Monday, February 22, 2010

Right-hand side Enumerable.Zip

With Reactive Extensions for .NET (Rx) and .NET Framework 4 a new LINQ operator was introduced – Zip (Bart De Smet gives excellent explanation about the idea and implementation details behind Zip operator). In a nutshell it merges two sequences into one sequence by using the selector function.

int[] numbers = { 1, 2, 3, 4 };
string[] words = { "one", "two", "three" };

numbers.Zip(words, (first, second) => first + " " + second)
 .Run(Console.WriteLine); 

// 1 one
// 2 two
// 3 three

While it seems pretty simple and clear there is a subtlety in its behavior. Zip is implemented something like this (omitting arguments checking):

public static IEnumerable<TResult> Zip<TFirst, TSecond, TResult>(IEnumerable<TFirst> first, IEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
  {
   using (IEnumerator<TFirst> fe = first.GetEnumerator())
   {
    using (IEnumerator<TSecond> se = second.GetEnumerator())
    {
     // What if call to fe.MoveNext() throws an exception
     // returns false or never returns?
     while (fe.MoveNext() && se.MoveNext())
     {
      yield return resultSelector(fe.Current, se.Current);
     }
    }
   }
  }

A call to MoveNext on first sequence’s enumerator may prevent from calling MoveNext on second sequence’s enumerator because of exception thrown, terminated or stuck sequence (it is explained in great details in “Zip’em together” section of More LINQ with System.Interactive – More combinators for your Swiss Army Knife and this is where this post topic came from).

As an exercise we will implement Zip operator that could fetch results from both sources simultaneously, combining their results or exceptions into produced results.

Here is a solution sketch:

  • As either enumerator can get stuck two worker threads are used to fetch results out of sequences and one coordination thread (actually caller’s thread is used) that combines results, terminations or exceptions.
  • Coordinator waits for notification that
    • either both workers successfully fetched next value out of sequences
    • or either of them normally or abnormally (exception is thrown) terminated.
  • Coordinator allows workers to proceed only if both workers successfully fetched next value. Before signaling workers to proceed coordinator resets primitive to wait for notifications from workers on to support next iteration.
  • Workers notify coordinator on fetched values and termination.
  • Once notified coordinator workers wait until
    • either all workers produced results and coordinator allows them to fetch next results from sequences (in order to prevent fetching unnecessary results and potentially producing unnecessary side effects)
    • or coordinator notified them on outer sequence termination due to either of sequences termination (normal or abnormal).
  • When all workers proceed primitive must be reset to support next iteration.
  • Once outer sequence is terminated the last alive worker must dispose shared resources.

As we expect that zipping may be done on infinite sequences or fetching next item from a sequence may take infinitely long time or even consumer may wait indefinitely long before fetching next zipped item explicitly spawned threads are used to avoid seizing thread pool threads for indefinitely long time.

For now notification problems do not take into account abnormal notifications (we’ll take a look at it below).

Task Parallel Library provides for both waiting problems very convenient primitives:

  • CountdownEvent - represents a synchronization primitive that is signaled when its count reaches zero.
  • Barrier - enables multiple tasks to cooperatively work on an algorithm in parallel through multiple phases.

Coordinator needs to track remaining work to be completed and be awaken when it is done. CountdownEvent starts with the number of workers (in this case 2) and reset back to this value just before allowing workers to proceed to next iteration.

Things a little bit more trickier with workers wait problem. Barrier allows to setup number of participants. If we will setup number of participants equal to number of worker then once they all arrive at the barrier they will proceed to next iteration. But this not what we want as workers must not proceed without coordinator’s permission. In that case we’ll make him a participant as well. Coordinator will arrive at the barrier once next zipped value (next iteration) is requested. After every one pass the barrier it automatically goes to its initial state so no explicit reset is required.

But what to do in case of normal or abnormal termination of either sequences. Zip sequence must be terminated as well and we cannot wait for the other sequence to fetch next value or terminate because it may take indefinitely long time. Fortunately both primitives (CountdownEvent and Barrier) support new .NET 4 Cancellation Framework and in particular:

Coordinator waits on the CountdownEvent instance with a CancellationToken instance that workers can use to propagate cancellation thus notifying about termination.

Workers wait on the Barrier instance with CancellationToken instance that coordinator can use to notify about outer sequence (zip sequence) termination.

The last thing is how to deal uniformly with values, exceptions and normal sequence termination. Hopefully Reactive Extensions for .NET (Rx) got the right tool for that:

Basically value, exception and termination are represented by a type derived from Notification<T> where T is a sequence element type.

Let’s now put everything together.

public static class ZipEx
{
 public static IEnumerable<TResult> RightHandZip<TFirst, TSecond, TResult>(this IEnumerable<TFirst> first, IEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
 {
  Contract.Requires(first != null);
  Contract.Requires(second != null);
  Contract.Requires(resultSelector != null);

  return new Zipper<TFirst, TSecond>().Zip(first, second, resultSelector);
 }

 class Zipper<TFirst, TSecond>
 {
  private const int c_sourceCount = 2;
  private int m_workersCount = c_sourceCount;

  private CountdownEvent m_zipEvent;
  private CancellationTokenSource m_zipCancellation;

  private Barrier m_srcBarrier;
  private CancellationTokenSource m_srcCancellation;

  private volatile Notification<TFirst> m_first;
  private volatile Notification<TSecond> m_second;

  public IEnumerable<TResult> Zip<TResult>(IEnumerable<TFirst> first, IEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
  {
   // Coordinator tracks remaining work through
      // count down event
   m_zipEvent = new CountdownEvent(c_sourceCount);
      // Workers may use this cancellation token source
      // to awake coordinator in case of ternimation of 
      // either sequence
   m_zipCancellation = new CancellationTokenSource();
      // Here we basically state that coordinator also
      // barrier participant
   m_srcBarrier = new Barrier(c_sourceCount + 1);
      // Coordinator may use this cancellation token source
      // to awake workers in case of outer sequence (zip)
      // termination
   m_srcCancellation = new CancellationTokenSource();

   // Spawn workers that will fetch results from both 
   // sequences simultaneously
   RunWorker(first, n => { m_first = n; });
   RunWorker(second, n => { m_second = n; });

   var token = m_zipCancellation.Token;
   while (true)
   {
    try
    {
     // Wait until either all workers fetched next 
     // values or any worker notified on completition 
     // (either due to exception or sequence completition)
     m_zipEvent.Wait(token);
    }
    catch (OperationCanceledException)
    {
     // Notify workers that zip sequence is terminated
     m_srcCancellation.Cancel();
     // If zip sequence is terminated due to exception(s) 
     // throw either AggregateException if both sequences 
     // resulted in exception or the exception that 
     // terminated either of them
     ThrowIfError();
     // Otherwise sequence is terminated due to one of 
     // the sequences completion
     yield break;
    }
    // Reset count down event for the next round
    m_zipEvent.Reset(c_sourceCount);
    // Yield next zipped value
    yield return resultSelector(m_first.Current, m_second.Current);
    // Only once consumer asks for the next value we allow 
    // workers to attempt to fetch next value
    // Zipper is a barrier participant that arrives at the  
     // barrier once next zipped is requested
    m_srcBarrier.SignalAndWait();
   }
  }

  private void RunWorker<T>(IEnumerable<T> enumerable, Action<Notification<T>> update)
  {
   // In order to fetch results from sequences we will use 
   // manually spawned threads as we do not want to seize 
   // ThreadPool threads for indefinite time. 
   new Thread(() =>
      {
       var token = m_srcCancellation.Token;
       foreach (var notification in enumerable.Materialize())
       {
        update(notification);

        if (notification.Kind == NotificationKind.OnNext)
        {
         // Notify on sucessfully fetched value out 
            // of sequence
         m_zipEvent.Signal();
        }
        else
        {
      // Either sequence completition or error 
            // notifications terminate zipped sequence
         m_zipCancellation.Cancel();
        }

        try
        {
      // Wait until next zipped value is requested or 
      // zip sequence is terminated
         m_srcBarrier.SignalAndWait(token);
        }
        catch (OperationCanceledException)
        {
      // Last alive worker is responsible for resources 
      // disposal
         DisposeIfLast();
         return;
        }
       }
      }) {IsBackground = true}.Start();
  }

  private void DisposeIfLast()
  {
   if (Interlocked.Decrement(ref m_workersCount) == 0)
   {
    DisposeAndNullify(ref m_zipEvent);
    DisposeAndNullify(ref m_srcBarrier);
    DisposeAndNullify(ref m_zipCancellation);
    DisposeAndNullify(ref m_srcCancellation);
   }
  }

  private void ThrowIfError()
  {
   var ex1 = ExtractErrorOrNothing(m_first);
   var ex2 = ExtractErrorOrNothing(m_second);

   if (ex1 != null && ex2 != null)
   {
    throw new AggregateException(ex1, ex2);
   }
   ThrowIfNotNull(ex1);
   ThrowIfNotNull(ex2);
  }

  private static Exception ExtractErrorOrNothing<T>(Notification<T> n)
  {
   if (n != null && n.Kind == NotificationKind.OnError)
   {
    return ((Notification<T>.OnError) n).Exception;
   }
   return null;
  }

  private static void ThrowIfNotNull(Exception ex)
  {
   if (ex != null)
   {
    throw ex;
   }
  }

  private static void DisposeAndNullify<T>(ref T res)
   where T : class, IDisposable
  {
   if (res != null)
   {
    res.Dispose();
    res = null;
   }
  }
 }
}

Let’s setup examples infrastructure

static IEnumerable<int> Infinite(string id)
{
  Console.Write("{0} -> X ", id);
  while(true) {}
  yield break;
}

static IEnumerable<int> Seq(string id, int start, int count)
{
  return Enumerable.Range(0, count).Select(x => x * 2 + start)
    .Do(x => Console.Write("{0} -> {1} ", id, x));
}

static IEnumerable<int> Abnormal(string id)
{
  return EnumerableEx.Throw<int>(new Exception())
    .Finally(() => Console.Write("{0} -> E ", id));
}

static void RunTest(IEnumerable<int> first, IEnumerable<int> second)
{
  Func<int, int, string> format = (f, s) => String.Format("[{0}, {1}] ", f, s);
  try
  {
    first.RightHandZip(second, format).Run(Console.Write);
  }
  catch (Exception ex)
  {
    Console.Write(ex.GetType());
  }
  Console.WriteLine();
}

and run some examples

const int count = 3;
// Both sequences terminate
RunTest(Seq("F", 0, count), Seq("S", 1, count));
// Terminates normally
// F -> 0 S -> 1 [0, 1] F -> 2 S -> 3 [2, 3] F -> 4 S -> 5 [4, 5]

// First normally terminates and second continues
RunTest(Seq("F", 0, count), Seq("S", 1, count + 1));
// Terminates normally
// F -> 0 S -> 1 [0, 1] S -> 3 F -> 2 [2, 3] F -> 4 S -> 5 [4, 5] S -> 7

// First abnormally terminates and second continues
RunTest(Seq("F", 0, count).Concat(Abnormal("F")), Seq("S", 1, count + 1));
// Terminates abnormally
// F -> 0 S -> 1 [0, 1] F -> 2 S -> 3 [2, 3] S -> 5 F -> 4 [4, 5] S -> 7 F -> E System.Exception

// Both terminate abnormally
RunTest(Seq("F", 0, count).Concat(Abnormal("F")), Seq("S", 1, count).Concat(Abnormal("S")));
// Terminates abnormally
// F -> 0 S -> 1 [0, 1] F -> 2 S -> 3 [2, 3] S -> 5 F -> 4 [4, 5] F -> E S -> E System.AggregateException

// First stucks and second terminates abnormally
RunTest(Seq("F", 0, count).Concat(Infinite("F")), Seq("S", 1, count).Concat(Abnormal("S")));
// Terminates abnormally
// F -> 0 S -> 1 [0, 1] F -> 2 S -> 3 [2, 3] F -> 4 S -> 5 [4, 5] S -> E F -> X System.Exception

// First stucks and second terminates normally
RunTest(Seq("F", 0, count).Concat(Infinite("F")), Seq("S", 1, count));
// Terminates normally
// F -> 0 S -> 1 [0, 1] F -> 2 S -> 3 [2, 3] F -> 4 S -> 5 [4, 5] F -> X

// First stucks and second continues
RunTest(Seq("F", 0, count).Concat(Infinite("F")), Seq("S", 1, count + 1));
// Stucks
// F -> 0 S -> 1 [0, 1] F -> 2 S -> 3 [2, 3] F -> 4 S -> 5 [4, 5] F -> X S -> 7

That’s it.

No comments: