In practice you may find yourself in a situation when you have several sequences of data that you want to drain in parallel and merge the results into a single sequence. This is what Merge combinator for sequences does. Reactive Extensions had it for enumerable and observable sequences however at some point it was decided to no longer support it for enumerable sequences in Reactive Extensions as it may be expressed through the same combinator for observable sequences.
I find it quite interesting to implement Merge combinator for enumerable sequences but to make it more interesting let’s change behavior a little bit. The original behavior of the EnumerableEx.Merge was to drain source sequences as fast as workers can and buffer results in a resulting queue until it is consumed by merged sequence enumerator. It can be implemented (maybe this is not the most efficient way but still) using
- BlockingCollection<T> to synchronize producers that drain original sequences and consumer that merges elements from all sequences into resulting sequence
- CancellationTokenSource to notify producers that early termination is requested
Instead let’s change behavior to allow producer proceed with getting next element from source sequence only once previously “produced” element is consumed (which basically means merged sequence enumerator reached it). This is sort of two way synchronization between producer and consumer where consumer cannot proceed unless there are ready elements and producer cannot proceed unless previous element is consumed. It is kind of similar to blocking collection. However in this case there is only one consumer and consumers aren’t blocked because the queue is full but rather until previously enqueued element is dequeued.Let’s code the thing!
class Consumer<T> { private readonly IEnumerable<IEnumerable<T>> _sources; private readonly object _sync = new object(); private readonly Queue<Producer<T>> _globalQueue = new Queue<Producer<T>>(); private readonly Queue<Producer<T>> _localQueue = new Queue<Producer<T>>(); // No need to mark these fields as volatile as // only consumer thread updates and reads them private bool _done; private int _count; private readonly IList<Exception> _exceptions = new List<Exception>(); public Consumer(IEnumerable<IEnumerable<T>> sources) { _sources = sources; } // Merges sources in parallel public IEnumerable<T> Merge() { Start(); while (true) { Wait(); while (_localQueue.Count > 0) { // Use local queue to yield values var producer = _localQueue.Dequeue(); // Get the actual value out of ready producer // while simultaneously allowing it to proceed // and observe early termination request or get // next value var next = producer.GetNext(!_done); // Using Notificaiton<T> from Rx to simplify // sequence processing if (next.Kind != NotificationKind.OnNext) { _count--; if (next.Kind == NotificationKind.OnError) { // Observed exception leads to early // termination however merge will allow // already running (not waiting) // producers to finish (potentially // bringing more exceptions to be // observed) while requesting waiters // to terminate _done = true; // Store observed exception to be further // thrown as part of the aggregate // exception _exceptions.Add(next.Exception); } } else { yield return next.Value; } } // Loop until all producers finished if (_count == 0) { // Either normally if (_exceptions.Count == 0) yield break; // Or exceptions were observed throw new AggregateException(_exceptions); } } } // Notifies consumer of ready elements public void Enqueue(Producer<T> producer) { // Notify consumer of a ready producer lock (_sync) { // Consumer will either observe non-empty // global queue (if it is not waiting already) // or pulse (if it is waiting) _globalQueue.Enqueue(producer); Monitor.Pulse(_sync); } } // Waits for ready elements private void Wait() { lock (_sync) { // As the only consumer is draining the global queue // once an empty queue is observed and consumer is // notified the queue will be non-empty if (_globalQueue.Count == 0) Monitor.Wait(_sync); // Copy whatever available to local queue to further // drain without bothering taking a lock while (_globalQueue.Count > 0) _localQueue.Enqueue(_globalQueue.Dequeue()); } } // Starts producers private void Start() { try { foreach (var source in _sources) { var producer = new Producer<T>(this, source.GetEnumerator()); Task.Factory.StartNew(producer.Enumerate); _count++; } } catch (Exception ex) { // If none of producers are started successfully // just rethrow the exception if (_count == 0) throw; // Some producers started notify them of early // termination _done = true; // Store observed exception to be further thrown as // part of the aggregate exception _exceptions.Add(ex); } } } class Producer<T> { private readonly object _sync = new object(); private readonly Consumer<T> _consumer; private IEnumerator<T> _enum; private volatile Notification<T> _next; private volatile bool _cont = true; private volatile bool _awake; public Producer(Consumer<T> consumer, IEnumerator<T> enumerator) { _consumer = consumer; _enum = enumerator; } // Drains source sequence public void Enumerate() { try { // Loop always observes non-null value as // it is initially non-null and everytime consumer // awakes producer it is set non-null value while (_cont && _enum.MoveNext()) { // Set continuation flag to null and wait to be // awoken by the consumer _awake = false; // Notify consumer of a ready element _next = new Notification<T>.OnNext(_enum.Current); _consumer.Enqueue(this); lock (_sync) { // If consumer was pretty quick and producer // missed the pulse it will observe the awake // flag and thus won't wait if (!_awake) Monitor.Wait(_sync); } } _next = new Notification<T>.OnCompleted(); } catch (Exception ex) { _next = new Notification<T>.OnError(ex); } finally { _enum.Dispose(); _enum = null; } // Notify consumer that the producer completed _consumer.Enqueue(this); } // Awakes/notifies producer that it can proceed public Notification<T> GetNext(bool cont) { // Store ready element in local variable // because once producer is unleashed below // it may override ready yet not returned element var next = _next; lock (_sync) { // Set awake flag in case producer miss the pulse _awake = true; _cont = cont; Monitor.Pulse(_sync); } return next; } }
With Producer<T> and Consumer<T> in place Merge combinator can be implemented as:
static class EnumerableEx { public static IEnumerable<T> Merge<T>(IEnumerable<IEnumerable<T>> sources) { Contract.Requires(sources != null); return new Consumer<T>(sources).Merge(); } }
At any given moment at most n elements will be in a global queue where n is the number of source sequences.
Great exercise!