Note

Access to this page requires authorization. You can try signing in or .

Access to this page requires authorization. You can try .

How to: Implement Dynamic Partitions

The following example shows how to implement a custom System.Collections.Concurrent.OrderablePartitioner<TSource> that implements dynamic partitioning and can be used from certain overloads ForEach and from PLINQ.

Example

Each time a partition calls MoveNext on the enumerator, the enumerator provides the partition with one list element. In the case of PLINQ and ForEach, the partition is a Task instance. Because requests are happening concurrently on multiple threads, access to the current index is synchronized.

//
// An orderable dynamic partitioner for lists
//
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml.Linq;
using System.Numerics;

class OrderableListPartitioner<TSource> : OrderablePartitioner<TSource>
{
 private readonly IList<TSource> m_input;

 // Must override to return true.
 public override bool SupportsDynamicPartitions => true;

 public OrderableListPartitioner(IList<TSource> input) : base(true, false, true) =>
 m_input = input;

 public override IList<IEnumerator<KeyValuePair<long, TSource>>> GetOrderablePartitions(int partitionCount)
 {
 var dynamicPartitions = GetOrderableDynamicPartitions();
 var partitions =
 new IEnumerator<KeyValuePair<long, TSource>>[partitionCount];

 for (int i = 0; i < partitionCount; i++)
 {
 partitions[i] = dynamicPartitions.GetEnumerator();
 }
 return partitions;
 }

 public override IEnumerable<KeyValuePair<long, TSource>> GetOrderableDynamicPartitions() =>
 new ListDynamicPartitions(m_input);

 private class ListDynamicPartitions : IEnumerable<KeyValuePair<long, TSource>>
 {
 private IList<TSource> m_input;
 private int m_pos = 0;

 internal ListDynamicPartitions(IList<TSource> input) =>
 m_input = input;

 public IEnumerator<KeyValuePair<long, TSource>> GetEnumerator()
 {
 while (true)
 {
 // Each task gets the next item in the list. The index is
 // incremented in a thread-safe manner to avoid races.
 int elemIndex = Interlocked.Increment(ref m_pos) - 1;

 if (elemIndex >= m_input.Count)
 {
 yield break;
 }

 yield return new KeyValuePair<long, TSource>(
 elemIndex, m_input[elemIndex]);
 }
 }

 IEnumerator IEnumerable.GetEnumerator() =>
 ((IEnumerable<KeyValuePair<long, TSource>>)this).GetEnumerator();
 }
}

class ConsumerClass
{
 static void Main()
 {
 var nums = Enumerable.Range(0, 10000).ToArray();
 OrderableListPartitioner<int> partitioner = new OrderableListPartitioner<int>(nums);

 // Use with Parallel.ForEach
 Parallel.ForEach(partitioner, (i) => Console.WriteLine(i));

 // Use with PLINQ
 var query = from num in partitioner.AsParallel()
 where num % 2 == 0
 select num;

 foreach (var v in query)
 Console.WriteLine(v);
 }
}
Imports System.Threading
Imports System.Threading.Tasks
Imports System.Collections.Concurrent

Module Module1
 Public Class OrderableListPartitioner(Of TSource)
 Inherits OrderablePartitioner(Of TSource)


 Private ReadOnly m_input As IList(Of TSource)

 Public Sub New(ByVal input As IList(Of TSource))
 MyBase.New(True, False, True)
 m_input = input
 End Sub

 ' Must override to return true.
 Public Overrides ReadOnly Property SupportsDynamicPartitions As Boolean
 Get
 Return True
 End Get
 End Property

 Public Overrides Function GetOrderablePartitions(ByVal partitionCount As Integer) As IList(Of IEnumerator(Of KeyValuePair(Of Long, TSource)))
 Dim dynamicPartitions = GetOrderableDynamicPartitions()
 Dim partitions(partitionCount - 1) As IEnumerator(Of KeyValuePair(Of Long, TSource))

 For i = 0 To partitionCount - 1
 partitions(i) = dynamicPartitions.GetEnumerator()
 Next

 Return partitions
 End Function

 Public Overrides Function GetOrderableDynamicPartitions() As IEnumerable(Of KeyValuePair(Of Long, TSource))
 Return New ListDynamicPartitions(m_input)
 End Function

 Private Class ListDynamicPartitions
 Implements IEnumerable(Of KeyValuePair(Of Long, TSource))

 Private m_input As IList(Of TSource)

 Friend Sub New(ByVal input As IList(Of TSource))
 m_input = input
 End Sub

 Public Function GetEnumerator() As IEnumerator(Of KeyValuePair(Of Long, TSource)) Implements IEnumerable(Of KeyValuePair(Of Long, TSource)).GetEnumerator
 Return New ListDynamicPartitionsEnumerator(m_input)
 End Function

 Public Function GetEnumerator1() As IEnumerator Implements IEnumerable.GetEnumerator
 Return CType(Me, IEnumerable).GetEnumerator()
 End Function
 End Class

 Private Class ListDynamicPartitionsEnumerator
 Implements IEnumerator(Of KeyValuePair(Of Long, TSource))

 Private m_input As IList(Of TSource)
 Shared m_pos As Integer = 0
 Private m_current As KeyValuePair(Of Long, TSource)

 Public Sub New(ByVal input As IList(Of TSource))
 m_input = input
 m_pos = 0
 Me.disposedValue = False
 End Sub

 Public ReadOnly Property Current As KeyValuePair(Of Long, TSource) Implements IEnumerator(Of KeyValuePair(Of Long, TSource)).Current
 Get
 Return m_current
 End Get
 End Property

 Public ReadOnly Property Current1 As Object Implements IEnumerator.Current
 Get
 Return Me.Current
 End Get
 End Property

 Public Function MoveNext() As Boolean Implements IEnumerator.MoveNext
 Dim elemIndex = Interlocked.Increment(m_pos) - 1
 If elemIndex >= m_input.Count Then
 Return False
 End If

 m_current = New KeyValuePair(Of Long, TSource)(elemIndex, m_input(elemIndex))
 Return True
 End Function

 Public Sub Reset() Implements IEnumerator.Reset
 m_pos = 0
 End Sub

 Private disposedValue As Boolean ' To detect redundant calls

 Protected Overridable Sub Dispose(ByVal disposing As Boolean)
 If Not Me.disposedValue Then
 m_input = Nothing
 m_current = Nothing
 End If
 Me.disposedValue = True
 End Sub

 Public Sub Dispose() Implements IDisposable.Dispose
 Dispose(True)
 GC.SuppressFinalize(Me)
 End Sub

 End Class

 End Class

 Class ConsumerClass

 Shared Sub Main()

 Console.BufferHeight = 20000
 Dim nums = Enumerable.Range(0, 2000).ToArray()

 Dim partitioner = New OrderableListPartitioner(Of Integer)(nums)

 ' Use with Parallel.ForEach
 Parallel.ForEach(partitioner, Sub(i) Console.Write("{0}:{1} ", i, Thread.CurrentThread.ManagedThreadId))

 Console.WriteLine("PLINQ -----------------------------------")


 ' create a new partitioner, since Enumerators are not reusable.
 Dim partitioner2 = New OrderableListPartitioner(Of Integer)(nums)
 ' Use with PLINQ
 Dim query = From num In partitioner2.AsParallel()
 Where num Mod 8 = 0
 Select num

 For Each v In query
 Console.Write("{0} ", v)
 Next

 Console.WriteLine("press any key")
 Console.ReadKey()
 End Sub
 End Class

End Module

This is an example of chunk partitioning, with each chunk consisting of one element. By providing more elements at a time, you could reduce the contention over the lock and theoretically achieve faster performance. However, at some point, larger chunks might require additional load-balancing logic in order to keep all threads busy until all the work is done.

See also


Feedback

Was this page helpful?

Additional resources