Monday, February 22, 2010

Right-hand side Enumerable.Zip

With Reactive Extensions for .NET (Rx) and .NET Framework 4 a new LINQ operator was introduced – Zip (Bart De Smet gives excellent explanation about the idea and implementation details behind Zip operator). In a nutshell it merges two sequences into one sequence by using the selector function.

int[] numbers = { 1, 2, 3, 4 };
string[] words = { "one", "two", "three" };

numbers.Zip(words, (first, second) => first + " " + second)
 .Run(Console.WriteLine); 

// 1 one
// 2 two
// 3 three

While it seems pretty simple and clear there is a subtlety in its behavior. Zip is implemented something like this (omitting arguments checking):

public static IEnumerable<TResult> Zip<TFirst, TSecond, TResult>(IEnumerable<TFirst> first, IEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
  {
   using (IEnumerator<TFirst> fe = first.GetEnumerator())
   {
    using (IEnumerator<TSecond> se = second.GetEnumerator())
    {
     // What if call to fe.MoveNext() throws an exception
     // returns false or never returns?
     while (fe.MoveNext() && se.MoveNext())
     {
      yield return resultSelector(fe.Current, se.Current);
     }
    }
   }
  }

A call to MoveNext on first sequence’s enumerator may prevent from calling MoveNext on second sequence’s enumerator because of exception thrown, terminated or stuck sequence (it is explained in great details in “Zip’em together” section of More LINQ with System.Interactive – More combinators for your Swiss Army Knife and this is where this post topic came from).

As an exercise we will implement Zip operator that could fetch results from both sources simultaneously, combining their results or exceptions into produced results.

Here is a solution sketch:

  • As either enumerator can get stuck two worker threads are used to fetch results out of sequences and one coordination thread (actually caller’s thread is used) that combines results, terminations or exceptions.
  • Coordinator waits for notification that
    • either both workers successfully fetched next value out of sequences
    • or either of them normally or abnormally (exception is thrown) terminated.
  • Coordinator allows workers to proceed only if both workers successfully fetched next value. Before signaling workers to proceed coordinator resets primitive to wait for notifications from workers on to support next iteration.
  • Workers notify coordinator on fetched values and termination.
  • Once notified coordinator workers wait until
    • either all workers produced results and coordinator allows them to fetch next results from sequences (in order to prevent fetching unnecessary results and potentially producing unnecessary side effects)
    • or coordinator notified them on outer sequence termination due to either of sequences termination (normal or abnormal).
  • When all workers proceed primitive must be reset to support next iteration.
  • Once outer sequence is terminated the last alive worker must dispose shared resources.

As we expect that zipping may be done on infinite sequences or fetching next item from a sequence may take infinitely long time or even consumer may wait indefinitely long before fetching next zipped item explicitly spawned threads are used to avoid seizing thread pool threads for indefinitely long time.

For now notification problems do not take into account abnormal notifications (we’ll take a look at it below).

Task Parallel Library provides for both waiting problems very convenient primitives:

  • CountdownEvent - represents a synchronization primitive that is signaled when its count reaches zero.
  • Barrier - enables multiple tasks to cooperatively work on an algorithm in parallel through multiple phases.

Coordinator needs to track remaining work to be completed and be awaken when it is done. CountdownEvent starts with the number of workers (in this case 2) and reset back to this value just before allowing workers to proceed to next iteration.

Things a little bit more trickier with workers wait problem. Barrier allows to setup number of participants. If we will setup number of participants equal to number of worker then once they all arrive at the barrier they will proceed to next iteration. But this not what we want as workers must not proceed without coordinator’s permission. In that case we’ll make him a participant as well. Coordinator will arrive at the barrier once next zipped value (next iteration) is requested. After every one pass the barrier it automatically goes to its initial state so no explicit reset is required.

But what to do in case of normal or abnormal termination of either sequences. Zip sequence must be terminated as well and we cannot wait for the other sequence to fetch next value or terminate because it may take indefinitely long time. Fortunately both primitives (CountdownEvent and Barrier) support new .NET 4 Cancellation Framework and in particular:

Coordinator waits on the CountdownEvent instance with a CancellationToken instance that workers can use to propagate cancellation thus notifying about termination.

Workers wait on the Barrier instance with CancellationToken instance that coordinator can use to notify about outer sequence (zip sequence) termination.

The last thing is how to deal uniformly with values, exceptions and normal sequence termination. Hopefully Reactive Extensions for .NET (Rx) got the right tool for that:

Basically value, exception and termination are represented by a type derived from Notification<T> where T is a sequence element type.

Let’s now put everything together.

public static class ZipEx
{
 public static IEnumerable<TResult> RightHandZip<TFirst, TSecond, TResult>(this IEnumerable<TFirst> first, IEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
 {
  Contract.Requires(first != null);
  Contract.Requires(second != null);
  Contract.Requires(resultSelector != null);

  return new Zipper<TFirst, TSecond>().Zip(first, second, resultSelector);
 }

 class Zipper<TFirst, TSecond>
 {
  private const int c_sourceCount = 2;
  private int m_workersCount = c_sourceCount;

  private CountdownEvent m_zipEvent;
  private CancellationTokenSource m_zipCancellation;

  private Barrier m_srcBarrier;
  private CancellationTokenSource m_srcCancellation;

  private volatile Notification<TFirst> m_first;
  private volatile Notification<TSecond> m_second;

  public IEnumerable<TResult> Zip<TResult>(IEnumerable<TFirst> first, IEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
  {
   // Coordinator tracks remaining work through
      // count down event
   m_zipEvent = new CountdownEvent(c_sourceCount);
      // Workers may use this cancellation token source
      // to awake coordinator in case of ternimation of 
      // either sequence
   m_zipCancellation = new CancellationTokenSource();
      // Here we basically state that coordinator also
      // barrier participant
   m_srcBarrier = new Barrier(c_sourceCount + 1);
      // Coordinator may use this cancellation token source
      // to awake workers in case of outer sequence (zip)
      // termination
   m_srcCancellation = new CancellationTokenSource();

   // Spawn workers that will fetch results from both 
   // sequences simultaneously
   RunWorker(first, n => { m_first = n; });
   RunWorker(second, n => { m_second = n; });

   var token = m_zipCancellation.Token;
   while (true)
   {
    try
    {
     // Wait until either all workers fetched next 
     // values or any worker notified on completition 
     // (either due to exception or sequence completition)
     m_zipEvent.Wait(token);
    }
    catch (OperationCanceledException)
    {
     // Notify workers that zip sequence is terminated
     m_srcCancellation.Cancel();
     // If zip sequence is terminated due to exception(s) 
     // throw either AggregateException if both sequences 
     // resulted in exception or the exception that 
     // terminated either of them
     ThrowIfError();
     // Otherwise sequence is terminated due to one of 
     // the sequences completion
     yield break;
    }
    // Reset count down event for the next round
    m_zipEvent.Reset(c_sourceCount);
    // Yield next zipped value
    yield return resultSelector(m_first.Current, m_second.Current);
    // Only once consumer asks for the next value we allow 
    // workers to attempt to fetch next value
    // Zipper is a barrier participant that arrives at the  
     // barrier once next zipped is requested
    m_srcBarrier.SignalAndWait();
   }
  }

  private void RunWorker<T>(IEnumerable<T> enumerable, Action<Notification<T>> update)
  {
   // In order to fetch results from sequences we will use 
   // manually spawned threads as we do not want to seize 
   // ThreadPool threads for indefinite time. 
   new Thread(() =>
      {
       var token = m_srcCancellation.Token;
       foreach (var notification in enumerable.Materialize())
       {
        update(notification);

        if (notification.Kind == NotificationKind.OnNext)
        {
         // Notify on sucessfully fetched value out 
            // of sequence
         m_zipEvent.Signal();
        }
        else
        {
      // Either sequence completition or error 
            // notifications terminate zipped sequence
         m_zipCancellation.Cancel();
        }

        try
        {
      // Wait until next zipped value is requested or 
      // zip sequence is terminated
         m_srcBarrier.SignalAndWait(token);
        }
        catch (OperationCanceledException)
        {
      // Last alive worker is responsible for resources 
      // disposal
         DisposeIfLast();
         return;
        }
       }
      }) {IsBackground = true}.Start();
  }

  private void DisposeIfLast()
  {
   if (Interlocked.Decrement(ref m_workersCount) == 0)
   {
    DisposeAndNullify(ref m_zipEvent);
    DisposeAndNullify(ref m_srcBarrier);
    DisposeAndNullify(ref m_zipCancellation);
    DisposeAndNullify(ref m_srcCancellation);
   }
  }

  private void ThrowIfError()
  {
   var ex1 = ExtractErrorOrNothing(m_first);
   var ex2 = ExtractErrorOrNothing(m_second);

   if (ex1 != null && ex2 != null)
   {
    throw new AggregateException(ex1, ex2);
   }
   ThrowIfNotNull(ex1);
   ThrowIfNotNull(ex2);
  }

  private static Exception ExtractErrorOrNothing<T>(Notification<T> n)
  {
   if (n != null && n.Kind == NotificationKind.OnError)
   {
    return ((Notification<T>.OnError) n).Exception;
   }
   return null;
  }

  private static void ThrowIfNotNull(Exception ex)
  {
   if (ex != null)
   {
    throw ex;
   }
  }

  private static void DisposeAndNullify<T>(ref T res)
   where T : class, IDisposable
  {
   if (res != null)
   {
    res.Dispose();
    res = null;
   }
  }
 }
}

Let’s setup examples infrastructure

static IEnumerable<int> Infinite(string id)
{
  Console.Write("{0} -> X ", id);
  while(true) {}
  yield break;
}

static IEnumerable<int> Seq(string id, int start, int count)
{
  return Enumerable.Range(0, count).Select(x => x * 2 + start)
    .Do(x => Console.Write("{0} -> {1} ", id, x));
}

static IEnumerable<int> Abnormal(string id)
{
  return EnumerableEx.Throw<int>(new Exception())
    .Finally(() => Console.Write("{0} -> E ", id));
}

static void RunTest(IEnumerable<int> first, IEnumerable<int> second)
{
  Func<int, int, string> format = (f, s) => String.Format("[{0}, {1}] ", f, s);
  try
  {
    first.RightHandZip(second, format).Run(Console.Write);
  }
  catch (Exception ex)
  {
    Console.Write(ex.GetType());
  }
  Console.WriteLine();
}

and run some examples

const int count = 3;
// Both sequences terminate
RunTest(Seq("F", 0, count), Seq("S", 1, count));
// Terminates normally
// F -> 0 S -> 1 [0, 1] F -> 2 S -> 3 [2, 3] F -> 4 S -> 5 [4, 5]

// First normally terminates and second continues
RunTest(Seq("F", 0, count), Seq("S", 1, count + 1));
// Terminates normally
// F -> 0 S -> 1 [0, 1] S -> 3 F -> 2 [2, 3] F -> 4 S -> 5 [4, 5] S -> 7

// First abnormally terminates and second continues
RunTest(Seq("F", 0, count).Concat(Abnormal("F")), Seq("S", 1, count + 1));
// Terminates abnormally
// F -> 0 S -> 1 [0, 1] F -> 2 S -> 3 [2, 3] S -> 5 F -> 4 [4, 5] S -> 7 F -> E System.Exception

// Both terminate abnormally
RunTest(Seq("F", 0, count).Concat(Abnormal("F")), Seq("S", 1, count).Concat(Abnormal("S")));
// Terminates abnormally
// F -> 0 S -> 1 [0, 1] F -> 2 S -> 3 [2, 3] S -> 5 F -> 4 [4, 5] F -> E S -> E System.AggregateException

// First stucks and second terminates abnormally
RunTest(Seq("F", 0, count).Concat(Infinite("F")), Seq("S", 1, count).Concat(Abnormal("S")));
// Terminates abnormally
// F -> 0 S -> 1 [0, 1] F -> 2 S -> 3 [2, 3] F -> 4 S -> 5 [4, 5] S -> E F -> X System.Exception

// First stucks and second terminates normally
RunTest(Seq("F", 0, count).Concat(Infinite("F")), Seq("S", 1, count));
// Terminates normally
// F -> 0 S -> 1 [0, 1] F -> 2 S -> 3 [2, 3] F -> 4 S -> 5 [4, 5] F -> X

// First stucks and second continues
RunTest(Seq("F", 0, count).Concat(Infinite("F")), Seq("S", 1, count + 1));
// Stucks
// F -> 0 S -> 1 [0, 1] F -> 2 S -> 3 [2, 3] F -> 4 S -> 5 [4, 5] F -> X S -> 7

That’s it.

Monday, February 8, 2010

Binary heap based priority queue

Design of container that supports items ordering raises lots of interesting design questions to consider. To be more concrete we will design simple priority queue based on binary min heap that supports the following operations:

  • Enqueue – add an item to the queue with an associated priority.
  • Dequeue - remove the element from the queue that has the highest priority and return it.
  • Peek – look at the highest priority element without removing it.

Good design that solves wrong problems isn’t better than the bad one. So the first step is to identify right problems to solve. Priority queue maintains set of items with associated key (priority). Items get off the queue based on employed ordering mechanism for keys (priorities). Basically the two problems we need to solve (from API design perspective) are the ways to represent:

  • association of a key (priority) and corresponding item
  • ordering mechanism for keys (priorities)

Association can be either explicit (PriorityQueue<TItem, TKey>, where key type is explicitly stated) or implicit (PriorityQueue<TItem>, where key type is of no interest). Each type parameter must have concrete consumers. Priority queue itself doesn’t care (although priority queues with updateable priority do) about keys but rather about comparing keys. Client code cannot benefit from explicit keys as well because it can easily access associated key as the client code defines what key actually means. So there is no point in cluttering API with irrelevant details (of what keys really are). Thus we will use PriorityQueue<T> (as now we have the only type parameter we will use short name for it) and let consumers provide comparison logic of two items based on whatever consumer defines as keys.

There are several options to represent comparison mechanism.

Item type may be constrained to support comparison through generic type parameter constraint:

class PriorityQueue<T> 
  where T : IComparable<T>
{
}

Though this approach benefits from clearly stated comparison mechanism it implies significant limitations:

  • It doesn’t support naturally comparison of items of the same type using different aspects (for example, in one case objects of Customer type are compared using number of orders and in the other – using date of last order). Of course consumer can create lightweight wrapper that aggregates object to compare and does actual comparison but it is not feasible from additional memory consumption and additional usage complexity perspectives.
  • It doesn’t support naturally changing order direction (ascending <-> descending) and thus it may require adding support into the data structure itself.

With those limitations in mind we can use comparers – something that knows how to compare two objects:

  • A type that implements IComparer<T> which benefits from .NET Framework support (it provides great documentation support and default implementation).
  • or a delegate Func<T, T, int> that accepts two parameters of type T and returns integer value indicating whether one is less than, equal to, or greater than the other. It benefits from anonymous functions conveniences.

Comparers are designed for particular usage scenarios and single instance corresponds to items container. Thus limitation mentioned above are not applied to comparers.

Taking into account value of .NET Framework support for IComparer<T> and that it is easy to create wrapper that derives from Comparer<T> and delegates comparison to aggregated function we will use IComparer<T> approach (although it seems costless to add also support for Func<T, T, int> mechanism and create wrapper ourselves in most cases it is best to avoid providing means to do the same thing in multiple ways or otherwise potential confusion may outweigh benefits).

Now putting everything together.

// Unbounded priority queue based on binary min heap
public class PriorityQueue<T>
{
  private const int c_initialCapacity = 4;
  private readonly IComparer<T> m_comparer;
  private T[] m_items;
  private int m_count;

  public PriorityQueue()
    : this(Comparer<T>.Default)
  {
  }

  public PriorityQueue(IComparer<T> comparer)
    : this(comparer, c_initialCapacity)
  {
  }

  public PriorityQueue(IComparer<T> comparer, int capacity)
  {
    Contract.Requires<ArgumentOutOfRangeException>(capacity >= 0);
    Contract.Requires<ArgumentNullException>(comparer != null);

    m_comparer = comparer;
    m_items = new T[capacity];
  }

  public PriorityQueue(IEnumerable<T> source)
    : this(source, Comparer<T>.Default)
  {
  }

  public PriorityQueue(IEnumerable<T> source, IComparer<T> comparer)
  {
    Contract.Requires<ArgumentNullException>(source != null);
    Contract.Requires<ArgumentNullException>(comparer != null);

    m_comparer = comparer;
    // In most cases queue that is created out of sequence 
    // of items will be emptied step by step rather than 
    // new items added and thus initially the queue is 
    // not expanded but rather left full
    m_items = source.ToArray();
    m_count = m_items.Length;
    // Restore heap order
    FixWhole();
  }

  public int Capacity
  {
    get { return m_items.Length; }
  }

  public int Count
  {
    get { return m_count; }
  }

  public void Enqueue(T e)
  {
    m_items[m_count++] = e;
    // Restore heap if it was broken
    FixUp(m_count - 1);
    // Once items count reaches half of the queue capacity 
    // it is doubled 
    if (m_count >= m_items.Length/2)
    {
      Expand(m_items.Length*2);
    }
  }

  public T Dequeue()
  {
    Contract.Requires<InvalidOperationException>(m_count > 0);

    var e = m_items[0];
    m_items[0] = m_items[--m_count];
    // Restore heap if it was broken
    FixDown(0);
    // Once items count reaches one eighth  of the queue 
    // capacity it is reduced to half so that items
    // still occupy one fourth (if it is reduced when 
    // count reaches one fourth after reduce items will
    // occupy half of queue capacity and next enqueued item
    // will require queue expand)
    if (m_count <= m_items.Length/8)
    {
      Expand(m_items.Length/2);
    }

    return e;
  }

  public T Peek()
  {
    Contract.Requires<InvalidOperationException>(m_count > 0);

    return m_items[0];
  }

  private void FixWhole()
  {
    // Using bottom-up heap construction method enforce
    // heap property
    for (int k = m_items.Length/2 - 1; k >= 0; k--)
    {
      FixDown(k);
    }
  }

  private void FixUp(int i)
  {
    // Make sure that starting with i-th node up to the root
    // the tree satisfies the heap property: if B is a child 
    // node of A, then key(A) ≤ key(B)
    for (int c = i, p = Parent(c); c > 0; c = p, p = Parent(p))
    {
      if (Compare(m_items[p], m_items[c]) < 0)
      {
        break;
      }
      Swap(m_items, c, p);
    }
  }

  private void FixDown(int i)
  {
    // Make sure that starting with i-th node down to the leaf 
    // the tree satisfies the heap property: if B is a child 
    // node of A, then key(A) ≤ key(B)
    for (int p = i, c = FirstChild(p); c < m_count; p = c, c = FirstChild(c))
    {
      if (c + 1 < m_count && Compare(m_items[c + 1], m_items[c]) < 0)
      {
        c++;
      }
      if (Compare(m_items[p], m_items[c]) < 0)
      {
        break;
      }
      Swap(m_items, p, c);
    }
  }

  private static int Parent(int i)
  {
    return (i - 1)/2;
  }

  private static int FirstChild(int i)
  {
    return i*2 + 1;
  }

  private int Compare(T a, T b)
  {
    return m_comparer.Compare(a, b);
  }

  private void Expand(int capacity)
  {
    Array.Resize(ref m_items, capacity);
  }

  private static void Swap(T[] arr, int i, int j)
  {
    var t = arr[i];
    arr[i] = arr[j];
    arr[j] = t;
  }
}

Example below prints top 200 elements from sequence of mscorlib types ordered by full name (sorting it first and than taking first 200 elements is less efficient).

class TypeNameComparer : Comparer<Type>
{
  public override int Compare(Type x, Type y)
  {
    Contract.Requires(x != null);
    Contract.Requires(y != null);

    return x.FullName.CompareTo(y.FullName);
  }
}

...

const int count = 200;
var types = typeof (object).Assembly.GetTypes();
var typesQueue = new PriorityQueue<Type>(types, new TypeNameComparer());

for (int i = 0; i < count && typesQueue.Count > 0; i++)
{
  Console.WriteLine(typesQueue.Dequeue());
}

That’s it.