Note

Access to this page requires authorization. You can try signing in or .

Access to this page requires authorization. You can try .

OrderablePartitioner<TSource> Class

Definition

Namespace:
System.Collections.Concurrent
Assemblies:
mscorlib.dll, System.Collections.Concurrent.dll
Assemblies:
netstandard.dll, System.Collections.Concurrent.dll
Assembly:
System.Collections.Concurrent.dll
Assembly:
mscorlib.dll
Assembly:
netstandard.dll
Source:
OrderablePartitioner.cs
Source:
OrderablePartitioner.cs
Source:
OrderablePartitioner.cs
Source:
OrderablePartitioner.cs
Source:
OrderablePartitioner.cs

Important

Some information relates to prerelease product that may be substantially modified before it’s released. Microsoft makes no warranties, express or implied, with respect to the information provided here.

Represents a particular manner of splitting an orderable data source into multiple partitions.

generic <typename TSource>
public ref class OrderablePartitioner abstract : System::Collections::Concurrent::Partitioner<TSource>
public abstract class OrderablePartitioner<TSource> : System.Collections.Concurrent.Partitioner<TSource>
type OrderablePartitioner<'Source> = class
 inherit Partitioner<'Source>
Public MustInherit Class OrderablePartitioner(Of TSource)
Inherits Partitioner(Of TSource)

Type Parameters

TSource

Type of the elements in the collection.

Inheritance
OrderablePartitioner<TSource>

Examples

The following example shows how to implement an orderable partitioner that returns one element at a time:

using System;
using System.Collections;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace OrderablePartitionerDemo
{

 // Simple partitioner that will extract one (index,item) pair at a time,
 // in a thread-safe fashion, from the underlying collection.
 class SingleElementOrderablePartitioner<T> : OrderablePartitioner<T>
 {
 // The collection being wrapped by this Partitioner
 IEnumerable<T> m_referenceEnumerable;

 // Class used to wrap m_index for the purpose of sharing access to it
 // between an InternalEnumerable and multiple InternalEnumerators
 private class Shared<U>
 {
 internal U Value;

 public Shared(U item)
 {
 Value = item;
 }
 }

 // Internal class that serves as a shared enumerable for the
 // underlying collection.
 private class InternalEnumerable : IEnumerable<KeyValuePair<long, T>>, IDisposable
 {
 IEnumerator<T> m_reader;
 bool m_disposed = false;
 Shared<long> m_index = null;

 // These two are used to implement Dispose() when static partitioning is being performed
 int m_activeEnumerators;
 bool m_downcountEnumerators;

 // "downcountEnumerators" will be true for static partitioning, false for
 // dynamic partitioning.
 public InternalEnumerable(IEnumerator<T> reader, bool downcountEnumerators)
 {
 m_reader = reader;
 m_index = new Shared<long>(0);
 m_activeEnumerators = 0;
 m_downcountEnumerators = downcountEnumerators;
 }

 public IEnumerator<KeyValuePair<long, T>> GetEnumerator()
 {
 if (m_disposed)
 throw new ObjectDisposedException("InternalEnumerable: Can't call GetEnumerator() after disposing");

 // For static partitioning, keep track of the number of active enumerators.
 if (m_downcountEnumerators) Interlocked.Increment(ref m_activeEnumerators);

 return new InternalEnumerator(m_reader, this, m_index);
 }

 IEnumerator IEnumerable.GetEnumerator()
 {
 return ((IEnumerable<KeyValuePair<long, T>>)this).GetEnumerator();
 }

 public void Dispose()
 {
 if (!m_disposed)
 {
 // Only dispose the source enumerator if you are doing dynamic partitioning
 if (!m_downcountEnumerators)
 {
 m_reader.Dispose();
 }
 m_disposed = true;
 }
 }

 // Called from Dispose() method of spawned InternalEnumerator. During
 // static partitioning, the source enumerator will be automatically
 // disposed once all requested InternalEnumerators have been disposed.
 public void DisposeEnumerator()
 {
 if (m_downcountEnumerators)
 {
 if (Interlocked.Decrement(ref m_activeEnumerators) == 0)
 {
 m_reader.Dispose();
 }
 }
 }
 }

 // Internal class that serves as a shared enumerator for
 // the underlying collection.
 private class InternalEnumerator : IEnumerator<KeyValuePair<long, T>>
 {
 KeyValuePair<long, T> m_current;
 IEnumerator<T> m_source;
 InternalEnumerable m_controllingEnumerable;
 Shared<long> m_index = null;
 bool m_disposed = false;

 public InternalEnumerator(IEnumerator<T> source, InternalEnumerable controllingEnumerable, Shared<long> index)
 {
 m_source = source;
 m_current = default(KeyValuePair<long, T>);
 m_controllingEnumerable = controllingEnumerable;
 m_index = index;
 }

 object IEnumerator.Current
 {
 get { return m_current; }
 }

 KeyValuePair<long, T> IEnumerator<KeyValuePair<long, T>>.Current
 {
 get { return m_current; }
 }

 void IEnumerator.Reset()
 {
 throw new NotSupportedException("Reset() not supported");
 }

 // This method is the crux of this class. Under lock, it calls
 // MoveNext() on the underlying enumerator, grabs Current and index,
 // and increments the index.
 bool IEnumerator.MoveNext()
 {
 bool rval = false;
 lock (m_source)
 {
 rval = m_source.MoveNext();
 if (rval)
 {
 m_current = new KeyValuePair<long, T>(m_index.Value, m_source.Current);
 m_index.Value = m_index.Value + 1;
 }
 else
 {
 m_current = default(KeyValuePair<long, T>);
 }
 }
 return rval;
 }

 void IDisposable.Dispose()
 {
 if (!m_disposed)
 {
 // Delegate to parent enumerable's DisposeEnumerator() method
 m_controllingEnumerable.DisposeEnumerator();
 m_disposed = true;
 }
 }
 }

 // Constructor just grabs the collection to wrap
 public SingleElementOrderablePartitioner(IEnumerable<T> enumerable)
 : base(true, true, true)
 {
 // Verify that the source IEnumerable is not null
 if (enumerable == null)
 throw new ArgumentNullException("enumerable");

 m_referenceEnumerable = enumerable;
 }

 // Produces a list of "numPartitions" IEnumerators that can each be
 // used to traverse the underlying collection in a thread-safe manner.
 // This will return a static number of enumerators, as opposed to
 // GetOrderableDynamicPartitions(), the result of which can be used to produce
 // any number of enumerators.
 public override IList<IEnumerator<KeyValuePair<long, T>>> GetOrderablePartitions(int numPartitions)
 {
 if (numPartitions < 1)
 throw new ArgumentOutOfRangeException("NumPartitions");

 List<IEnumerator<KeyValuePair<long, T>>> list = new List<IEnumerator<KeyValuePair<long, T>>>(numPartitions);

 // Since we are doing static partitioning, create an InternalEnumerable with reference
 // counting of spawned InternalEnumerators turned on. Once all of the spawned enumerators
 // are disposed, dynamicPartitions will be disposed.
 var dynamicPartitions = new InternalEnumerable(m_referenceEnumerable.GetEnumerator(), true);
 for (int i = 0; i < numPartitions; i++)
 list.Add(dynamicPartitions.GetEnumerator());

 return list;
 }

 // Returns an instance of our internal Enumerable class. GetEnumerator()
 // can then be called on that (multiple times) to produce shared enumerators.
 public override IEnumerable<KeyValuePair<long, T>> GetOrderableDynamicPartitions()
 {
 // Since we are doing dynamic partitioning, create an InternalEnumerable with reference
 // counting of spawned InternalEnumerators turned off. This returned InternalEnumerable
 // will need to be explicitly disposed.
 return new InternalEnumerable(m_referenceEnumerable.GetEnumerator(), false);
 }

 // Must be set to true if GetDynamicPartitions() is supported.
 public override bool SupportsDynamicPartitions
 {
 get { return true; }
 }
 }

 class Program
 {
 static void Main()
 {
 //
 // First a fairly simple visual test
 //
 var someCollection = new string[] { "four", "score", "and", "twenty", "years", "ago" };
 var someOrderablePartitioner = new SingleElementOrderablePartitioner<string>(someCollection);
 Parallel.ForEach(someOrderablePartitioner, (item, state, index) =>
 {
 Console.WriteLine("ForEach: item = {0}, index = {1}, thread id = {2}", item, index, Thread.CurrentThread.ManagedThreadId);
 });

 //
 // Now a test of static partitioning, using 2 partitions and 2 tasks
 //
 var staticPartitioner = someOrderablePartitioner.GetOrderablePartitions(2);

 // staticAction will consume the shared enumerable
 int partitionerListIndex = 0;
 Action staticAction = () =>
 {
 int myIndex = Interlocked.Increment(ref partitionerListIndex) - 1;
 var enumerator = staticPartitioner[myIndex];
 while (enumerator.MoveNext())
 Console.WriteLine("Static partitioning: item = {0}, index = {1}, thread id = {2}",
 enumerator.Current.Value, enumerator.Current.Key, Thread.CurrentThread.ManagedThreadId);
 enumerator.Dispose();
 };

 // Now launch two of them
 Parallel.Invoke(staticAction, staticAction);

 //
 // Now a more rigorous test of dynamic partitioning (used by Parallel.ForEach)
 //
 Console.WriteLine("OrderablePartitioner test: testing for index mismatches");
 List<int> src = Enumerable.Range(0, 100000).ToList();
 SingleElementOrderablePartitioner<int> myOP = new SingleElementOrderablePartitioner<int>(src);

 int counter = 0;
 bool mismatch = false;
 Parallel.ForEach(myOP, (item, state, index) =>
 {
 if (item != index) mismatch = true;
 Interlocked.Increment(ref counter);
 });

 if (mismatch) Console.WriteLine("OrderablePartitioner Test: index mismatch detected");

 Console.WriteLine("OrderablePartitioner test: counter = {0}, should be 100000", counter);
 }
 }
}
Imports System.Collections.Concurrent
Imports System.Threading
Imports System.Threading.Tasks

Module OrderablePartitionerDemo


 ' Simple partitioner that will extract one (index,item) pair at a time, 
 ' in a thread-safe fashion, from the underlying collection.
 Class SingleElementOrderablePartitioner(Of T)
 Inherits OrderablePartitioner(Of T)
 ' The collection being wrapped by this Partitioner
 Private m_referenceEnumerable As IEnumerable(Of T)

 ' Class used to wrap m_index for the purpose of sharing access to it
 ' between an InternalEnumerable and multiple InternalEnumerators
 Private Class [Shared](Of U)
 Friend Value As U

 Public Sub New(ByVal item As U)
 Value = item
 End Sub
 End Class

 ' Internal class that serves as a shared enumerable for the
 ' underlying collection.
 Private Class InternalEnumerable
 Implements IEnumerable(Of KeyValuePair(Of Long, T))
 Implements IDisposable

 Private m_reader As IEnumerator(Of T)
 Private m_disposed As Boolean = False
 Private m_index As [Shared](Of Long) = Nothing

 ' These two are used to implement Dispose() when static partitioning is being performed
 Private m_activeEnumerators As Integer
 Private m_downcountEnumerators As Boolean

 ' "downcountEnumerators" will be true for static partitioning, false for
 ' dynamic partitioning. 
 Public Sub New(ByVal reader As IEnumerator(Of T), ByVal downcountEnumerators As Boolean)
 m_reader = reader
 m_index = New [Shared](Of Long)(0)
 m_activeEnumerators = 0
 m_downcountEnumerators = downcountEnumerators
 End Sub

 Public Function GetEnumerator() As IEnumerator(Of KeyValuePair(Of Long, T)) _
 Implements IEnumerable(Of System.Collections.Generic.KeyValuePair(Of Long, T)).GetEnumerator

 If m_disposed Then
 Throw New ObjectDisposedException("InternalEnumerable: Can't call GetEnumerator() after disposing")
 End If

 ' For static partitioning, keep track of the number of active enumerators.
 If m_downcountEnumerators Then
 Interlocked.Increment(m_activeEnumerators)
 End If

 Return New InternalEnumerator(m_reader, Me, m_index)
 End Function

 Private Function GetEnumerator2() As IEnumerator Implements IEnumerable.GetEnumerator
 Return DirectCast(Me, IEnumerable(Of KeyValuePair(Of Long, T))).GetEnumerator()
 End Function

 Public Sub Dispose() Implements IDisposable.Dispose
 If Not m_disposed Then
 ' Only dispose the source enumerator if you are doing dynamic partitioning
 If Not m_downcountEnumerators Then
 m_reader.Dispose()
 End If
 m_disposed = True
 End If
 End Sub

 ' Called from Dispose() method of spawned InternalEnumerator. During
 ' static partitioning, the source enumerator will be automatically
 ' disposed once all requested InternalEnumerators have been disposed.
 Public Sub DisposeEnumerator()
 If m_downcountEnumerators Then
 If Interlocked.Decrement(m_activeEnumerators) = 0 Then
 m_reader.Dispose()
 End If
 End If
 End Sub
 End Class

 ' Internal class that serves as a shared enumerator for 
 ' the underlying collection.
 Private Class InternalEnumerator
 Implements IEnumerator(Of KeyValuePair(Of Long, T))
 Private m_current As KeyValuePair(Of Long, T)
 Private m_source As IEnumerator(Of T)
 Private m_controllingEnumerable As InternalEnumerable
 Private m_index As [Shared](Of Long) = Nothing
 Private m_disposed As Boolean = False


 Public Sub New(ByVal source As IEnumerator(Of T), ByVal controllingEnumerable As InternalEnumerable, ByVal index As [Shared](Of Long))
 m_source = source
 m_current = Nothing
 m_controllingEnumerable = controllingEnumerable
 m_index = index
 End Sub

 Private ReadOnly Property Current2() As Object Implements IEnumerator.Current
 Get
 Return m_current
 End Get
 End Property

 Private ReadOnly Property Current() As KeyValuePair(Of Long, T) Implements IEnumerator(Of KeyValuePair(Of Long, T)).Current
 Get
 Return m_current
 End Get
 End Property

 Private Sub Reset() Implements IEnumerator.Reset
 Throw New NotSupportedException("Reset() not supported")
 End Sub

 ' This method is the crux of this class. Under lock, it calls
 ' MoveNext() on the underlying enumerator, grabs Current and index, 
 ' and increments the index.
 Private Function MoveNext() As Boolean Implements IEnumerator.MoveNext
 Dim rval As Boolean = False
 SyncLock m_source
 rval = m_source.MoveNext()
 If rval Then
 m_current = New KeyValuePair(Of Long, T)(m_index.Value, m_source.Current)
 m_index.Value = m_index.Value + 1
 Else
 m_current = Nothing
 End If
 End SyncLock
 Return rval
 End Function

 Private Sub Dispose() Implements IDisposable.Dispose
 If Not m_disposed Then
 ' Delegate to parent enumerable's DisposeEnumerator() method
 m_controllingEnumerable.DisposeEnumerator()
 m_disposed = True
 End If
 End Sub

 End Class

 ' Constructor just grabs the collection to wrap
 Public Sub New(ByVal enumerable As IEnumerable(Of T))
 MyBase.New(True, True, True)

 ' Verify that the source IEnumerable is not null
 If enumerable Is Nothing Then
 Throw New ArgumentNullException("enumerable")
 End If

 m_referenceEnumerable = enumerable
 End Sub

 ' Produces a list of "numPartitions" IEnumerators that can each be
 ' used to traverse the underlying collection in a thread-safe manner.
 ' This will return a static number of enumerators, as opposed to
 ' GetOrderableDynamicPartitions(), the result of which can be used to produce
 ' any number of enumerators.
 Public Overloads Overrides Function GetOrderablePartitions(ByVal numPartitions As Integer) As IList(Of IEnumerator(Of KeyValuePair(Of Long, T)))
 If numPartitions < 1 Then
 Throw New ArgumentOutOfRangeException("NumPartitions")
 End If

 Dim list As New List(Of IEnumerator(Of KeyValuePair(Of Long, T)))(numPartitions)

 ' Since we are doing static partitioning, create an InternalEnumerable with reference
 ' counting of spawned InternalEnumerators turned on. Once all of the spawned enumerators
 ' are disposed, dynamicPartitions will be disposed.
 Dim dynamicPartitions = New InternalEnumerable(m_referenceEnumerable.GetEnumerator(), True)
 For i As Integer = 0 To numPartitions - 1
 list.Add(dynamicPartitions.GetEnumerator())
 Next

 Return list
 End Function

 ' Returns an instance of our internal Enumerable class. GetEnumerator()
 ' can then be called on that (multiple times) to produce shared enumerators.
 Public Overloads Overrides Function GetOrderableDynamicPartitions() As IEnumerable(Of KeyValuePair(Of Long, T))
 ' Since we are doing dynamic partitioning, create an InternalEnumerable with reference
 ' counting of spawned InternalEnumerators turned off. This returned InternalEnumerable
 ' will need to be explicitly disposed.
 Return New InternalEnumerable(m_referenceEnumerable.GetEnumerator(), False)
 End Function

 ' Must be set to true if GetDynamicPartitions() is supported.
 Public Overloads Overrides ReadOnly Property SupportsDynamicPartitions() As Boolean
 Get
 Return True
 End Get
 End Property
 End Class

 Class Program
 Shared Sub Main()
 '
 ' First a fairly simple visual test
 '
 Dim someCollection = New String() {"four", "score", "and", "twenty", "years", "ago"}
 Dim someOrderablePartitioner = New SingleElementOrderablePartitioner(Of String)(someCollection)
 Parallel.ForEach(someOrderablePartitioner,
 Sub(item, state, index)
 Console.WriteLine("ForEach: item = {0}, index = {1}, thread id = {2}", item, index, Thread.CurrentThread.ManagedThreadId)
 End Sub)

 '
 ' Now a test of static partitioning, using 2 partitions and 2 tasks
 '
 Dim staticPartitioner = someOrderablePartitioner.GetOrderablePartitions(2)

 ' staticAction will consume the shared enumerable
 Dim partitionerListIndex As Integer = 0
 Dim staticAction As Action =
 Sub()
 Dim myIndex As Integer = Interlocked.Increment(partitionerListIndex) - 1
 Dim enumerator = staticPartitioner(myIndex)
 While enumerator.MoveNext()
 Console.WriteLine("Static partitioning: item = {0}, index = {1}, thread id = {2}", enumerator.Current.Value, enumerator.Current.Key, Thread.CurrentThread.ManagedThreadId)
 End While
 enumerator.Dispose()
 End Sub

 ' Now launch two of them
 Parallel.Invoke(staticAction, staticAction)

 '
 ' Now a more rigorous test of dynamic partitioning (used by Parallel.ForEach)
 '
 Console.WriteLine("OrderablePartitioner test: testing for index mismatches")
 Dim src As List(Of Integer) = Enumerable.Range(0, 100000).ToList()
 Dim myOP As New SingleElementOrderablePartitioner(Of Integer)(src)

 Dim counter As Integer = 0
 Dim mismatch As Boolean = False
 Parallel.ForEach(myOP,
 Sub(item, state, index)
 If item <> index Then
 mismatch = True
 End If
 Interlocked.Increment(counter)
 End Sub)

 If mismatch Then
 Console.WriteLine("OrderablePartitioner Test: index mismatch detected")
 End If



 Console.WriteLine("OrderablePartitioner test: counter = {0}, should be 100000", counter)
 End Sub
 End Class
End Module

Remarks

The implementation of the derived class is responsible for ordering the elements into key-value pairs in whatever manner is appropriate. For more information, see Custom Partitioners for PLINQ and TPL.

Constructors

Name Description
OrderablePartitioner<TSource>(Boolean, Boolean, Boolean)

Called from constructors in derived classes to initialize the OrderablePartitioner<TSource> class with the specified constraints on the index keys.

Properties

Name Description
KeysNormalized

Gets whether order keys are normalized.

KeysOrderedAcrossPartitions

Gets whether elements in an earlier partition always come before elements in a later partition.

KeysOrderedInEachPartition

Gets whether elements in each partition are yielded in the order of increasing keys.

SupportsDynamicPartitions

Gets whether additional partitions can be created dynamically.

(Inherited from Partitioner<TSource>)

Methods

Name Description
Equals(Object)

Determines whether the specified object is equal to the current object.

(Inherited from Object)
GetDynamicPartitions()

Creates an object that can partition the underlying collection into a variable number of partitions.

GetHashCode()

Serves as the default hash function.

(Inherited from Object)
GetOrderableDynamicPartitions()

Creates an object that can partition the underlying collection into a variable number of partitions.

GetOrderablePartitions(Int32)

Partitions the underlying collection into the specified number of orderable partitions.

GetPartitions(Int32)

Partitions the underlying collection into the given number of ordered partitions.

GetType()

Gets the Type of the current instance.

(Inherited from Object)
MemberwiseClone()

Creates a shallow copy of the current Object.

(Inherited from Object)
ToString()

Returns a string that represents the current object.

(Inherited from Object)

Extension Methods

Name Description
AsParallel<TSource>(Partitioner<TSource>)

Enables parallelization of a query, as sourced by a custom partitioner that is responsible for splitting the input sequence into partitions.

Applies to

Thread Safety

All public members of OrderablePartitioner<TSource> are thread-safe and may be called from multiple threads concurrently.

See also


Feedback

Was this page helpful?