Monday, March 5, 2012

Parallel partition phase for quick sort

A while ago I was wrapping my head around parallel merge sort. Since it requires additional O(n) space in practice it is the best choice if available memory is not constrained. Otherwise it is better to consider quick sort. As in merge sort in quick sort we have two phases:

  • rearrange elements into two partitions such that left one contains elements less than or equal to the selected pivot element and greater or equal to the pivot elements are in the right one
  • recursively sort (independent) partitions

Second phase is naturally parallelized using task parallelism since partitions are independent (partition elements will remain inside partition boundaries when the sort of the whole array is finished). You can find an example of this behavior in parallel quick sort. It is a good start. But the first phase is still contributes O(n) at each recursion level. By parallelizing partition phase we can further speed up quick sort.

Sequential version of partition phase is pretty straightforward.

class PartitionHelper<T>
{
    private readonly T[] m_arr;
    private readonly IComparer<T> m_comparer;

    public PartitionHelper(T[] arr, IComparer<T> comparer)
    {
        m_arr = arr;
        m_comparer = comparer;
    }

    // Moves elements within range around pivot sequentially and
    // returns position of the first element equal to the pivot.
    public int SequentialPartition(T pivot, int from, int to)
    {
        var j = from;
        for (var i = from; i < to; i++) {
            if (m_comparer.Compare(m_arr[i], pivot) < 0) {
                SwapElements(i, j++);
            }
        }
        return j;
    }

    private void SwapElements(int from, int to)
    {
        var tmp = m_arr[from];
        m_arr[from] = m_arr[to];
        m_arr[to] = tmp;
    }
    
...

An interesting point is that we do not know in advance how the partitioning will be done since it is data dependent (position of an element depends on other elements). Still independent pieces can be carved out. Here is the core idea.

Let’s assume an array that looks like below where x denotes some element, p denotes selected pivot, e is equal, l is less and g is greater elements than pivot.

e l l (g l g e e l) x x x x x x x x x (l l g g e l) g e g p

           left                                    right

Let’s assume we selected two blocks of elements within array called left (containing elements g l g e e l) such that all elements before are less or equal to the pivot and right (that holds elements l l g g e l) such that all elements after it are greater or equal to the pivot from left and right ends respectively. After the partitioning against pivot is done left block will hold elements less than or equal to the pivot and right block will contain elements greater that or equal to the pivot. In our example left block contains two g elements that do not belong there and right block holds three l elements that must not be there. But this means  that we can swap two l elements from right block with two g elements from left block and left block will comply with partitioning against pivot.

e l l (l l l e e l) x x x x x x x x x (g g g g e l) g e g p

           left                                   right

Overall after blocks rearrange operation at least one of them contains correct elements (if the number of elements to be moved in each block is not equal).

...

    // Enum that indicates which of the blocks are in 
    // in place after arrangment.
    private enum InPlace
    {
        Left,
        Right,
        Both
    }

    // Tries to rearranges elements of the two blocks such that
    // right block contains elements greater or equal to the 
    // pivot and/or left block contains elements less than or 
    // equal to the pivot. At least one of the blocks is 
    // correctly reaaranged.
    private InPlace ArrangeBlocks(T pivot, ref int leftFrom, int leftTo, ref int rightFrom, int rightTo)
    {
        while (leftFrom < leftTo && rightFrom < rightTo) {
            while (m_comparer.Compare(m_arr[leftFrom], pivot) <= 0 && ++leftFrom < leftTo) {
            }

            while (m_comparer.Compare(m_arr[rightFrom], pivot) >= 0 && ++rightFrom < rightTo) {
            }

            if (leftFrom == leftTo || rightFrom == rightTo) {
                break;
            }

            SwapElements(leftFrom++, rightFrom++);
        }

        if (leftFrom == leftTo && rightFrom == rightTo) {
            return InPlace.Both;
        }

        if (leftFrom == leftTo) {
            return InPlace.Left;
        }

        return InPlace.Right;
    }

...

Then we can select next left block try to do the same. Repeat it until blocks meet piece by piece making left and right parts of the array as partitioning wants it to be.  Sequential block based algorithm looks like this:

  • select block size and pick block from left and right ends of the array
  • rearrange elements of the two blocks
  • pick next block from the same end if all elements of the block are in place (as the partitioning wants them to be)
  • repeat until all blocks are processed
  • do sequential partitioning of the remaining block (since from a pair of blocks at most one block may remain not rearranged) if one exists

Interesting bit is that pairs of blocks can be independently rearranged. Workers can pick blocks concurrently from corresponding ends of array and in parallel rearrange elements.

A block once taken by a worker should not be accessible by other workers. When no more blocks left worker must stop. Basically we have two counters (number of blocks taken from left and right ends of the array). In order to take a block we must atomically increment corresponding counter and check that sum of the two counters is less than equal to total number of blocks otherwise all blocks are exhausted and worker must stop. Doing under a lock is simple and acceptable for large arrays and blocks but inefficient for small arrays and blocks.

We will pack two counters into a single 32 bit value where lower 16 bits are for right blocks counter and higher 16 bits are for left blocks. To increment right and left blocks counters 1 and 1<<16 must be added to combined value respectively. Atomically updated combined value allows to extract individual counters and make decision on whether block was successfully taken or not.

Since each worker may attempt to race for the last not taken block care should be taken of overflow. So only 15 bits are used for each counter and so it will require 1<<15 workers to cause overflow that is not realistic.

...

    // Class that maintains taken blocks in a thread-safe way.
    private class BlockCounter
    {
        private const int c_minBlockSize = 1024;

        private readonly int m_blockCount;
        private readonly int m_blockSize;
        private int m_counter;
        private const int c_leftBlock = 1 << 16;
        private const int c_rightBlock = 1;
        private const int c_lowWordMask = 0x0000FFFF;

        public BlockCounter(int size)
        {
            // Compute block size given that we have only 15 bits
            // to hold block count.
            m_blockSize = Math.Max(size/Int16.MaxValue, c_minBlockSize);
            m_blockCount = size/m_blockSize;
        }

        // Gets selected block size based on total number of 
        // elements and minimum block size.
        public int BlockSize
        {
            get { return m_blockSize; }
        }

        // Gets total number of blocks that is equal to the 
        // total number devided evenly by the block size.
        public int BlockCount
        {
            get { return m_blockCount; }
        }

        // Takes a block from left end and returns a value which 
        // indicates whether taken block is valid since due to 
        // races a block that is beyond allowed range can be 
        // taken.
        public bool TakeLeftBlock(out int left)
        {
            int ignore;
            return TakeBlock(c_leftBlock, out left, out ignore);
        }

        // Takes a block from ringt end and returns its validity. 
        public bool TakeRightBlock(out int right)
        {
            int ignore;
            return TakeBlock(c_rightBlock, out ignore, out right);
        }

        // Atomically takes a block either from left or right end 
        // by incrementing higher or lower word of a single 
        // double word and checks that the sum of taken blocks 
        // so far is still within allowed limit.
        private bool TakeBlock(int block, out int left, out int right)
        {
            var counter = unchecked((uint) Interlocked.Add(ref m_counter, block));
            // Extract number of taken blocks from left and right 
            // ends.
            left = (int) (counter >> 16);
            right = (int) (counter & c_lowWordMask);
            // Check that the sum of taken blocks is within 
            // allowed range and decrement them to represent 
            // most recently taken blocks indices.
            return left-- + right-- <= m_blockCount;
        }
    }

...

With multiple workers rearranging pairs of blocks we may end up with “wholes”.

(l l e) (l g l) (l e e) (l l l) (g g e) (e l l) x x x (g g e) (l g e) (e e g)

   l0     l1        l2      l3       l4       l5              r0         r1      r2

In the example above blocks l1, l4 and r1 are the wholes in left and right partitions of the array meaning they were not completely rearranged. We must compact left and right partitions such that they contain no wholes.

(l l e) (e l l) (l e e) (l l l) (g g e) (l g l) x x x (l g e) (g g e) (e e g)

  l0      l5       l2        l3      l4        l1              r1       r0        r2

Now we can do sequential partitioning of range between the end of the left most rearranged block (l3) and beginning of the right most rearranged block (r0).

...

    // A threshold of range size below which parallel partition
    // will switch to sequential implementation otherwise 
    // parallelization will not be justified.
    private const int c_sequentialThreshold = 8192;

    // Moves elements within range around pivot in parallel and 
    // returns position of the first element equal to the pivot.
    public int ParallelPartition(T pivot, int from, int to)
    {
        var size = to - from;
        // If range is too narrow resort to sequential 
        // partitioning.
        if (size < c_sequentialThreshold) {
            return SequentialPartition(pivot, from, to);
        }

        var counter = new BlockCounter(size);
        var blockCount = counter.BlockCount;
        var blockSize = counter.BlockSize;
        // Workers will process pairs of blocks and so number 
        // of workers should be less than half the number of 
        // blocks.
        var workerCount = Math.Min(Environment.ProcessorCount, blockCount / 2);
        // After the worker is done it must report blocks that 
        // were not rearranged
        var leftRemaining = AllocateRemainingArray(workerCount);
        var rightRemaining = AllocateRemainingArray(workerCount);
        // and left most and right most rearranged blocks.
        var leftMostBlocks = AllocateMostArray(workerCount);
        var rightMostBlocks = AllocateMostArray(workerCount);

        Action<int> worker = index =>
        {
            int localLeftMost = -1, localRightMost = -1;

            var leftBlock = localLeftMost;
            var rightBlock = localRightMost;
            int leftFrom = 0, leftTo = 0;
            int rightFrom = 0, rightTo = 0;
            var result = InPlace.Both;
            // Until all blocks are exhausted try to rearrange 
            while (true) {
                // Depending on the previous step one or two 
                // blocks must taken.
                if (result == InPlace.Left || 
                    result == InPlace.Both) {
                    // Left or both blocks wre successfully 
                    // rearranged so we need to update left most 
                    // block.
                    localLeftMost = leftBlock;
                    // and try to take block from the left end.
                    if (!counter.TakeLeftBlock(out leftBlock)) {
                        break;
                    }

                    leftFrom = from + leftBlock*blockSize;
                    leftTo = leftFrom + blockSize;
                }

                if (result == InPlace.Right || 
                    result == InPlace.Both) {
                    // Right or both blocks were successfully 
                    // rearranged update right most and take new 
                    // right block.
                    localRightMost = rightBlock;
                    if (!counter.TakeRightBlock(out rightBlock)){
                        break;
                    }

                    rightTo = to - rightBlock*blockSize;
                    rightFrom = rightTo - blockSize;
                }
                // Try to rearrange elements of the two blocks 
                // such that elements of the right block are 
                // greater or equal to pivot and left block 
                // contains elements less than or equal to pivot.
                result = ArrangeBlocks(pivot, ref leftFrom, leftTo, ref rightFrom, rightTo);
                // At least one of the blocks is correctly 
                // rearranged and if we are lucky - two of them.
            }
            // If the end of right block was not rearranged mark 
            // it as remaining to be arranged.
            if (rightFrom != rightTo) {
                rightRemaining[index] = rightBlock;
            }
            // Same for the left block.
            if (leftFrom != leftTo) {
                leftRemaining[index] = leftBlock;
            }
            // Update worker local left most and right most 
            // arranged blocks.
            leftMostBlocks[index] = localLeftMost;
            rightMostBlocks[index] = localRightMost;
        };

        Parallel.For(0, workerCount, worker);
        // Compact arranged blocks from both ends so that all non
        // arranged blocks lie consecutively between arranged 
        // left and right blocks.
        var leftMostBlock = ArrangeRemainingBlocks(from, blockSize, leftRemaining, leftMostBlocks.Max(), 1);
        var rightMostBlock = ArrangeRemainingBlocks(to - blockSize, blockSize, rightRemaining, rightMostBlocks.Max(), -1);
        // Do sequential partitioning of the inner most area.
        return SequentialPartition(pivot, from + (leftMostBlock + 1) * blockSize, to - (rightMostBlock + 1)*blockSize);
    }

    // Moves rearranged blocks to cover holes such that all 
    // rearranged blocks are consecutive. Basically it does 
    // compaction and returns most rearranged block.
    private int ArrangeRemainingBlocks(int bound, int blockSize, int[] remaining, int mostBlock, int sign)
    {
        Array.Sort(remaining);
        var j = Array.FindLastIndex(remaining, b => b < mostBlock);
        for (var i = 0; i < remaining.Length && remaining[i] <= mostBlock;) {
            if (remaining[j] == mostBlock) {
                j--;
            }
            else {
                SwapBlocks(bound + sign * remaining[i] * blockSize, bound + sign * mostBlock * blockSize, blockSize);
                i++;
            }
            mostBlock--;
        }
        return mostBlock;
    }

    private static int[] AllocateRemainingArray(int workerCount)
    {
        return Enumerable.Repeat(Int32.MaxValue, workerCount).ToArray();
    }

    private static int[] AllocateMostArray(int workerCount)
    {
        return Enumerable.Repeat(-1, workerCount).ToArray();
    }

    // Swaps two blocks
    private void SwapBlocks(int from, int to, int blockSize)
    {
        for (var i = 0; i < blockSize; i++) {
            SwapElements(from + i, to + i);
        }
    }
}

Now we have parallel implementation of the quick sort partition phase. Experiments with random generated arrays of integer values show that it helps to speed up parallel quick sort by approximately 50% on a 8 way machine.