In part one work stealing range was introduced that allows stealing of work items in contiguous chunks up to half of available work space. Now is the time for the partitioner itself.
If you recall partitioning can be done either statically up front or dynamically on demand. As we are looking at the case of known work space size handing out chunk of work items on demand dynamically is the way to deal with uneven work distribution. Since work stealing is a load balancing mechanism itself static partitioning for initial workload distribution and as computation goes work stealing is used to balance it.
Now as the obvious stuff is out of the way there two important things to consider: how work stealing is done and what is the termination condition of the whole computation.
Initial static partitioning feeds workers with contiguous chunks of work items to process. Each worker wraps obtained chunk into work stealing range so that other workers if needed can steal from it and continues to take one item at a time until work stealing range is drained potentially with help from other workers. At this point worker accumulated number of processed items as it is required to do the proper termination detection. Worker tries to steal from others while returning back processed items (or basically marking them as processed) through partition list that serves as a coordinator of work stealing and termination detection. Stealing is attempted until succeeded or termination is detected. Upon success worker wraps stolen range and continues its processing as from the beginning.
Static partitioning assumes work space size knowledge. As work space doesn't grow over time (deliberate decision and if grow is needed it can be done in phases where work items from current phase result in work items for the next phase) it is the basis for termination detection. Initially work space size is set and every worker that encounters drained work stealing range returns back processed items count by decreasing remaining count. Once count reaches zero the processing must be terminated.
Here is work stealing range partitioner in the flesh.
class WorkStealingIndexRangePartitioner : Partitioner<int> { private readonly int m_fromInclusive; private readonly int m_toExclusive; public WorkStealingIndexRangePartitioner(int fromInclusive, int toExclusive) { if (fromInclusive >= toExclusive) throw new ArgumentException(); m_fromInclusive = fromInclusive; m_toExclusive = toExclusive; } public override IList<IEnumerator<int>> GetPartitions(int partitionCount) { if (partitionCount <= 0) throw new ArgumentException(); // Calculate range and partition size var rangeSize = m_toExclusive - m_fromInclusive; var partitionSize = Math.Max(1, rangeSize / partitionCount); var partitionList = new PartitionList(rangeSize); var partitions = new Partition[partitionCount]; // Create partitiions by statically diving work items // into even sized chunks which is ok even in case // non uniform workload distribution as it will be // balanced out through work stealing var from = m_fromInclusive; for (var i = 0; i < partitionCount; i++) { var to = Math.Min(from + partitionSize, m_toExclusive); partitions[i] = new Partition(partitionList, from, to); from = to; } // Wire them through a coordinator partitionList.SetPartitions(partitions); return partitions; } // Partitioning coordinator class PartitionList { // Holds list of available partitions private List<Partition> m_partitions; // Holds number of remaining items to process private int m_remainingCount; public PartitionList(int count) { m_remainingCount = count; } public void SetPartitions(IEnumerable<Partition> partitions) { m_partitions = new List<Partition>(partitions); } // Return number of items as processed and tries to steal // new work items from other partitions public bool TryReturnAndSteal(Partition to, int count, out Tuple<int, int> range) { // Move toward termination condition Interlocked.Add(ref m_remainingCount, -count); // Until either termination condition is // reached or successful steal attempt range = null; while (true) { // Enumerate through available partitions and try // to steal from them foreach (var from in m_partitions) { // Check if nothing to steal if (m_remainingCount <= 0) return false; // Skip requesting partition as it is empty if (from == to) continue; range = from.TrySteal(); // Keep trying to steal from others if // unsuccessful if (range != null) return true; } } } } // Work stealing partition class Partition : IEnumerator<int> { // Holds range items currently owned private WorkStealingRange m_workStealingRange; // Holds number of processed items private int m_localCount; // Holds reference to partitioning coordinator that // controls in addition termination condition private readonly PartitionList m_list; // Holds current partition element or null if move next // was called or returned false private int? m_current; public Partition(PartitionList list, int fromInclusive, int toExclusive) { m_list = list; m_workStealingRange = new WorkStealingRange(fromInclusive, toExclusive); } public int Current { get { if (m_current != null) return (int)m_current; throw new InvalidOperationException(); } } object IEnumerator.Current { get { return Current; } } // Tries to steal from local steal range public Tuple<int, int> TrySteal() { return m_workStealingRange.TryStealRange(); } public bool MoveNext() { // First try to take item local from available range var local = m_workStealingRange.TryTakeOne(); if (local != null) { // Mark item as processed m_localCount++; // and set up current item m_current = local; return true; } // Otherwise try to steal from others Tuple<int, int> range; if (m_list.TryReturnAndSteal(this, m_localCount, out range)) { // Stolen something var from = range.Item1; var to = range.Item2; // Keep very first element to yourself m_localCount = 1; m_current = from++; // If anything left expose it to allow others // to steal if (to - from > 0) m_workStealingRange = new WorkStealingRange(from, to); return true; } // Termination condition reached so nothing to steal // from others return false; } public void Dispose() { } public void Reset() { throw new NotSupportedException(); } } }
Rough benchmarking shows that on random workload input it performs as well as standard partitioners for indexed collections while providing good performance for worst case scenario in workload distribution.