Wednesday, August 10, 2011

Load-balancing partitioner with work stealing, part two

In part one work stealing range was introduced that allows stealing of work items in contiguous chunks up to half of available work space. Now is the time for the partitioner itself.

If you recall partitioning can be done either statically up front or dynamically on demand. As we are looking at the case of known work space size handing out chunk of work items on demand dynamically is the way to deal with uneven work distribution. Since work stealing is a load balancing mechanism itself static partitioning for initial workload distribution and as computation goes work stealing is used to balance it.

Now as the obvious stuff is out of the way there two important things to consider: how work stealing is done and what is the termination condition of the whole computation.

Initial static partitioning feeds workers with contiguous chunks of work items to process. Each worker wraps obtained chunk into work stealing range so that other workers if needed can steal from it and continues to take one item at a time until work stealing range is drained potentially with help from other workers. At this point worker accumulated number of processed items as it is required to do the proper termination detection. Worker tries to steal from others while returning back processed items (or basically marking them as processed) through partition list that serves as a coordinator of work stealing and termination detection. Stealing is attempted until succeeded or termination is detected. Upon success worker wraps stolen range and continues its processing as from the beginning.

Static partitioning assumes work space size knowledge. As work space doesn't grow over time (deliberate decision and if grow is needed it can be done in phases where work items from current phase result in work items for the next phase) it is the basis for termination detection. Initially work space size is set and every worker that encounters drained work stealing range returns back processed items count by decreasing remaining count. Once count reaches zero the processing must be terminated.

Here is work stealing range partitioner in the flesh.

class WorkStealingIndexRangePartitioner : Partitioner<int>
{
    private readonly int m_fromInclusive;
    private readonly int m_toExclusive;

    public WorkStealingIndexRangePartitioner(int fromInclusive, int toExclusive)
    {
        if (fromInclusive >= toExclusive)
            throw new ArgumentException();

        m_fromInclusive = fromInclusive;
        m_toExclusive = toExclusive;
    }

    public override IList<IEnumerator<int>> GetPartitions(int partitionCount)
    {
        if (partitionCount <= 0)
            throw new ArgumentException();
        // Calculate range and partition size
        var rangeSize = m_toExclusive - m_fromInclusive;
        var partitionSize = Math.Max(1, rangeSize / partitionCount);

        var partitionList = new PartitionList(rangeSize);
        var partitions = new Partition[partitionCount];
        // Create partitiions by statically diving work items 
        // into even sized chunks which is ok even in case 
        // non uniform workload distribution as it will be 
        // balanced out through work stealing
        var from = m_fromInclusive;
        for (var i = 0; i < partitionCount; i++)
        {
            var to = Math.Min(from + partitionSize, m_toExclusive);
            partitions[i] = new Partition(partitionList, from, to);
            from = to;
        }
        // Wire them through a coordinator
        partitionList.SetPartitions(partitions);

        return partitions;
    }

    // Partitioning coordinator
    class PartitionList
    {
        // Holds list of available partitions
        private List<Partition> m_partitions;
        // Holds number of remaining items to process
        private int m_remainingCount;

        public PartitionList(int count)
        {
            m_remainingCount = count;
        }

        public void SetPartitions(IEnumerable<Partition> partitions)
        {
            m_partitions = new List<Partition>(partitions);
        }

        // Return number of items as processed and tries to steal 
        // new work items from other partitions
        public bool TryReturnAndSteal(Partition to, int count, out Tuple<int, int> range)
        {
            // Move toward termination condition 
            Interlocked.Add(ref m_remainingCount, -count);
            // Until either termination condition is 
            // reached or successful steal attempt
            range = null;
            while (true)
            {
                // Enumerate through available partitions and try 
                // to steal from them
                foreach (var from in m_partitions)
                {
                    // Check if nothing to steal
                    if (m_remainingCount <= 0)
                        return false;
                    // Skip requesting partition as it is empty
                    if (from == to)
                        continue;

                    range = from.TrySteal();
                    // Keep trying to steal from others if 
                    // unsuccessful
                    if (range != null)
                        return true;
                }
            }
        }
    }

    // Work stealing partition 
    class Partition : IEnumerator<int>
    {
        // Holds range items currently owned
        private WorkStealingRange m_workStealingRange;
        // Holds number of processed items
        private int m_localCount;
        // Holds reference to partitioning coordinator that 
        // controls in addition termination condition
        private readonly PartitionList m_list;
        // Holds current partition element or null if move next
        // was called or returned false
        private int? m_current;

        public Partition(PartitionList list, int fromInclusive, int toExclusive)
        {
            m_list = list;
            m_workStealingRange = new WorkStealingRange(fromInclusive, toExclusive);
        }

        public int Current
        {
            get
            {
                if (m_current != null)
                    return (int)m_current;

                throw new InvalidOperationException();
            }
        }

        object IEnumerator.Current
        {
            get { return Current; }
        }

        // Tries to steal from local steal range
        public Tuple<int, int> TrySteal()
        {
            return m_workStealingRange.TryStealRange();
        }

        public bool MoveNext()
        {
            // First try to take item local from available range
            var local = m_workStealingRange.TryTakeOne();
            if (local != null)
            {
                // Mark item as processed 
                m_localCount++;
                // and set up current item
                m_current = local;
                return true;
            }
            // Otherwise try to steal from others
            Tuple<int, int> range;
            if (m_list.TryReturnAndSteal(this, m_localCount, out range))
            {
                // Stolen something
                var from = range.Item1;
                var to = range.Item2;
                // Keep very first element to yourself
                m_localCount = 1;
                m_current = from++;
                // If anything left expose it to allow others
                // to steal
                if (to - from > 0)
                    m_workStealingRange = new WorkStealingRange(from, to);

                return true;
            }
            // Termination condition reached so nothing to steal 
            // from others
            return false;
        }

        public void Dispose()
        {
        }

        public void Reset()
        {
            throw new NotSupportedException();
        }
    }
}

Rough benchmarking shows that on random workload input it performs as well as standard partitioners for indexed collections while providing good performance for worst case scenario in workload distribution.

Monday, August 8, 2011

Load-balancing partitioner with work stealing, part one

Data parallelism is a computing parallelization technique where data can be separated into independent pieces and distributed across parallel computing nodes. This technique is the core of the Parallel LINQ that partitions data into segments and executes query on each segment in parallel. Depending on scenarios and expected workload distribution different partitioning schemes (I highly recommend to read linked post before reading further) can be employed that can be essentially divided into two types:

  • Static where partitioning is done up front which is quite simple but may perform poorly when workload distribution is not even
  • Dynamic where partition is done on demand by providing chunks of work to idle workers which better deals with uneven workload distribution

Both types has one thing in common which is once chunk of work is taken by worker it is never given back until processed meaning worker has exclusive responsibility for it. In case of uneven workload distribution it may lead to poor performance which is the performance of the slowest worker. For example, when most of the heavy work is handed out to a single worker even though all other workers are finished in parallel the overall computation is not finished until unlucky worker will finish executing heavy work sequentially.

In order to deal with uneven workload distribution work stealing can be used. Joe Duffy explores in great details of how to build custom thread pool that uses work stealing to balance workload among workers. The approach allows to steal one work item which is in most cases sufficient as work item usually produces other work items and so chances are that idle worker once processed stolen work item will have more work to do (otherwise it can go steal other work item if any).

In data parallelism scenarios stealing a chunk of work items in one shot may be more beneficial (to avoid high synchronization costs). Work stealing benefits from initial work distribution compared to having all work handed over to a single worker and let the others to steal from it. Thus static partitioning with work stealing of chunks of work items is what we are looking for. Static partitioning assumes work space size knowledge which in most cases there.

Parallel LINQ uses partitioner concept to abstract partitioning mechanism. Before developing custom partitioner (this will be part two of the series) work stealing part must be in place:

  • Work space is known in advance, it is not growing over time and provides indexed access
  • Thieves must be able to steal work items in contiguous chunks ideally half of the available items

As work space is known in advance it is represented through a range of integer values. So basically indexes will be the subject rather than elements themselves as it quite easy to map to the actual elements. Range will be accessed from lower bound by the owner of the range and every time will try to take one index. Higher bound of the range is represented basically by other range called steal range. It defines bounds of indexes that are eligible for stealing. Thieves will contend for the steal range with each other and with owner in case the very last item is in the steal range. Essentially work space can be looked at as [low .. [mid .. high)) where [mid .. high) is a steal range and [low .. high) is overall work space.

Stealing is the fate of idle workers and thus they take the burden letting owner be ignorant of their presence until they are too close:

  • Load available steal range and lower bound
  • If steal range falls behind meaning no more items left return value that indicates unsuccessful steal attempt
  • Otherwise construct new steal range and attempt to atomically compare and swap it
    • If succeeded observed range was stolen
    • otherwise either other thief succeeded; or the owner if the range contained the very last item; or owner updated steal range as between the moment thief observed steal range and now owner consumed a lot of items and is close to steal range

Owner must be able to take one item at a time without heavy synchronization as follows:

  • Reserve item at the lower bound by advancing it by one and once the item is reserved it cannot be reached unless it is in the steal range
  • Load available steal range; the order is really important otherwise due to reordering the same item can stolen and taken by owner
  • If reserved lower bound item is not in observed steal range
    • If steal range is too close try to update to a smaller one and don't worry if unsuccessfully as next steal or local take will make right
    • Return reserved item as successfully taken
  • If reserved item in the steal range meaning this the very last one and so contend for it with thieves
    • Try to atomically compare and swap to a steal range that falls behind to indicate no more items left
    • If succeeded return reserved item as successfully taken
    • Otherwise lost the race so return value that indicates unsuccessful take attempt
  • Otherwise the last item was stolen before owner even contended for it

Now that algorithm is in place here is the implementation.

class WorkStealingRange
{
    // Holds range that is available for stealing
    private volatile Tuple<int, int> m_stealRange;
    // Holds index next to be taken locally
    private volatile int m_low;

    public WorkStealingRange(int low, int high)
    {
        m_low = low;
        m_stealRange = CreateStealRange(low, high);
    }

    // tries to steal range of items 
    public Tuple<int, int> TryStealRange()
    {
        // Contend for available steal range
        var oldRange = m_stealRange;
        var mid = oldRange.Item1;
        var low = m_low;
        // If steal range is behind lower bound it means no 
        // work items left 
        if (low > mid)
            // Return null to indicate failed steal attempt
            return null;
        // Calculate new steal range that will replace current
        // in case of success
        var newRange = CreateStealRange(low, mid);
        // Contend with other thieves and owner (in case steal 
        // range consists of the single last item)
        if (Interlocked.CompareExchange(ref m_stealRange, newRange, oldRange) != oldRange)
            // Lost the race so indicate failed steal attempt
            return null;
        // Won contention for the steal range
        return oldRange;
    }

    // Tries to take one item locally
    public int? TryTakeOne()
    {
        var low = m_low;
        // Reserve item using exchange to avoid legal 
        // reordering with steal range read below
        Interlocked.Exchange(ref m_low, low + 1);
        // Now that the lowest element is reserved it is either
        // not avaible to thieves or it is the last one and
        // is in steal range
        var oldRange = m_stealRange;
        var mid = oldRange.Item1;
        // If observed non empty steal range that doesn't 
        // contain reserved item it safe to return it as 
        // nobody can reach reserved item now
        if (low < mid)
        {
            var high = oldRange.Item2;
            // If ahead not enough space in particular at least 
            // two times of observed steal range attempt to 
            // adjust steal range to prevent stealing more than 
            // half of items
            if (mid - low <= 2 * (high - mid))
            {
                // Try to make steal range 1/4 of available work
                // space
                var newRange = CreateStealRange(low, high);
                // Don't worry if failed as next steal or local 
                // take will fix it
                Interlocked.CompareExchange(ref m_stealRange, newRange, oldRange);
            }
            // Return reserved item as it is not reachable 
            // by thieves
            return low;
        }
        // If observed steal range contains reserved item contend
        // for it with thieves
        if (low == mid)
        {
            // Create new range that falls behind to indicate 
            // termination
            var newRange = CreateStealRange(low, low);
            // Otherwise steal range contains only reserved item 
            // and must contend with the thieves for it
            if (Interlocked.CompareExchange(ref m_stealRange, newRange, oldRange) != oldRange)
                // Lost the race, return null to indicate no 
                // more items available
                return null;
            // Won contention for the last item
            return low;
        }
        // No luck last item was stolen
        return null;
    }

    private static Tuple<int, int> CreateStealRange(int low, int high)
    {
        // If range is not empty create new one that is 
        // 1/4 of the available space
        if (low != high)
            return new Tuple<int, int>((low + 3 * high) / 4, high);
        // Otherwise create empty range that falls behind
        return new Tuple<int, int>(low - 1, low - 1);
    }
}

Next time custom partitioner that uses work stealing range on the surgical table.

Tuesday, July 19, 2011

Infernal dinner synchronization problem

Imagine group of hungry people with spoons sitting around pot of stew. Spoon’s handle long enough to reach the pot but it is longer than the arm and no one can feed himself. People are desperate. This is a picture described in recently found piece of lost chapter of Dante’s Inferno. In order to help them Dante suggested to feed one another.

Only one person can feed another at the same time. While feeding someone else a person cannot eat. People must not starve meaning once hungry a person will be fed. It is assumed that spoon’s handle allows to feed any other person expect for yourself.

We’ll develop algorithm to let unfortunate ones to synchronize with each other and not to starve. It may seems similar to dinning philosophers problem but the latter has a limited choice of selecting the order of taking forks and the degree of contention is low. However in infernal dinner problem choice space and degree of contention is comparable with the problem size which is the number of people (a person may choose to try to feed any other person while potentially contending with all other but the person to be fed).

Here are few important observations:

  • If everyone want to eat or to feed others at the same time they are doomed to deadlock. So at any given point in time at least one person must be willing to eat and at least one person to feed.
  • If a person fed someone next time he must eat and if a person ate next time he must feed thus they won’t get to all want to do the same situation that is a deadlock.
  • In order to prevent starvation some sort of fairness must be guaranteed. One person must not be able to get fed infinitely many times while there are other people waiting.
  • People must somehow agree in pairs (hungry one and not) to feed each other and while doing so others must not be able to pair with them.

The first two are quite straightforward. Any person will either be hungry or not and every time a person eats the state changes. At the beginning at least one person is hungry and at least one is not.

The last two are more tricky. As you remember there two types of people those that are hungry and those who do not. Let’s assume there are hungry people that line up and wait to be fed. Then people that are willing to feed come and take one by one from the head of the line hungry people and feed them. If no more hungry people left they also line up and wait for hungry people. They basically switched. This a an idea of how hungry and non-hungry people can pair to feed each other. While a pair of people is outside of the queue nobody else can interfere them.

The queue is represented through linked linked list of the following form h->n0->n1->…->nk where h is a sentinel head node. Head and tail are never equal to null as in case of empty queue they both point to sentinel node. Nodes are added to the tail and removed from the head. In order to remove node from the beginning of the queue head node must be advanced to its successor that must not be non null otherwise the queue is considered empty. Adding is trickier. It is based on the fact that once next of a node is set it is never changed. It is done in two steps. First tail’s next is set to the node to be added and up on success (at this point the node is visible to other threads) tail is advanced to newly added node. Because this process is not atomic other threads may observe the change half way through. In that case a thread may help to finish adding the node by advancing the tail and retry its own operation.

Now to the line up part. Essentially there are two cases:

  • When the queue is empty or there are already nodes of the same type
    • new node must be added to the end of the queue and waited upon until paired with someone else
  • Otherwise a node at the beginning must be removed and waiting thread notified of formed pair

Based on this rules waiting queue will either be empty or contain nodes of the same type which is equivalent to a line of either hungry or non-hungry people.

Here goes the implementation.

class SyncQueue<T>
{
    private volatile Node m_head;
    private volatile Node m_tail;

    public SyncQueue()
    {
        // Head is a sentinel node and will never be null
        m_head = new Node(default(T), false);
        m_tail = m_head;
    }

    public T Exchange(T value, bool mark, CancellationToken cancellationToken)
    {
        var node = new Node(value, mark);
        // Do until exchanged values with thread of 
        // a different type
        while (true)
        {
            cancellationToken.ThrowIfCancellationRequested();

            var head = m_head;
            var tail = m_tail;

            // If the waiting queue is empty or already contains
            // same type of items
            if (head == tail || tail.m_mark == mark)
            {
                // Attempt to add current item to the end of the 
                // waiting queue
                var nextToTail = tail.m_next;
                // To avoid costly interlocked operations check 
                // if assumtion about the tail is still correct
                if (tail != m_tail)
                    continue;
                // If next to what observed to be the tail is 
                // not null then the tail fell behind
                if (nextToTail != null)
                {
                    // Help to advance tail to the last node 
                    // and do not worry if it will fail as 
                    // someone else succeed in making tail up 
                    // to date
                    Interlocked.CompareExchange(ref m_tail, nextToTail, tail);
                    // And retry again
                    continue;
                }
                // Try to append current node to the end of the 
                // waiting queue by setting next of the tail
                // This is a linearization point of waiting case
                // (adding node to the end of the queue)
                if (Interlocked.CompareExchange(ref tail.m_next, node, null) != null) 
                    // Retry again if lost the race
                    continue;
                // Advance the tail with no check for success as 
                // in case of failure other thread is helping to
                // advance the tail
                Interlocked.CompareExchange(ref m_tail, node, tail);
                // Wait until exchange is complete
                var spin = new SpinWait();
                while (node.m_mark == mark)
                {
                    spin.SpinOnce();
                    cancellationToken.ThrowIfCancellationRequested();
                }
                // Correct value will be observed as reading mark 
                // implies load acquire semantics
                return node.m_value;
            }
            // Non empty waiting queue with items of a different 
            // type was observed thus attempt to exchange with 
            // thread waiting at the head of the queue through 
            // dequeueing
            var nextToHead = head.m_next;
            // Check if observed head is still consistent and it
            // has successor
            if (head != m_head || nextToHead == null)
                continue;
            // Observed non-empty queue can either grow which is
            // fine as we are interested here in the head node 
            // otherwise attempt below will fail and will retry 
            // again.
            // Attempt to advance head that is a sentinel node 
            // to its successor that holds sought value and is 
            // supposed to be new sentinel.
            // This is a linearization point of releasing case 
            // (removing node from the beginning of the queue)
            if (Interlocked.CompareExchange(ref m_head, nextToHead, head) != head) 
                // Retry if lost the race
                continue;
            // At this point head's successor is dequeued and no
            // longer reachable so values can be safely exchanged
            var local = nextToHead.m_value;
            nextToHead.m_value = value;
            // Switch mark to let waiting thread know that 
            // exchange is complete and making it the last store
            // with release semantics makes sure waiting thread 
            // will observe correct value
            nextToHead.m_mark = mark;

            return local;
        }
    }

    class Node
    {
        internal volatile Node m_next;
        internal volatile bool m_mark;
        internal T m_value;

        internal Node(T value, bool mark)
        {
            m_value = value;
            m_mark = mark;
        }
    }
}

class Human
{
    private volatile bool m_hungry;

    public Human(bool hungry)
    {
        m_hungry = hungry;
    }

    public void WaitAndEat(SyncQueue<Human> waitingQueue, CancellationToken cancellationToken)
    {
        var spin = new SpinWait();
        while (true)
        {
            spin.Reset();
            // The hell seems to have frozen =)
            cancellationToken.ThrowIfCancellationRequested();

            // Pair with someone either to feed hungry man 
            // or eat yourself if hungry
            var pairedWith = waitingQueue.Exchange(this, m_hungry, cancellationToken);
            if (!m_hungry)
                // Feed hungry man
                pairedWith.Feed();
            else
                // Wait to be fed
                while (m_hungry)
                {
                    spin.SpinOnce();
                    cancellationToken.ThrowIfCancellationRequested();
                }
        }
    }

    private void Feed()
    {
        // Switch to non hungry as just ate
        m_hungry = !m_hungry;
    }
}

The infernal dinner is served =)

Thursday, June 30, 2011

Shared rooms synchronization problem

Recently I came across another interesting synchronization problem. Assume there are n rooms. Threads can enter and leave rooms. A room can hold arbitrary number of threads. If a room holds at least one thread it is considered occupied. Only one room can be occupied at a time. With each room exit action is associated that must be executed when the last thread left it. No threads are allowed to enter any room before exit action is executed to the end. Threads that are waiting to enter a room must eventually enter it. It is assumed that threads also at some point leave the room.

There are several cases for a thread attempting to enter a room:

  • If no room is occupied thread can enter required room
  • If occupied room is the same as required room it is not empty (otherwise it may be the case when the action is being executed at the moment and it is not allowed to enter) and there no other threads waiting for other rooms (otherwise others may starve as continuous stream of coming threads to currently occupied room can prevent it to be set free) thread can enter it
  • Otherwise thread must wait

Leaving room requires careful thought as well:

  • If the thread is not the last one it is free to go
  • Otherwise it must leave the room and execute exit action while keeping the room occupied to prevent others from entering any room
    • Once exit action is executed if no other thread is waiting the last one is free to go
    • Otherwise it must wakeup waiting ones

Waiting and waking part can be done using Monitor.Wait and Monitor.PulseAll. Doing so using single sync object is simple but quite inefficient as every pulse all will wakeup all waiting threads (potentially waiting for different rooms) to see that they are not allowed to enter yet as only single room can be occupied at a time. Instead each room will have its own local sync object to wait and pulse on. This will allow to wake up only threads that are now allowed to enter the room they've been waiting on.

But this is where difficulties come. The decision to be made by entering thread spans across rooms. So the decision is still needs to made using global lock. Now the tricky part is that once the decision is made to wait how not to miss wakeup. Attempting to do it like

lock (global)
{
    // Make decision to wait or return
    bool wait = ...;
    if (!wait)
        return;
}
// Somewhere here wakeup may be missed
lock (local)
{
    // Wait on local based on the decision made above
    Monitor.Wait(local);
}

is doomed to suffer from missed wakeups as in between global lock is released and local is acquired the last leaving room thread may try to wakeup waiters. So instead local lock will be acquired while still holding global lock and releasing it only after that.

var locked = false;
try
{
    Monitor.Enter(global, ref locked);
    // Make decision to wait or return
    bool wait = ...;
    if (!wait)
        return;
    
    // Acquifre local hiwle holding global
    lock (local)
    {
        // Release global
        Monitor.Exit(global);
        locked = false;
        // Wait on local based on the decision made above
        Monitor.Wait(local);
    }
}
finally
{
    if (locked)
        Monitor.Exit(global);
}

Threads indicate that are willing to enter the room by maintaining wait count for each room. It also used to allow threads enter the room in bulk. When the last leaving thread picks up a room to get occupied next, adjusts the count and wakes up waiting threads by pulsing room local sync object.

In order to make sure that waiting threads enter a room eventually (guaranteeing starvation freedom) the following mechanism is used:

  • If some room is occupied a thread can enter that room only if no other threads are waiting to enter.
  • Last leaving thread runs through other rooms in circles starting from the room right next to currently occupied one to see if any thread is waiting and if such room is found it lets waiting threads to enter in bulk.

Here goes the thing.

public class SharedRoomsLock
{
    // Holds room local locks
    private readonly object[] m_locks;
    // Holds number of threads waiting on to 
    // enter indexed by room numbers
    private readonly int[] m_waiting;
    // Holds actions to be excuted for each room 
    // upon exit
    private readonly Action[] m_actions;
    // Holds number of threads currently in the 
    // occupied room
    private int m_count;
    // Holds index of the room currently occupied
    private int m_occupied = -1;

    public SharedRoomsLock(IEnumerable<Action> actions)
    {
        m_actions = actions.ToArray();

        var count = m_actions.Length;

        m_waiting = new int[count];
        m_locks = Enumerable.Range(0, count)
            .Select(_ => new object()).ToArray();
    }

    // Lock ownership is omitted for clarity however
    // can be added using ThreadLocal<T>
    public void Enter(int i)
    {
        var locked = false;
        try
        {
            Monitor.Enter(m_locks, ref locked);

            if (
                // If no room is occupied or
                m_occupied < 0 ||
                (
                // Occupied room that thread is trying 
                // to enter
                    m_occupied == i &&
                // and there is still someone in there
                    m_count > 0 &&
                // and no one waiting to enter other rooms
                    !m_waiting.Any(w => w > 0)))
            {
                m_occupied = i;
                m_count++;
                return;
            }
            // Otherwise indicate desire to enter the room
            m_waiting[i]++;
            // Acquire room local lock before releasing main
            // to avoid missed or incorrect wakeups
            lock (m_locks[i])
            {
                // Release main lock to allow others to 
                // proceed including waking thread
                Monitor.Exit(m_locks);
                locked = false;
                // Wait to be woken up by some last to leave
                // room thread, not necessarily immediately
                Monitor.Wait(m_locks[i]);
                // Once woken up thread can safely enter 
                // the room as count already adjusted 
            }
        }
        finally
        {
            if (locked)
                Monitor.Exit(m_locks);
        }
    }

    public void Exit(int i)
    {
        lock (m_locks)
        {
            // Indicate that thread left the room 
            if (--m_count > 0)
                // And leave it if not last
                return;
        }
        // If last execute exit action however not under 
        // the lock as it quite dangerous due to reentracy 
        m_actions[i]();
        // At this point room is still treated as 
        // occupied as only once exit action is executed 
        // it can be set free
        var locked = false;
        try
        {
            Monitor.Enter(m_locks, ref locked);
            // By default set room as not occupied
            m_occupied = -1;
            // Run through other rooms in circles to see if any 
            // thread is waiting thus ensuring some sort of 
            // fairness or at least starvation freedom
            for (var j = 1; j <= m_waiting.Length; j++)
            {
                var next = (i + j) % m_waiting.Length;
                var w = m_waiting[next];
                if (w > 0)
                {
                    // Found room with waiting threads so it 
                    // will be occupied next
                    m_occupied = next;
                    break;
                }
            }
            // If no one is waiting 
            if (m_occupied < 0)
                // There is nothing that can done
                return;
            // At this there are threads waiting to enter
            // the room so they are allowed to enter in one
            // shot 
            m_count = m_waiting[m_occupied];
            // Closing the doors right after them
            m_waiting[m_occupied] = 0;
            // Acquire room local lock before releasing main
            // to avoid missed or incorrect wakeups
            lock (m_locks[m_occupied])
            {
                // Releasing main lock is safe because the 
                // decision made under main lock will still be 
                // true as no other thread except for those 
                // already wating will be able to wait on room
                // local lock that is currently held
                Monitor.Exit(m_locks);
                locked = false;
                // Wake up waiting to enter threads
                Monitor.PulseAll(m_locks[m_occupied]);
            }
        }
        finally
        {
            if (locked)
                Monitor.Exit(m_locks);
        }
    }
}

Monday, June 13, 2011

Sleeping barber synchronization problem

Sleeping barber problem is a classic synchronization problem proposed by Dijkstra that goes as follows:

A barbershop consists of a waiting room with n chairs, and the
barber room containing the barber chair. If there are no customers
to be served, the barber goes to sleep. If a customer enters the
barbershop and all chairs are occupied, then the customer leaves
the shop. If the barber is busy, but chairs are available, then the
customer sits in one of the free chairs. If the barber is asleep, the
customer wakes up the barber. Write a program to coordinate the
barber and the customers.

Back in the days when Dijkstra proposed this problem (it was in 1965) probably rhythm of life was hasteless and barbers had chance to sleep at work and customers could wait until he woke up. 

Most of the solutions use some sort of WaitHandle to do the trick. For example, classic solution is based on semaphores. Waiting on wait handles is not free.

But now let’s assume that analogy for this problem a barber that desperately needs customers so he is running around in circles while waiting impatiently when there are no customers. Customers are also quite busy so if the waiting queue is empty they want to be served immediately otherwise they go from corner to corner while waiting.

I call this problem “crazy barber”. From the problem statement it follows that we should avoid using any “sleeping” mechanisms to do the synchronization.

Haircut we represent through an Action that barber must execute and thus it cannot be null.

Number of chairs in the waiting room is known in advance and never changes. Waiting queue can not overflow because any customer that sees no free chairs turns around and leaves barbershop. So we can represent waiting queue as circular array of fixed size. Two indices are used to represent it. Head points to next request to be serviced if not null. Tail points to next free slot where request can be put.

Barber waits next in line (the one head index points to) non null request to get serviced. To mark slot as free for itself it nullifies it once obtained reference to request. Next head index is advanced to make this slot available for use by customers. Once it completed with execution it notifies waiting customer that it is free to go by changing done flag.

Customer first checks if there are free slots. Then it competes with other customers for a free slot (the one tail index points to). If successful it puts request that combines action and done flag into waiting queue array with the index value of just advanced tail. Once successfully queued request customer waits until value in the  done flag is not changed.

Here goes “crazy barber”.

class CrazyBarber
{
    private readonly int m_capacity;
    // Circular array that holds queued items
    private volatile Request[] m_queue;
    // Points to next free slot
    private volatile int m_tail;
    // Points to a slot where next item to be executed 
    // is expected
    private volatile int m_head;

    public CrazyBarber(int capacity)
    {
        m_capacity = capacity;
        m_queue = new Request[m_capacity];
    }

    // Queues action for execution if there is free slot and 
    // waits for its execution completion
    public bool TryGetExecuted(Action action)
    {
        if (action == null)
            throw new ArgumentException();

        var waitToEnq = new SpinWait();
        while (true)
        {
            // Load tail first as if it will change compare and 
            // swap below will fail anyway
            var tail = m_tail;
            // Now load head and this is the linearization point 
            // of full queue case which results in unsuccessful 
            // attempt
            var head = m_head;
            // Check if queue has some free slots
            if (tail - head >= m_capacity)
                // The queue is full, no luck
                return false;
            // Create request before interlocked operation as 
            // it implies full barrier and thus will prevent
            // partially initialized request to be visible to 
            // worker loop
            var request = new Request { m_action = action };
            // Compete for the tail slot
            if (Interlocked.CompareExchange(ref m_tail, tail + 1, tail) != tail)
            {
                // We lost due to contention, spin briefly and 
                // retry
                waitToEnq.SpinOnce();
                continue;
            }
            var index = tail % m_capacity;
            // Here is the linearization point of successfull 
            // attempt
            m_queue[index] = request;
            
            var waitToExe = new SpinWait();
            // Wait until enqueued action is not executed
            while (!request.m_done)
                waitToExe.SpinOnce();

            return true;
        }
    }

    // Runs single worker loop that does the execution and
    // must not be called from multiple threads
    public void Run(CancellationToken cancellationToken)
    {
        var waitToDeq = new SpinWait();
        while (true)
        {
            var head = m_head;
            var index = head % m_capacity;
            waitToDeq.Reset();
            // Though array field is marked as volatile access 
            // to its elements are not treated as volatile 
            // however its enough to make sure loop condition 
            // is not optimized 
            // Wait until new item is available or cancellation 
            // is requested
            while (m_queue[index] == null)
            {
                if (cancellationToken.IsCancellationRequested)
                    return;
                waitToDeq.SpinOnce();
            }
            // Get request to be serviced and nullify it to 
            // mark slot as free for yourself
            var request = m_queue[index];
            m_queue[index] = null;
            // As there is only one worker advance head without
            // interlocked and here is the linearization point 
            // of making free slot
            m_head = head + 1;
            // Do not call TryGetExecuted from action or it will 
            // deadlock
            request.m_action();
            // Make sure ready notification is not made visible 
            // through reordering before action is completed
            // and store release guarantees that here
            request.m_done = true;
        }
    }

    class Request
    {
        internal Action m_action;
        internal volatile bool m_done;
    }
}

Although tail index at some point will overflow let’s assume “crazy” barber won’t try to make around 2 billion haircuts =).

Monday, May 2, 2011

Concurrent Object Pool

Object pool is a set of initialized objects that are kept ready to use, rather than allocated and destroyed on demand.

Wikipedia

Object pool usage may get performance improvement in case pooled object initialization cost and frequency of instantiation are high and at any period in time number of used objects is low (ThreadPool is a good example).

There are many questions to take into account when designing object pools. How to handle acquire request when there are no free objects? In single threaded scenarios you may choose to create new object or let the caller know that the request cannot be fulfilled. In multithreaded scenarios you may choose to wait for free objects until other threads release them. How to organize access to pooled objects? Based on your logic you may choose to use most recently used first strategy, or least recently or even random first. Besides that you need to provide synchronized access to internal object storage.   

Let’s assume that we made decision on handling empty pool case (object pool growth strategy) and access to pooled objects (most recently used first approach) and now focused on minimizing synchronization costs to internal object storage.

The simplest way of doing it will be to protect any access with a lock. That will work however under high contention it may result in lock convoys (in most cases time spent within the lock will be pretty short necessary only to take ready to use object). We may use lock-free data structure as storage such as ConcurrentStack<T>. Or we may try to reduce contention.

The idea is somewhat similar to the algorithm used in ThreadPool where each worker thread in addition to global work item queue maintains local work item double ended queue (where one end is exposed to the owner and the other to worker threads that may want to steal work items if global queue is empty). Each worker thread puts newly created work items into its local queue to avoid synchronization costs. When worker thread is ready to process next work item it first tries to dequeue from its local queue, if fails it tries to get work item from global queue and lastly resorts to stealing from other threads (check out cool series of articles from Joe Duffy on building custom thread pool). 

In order to build object pool we will maintain local per thread storage of pooled objects in addition to global storage. Pooled objects are stored in segments of a selected size (depends on usage scenarios so it must be specified during object pool creation). Global storage holds set of segments. In order to acquire pooled object thread must:

  1. Get local segment (if any)
  2. If no local segment present or it is empty try to get segment out of global storage (if any)
  3. If global storage is empty create new segment
  4. Update local segment
  5. Get item from local segment and return it

Returning object look like this:

  1. Put item into local segment
  2. If segment has grown beyond threshold split it into two parts and one of them put back to global storage.

Thus most of the time thread will work with its own local set of pooled objects. However it has its own cost. If for example a different thread comes and acquires pooled object and then never again use pooled objects we’ll get orphaned segment. So as usual it is a matter of tradeoff. If you have a limited set of threads that will work with object pool for a long time usage of this approach can be justified. If however you have either many threads that work with object pool for short periods of time and then never come back you’d probably better look into other approaches.

// Represents thread safe object pool
public class ConcurrentObjectPool<T>
{
    private readonly int m_segmentSize;
    // Thread local pool used without synchronization 
    // to reduce costs
    private ThreadLocal<Segment> m_localPool = new ThreadLocal<Segment>();
    // Global pool that is used once there is nothing or 
    // too much in local pool
    private readonly ConcurrentStack<Segment> m_globalPool = new ConcurrentStack<Segment>();
    // Factory function that is used from potentionally multiple 
    // threads to produce pooled objects and thus must be thread 
    // safe
    private readonly Func<T> m_factory;

    public ConcurrentObjectPool(Func<T> factory, int segmentSize)
    {
        m_factory = factory;
        m_segmentSize = segmentSize;
    }

    // Acquires object from pool
    public PoolObject<T> Acquire()
    {
        var local = m_localPool.Value;
            
        T item;
        // Try to acquire pooled object from local pool
        // first to avoid synchronization penalties
        if (local != null && local.TryPop(out item))
            return new PoolObject<T>(item, this);
        // If failed (either due to empty or not yet 
        // initialized local pool) try to acquire segment
        // that will be local pool from global pool
        if (!m_globalPool.TryPop(out local))
        {
            // If failed create new segment using object 
            // factory
            var items = Enumerable.Range(0, m_segmentSize)
                .Select(_ => m_factory());
            local = new Segment(m_segmentSize, items);
        }
        m_localPool.Value = local;
        // Eventually get object from local non-empty pool    
        local.TryPop(out item);
        return new PoolObject<T>(item, this);
    }

    // Releases pooled ojbect back to the pool however 
    // it is accessible publicly to avoid multiple releases
    // of the same object
    internal void Release(T poolObject)
    {
        var local = m_localPool.Value;
        // Return object back to local pool first 
        var divided = local.Push(poolObject);
        // If local pool has grown beyond threshold 
        // return extra segment back to global pool
        if (divided != null)
            m_globalPool.Push(divided);
    }

    // Represents chunk of pooled objects
    class Segment
    {
        private readonly int m_size;
        // Using stack to store pooled objects assuming 
        // that hot objects (recently used) provide better 
        // locality
        private readonly Stack<T> m_items;

        public Segment(int size, IEnumerable<T> items)
        {
            m_size = size;
            m_items = new Stack<T>(items);
        }

        public bool TryPop(out T item)
        {
            item = default(T);
            // Pop item if any available
            if (m_items.Count > 0)
            {
                item = m_items.Pop();
                return true;
            }
            return false;
        }

        public Segment Push(T item)
        {
            m_items.Push(item);
            // If current segment size is still smaller
            // than twice of original size no need to split
            if (m_items.Count < 2 * m_size)
                return null;
            // Otherwise split current segment to get it 
            // pushed into global pool
            var items = Enumerable.Range(0, m_size)
                .Select(_ => m_items.Pop());
            return new Segment(m_size, items);
        }
    }
}

// Represents disposable wrapper around pooled object 
// that is used to return object back to the pool
public class PoolObject<T> : IDisposable
{
    private readonly ConcurrentObjectPool<T> m_pool;
    private bool m_disposed;

    private readonly T m_value;

    // Get reference to the pool to return value to
    public PoolObject(T value, ConcurrentObjectPool<T> pool)
    {
        m_value = value;
        m_pool = pool;
    }

    public T Value
    {
        get
        {
            // Make sure value can't be obtained (though we can't 
            // guarantee that it is not used) anymore after it is
            // released back to the pool
            ThrowIfDisposed();
            return m_value;
        }
    }

    public void Dispose()
    {
        if (m_disposed)
            return;
        // As we are disposing pooled object disposal basically 
        // equivalent to returning the object back to the pool
        m_disposed = true;
        m_pool.Release(m_value);
    }

    private void ThrowIfDisposed()
    {
        if (m_disposed)
            throw new ObjectDisposedException("Pool object has been disposed");
    }
}

And it is used somewhat like this

// Initialized elsewhere
ConcurrentObjectPool<T> pool = ...
...
using (var obj = pool.Acquire())
{
    // Use pooled object value
    Process(obj.Value);
}

I omitted IDisposable implementation on the object pool to make code simpler. However in order to implement it we will need to track all segments in a separate collection as otherwise thread local segments won’t be accessible from the thread disposing the pool.  

Thursday, April 7, 2011

Concurrent set based on sorted singly linked list

Set is an abstract data structure that can store certain values, without any particular order, and no repeated values. Static sets that do not change with time, and allow only query operations while mutable sets allow also the insertion and/or deletion of elements from the set.

Wikipedia

Though set definition says it doesn’t imply particular of values it is referred to how consuming code treats set. One legitimate way (though not the most efficient one) to implement set is to use sorted singly linked list that will allow us to eliminate duplicate values. Other ways include but not limited to self-balancing binary search tree, skip lists or hash table.

As the title says we are about to explore algorithm for concurrent set. Taking this into account we can use ConcurrentDictionary<TKey, TValue> to implement concurrent set. Assuming it is done =) let’s take a look at other options. At this point .NET Framework has to support for concurrent self-balancing binary search tree or skip lists and building those is quite tricky. On the other hand concurrent sorted singly linked list is still feasible solution. This well known algorithm contains useful techniques.

For mutable sets at least the following operations must be supported: add, remove, contains. The simplest way to do this is to wrap any list modifications with lock leading to coarse-grained synchronization. However under high contention this single lock will become a bottleneck taking into account that all operations has O(n) time complexity and thus serializing them will lead to significant performance hit.

Consider list with the following contents: 1->3->4->6->7. Operations Add(2) and Add(5) even when run concurrently do not interfere as they modify distinct areas of the list 1->(2)->3->4->(5)->6->7. Add operation affects two consequent nodes where new node must be added in between. The same is true for remove operation (it affects two consequent nodes: the node to be removed and its predecessor).These nodes will be used as sync roots to make thread safe modifications of the list. In order to prevent deadlocks nodes are locked always in the same order (predecessor first and than its successor).

What if we need to add new node either at the beginning or at the end? In that case we do not have a pair nodes. To work around this the list will contain two sentinel nodes (head and tail sentinels that remain in the list forever and any value is more than value in the head and less than value in the tail). Basically any list looks like h->x0->x1->…->xn->t assuming h contains negative infinity and t contains positive infinity values.

Let’s assume we have a list: h->1->2->5->6->t. Two threads (A and B) execute operations Add(3) and Add(4) respectively concurrently. Both of them need to add new node between nodes that contain 2 and 5 (h->1->2->(x)->5->6->t). One of them will be first who will succeed in locking both nodes. After nodes are successfully locked it will proceed with adding new node (let’s assume thread A succeeded). Once thread A finished with list modifications the list will be h->1->[2]->4->[5]->6->t yet thread B is trying to lock nodes in brackets. Thread B eventually will succeed in locking but his expectations may not be true anymore as it happens in this case (node that holds value 2 no longer points to node that holds 5).

Because between the moment a thread starts his attempt to lock pair of nodes and the moment it eventually succeeds in doing so the list can be modified by other thread as it tries to lock two nodes which is not atomic. Thus after a thread succeeds in locking it must do validation that its expectations are still true.

However dealing with node removal is even more tricky. Assume we have a list h->1->2->5->6->t and thread A attempts to Add(3) concurrently with thread B trying to Remove(2) or Remove(5) and thread B succeeds to be first. In that case once thread A will lock nodes that contain 2 and 5 it may observe list that is now h->1->5->6->t or h->1->2->6->t meaning of the locked nodes is no longer in the list and adding new value in between will lead to lost value (if predecessor was removed) or resurrected value (if successor was removed and now new nodes points to it).

Thus once a thread succeeds in locking both nodes it must check that locked nodes are still not removed from the list and predecessor still points to its observed previously successor. If the validation fails the operation must fallback and start again.

Well if the node is removed from the list how do we know that? One way to make removed node’s next reference null. However this is a bad idea because in that case other thread traverses list without locks (and this is deliberate behavior) may observe unexpected null reference. For example, assume we have a list h->1->2->3->6->t. Thread A tries to Add(4) while thread B tries to Remove(2). Assume that thread A while searching for 3->6 pair of nodes (to add 4 in between) moved its current reference to the node that contains 2 and it was preempted. Then thread B succeeds in Remove(2). If thread B set removed node next reference to null once thread B will wake up it will miserably fail though there are still sought nodes in the list. Thus list node’s next reference must always be non-null.

So how do we remove nodes. We must preserve next pointers even if the node is removed. Thus the node is simply marked as removed and then its predecessor’s next is updated. Thus a node to remove is no longer reachable but still points other list nodes. Thus any traverses in-progress won’t break. From memory perspective we rely on garbage collector to reclaim memory occupied by non reachable nodes. 

Because of the mechanism chosen for nodes remove we can safely traverse the list with no locks. And this is true for all three operations. Contains operation just need to validate found nodes.

Now let’s code the thing.

public class ConcurrentSet<T>
{
    private readonly IComparer<T> m_comparer;
    // Sentinel nodes
    private readonly Node m_head;
    private readonly Node m_tail;

    public ConcurrentSet(IComparer<T> comparer)
    {
        m_comparer = comparer;
        // Sentinel nodes cannot be removed from the
        // list and logically contain negative and 
        // positive infinity values from T
        m_tail = new Node(default(T));
        m_head = new Node(default(T), m_tail);
    }

    // Adds item to the set if no such item 
    // exists
    public bool Add(T item)
    {
        // Continue attempting until succeeded 
        // or failed
        while (true)
        {
            Node pred, curr;
            // Find where new node must be added
            Find(item, out pred, out curr);
            // Locks nodes starting from predecessor
            // to synchronize concurrent access
            lock (pred)
            {
                lock (curr)
                {
                    // Check if found nodes still
                    // meet expectations
                    if (!Validate(pred, curr))
                        continue;
                    // If the value is already in the
                    // set we are done
                    if (Equal(curr, item))
                        return false;
                    // Otherwise add new node
                    var node = new Node(item, curr);
                    // At this point new node becomes
                    // reachable
                    pred.m_next = node;
                    return true;
                }
            }
        }
    }

    // Removes item from the list if such item
    // exists
    public bool Remove(T item)
    {
        // Continue attempting until succeeded 
        // or failed
        while (true)
        {
            Node pred, curr;
            // Find node that must be removed and 
            // its predecessor
            Find(item, out pred, out curr);
            // Locks nodes starting from predecessor
            // to synchronize concurrent access
            lock (pred)
            {
                lock (curr)
                {
                    // Check if found nodes still
                    // meet expectations
                    if (!Validate(pred, curr))
                        continue;
                    // If the value is not in the set 
                    // we are done
                    if (!Equal(curr, item))
                        return false;
                    // Otherwise mark node as removed 
                    curr.m_removed = true;
                    // And make it unreachable
                    pred.m_next = curr.m_next;
                    return true;
                }
            }
        }
    }

    // Checks if given item exists in the list
    public bool Contains(T item)
    {
        Node pred, curr;
        Find(item, out pred, out curr);

        return !curr.m_removed && Equal(curr, item);
    }

    // Searches for pair consequent nodes such that 
    // curr node contains a value equal or greater 
    // than given item
    void Find(T item, out Node pred, out Node curr)
    {
        // Traverse the list without locks as removed
        // nodes still point to other nodes
        pred = m_head;
        curr = m_head.m_next;
        while (Less(curr, item))
        {
            pred = curr;
            curr = curr.m_next;
        }
    }

    static bool Validate(Node pred, Node curr)
    {
        // Validate that pair of nodes previously 
        // found still meets the expectations 
        // which essentially is checking whether 
        // nodes still point to each other and no one
        // was removed from the list
        return !pred.m_removed &&
                !curr.m_removed &&
                pred.m_next == curr;
    }

    bool Less(Node node, T item)
    {
        return node != m_tail &&
                m_comparer.Compare(node.m_value, item) < 0;
    }

    bool Equal(Node node, T item)
    {
        return node != m_tail &&
                m_comparer.Compare(node.m_value, item) == 0;
    }

    class Node
    {
        internal readonly T m_value;
        internal volatile Node m_next;
        internal volatile bool m_removed;

        internal Node(T value, Node next = null)
        {
            m_value = value;
            m_next = next;
        }
    }
}

This algorithm uses fine grained synchronization that improves concurrency and with lazy nodes removal allows us to traverse list with no locks at all. This is quite important as usually Contains operation is used more frequent than Add and Remove.

Hope this pile of bits makes sense to you =).

Monday, March 28, 2011

Merge binary search trees in place

Binary search tree is a fundamental data structure that is used for searching and sorting. It also common problem to merge two binary search trees into one.

The simplest solution to do this is to take every element of one tree and insert it into the other tree. This may be really inefficient as it depends on how well target tree is balanced and it doesn’t take into account structure of the source tree.

A more efficient way of doing this is to use insertion into root. Assuming we have two trees A and B we insert root of tree A into tree B and using rotations move inserted root to become new root of tree B. Next we recursively merge left and right sub-trees of trees A and B. This algorithm takes into account both trees structure but insertion still depends on how balanced target tree is.

We can look at the problem from a different perspective. Binary search tree organizes its nodes in sorted order. Merging two trees means organizing nodes from both trees in sorted order. This sounds exactly like merge phase of merge sort. However trees cannot be directly consumed by this algorithm. So we need to convert them into sorted singly linked lists first using tree nodes. Then merge lists into a single sorted linked list. This list gives us sorted order for sought tree. This list must be converted back to tree. We got the plan, let’s go for it.

In order to convert binary search tree into sorted singly linked list we traverse tree in order converting sub-trees into lists and appending them to the resulting one.

// Converts tree to sorted singly linked list and appends it 
// to the head of the existing list and returns new head.
// Left pointers are used as next pointer to form singly
// linked list thus basically forming degenerate tree of 
// single left oriented branch. Head of the list points 
// to the node with greatest element.
static TreeNode<T> ToSortedList<T>(TreeNode<T> tree, TreeNode<T> head)
{
    if (tree == null)
        // Nothing to convert and append
        return head;
    // Do conversion using in order traversal
    // Convert first left sub-tree and append it to 
    // existing list
    head = ToSortedList(tree.Left, head);
    // Append root to the list and use it as new head
    tree.Left = head;
    // Convert right sub-tree and append it to list 
    // already containing left sub-tree and root
    return ToSortedList(tree.Right, tree);
}

Merging sorted linked lists is quite straightforward.

// Merges two sorted singly linked lists into one and 
// calculates the size of merged list. Merged list uses 
// right pointers to form singly linked list thus forming 
// degenerate tree of single right oriented branch. 
// Head points to the node with smallest element.
static TreeNode<T> MergeAsSortedLists<T>(TreeNode<T> left, TreeNode<T> right, IComparer<T> comparer, out int size)
{
    TreeNode<T> head = null;
    size = 0;
    // See merge phase of merge sort for linked lists
    // with the only difference in that this implementations
    // reverts the list during merge
    while (left != null || right != null)
    {
        TreeNode<T> next;
        if (left == null)
            next = DetachAndAdvance(ref right);
        else if (right == null)
            next = DetachAndAdvance(ref left);
        else
            next = comparer.Compare(left.Value, right.Value) > 0
                        ? DetachAndAdvance(ref left)
                        : DetachAndAdvance(ref right);
        next.Right = head;
        head = next;
        size++;
    }
    return head;
}

static TreeNode<T> DetachAndAdvance<T>(ref TreeNode<T> node)
{
    var tmp = node;
    node = node.Left;
    tmp.Left = null;
    return tmp;
}

Rebuilding tree from sorted linked list is quite interesting. To build balanced tree we must know the number of nodes in the final tree. That is why it is calculated during merge phase. Knowing the size allows to uniformly distribute nodes and build optimal tree from height perspective. Optimality depends on usage scenarios and in this case we assume that every element in the tree has the same probability to be sought.

// Converts singly linked list into binary search tree 
// advancing list head to next unused list node and 
// returning created tree root
static TreeNode<T> ToBinarySearchTree<T>(ref TreeNode<T> head, int size)
{
    if (size == 0)
        // Zero sized list converts to null 
        return null;

    TreeNode<T> root;
    if (size == 1)
    {
        // Unit sized list converts to a node with 
        // left and right pointers set to null
        root = head;
        // Advance head to next node in list
        head = head.Right;
        // Left pointers were so only right needs to 
        // be nullified
        root.Right = null;
        return root;
    }

    var leftSize = size / 2;
    var rightSize = size - leftSize - 1;
    // Create left substree out of half of list nodes
    var leftRoot = ToBinarySearchTree(ref head, leftSize);
    // List head now points to the root of the subtree
    // being created
    root = head;
    // Advance list head and the rest of the list will 
    // be used to create right subtree
    head = head.Right;
    // Link left subtree to the root
    root.Left = leftRoot;
    // Create right subtree and link it to the root
    root.Right = ToBinarySearchTree(ref head, rightSize);
    return root;
}

Now putting everything together.

public static TreeNode<T> Merge<T>(TreeNode<T> left, TreeNode<T> right, IComparer<T> comparer)
{
    Contract.Requires(comparer != null);

    if (left == null || right == null)
        return left ?? right;
    // Convert both trees to sorted lists using original tree nodes 
    var leftList = ToSortedList(left, null);
    var rightList = ToSortedList(right, null);
    int size;
    // Merge sorted lists and calculate merged list size
    var list = MergeAsSortedLists(leftList, rightList, comparer, out size);
    // Convert sorted list into optimal binary search tree
    return ToBinarySearchTree(ref list, size);
}

This solution is O(n + m) time and O(1) space complexity where n and m are sizes of the trees to merge.

Thursday, March 3, 2011

Shared restroom synchronization problem

Assume you own a bar that have single restroom with n stalls. In order to avoid lawsuits you want to make sure that only people of the same gender can be in the restroom at the same time and no accidents occur (nobody peed their pants). How would you write synchronization algorithm for it?

Translating into concurrency language we want to build a synchronization algorithm that allow no more than n threads of the same kind to enter critical section and it should be starvation free (threads that are trying to enter critical section eventually enter it).

The problem can be divided three parts:

  • How to allow only threads of the same kind to enter critical section
  • How to limit number of threads inside critical section to configured number
  • How to make the whole thing starvation free

Mutual exclusion algorithms have a good property with respect to starvation freedom. Assume you have two starvation free mutual algorithms A and B. Combined in the following way:

Enter code A

Enter code B

Critical section

Leave code B

Leave Code A

they form another starvation free mutual exclusion algorithm.

Limiting number of threads inside critical section to configured number can be easily solved with SemaphoreSlim (starvation free). Thus we need to solve problem of allowing only threads of the same kind to enter critical section.

Let’s denote two types of threads: black and white. Assume that thread tries to enter critical section. The following case are possible:

  • No other threads are in the critical section, so it can enter
  • There are threads of the same color in the critical section
    • no threads of the different color are waiting, so it can enter
    • there are waiting threads of the different color, so it cannot enter to prevent starvation of the waiting threads and must wait
  • There are threads of the different color in the critical section so it must wait

Now to prevent starvation we will switch turn to the different color once group of threads of current color leaves critical section. Turn is set to color that is now allowed to enter critical section. However if no threads are in the critical section a thread may enter if its not its turn (basically it captures the turn).

class WhiteBlackLock
{
    private readonly object _sync = new object();
    private readonly int[] _waiting = new int[2];
    private int _turn;
    private int _count;

    public IDisposable EnterWhite()
    {
        return Enter(0);
    }

    public IDisposable EnterBlack()
    {
        return Enter(1);
    }

    private IDisposable Enter(int color)
    {
        lock (_sync)
        {
            if (_waiting[1 - _turn] == 0 && 
                (_count == 0 || _turn == color))
            {
                // Nobody is waiting and either no one is in the 
                // critical section or this thread has the same 
                // color
                _count++;
                _turn = color;
            }
            else
            {
                // Either somebody is waiting to enter critical 
                // section or this thread has a different color 
                // than the ones already in the critical section 
                // and thus wait with the rest of the same color
                _waiting[color]++;
                // Wait until current group 
                while (_waiting[color] > 0)
                    Monitor.Wait(_sync);
            }
            // Wrap critical section leaving in a disposable to 
            // enable convenient use with using statement
            return new Disposable(this);
        }
    }

    private void Leave()
    {
        lock (_sync)
        {
            // Indicate completion
            if (--_count != 0) 
                return;
            // If this is the last one of the current group make 
            // way for threads of other color to run by switching 
            // turn
            _turn = 1 - _turn;
            // Before threads are awoken count must be set to 
            // waiting group size so that they can properly report 
            // their completion and not change turn too fast
            _count = _waiting[_turn];
            // Indicatet that current group can enter critical 
            // section
            _waiting[_turn] = 0;
            // Wake up wating threads
            Monitor.PulseAll(_sync);
        }
    }

    class Disposable : IDisposable
    {
        private readonly WhiteBlackLock _lock;
        private int _disposed;

        public Disposable(WhiteBlackLock @lock)
        {
            _lock = @lock;
        }

        public void Dispose()
        {
            // Make sure only the first call of allowed multiple 
            // calls leaves critical section
            if (Interlocked.Exchange(ref _disposed, 1) == 0)
                _lock.Leave();
        }
    }
}

In order to avoid lock ownership tracking leaving critical section is represented through disposable.

var semaphore = new SemaphoreSlim(n);
var whiteBlackLock = new WhiteBlackLock();

// Worker thread code to enter critical section
using (whiteBlackLock.EnterWhite())
{
    semaphore.Wait();
    // Critical section goes here
    semaphore.Release();
}

Now your restroom can safely serve the needs of your customers =).

Friday, February 25, 2011

Merge sequences in parallel

In practice you may find yourself in a situation when you have several sequences of data that you want to drain in parallel and merge the results into a single sequence. This is what Merge combinator for sequences does. Reactive Extensions had it for enumerable and observable sequences however at some point it was decided to no longer support it for enumerable sequences in Reactive Extensions as it may be expressed through the same combinator for observable sequences. 

I find it quite interesting to implement Merge combinator for enumerable sequences but to make it more interesting let’s change behavior a little bit. The original behavior of the EnumerableEx.Merge was to drain source sequences as fast as workers can and buffer results in a resulting queue until it is consumed by merged sequence enumerator. It can be implemented (maybe this is not the most efficient way but still) using

  • BlockingCollection<T> to synchronize producers that drain original sequences and consumer that merges elements from all sequences into resulting sequence
  • CancellationTokenSource to notify producers that early termination is requested

Instead let’s change behavior to allow producer proceed with getting next element from source sequence only once previously “produced” element is consumed (which basically means merged sequence enumerator reached it). This is sort of two way synchronization between producer and consumer where consumer cannot proceed unless there are ready elements and producer cannot proceed unless previous element is consumed. It is kind of similar to blocking collection. However in this case there is only one consumer and consumers aren’t blocked because the queue is full but rather until previously enqueued element is dequeued.Let’s code the thing!

class Consumer<T>
{
    private readonly IEnumerable<IEnumerable<T>> _sources;

    private readonly object _sync = new object();

    private readonly Queue<Producer<T>> _globalQueue = new Queue<Producer<T>>();
    private readonly Queue<Producer<T>> _localQueue = new Queue<Producer<T>>();

    // No need to mark these fields as volatile as 
    // only consumer thread updates and reads them
    private bool _done;
    private int _count;

    private readonly IList<Exception> _exceptions = new List<Exception>();

    public Consumer(IEnumerable<IEnumerable<T>> sources)
    {
        _sources = sources;
    }

    // Merges sources in parallel
    public IEnumerable<T> Merge()
    {
        Start();
        while (true)
        {
            Wait();
            while (_localQueue.Count > 0)
            {
                // Use local queue to yield values
                var producer = _localQueue.Dequeue();
                // Get the actual value out of ready producer
                // while simultaneously allowing it to proceed 
                // and observe early termination request or get 
                // next value
                var next = producer.GetNext(!_done);
                // Using Notificaiton<T> from Rx to simplify
                // sequence processing
                if (next.Kind != NotificationKind.OnNext)
                {
                    _count--;
                    if (next.Kind == NotificationKind.OnError)
                    {
                        // Observed exception leads to early 
                        // termination however merge will allow 
                        // already running (not waiting) 
                        // producers to finish (potentially 
                        // bringing more exceptions to be 
                        // observed) while requesting waiters 
                        // to terminate
                        _done = true;
                        // Store observed exception to be further 
                        // thrown as part of the aggregate 
                        // exception
                        _exceptions.Add(next.Exception);
                    }
                }
                else
                {
                    yield return next.Value;
                }
            }
            // Loop until all producers finished
            if (_count == 0)
            {
                // Either normally
                if (_exceptions.Count == 0)
                    yield break;
                // Or exceptions were observed
                throw new AggregateException(_exceptions);
            }
        }
    }

    // Notifies consumer of ready elements
    public void Enqueue(Producer<T> producer)
    {
        // Notify consumer of a ready producer
        lock (_sync)
        {
            // Consumer will either observe non-empty 
            // global queue (if it is not waiting already) 
            // or pulse (if it is waiting)
            _globalQueue.Enqueue(producer);
            Monitor.Pulse(_sync);
        }
    }

    // Waits for ready elements
    private void Wait()
    {
        lock (_sync)
        {
            // As the only consumer is draining the global queue
            // once an empty queue is observed and consumer is 
            // notified the queue will be non-empty
            if (_globalQueue.Count == 0)
                Monitor.Wait(_sync);
            // Copy whatever available to local queue to further 
            // drain without bothering taking a lock
            while (_globalQueue.Count > 0)
                _localQueue.Enqueue(_globalQueue.Dequeue());
        }
    }

    // Starts producers
    private void Start()
    {
        try
        {
            foreach (var source in _sources)
            {
                var producer = new Producer<T>(this, source.GetEnumerator());
                Task.Factory.StartNew(producer.Enumerate);
                _count++;
            }
        }
        catch (Exception ex)
        {
            // If none of producers are started successfully 
            // just rethrow the exception
            if (_count == 0)
                throw;
            // Some producers started notify them of early 
            // termination
            _done = true;
            // Store observed exception to be further thrown as 
            // part of the aggregate exception
            _exceptions.Add(ex);
        }
    }
}

class Producer<T>
{
    private readonly object _sync = new object();
    private readonly Consumer<T> _consumer;

    private IEnumerator<T> _enum;

    private volatile Notification<T> _next;
    private volatile bool _cont = true;
    private volatile bool _awake;

    public Producer(Consumer<T> consumer, IEnumerator<T> enumerator)
    {
        _consumer = consumer;
        _enum = enumerator;
    }

    // Drains source sequence
    public void Enumerate()
    {
        try
        {
            // Loop always observes non-null value as 
            // it is initially non-null and everytime consumer 
            // awakes producer it is set non-null value
            while (_cont && _enum.MoveNext())
            {
                // Set continuation flag to null and wait to be 
                // awoken by the consumer
                _awake = false;
                // Notify consumer of a ready element
                _next = new Notification<T>.OnNext(_enum.Current);
                _consumer.Enqueue(this);

                lock (_sync)
                {
                    // If consumer was pretty quick and producer 
                    // missed the pulse it will observe the awake 
                    // flag and thus won't wait
                    if (!_awake)
                        Monitor.Wait(_sync);
                }
            }
            _next = new Notification<T>.OnCompleted();
        }
        catch (Exception ex)
        {
            _next = new Notification<T>.OnError(ex);
        }
        finally
        {
            _enum.Dispose();
            _enum = null;
        }
        // Notify consumer that the producer completed 
        _consumer.Enqueue(this);
    }

    // Awakes/notifies producer that it can proceed
    public Notification<T> GetNext(bool cont)
    {
        // Store ready element in local variable 
        // because once producer is unleashed below 
        // it may override ready yet not returned element
        var next = _next;

        lock (_sync)
        {
            // Set awake flag in case producer miss the pulse 
            _awake = true;
            _cont = cont;
            Monitor.Pulse(_sync);
        }

        return next;
    }
}

With Producer<T> and Consumer<T> in place Merge combinator can be implemented as:

static class EnumerableEx
{
    public static IEnumerable<T> Merge<T>(IEnumerable<IEnumerable<T>> sources)
    {
        Contract.Requires(sources != null);
        return new Consumer<T>(sources).Merge();
    }
}

At any given moment at most n elements will be in a global queue where n is the number of source sequences.

Great exercise!

Sunday, February 13, 2011

Longest consecutive elements sequence

A tiny detail that can be uncovered by looking at a problem on a different angle usually is the key to the solution. So many times I looked at a great API design or problem solution saying “how I didn’t see it”. But sometimes I do see. It was a problem of searching for the longest consecutive elements sequence within unsorted array of integers. For example, in {5, 7, 3, 4, 9, 10, 1, 15, 1, 3, 12, 2, 11} sought sequence is {1, 2, 3, 4, 5}.

The first thing that comes mind is to sort given array in O(n log n ) and look for longest consecutive elements sequence. Eh, we can do better than that.

Using bit vector indexed by numbers from original array may not be justified due incomparable solution space and original array size (although time complexity is O(n)).

Let’s look at the problem more closely. The problem may be reduced to problem of effective range manipulation. Disjoint-set structure or interval trees offer O(n log n) building time complexity. But they do not take into consideration fact that we are dealing with integers. Knowing range boundaries we can definitely say what numbers are in there (for example, [1..3] contains 1, 2, 3). O(1) time complexity for range manipulation operations with O(1) space complexity for each range are the things we are looking for. We can do this using two hash tables:

  • ‘low’ with range start numbers as keys and range end numbers as values
  • ‘high’ with range end numbers as keys and range start numbers as values

For example, for a range [x..y] the tables will hold low[x] = y and high[y] = x. The algorithm looks the following:

  • Scan original array and for each element:
    • If it already belongs to any range skip it
    • Otherwise create unit size range [i..i]
    • If there is a range next to the right [i+1.. y] merge with it to produce [i..y] range
    • If there is a range next to the left [x..i-1] merge with it as well (either [i..i] or merged [i..y] will be merged with [x..i-1])
  • Scan through created ranges to find out the longest

The only question left is how to check if an element already belongs to some range as we are keeping only range boundaries in hash tables. Any number is either processed previously and thus in one of the ranges or a new range created out of it (and potentially merged with others). Thus if a number previously seen it is in some range and we do not need to process it. So before scanning original array we can simply remove any duplicates in O(n) time and space.

class Solver
{
    public static Tuple<int, int> FindMaxRange(IEnumerable<int> seq)
    {
        // Generate all ranges and select maximum
        return EnumerateRanges(seq)
            .Aggregate((x, y) => Length(x) > Length(y) ? x : y);
    }

    public static IEnumerable<Tuple<int, int>> EnumerateRanges(IEnumerable<int> seq)
    {
        var low = new Dictionary<int, int>();
        var high = new Dictionary<int, int>();

        // Remove duplicates
        foreach (var val in seq.Distinct())
        {
            // Create unit size range
            low[val] = high[val] = val;
            // Merge [i..i] with [i+1..y]
            var endsWith = MergeWithNext(val, low, high, 1);
            // Merge [i..endsWith] with [x..i-1]
            MergeWithNext(endsWith, high, low, -1);
        }

        return low.Select(p => Tuple.Create(p.Key, p.Value));
    }

    static int MergeWithNext(int currStart, IDictionary<int, int> low, IDictionary<int, int> high, int sign)
    {
        var currEnd = low[currStart];
        var nextStart = currEnd + sign;
        if (low.ContainsKey(nextStart))
        {
            low[currStart] = low[nextStart];
            high[low[currStart]] = currStart;
            low.Remove(nextStart);
            high.Remove(currEnd);
        }
        return low[currStart];
    }

    static int Length(Tuple<int, int> t)
    {
        return t.Item2 - t.Item1;
    }
}
The solution has O(n) time and space complexity assuming hash table implementation has O(1) time and space complexity for each operation/element.

Wednesday, January 5, 2011

Parallel string matching

String matching is about searching for occurrence (first or all occurrences – it makes difference from parallelization point of view as you shall see soon) of a pattern in a given text.

This problem naturally fits into data parallelism scenario although details depend on whether we want to find first or all occurrences of a pattern. Parallel searching for all occurrences is simpler as searching space is static (whole text needs to be looked through) in contrast to searching for the first occurrence that may be anywhere in a text (reducing search space improves performance otherwise the solution is either equal to sequential search till first occurrence or parallel search of all occurrences with getting the first one).

In order to find all pattern occurrences text can be separated into chunks that are processed in parallel. Each chunk overlaps with its immediate neighbor (except for the last one) for no more than length(pattern) - 1 characters to cover the case when pattern occurs in text such that chunk starting position is within pattern.

public static class ParallelAlgorithms
{
    // Find all occurrences of a pattern in a text
    public static IEnumerable<int> IndexesOf(this string text, string pattern, int startIndex, int count)
    {
        var proc = Environment.ProcessorCount;
        // Do range partitioning
        var chunk = (count + proc - 1) / proc;
        var indexes = new IEnumerable<int>[proc];

        Parallel.For(0, proc, 
            p => 
            {
                // Define overlapping with its immediate 
                // neighbor chunk except for the last one
                var before = p * chunk;
                var now = Math.Min(chunk + pattern.Length - 1, count - p * chunk);
                // Sequentially search for patterns occurences 
                // and store local result
                indexes[p] = SequentialIndexesOf(text, pattern, startIndex + before, now);
            });
        return indexes.SelectMany(p => p);
    }

    static IEnumerable<int> SequentialIndexesOf(string text, string pattern, int startIndex, int count)
    {
        for (var i = 0; i <= count - pattern.Length;)
        {
            var found = text.IndexOf(pattern, startIndex + i, count - i);
            // No pattern occurrences is found till the end
            if (found == -1)
                break;
            yield return found;
            // Proceed with next to found position
            i = found + 1;
        }
    }
}

Now for the first occurrence scenario search space must be reduced minimizing amount of unnecessary work (it will be non-zero in most cases due speculative processing).

One way to do this is to separate text into chunks and process them in parallel such that if pattern is found in chunk(i) processing of any chunk(j) where j > i is cancelled. Assuming n is the length of a text and p equals to logical processor count in the worst case (when first occurrence of a pattern is at the end of the first chunk) amount of the unnecessary work will be (p - 1)*n/p. On the other hand range partitioning has poor performance when workload is unbalanced.

What we can do instead is dynamic partitioning where whole text is separated into groups of chunks of the same length within group and chunk length in consequent group is two times large than previous group had. Group size should be set to number of logical processors. Thus, if c denotes chunk of unit size text will be separated like c, c, c, …, cc, cc, cc, …, cccc, cccc, cccc,…  Now this sequence of chunks can be processed in parallel (respecting order) breaking at first found occurrence. Thus in worst case at most p*c amount of unnecessary work will be done.

This partitioning strategy is used in PLINQ and called chunk partitioning. Although text can be treated as a sequence of characters and chunk partitioning can be used out of the box but that is not what we want as otherwise we will process individual characters rather than text chunks. Instead we’ll produce sequence of chunks manually and using single item partitioner and Parallel.ForEach process them in parallel respecting order.

public static class ParallelAlgorithms
{
    // Find first occurrence of a pattern in a text
    public static int IndexOf(this string text, string pattern, int startIndex, int count)
    {
        var minChunkSize = pattern.Length << 5;
        var maxChunkSize = minChunkSize << 3;
        // Create sequence of chunks
        var chunks = PartitionRangeToChunks(startIndex, count, minChunkSize, maxChunkSize, Environment.ProcessorCount);
        // Process chunks in parallel respecting order
        var chunkPartitioner = SingleItemPartitioner.Create(chunks);
        var concurrentBag = new ConcurrentBag<int>();
        Parallel.ForEach(chunkPartitioner,
            (range, loop) =>
            {
                var start = range.Item1;
                var length = Math.Min(startIndex + count - start, range.Item2 + pattern.Length - 1);
                var index = text.IndexOf(pattern, start, length);
                // No pattern occurrences in this chunk
                if (index < 0)
                    return;
                // Store shared result
                concurrentBag.Add(index);
                // Let running parallel iterations complete and 
                // prevent starting new ones
                loop.Break();
            });
        // Pick first occurrence or indicate no occurrence
        return concurrentBag.Count > 0 ? concurrentBag.Min() : -1;
    }

    static IEnumerable<Tuple<int, int>> PartitionRangeToChunks(int start, int count, int minChunkSize, int maxChunkSize, int doubleAfter)
    {
        var end = start + count;
        var chunkSize = Math.Min(minChunkSize, count);
        while (start < end)
        {
            for (var i = 0; i < doubleAfter && start < end; i++)
            {
                var next = Math.Min(end, start + chunkSize);
                yield return Tuple.Create(start, next - start);
                start = next;
            }

            chunkSize = Math.Min(maxChunkSize, chunkSize * 2);
        }
    }
}

Search in parallel! =)