Wednesday, August 10, 2011

Load-balancing partitioner with work stealing, part two

In part one work stealing range was introduced that allows stealing of work items in contiguous chunks up to half of available work space. Now is the time for the partitioner itself.

If you recall partitioning can be done either statically up front or dynamically on demand. As we are looking at the case of known work space size handing out chunk of work items on demand dynamically is the way to deal with uneven work distribution. Since work stealing is a load balancing mechanism itself static partitioning for initial workload distribution and as computation goes work stealing is used to balance it.

Now as the obvious stuff is out of the way there two important things to consider: how work stealing is done and what is the termination condition of the whole computation.

Initial static partitioning feeds workers with contiguous chunks of work items to process. Each worker wraps obtained chunk into work stealing range so that other workers if needed can steal from it and continues to take one item at a time until work stealing range is drained potentially with help from other workers. At this point worker accumulated number of processed items as it is required to do the proper termination detection. Worker tries to steal from others while returning back processed items (or basically marking them as processed) through partition list that serves as a coordinator of work stealing and termination detection. Stealing is attempted until succeeded or termination is detected. Upon success worker wraps stolen range and continues its processing as from the beginning.

Static partitioning assumes work space size knowledge. As work space doesn't grow over time (deliberate decision and if grow is needed it can be done in phases where work items from current phase result in work items for the next phase) it is the basis for termination detection. Initially work space size is set and every worker that encounters drained work stealing range returns back processed items count by decreasing remaining count. Once count reaches zero the processing must be terminated.

Here is work stealing range partitioner in the flesh.

class WorkStealingIndexRangePartitioner : Partitioner<int>
{
    private readonly int m_fromInclusive;
    private readonly int m_toExclusive;

    public WorkStealingIndexRangePartitioner(int fromInclusive, int toExclusive)
    {
        if (fromInclusive >= toExclusive)
            throw new ArgumentException();

        m_fromInclusive = fromInclusive;
        m_toExclusive = toExclusive;
    }

    public override IList<IEnumerator<int>> GetPartitions(int partitionCount)
    {
        if (partitionCount <= 0)
            throw new ArgumentException();
        // Calculate range and partition size
        var rangeSize = m_toExclusive - m_fromInclusive;
        var partitionSize = Math.Max(1, rangeSize / partitionCount);

        var partitionList = new PartitionList(rangeSize);
        var partitions = new Partition[partitionCount];
        // Create partitiions by statically diving work items 
        // into even sized chunks which is ok even in case 
        // non uniform workload distribution as it will be 
        // balanced out through work stealing
        var from = m_fromInclusive;
        for (var i = 0; i < partitionCount; i++)
        {
            var to = Math.Min(from + partitionSize, m_toExclusive);
            partitions[i] = new Partition(partitionList, from, to);
            from = to;
        }
        // Wire them through a coordinator
        partitionList.SetPartitions(partitions);

        return partitions;
    }

    // Partitioning coordinator
    class PartitionList
    {
        // Holds list of available partitions
        private List<Partition> m_partitions;
        // Holds number of remaining items to process
        private int m_remainingCount;

        public PartitionList(int count)
        {
            m_remainingCount = count;
        }

        public void SetPartitions(IEnumerable<Partition> partitions)
        {
            m_partitions = new List<Partition>(partitions);
        }

        // Return number of items as processed and tries to steal 
        // new work items from other partitions
        public bool TryReturnAndSteal(Partition to, int count, out Tuple<int, int> range)
        {
            // Move toward termination condition 
            Interlocked.Add(ref m_remainingCount, -count);
            // Until either termination condition is 
            // reached or successful steal attempt
            range = null;
            while (true)
            {
                // Enumerate through available partitions and try 
                // to steal from them
                foreach (var from in m_partitions)
                {
                    // Check if nothing to steal
                    if (m_remainingCount <= 0)
                        return false;
                    // Skip requesting partition as it is empty
                    if (from == to)
                        continue;

                    range = from.TrySteal();
                    // Keep trying to steal from others if 
                    // unsuccessful
                    if (range != null)
                        return true;
                }
            }
        }
    }

    // Work stealing partition 
    class Partition : IEnumerator<int>
    {
        // Holds range items currently owned
        private WorkStealingRange m_workStealingRange;
        // Holds number of processed items
        private int m_localCount;
        // Holds reference to partitioning coordinator that 
        // controls in addition termination condition
        private readonly PartitionList m_list;
        // Holds current partition element or null if move next
        // was called or returned false
        private int? m_current;

        public Partition(PartitionList list, int fromInclusive, int toExclusive)
        {
            m_list = list;
            m_workStealingRange = new WorkStealingRange(fromInclusive, toExclusive);
        }

        public int Current
        {
            get
            {
                if (m_current != null)
                    return (int)m_current;

                throw new InvalidOperationException();
            }
        }

        object IEnumerator.Current
        {
            get { return Current; }
        }

        // Tries to steal from local steal range
        public Tuple<int, int> TrySteal()
        {
            return m_workStealingRange.TryStealRange();
        }

        public bool MoveNext()
        {
            // First try to take item local from available range
            var local = m_workStealingRange.TryTakeOne();
            if (local != null)
            {
                // Mark item as processed 
                m_localCount++;
                // and set up current item
                m_current = local;
                return true;
            }
            // Otherwise try to steal from others
            Tuple<int, int> range;
            if (m_list.TryReturnAndSteal(this, m_localCount, out range))
            {
                // Stolen something
                var from = range.Item1;
                var to = range.Item2;
                // Keep very first element to yourself
                m_localCount = 1;
                m_current = from++;
                // If anything left expose it to allow others
                // to steal
                if (to - from > 0)
                    m_workStealingRange = new WorkStealingRange(from, to);

                return true;
            }
            // Termination condition reached so nothing to steal 
            // from others
            return false;
        }

        public void Dispose()
        {
        }

        public void Reset()
        {
            throw new NotSupportedException();
        }
    }
}

Rough benchmarking shows that on random workload input it performs as well as standard partitioners for indexed collections while providing good performance for worst case scenario in workload distribution.

Monday, August 8, 2011

Load-balancing partitioner with work stealing, part one

Data parallelism is a computing parallelization technique where data can be separated into independent pieces and distributed across parallel computing nodes. This technique is the core of the Parallel LINQ that partitions data into segments and executes query on each segment in parallel. Depending on scenarios and expected workload distribution different partitioning schemes (I highly recommend to read linked post before reading further) can be employed that can be essentially divided into two types:

  • Static where partitioning is done up front which is quite simple but may perform poorly when workload distribution is not even
  • Dynamic where partition is done on demand by providing chunks of work to idle workers which better deals with uneven workload distribution

Both types has one thing in common which is once chunk of work is taken by worker it is never given back until processed meaning worker has exclusive responsibility for it. In case of uneven workload distribution it may lead to poor performance which is the performance of the slowest worker. For example, when most of the heavy work is handed out to a single worker even though all other workers are finished in parallel the overall computation is not finished until unlucky worker will finish executing heavy work sequentially.

In order to deal with uneven workload distribution work stealing can be used. Joe Duffy explores in great details of how to build custom thread pool that uses work stealing to balance workload among workers. The approach allows to steal one work item which is in most cases sufficient as work item usually produces other work items and so chances are that idle worker once processed stolen work item will have more work to do (otherwise it can go steal other work item if any).

In data parallelism scenarios stealing a chunk of work items in one shot may be more beneficial (to avoid high synchronization costs). Work stealing benefits from initial work distribution compared to having all work handed over to a single worker and let the others to steal from it. Thus static partitioning with work stealing of chunks of work items is what we are looking for. Static partitioning assumes work space size knowledge which in most cases there.

Parallel LINQ uses partitioner concept to abstract partitioning mechanism. Before developing custom partitioner (this will be part two of the series) work stealing part must be in place:

  • Work space is known in advance, it is not growing over time and provides indexed access
  • Thieves must be able to steal work items in contiguous chunks ideally half of the available items

As work space is known in advance it is represented through a range of integer values. So basically indexes will be the subject rather than elements themselves as it quite easy to map to the actual elements. Range will be accessed from lower bound by the owner of the range and every time will try to take one index. Higher bound of the range is represented basically by other range called steal range. It defines bounds of indexes that are eligible for stealing. Thieves will contend for the steal range with each other and with owner in case the very last item is in the steal range. Essentially work space can be looked at as [low .. [mid .. high)) where [mid .. high) is a steal range and [low .. high) is overall work space.

Stealing is the fate of idle workers and thus they take the burden letting owner be ignorant of their presence until they are too close:

  • Load available steal range and lower bound
  • If steal range falls behind meaning no more items left return value that indicates unsuccessful steal attempt
  • Otherwise construct new steal range and attempt to atomically compare and swap it
    • If succeeded observed range was stolen
    • otherwise either other thief succeeded; or the owner if the range contained the very last item; or owner updated steal range as between the moment thief observed steal range and now owner consumed a lot of items and is close to steal range

Owner must be able to take one item at a time without heavy synchronization as follows:

  • Reserve item at the lower bound by advancing it by one and once the item is reserved it cannot be reached unless it is in the steal range
  • Load available steal range; the order is really important otherwise due to reordering the same item can stolen and taken by owner
  • If reserved lower bound item is not in observed steal range
    • If steal range is too close try to update to a smaller one and don't worry if unsuccessfully as next steal or local take will make right
    • Return reserved item as successfully taken
  • If reserved item in the steal range meaning this the very last one and so contend for it with thieves
    • Try to atomically compare and swap to a steal range that falls behind to indicate no more items left
    • If succeeded return reserved item as successfully taken
    • Otherwise lost the race so return value that indicates unsuccessful take attempt
  • Otherwise the last item was stolen before owner even contended for it

Now that algorithm is in place here is the implementation.

class WorkStealingRange
{
    // Holds range that is available for stealing
    private volatile Tuple<int, int> m_stealRange;
    // Holds index next to be taken locally
    private volatile int m_low;

    public WorkStealingRange(int low, int high)
    {
        m_low = low;
        m_stealRange = CreateStealRange(low, high);
    }

    // tries to steal range of items 
    public Tuple<int, int> TryStealRange()
    {
        // Contend for available steal range
        var oldRange = m_stealRange;
        var mid = oldRange.Item1;
        var low = m_low;
        // If steal range is behind lower bound it means no 
        // work items left 
        if (low > mid)
            // Return null to indicate failed steal attempt
            return null;
        // Calculate new steal range that will replace current
        // in case of success
        var newRange = CreateStealRange(low, mid);
        // Contend with other thieves and owner (in case steal 
        // range consists of the single last item)
        if (Interlocked.CompareExchange(ref m_stealRange, newRange, oldRange) != oldRange)
            // Lost the race so indicate failed steal attempt
            return null;
        // Won contention for the steal range
        return oldRange;
    }

    // Tries to take one item locally
    public int? TryTakeOne()
    {
        var low = m_low;
        // Reserve item using exchange to avoid legal 
        // reordering with steal range read below
        Interlocked.Exchange(ref m_low, low + 1);
        // Now that the lowest element is reserved it is either
        // not avaible to thieves or it is the last one and
        // is in steal range
        var oldRange = m_stealRange;
        var mid = oldRange.Item1;
        // If observed non empty steal range that doesn't 
        // contain reserved item it safe to return it as 
        // nobody can reach reserved item now
        if (low < mid)
        {
            var high = oldRange.Item2;
            // If ahead not enough space in particular at least 
            // two times of observed steal range attempt to 
            // adjust steal range to prevent stealing more than 
            // half of items
            if (mid - low <= 2 * (high - mid))
            {
                // Try to make steal range 1/4 of available work
                // space
                var newRange = CreateStealRange(low, high);
                // Don't worry if failed as next steal or local 
                // take will fix it
                Interlocked.CompareExchange(ref m_stealRange, newRange, oldRange);
            }
            // Return reserved item as it is not reachable 
            // by thieves
            return low;
        }
        // If observed steal range contains reserved item contend
        // for it with thieves
        if (low == mid)
        {
            // Create new range that falls behind to indicate 
            // termination
            var newRange = CreateStealRange(low, low);
            // Otherwise steal range contains only reserved item 
            // and must contend with the thieves for it
            if (Interlocked.CompareExchange(ref m_stealRange, newRange, oldRange) != oldRange)
                // Lost the race, return null to indicate no 
                // more items available
                return null;
            // Won contention for the last item
            return low;
        }
        // No luck last item was stolen
        return null;
    }

    private static Tuple<int, int> CreateStealRange(int low, int high)
    {
        // If range is not empty create new one that is 
        // 1/4 of the available space
        if (low != high)
            return new Tuple<int, int>((low + 3 * high) / 4, high);
        // Otherwise create empty range that falls behind
        return new Tuple<int, int>(low - 1, low - 1);
    }
}

Next time custom partitioner that uses work stealing range on the surgical table.