Friday, February 25, 2011

Merge sequences in parallel

In practice you may find yourself in a situation when you have several sequences of data that you want to drain in parallel and merge the results into a single sequence. This is what Merge combinator for sequences does. Reactive Extensions had it for enumerable and observable sequences however at some point it was decided to no longer support it for enumerable sequences in Reactive Extensions as it may be expressed through the same combinator for observable sequences. 

I find it quite interesting to implement Merge combinator for enumerable sequences but to make it more interesting let’s change behavior a little bit. The original behavior of the EnumerableEx.Merge was to drain source sequences as fast as workers can and buffer results in a resulting queue until it is consumed by merged sequence enumerator. It can be implemented (maybe this is not the most efficient way but still) using

  • BlockingCollection<T> to synchronize producers that drain original sequences and consumer that merges elements from all sequences into resulting sequence
  • CancellationTokenSource to notify producers that early termination is requested

Instead let’s change behavior to allow producer proceed with getting next element from source sequence only once previously “produced” element is consumed (which basically means merged sequence enumerator reached it). This is sort of two way synchronization between producer and consumer where consumer cannot proceed unless there are ready elements and producer cannot proceed unless previous element is consumed. It is kind of similar to blocking collection. However in this case there is only one consumer and consumers aren’t blocked because the queue is full but rather until previously enqueued element is dequeued.Let’s code the thing!

class Consumer<T>
{
    private readonly IEnumerable<IEnumerable<T>> _sources;

    private readonly object _sync = new object();

    private readonly Queue<Producer<T>> _globalQueue = new Queue<Producer<T>>();
    private readonly Queue<Producer<T>> _localQueue = new Queue<Producer<T>>();

    // No need to mark these fields as volatile as 
    // only consumer thread updates and reads them
    private bool _done;
    private int _count;

    private readonly IList<Exception> _exceptions = new List<Exception>();

    public Consumer(IEnumerable<IEnumerable<T>> sources)
    {
        _sources = sources;
    }

    // Merges sources in parallel
    public IEnumerable<T> Merge()
    {
        Start();
        while (true)
        {
            Wait();
            while (_localQueue.Count > 0)
            {
                // Use local queue to yield values
                var producer = _localQueue.Dequeue();
                // Get the actual value out of ready producer
                // while simultaneously allowing it to proceed 
                // and observe early termination request or get 
                // next value
                var next = producer.GetNext(!_done);
                // Using Notificaiton<T> from Rx to simplify
                // sequence processing
                if (next.Kind != NotificationKind.OnNext)
                {
                    _count--;
                    if (next.Kind == NotificationKind.OnError)
                    {
                        // Observed exception leads to early 
                        // termination however merge will allow 
                        // already running (not waiting) 
                        // producers to finish (potentially 
                        // bringing more exceptions to be 
                        // observed) while requesting waiters 
                        // to terminate
                        _done = true;
                        // Store observed exception to be further 
                        // thrown as part of the aggregate 
                        // exception
                        _exceptions.Add(next.Exception);
                    }
                }
                else
                {
                    yield return next.Value;
                }
            }
            // Loop until all producers finished
            if (_count == 0)
            {
                // Either normally
                if (_exceptions.Count == 0)
                    yield break;
                // Or exceptions were observed
                throw new AggregateException(_exceptions);
            }
        }
    }

    // Notifies consumer of ready elements
    public void Enqueue(Producer<T> producer)
    {
        // Notify consumer of a ready producer
        lock (_sync)
        {
            // Consumer will either observe non-empty 
            // global queue (if it is not waiting already) 
            // or pulse (if it is waiting)
            _globalQueue.Enqueue(producer);
            Monitor.Pulse(_sync);
        }
    }

    // Waits for ready elements
    private void Wait()
    {
        lock (_sync)
        {
            // As the only consumer is draining the global queue
            // once an empty queue is observed and consumer is 
            // notified the queue will be non-empty
            if (_globalQueue.Count == 0)
                Monitor.Wait(_sync);
            // Copy whatever available to local queue to further 
            // drain without bothering taking a lock
            while (_globalQueue.Count > 0)
                _localQueue.Enqueue(_globalQueue.Dequeue());
        }
    }

    // Starts producers
    private void Start()
    {
        try
        {
            foreach (var source in _sources)
            {
                var producer = new Producer<T>(this, source.GetEnumerator());
                Task.Factory.StartNew(producer.Enumerate);
                _count++;
            }
        }
        catch (Exception ex)
        {
            // If none of producers are started successfully 
            // just rethrow the exception
            if (_count == 0)
                throw;
            // Some producers started notify them of early 
            // termination
            _done = true;
            // Store observed exception to be further thrown as 
            // part of the aggregate exception
            _exceptions.Add(ex);
        }
    }
}

class Producer<T>
{
    private readonly object _sync = new object();
    private readonly Consumer<T> _consumer;

    private IEnumerator<T> _enum;

    private volatile Notification<T> _next;
    private volatile bool _cont = true;
    private volatile bool _awake;

    public Producer(Consumer<T> consumer, IEnumerator<T> enumerator)
    {
        _consumer = consumer;
        _enum = enumerator;
    }

    // Drains source sequence
    public void Enumerate()
    {
        try
        {
            // Loop always observes non-null value as 
            // it is initially non-null and everytime consumer 
            // awakes producer it is set non-null value
            while (_cont && _enum.MoveNext())
            {
                // Set continuation flag to null and wait to be 
                // awoken by the consumer
                _awake = false;
                // Notify consumer of a ready element
                _next = new Notification<T>.OnNext(_enum.Current);
                _consumer.Enqueue(this);

                lock (_sync)
                {
                    // If consumer was pretty quick and producer 
                    // missed the pulse it will observe the awake 
                    // flag and thus won't wait
                    if (!_awake)
                        Monitor.Wait(_sync);
                }
            }
            _next = new Notification<T>.OnCompleted();
        }
        catch (Exception ex)
        {
            _next = new Notification<T>.OnError(ex);
        }
        finally
        {
            _enum.Dispose();
            _enum = null;
        }
        // Notify consumer that the producer completed 
        _consumer.Enqueue(this);
    }

    // Awakes/notifies producer that it can proceed
    public Notification<T> GetNext(bool cont)
    {
        // Store ready element in local variable 
        // because once producer is unleashed below 
        // it may override ready yet not returned element
        var next = _next;

        lock (_sync)
        {
            // Set awake flag in case producer miss the pulse 
            _awake = true;
            _cont = cont;
            Monitor.Pulse(_sync);
        }

        return next;
    }
}

With Producer<T> and Consumer<T> in place Merge combinator can be implemented as:

static class EnumerableEx
{
    public static IEnumerable<T> Merge<T>(IEnumerable<IEnumerable<T>> sources)
    {
        Contract.Requires(sources != null);
        return new Consumer<T>(sources).Merge();
    }
}

At any given moment at most n elements will be in a global queue where n is the number of source sequences.

Great exercise!

Sunday, February 13, 2011

Longest consecutive elements sequence

A tiny detail that can be uncovered by looking at a problem on a different angle usually is the key to the solution. So many times I looked at a great API design or problem solution saying “how I didn’t see it”. But sometimes I do see. It was a problem of searching for the longest consecutive elements sequence within unsorted array of integers. For example, in {5, 7, 3, 4, 9, 10, 1, 15, 1, 3, 12, 2, 11} sought sequence is {1, 2, 3, 4, 5}.

The first thing that comes mind is to sort given array in O(n log n ) and look for longest consecutive elements sequence. Eh, we can do better than that.

Using bit vector indexed by numbers from original array may not be justified due incomparable solution space and original array size (although time complexity is O(n)).

Let’s look at the problem more closely. The problem may be reduced to problem of effective range manipulation. Disjoint-set structure or interval trees offer O(n log n) building time complexity. But they do not take into consideration fact that we are dealing with integers. Knowing range boundaries we can definitely say what numbers are in there (for example, [1..3] contains 1, 2, 3). O(1) time complexity for range manipulation operations with O(1) space complexity for each range are the things we are looking for. We can do this using two hash tables:

  • ‘low’ with range start numbers as keys and range end numbers as values
  • ‘high’ with range end numbers as keys and range start numbers as values

For example, for a range [x..y] the tables will hold low[x] = y and high[y] = x. The algorithm looks the following:

  • Scan original array and for each element:
    • If it already belongs to any range skip it
    • Otherwise create unit size range [i..i]
    • If there is a range next to the right [i+1.. y] merge with it to produce [i..y] range
    • If there is a range next to the left [x..i-1] merge with it as well (either [i..i] or merged [i..y] will be merged with [x..i-1])
  • Scan through created ranges to find out the longest

The only question left is how to check if an element already belongs to some range as we are keeping only range boundaries in hash tables. Any number is either processed previously and thus in one of the ranges or a new range created out of it (and potentially merged with others). Thus if a number previously seen it is in some range and we do not need to process it. So before scanning original array we can simply remove any duplicates in O(n) time and space.

class Solver
{
    public static Tuple<int, int> FindMaxRange(IEnumerable<int> seq)
    {
        // Generate all ranges and select maximum
        return EnumerateRanges(seq)
            .Aggregate((x, y) => Length(x) > Length(y) ? x : y);
    }

    public static IEnumerable<Tuple<int, int>> EnumerateRanges(IEnumerable<int> seq)
    {
        var low = new Dictionary<int, int>();
        var high = new Dictionary<int, int>();

        // Remove duplicates
        foreach (var val in seq.Distinct())
        {
            // Create unit size range
            low[val] = high[val] = val;
            // Merge [i..i] with [i+1..y]
            var endsWith = MergeWithNext(val, low, high, 1);
            // Merge [i..endsWith] with [x..i-1]
            MergeWithNext(endsWith, high, low, -1);
        }

        return low.Select(p => Tuple.Create(p.Key, p.Value));
    }

    static int MergeWithNext(int currStart, IDictionary<int, int> low, IDictionary<int, int> high, int sign)
    {
        var currEnd = low[currStart];
        var nextStart = currEnd + sign;
        if (low.ContainsKey(nextStart))
        {
            low[currStart] = low[nextStart];
            high[low[currStart]] = currStart;
            low.Remove(nextStart);
            high.Remove(currEnd);
        }
        return low[currStart];
    }

    static int Length(Tuple<int, int> t)
    {
        return t.Item2 - t.Item1;
    }
}
The solution has O(n) time and space complexity assuming hash table implementation has O(1) time and space complexity for each operation/element.