Monday, December 27, 2010

Parallel graph search

Graph is a concept used to describe relationships between things where vertices (or nodes) represent things and edges represent relationships. Graph is made up of two finite sets (vertices and edges) and denoted by G = (V, E). Nodes usually are denoted by non-negative integers (for example, graph with 3 nodes with have nodes denoted by 0, 1, 2) and edges are pairs of nodes (s, t) where s is a source and t is a target node.

Searching for a simple path (path with no repeated vertices) that connects two nodes is one of the basic problems that are solved using graph search. We need to visit each node once (meaning visited nodes tracking is on) to find out is there any path between two given nodes. Time complexity for graph search is O(V).

Let’s parallelize the thing to speed it up.

Graph representation is irrelevant for our case so let’s assume implementation of the following public API is in place:

public class Graph
{
    // Initializes new graph with given number of nodes and 
    // set of edges
    public Graph(int nodes, IEnumerable<Tuple<int, int>> edges);
    // Gets number of nodes in the graph
    public int Nodes { get; }
    // Gets list of nodes adjacent to given one
    public IEnumerable<int> AdjacentTo(int node);
}

Starting with a source node for each node being processed search condition is checked and search is expanded to adjacent nodes that are not yet visited. Search continues until condition is met or no more reachable not visited nodes are left.

First part of the solution is to process nodes in parallel until no more is left or early termination is requested. The idea is to maintain two work queues for current and next phases. Items from current phase queue are processed in parallel and new work items are added to next phase queue. At the end of phase queues are switched. While this mechanism covers “while not empty” scenario, cooperative cancellation covers early search termination.

public static class ParallelAlgorithms
{
    public static void DoWhileNotEmpty<T>(IEnumerable<T> seed, Action<T, Action<T>> body, CancellationToken cancellationToken)
    {
        // Maintain two work queues for current and next phases
        var curr = new ConcurrentQueue<T>(seed);
        var next = new ConcurrentQueue<T>();
        var parallelOptions = new ParallelOptions {CancellationToken = cancellationToken};
        // Until there is something to do
        while (!curr.IsEmpty)
        {
            // Function to add work for the next phase
            Action<T> add = next.Enqueue;
            // Process current work queue in parallel while 
            // populating next one
            Parallel.ForEach(curr, parallelOptions, t => body(t, add));
            // Switch queues and empty next one
            curr = next;
            next = new ConcurrentQueue<T>();
        }
    }
}

Now that we have processing mechanism we need to bring visited nodes tracking mechanism. As part of our solution we must find a path that connects two given nodes if one exists. Thus instead of just remembering which nodes were visited we’ll keep track of parent nodes used to visit a node. Unvisited nodes are marked with –1. Once search is finished we can reconstruct path backwards between target and source by following parent links.

public static class ParallelGraph
{
    public static int[] Search(this Graph graph, int source, Func<Tuple<int, int>, bool> func)
    {
        const int undef = -1;
        // Start with dummy loop edge
        var seed = Enumerable.Repeat(Tuple.Create(source, source), 1);
        // Mark all nodes as not yet visited
        var tree = Enumerable.Repeat(undef, graph.Nodes).ToArray();
        tree[source] = source;
        // Cancellation token source used to exit graph search 
        // once search condition is met.
        var cancellationTokenSource = new CancellationTokenSource();
        try
        {
            // Until there are reachable not visited nodes 
            ParallelAlgorithms
                .DoWhileNotEmpty(seed, (edge, add) =>
            {
                var from = edge.Item2;

                // and search condition is not met
                if (!func(edge))
                    cancellationTokenSource.Cancel();

                // Try expand search to adjacent nodes
                foreach (var to in graph.AdjacentTo(from))
                {
                    // Expand search to not yet visited nodes 
                    // (otherwise if node is expanded and 
                    // then checked during processing we may 
                    // end up with lots of already visited 
                    // nodes in memory which is a problem 
                    // for large dense graphs) marking nodes 
                    // to visit next
                    if (Interlocked.CompareExchange(ref tree[to], from, undef) != undef)
                        continue;
                    add(Tuple.Create(from, to));
                }
            }, cancellationTokenSource.Token);
        }
        catch (OperationCanceledException ex)
        {
            // Check if exception originated from parallel 
            // processing cancellation request
            if (ex.CancellationToken != cancellationTokenSource.Token)
                throw;
        }
        // In the end we have spanning tree that connects
        // all the nodes reachable from source
        return tree;
    }

    public static IEnumerable<int> FindPath(this Graph graph, int from, int to)
    {
        // Search within graph until reached 'to'
        var tree = Search(graph, from, edge => edge.Item2 != to);
        // Check if there is a path that connects 'from' and 'to'
        if (tree[to] == -1)
            return Enumerable.Empty<int>();
        // Reconstruct path that connects 'from' and 'to'
        var path = new List<int>();
        while (tree[to] != to)
        {
            path.Add(to);
            to = tree[to];
        }
        path.Add(to);
        // Reverse path as it is reconstructed backwards
        return Enumerable.Reverse(path);
    }
}

Done!

Thursday, November 11, 2010

Joining Microsoft

Find a job you like and you add five days to every week.

- H. Jackson Brown, Jr.

I decided to join Microsoft in attempt to make my week longer. Lot’s of things to learn and challenging problems to solve. But this is where interesting topics come from.

I will continue chasing state of the art soon. Now I need to get materialized in Seattle area =).

Sunday, October 17, 2010

Parallel merge sort

Divide and conquer algorithm solves the problem by:

  1. dividing problem into two or more smaller independent sub-problems of the same type of problem
  2. solving each sub-problem recursively
  3. and combining their results.

Sub-problem independency makes divide and conquer algorithms natural for dynamic task parallelism. Quicksort is a good example (actually you can find its parallel version here). We’ll focus on parallel merge sort as it better parallelizes. Basically the term parallelism means ratio T1/Ti where T1 is the execution time on a single processor and Ti is the execution on infinite number of processors.

Let’s implement sequential merge sort first as parallel merge sort will use it. In general merge sort in terms of divide and conquer defined as follows:

  1. If the array is of size 0 or 1 it is already sorted; otherwise divide it into two sub-arrays of about half the size
  2. recursively sort each sub-array
  3. and merge sorted sub-arrays into one sorted array.

It is quite straightforward with a small improvement that allows to avoid unnecessary allocations and data copying.

class MergeSortHelper<T>
{
    private readonly IComparer<T> _comparer;

    public MergeSortHelper()
        : this(Comparer<T>.Default)
    {
    }

    public MergeSortHelper(IComparer<T> comparer)
    {
        _comparer = comparer;
    }

    public void MergeSort(T[] array, int low, int high, bool parallel)
    {
        // Create a copy of the original array. Switching between
        // original array and its copy will allow to avoid 
        // additional array allocations and data copying.
        var copy = (T[]) array.Clone();
        if (parallel)
            ParallelMergeSort(array, copy, low, high, GetMaxDepth());
        else
            SequentialMergeSort(array, copy, low, high);
    }

    private void SequentialMergeSort(T[] to, T[] temp, int low, int high)
    {
        if (low >= high)
            return;
        var mid = (low + high) / 2;
        // On the way down the recursion tree both arrays have 
        // the same data so we can switch them. Sort two 
        // sub-arrays first so that they are placed into the temp 
        // array.
        SequentialMergeSort(temp, to, low, mid);
        SequentialMergeSort(temp, to, mid + 1, high);
        // Once temp array contains two sorted sub-arrays
        // they are merged into target array.
        SequentialMerge(to, temp, low, mid, mid + 1, high, low);
        // On the way up either we are done as the target array
        // is the original array and now contains required 
        // sub-array sorted or it is the temp array from previous 
        // step and contains smaller sub-array that will be 
        // merged into the target array from previous step 
        // (which is the temp array of this step and so we 
        // can destroy its contents).
    }

    // Although sub-arrays being merged in sequential version 
    // are adjacent that is not the case for parallel version 
    // and thus sub-arrays boundaries must be specified 
    // explicitly.
    private void SequentialMerge(T[] to, T[] temp, int lowX, int highX, int lowY, int highY, int lowTo)
    {
        var highTo = lowTo + highX - lowX + highY - lowY + 1;
        for (; lowTo <= highTo; lowTo++)
        {
            if (lowX > highX)
                to[lowTo] = temp[lowY++];
            else if (lowY > highY)
                to[lowTo] = temp[lowX++];
            else
                to[lowTo] = Less(temp[lowX], temp[lowY])
                                ? temp[lowX++]
                                : temp[lowY++];
        }
    }

    private bool Less(T x, T y)
    {
        return _comparer.Compare(x, y) < 0;
    }
    
    ...

Now we need to parallelize it. Let’s get obvious out of the way. Sorting two sub-arrays can be done in parallel just like parallel quicksort. We can proceed to merge step only once both sub-arrays are sorted.

Parallelizing merge is a more interesting task. Sequential version looks like a single indivisible task. We need to find a way to separate merging into independent tasks that can be run in parallel. The idea behind algorithm is the following:

  1. Let’s assume we want to merge sorted arrays X and Y. Select X[m] median element in X. Elements in X[ .. m-1] are less than or equal to X[m]. Using binary search find index k of the first element in Y greater than X[m]. Thus Y[ .. k-1] are less than or equal to X[m] as well. Elements in X[m+1 .. ] are greater than or equal to X[m] and Y[k .. ] are greater. So merge(X, Y) can be defined as concat(merge(X[ .. m–1], Y[ .. k–1]), X[m], merge(X[m+1 .. ], Y[k .. ]))
  2. now we can recursively in parallel do merge(X[ .. m-1], Y[ .. k–1]) and merge(X[m+1 .. ], Y[k .. ])
  3. and then concat results.

Although algorithm description pretty abstract there will be more detailed comments in code below.

    ...

    private const int SEQUENTIAL_THRESHOLD = 2048;

    // Recursion depth is utilized to limit number of spawned 
    // tasks. 
    private void ParallelMergeSort(T[] to, T[] temp, int low, int high, int depth)
    {
        if (high - low + 1 <= SEQUENTIAL_THRESHOLD || depth <= 0)
        {
            // Resort to sequential algorithm if either 
            // recursion depth limit is reached or sub-problem 
            // size is not big enough to solve it in parallel.
            SequentialMergeSort(to, temp, low, high);
            return;
        }

        var mid = (low + high) / 2;
        // The same target/temp arrays switching technique 
        // as in sequential version applies in parallel 
        // version. sub-arrays are independent and thus can 
        // be sorted in parallel.
        depth--;
        Parallel.Invoke(
            () => ParallelMergeSort(temp, to, low, mid, depth),
            () => ParallelMergeSort(temp, to, mid + 1, high, depth)
            );
        // Once both taks ran to completion merge sorted 
        // sub-arrays in parallel.
        ParallelMerge(to, temp, low, mid, mid + 1, high, low, depth);
    }

    // As parallel merge is itself recursive the same mechanism
    // for tasks number limititation is used (recursion depth).
    private void ParallelMerge(T[] to, T[] temp, int lowX, int highX, int lowY, int highY, int lowTo, int depth)
    {
        var lengthX = highX - lowX + 1;
        var lengthY = highY - lowY + 1;

        if (lengthX + lengthY <= SEQUENTIAL_THRESHOLD || depth <= 0)
        {
            // Resort to sequential algorithm in case of small 
            // sub-problem or deep recursion.
            SequentialMerge(to, temp, lowX, highX, lowY, highY, lowTo);
            return;
        }

        if (lengthX < lengthY)
        {
            // Make sure that X range no less than Y range and 
            // if needed swap them.
            ParallelMerge(to, temp, lowY, highY, lowX, highX, lowTo, depth);
            return;
        }

        // Get median of the X sub-array. As X sub-array is 
        // sorted it means that X[lowX .. midX - 1] are less 
        // than or equal to median and X[midx + 1 .. highX] 
        // are greater or equal to median.
        var midX = (lowX + highX) / 2;
        // Find element in the Y sub-array that is strictly 
        // greater than X[midX]. Again as Y sub-array is 
        // sorted Y[lowY .. midY - 1] are less than or equal 
        // to X[midX] and Y[midY .. highY] are greater than 
        // X[midX].
        var midY = BinarySearch(temp, lowY, highY, temp[midX]);
        // Now we can compute final position in the target 
        // array of median of the X sub-array.
        var midTo = lowTo + midX - lowX + midY - lowY;
        to[midTo] = temp[midX];
        // The rest is to merge X[lowX .. midX - 1] with 
        // Y[lowY .. midY - 1] and X[midx + 1 .. highX] 
        // with Y[midY .. highY] preceeding and following 
        // median respectively in the target array. As 
        // pairs are idependent from their final position 
        // perspective they can be merged in parallel.
        depth--;
        Parallel.Invoke(
            () => ParallelMerge(to, temp, lowX, midX - 1, lowY, midY - 1, lowTo, depth),
            () => ParallelMerge(to, temp, midX + 1, highX, midY, highY, midTo + 1, depth)
            );
    }

    // Searches for index the first element in low to high 
    // range that is strictly greater than provided value 
    // and all elements within specified range are smaller 
    // or equal than index of the element next to range is 
    // returned.
    private int BinarySearch(T[] from, int low, int high, T lessThanOrEqualTo)
    {
        high = Math.Max(low, high + 1);
        while (low < high)
        {
            var mid = (low + high) / 2;
            if (Less(from[mid], lessThanOrEqualTo))
                low = mid + 1;
            else
                high = mid;
        }
        return low;
    }

    private static int GetMaxDepth()
    {
        // Although at each step we split unsorted array
        // into two equal size sub-arrays sorting them 
        // not be perfectly balanced because parallel merge 
        // may not be balanced. So we add some extra space for 
        // task creation and so will keep CPUs busy.
        return (int) Math.Log(Environment.ProcessorCount, 2) + 4;
    }
}

Now that the thing is ready let’s do some comparison. We’ll use small benchmark helper.

public class Benchmark
{
    public static IEnumerable<long> Run<T>(Func<int, T[]> generator, Action<T[]> action, int times)
    {
        var samples = new long[times];
        var sw = new Stopwatch();
        for (var i = 0; i < times; i++)
        {
            var input = generator(i);
            sw.Restart();
            action(input);
            sw.Stop();
            samples[i] = sw.ElapsedMilliseconds;
        }
        return samples;
    }
}

We'll calculate and compare average running time for a number of samples for sequential quicksort, parallel quicksort and parallel merge sort.

const int count = 10 * 1000 * 1000;
const int iterations = 10;
var rnd = new Random();

var input = new int[count];
Func<int, int[]> gen = i =>
                {
                    // Avoid allocating large array for every
                    // run and just fill existing one 
                    for (var j = 0; j < input.Length; j++)
                        input[j] = rnd.Next();

                    return input;
                };

Action<int[]> seqArraySort, parallelQuickSort, parallelMergeSort;
// If Array.Sort<T>(T[] a, IComparer<T> c) sees 
// Comparer<T>.Default it resorts to unmanaged CLR provided 
// sorting implementation, so we use here dummy comparer to 
// force it to use managed quicksort.
seqArraySort = a => Array.Sort(a, new IntComparer());
// Parallel quicksort implementation from TPL extras.
// We use dummy comparer as well because at some point
// parallel quicksort resorts back to Array.Sort
parallelQuickSort = a => ParallelAlgorithms.Sort(a, new IntComparer());
// Our parallel merge sort implementation
parallelMergeSort = a => new MergeSortHelper<int>().MergeSort(a, 0, a.Length - 1, true);

var sq = Benchmark.Run(gen, seqArraySort, iterations).Average();
var pq = Benchmark.Run(gen, parallelQuickSort, iterations).Average();
var pm = Benchmark.Run(gen, parallelMergeSort, iterations).Average();

On a two cores machine I got that parallel merge sort is more than 2x faster than sequential quicksort and up to 25% faster than parallel quicksort but at the cost of additional O(n) space. Still it is a good example of how to use dynamic task parallelism. Enjoy!

Sunday, September 19, 2010

Traverse binary tree in level order by spiral

Another puzzle is at stake, folks. This time it is binary tree related (not necessarily binary search tree as we are not interested in data relations but rather in binary tree structure). We need to traverse binary tree level by level (level is defined as set of all nodes at the same distance from root) in such a way that traversal direction within level changes from level to level thus forming a spiral. For example, consider binary tree below (it is rotated 90 degrees counter clockwise). Asterisk means NIL node.

                *
            20
                *
         19
                   *
               18
                   *
            17
                   *
               13
                   *
      12
          *
   11
       *
10
             *
          7
             *
       6
             *
          5
             *
    4
          *
       2
          *

Desired traversal for this tree will be: 10, 4, 11, 12, 6, 2, 5, 7, 19, 20, 17, 13, 18. Dissected into levels it looks like:

  1. 10
  2. 4, 11
  3. 12, 6, 2
  4. 5, 7, 19
  5. 20, 17
  6. 13, 18

It may not be obvious how to approach the problem. We’ll start with a well know problem of traversing binary tree level by level (also known as breadth first traversal). It can be done using queue.

static IEnumerable<T> LevelOrderLeftToRight<T>(TreeNode<T> root)
{
    if (root == null)
       yield break;

    var next = new Queue<TreeNode<T>>();
    next.Enqueue(root);

    while(next.Count > 0)
    {
       var node = next.Dequeue();
       yield return node.Data;

       EnqueueIfNotNull(next, node.Left);
       EnqueueIfNotNull(next, node.Right);
    }
}

static void EnqueueIfNotNull<T>(Queue<T> queue, T value)
   where T:class
{
    if (value != null)
       queue.Enqueue(value);
}

The queue contains yet to be examined nodes in level by level left to right (as we first enqueue left child and then right) order. At each step node at the front is dequeued, examined and its children are enqueued for further examination thus preserving level by level left to right order. It yields the following traversal: 10, 4, 11, 2, 6, 12, 5, 7, 19, 17, 20, 13, 18. Let’s split node sequence into levels:

  1. 10
  2. 4, 11
  3. 2, 6, 12
  4. 5, 7, 19
  5. 17, 20
  6. 13, 18

That looks close to sought-for order except every odd numbered level has must be reversed. With all nodes in the same container (queue) it is hard to do this. Let’s change code a little bit to make every two adjacent layers are separated into different containers.

static IEnumerable<T> LevelOrderLeftToRight<T>(TreeNode<T> root)
{
    if (root == null)
       yield break;

    var curr = new Queue<TreeNode<T>>();
    var next = new Queue<TreeNode<T>>();
    next.Enqueue(root);

   do
   {
      // Swap level containers
      Swap(ref curr, ref next);
      // Examine all nodes at current level
      while (curr.Count > 0)
      {
         var node = curr.Dequeue();
         yield return node.Data;
         // Fill next level preserving order
         EnqueueIfNotNull(next, node.Left);
         EnqueueIfNotNull(next, node.Right);
      }
      // Continue until next level has nodes
   } while (next.Count > 0);
}

static void Swap<T>(ref T a, ref T b)
{
   var tmp = a;
   a = b;
   b = tmp;
}

With adjacent levels separated we can change order within levels. Next level is a set of child nodes from current level. FIFO container (queue) makes sure that child nodes of earlier examined node will also be examined earlier. But this is opposite to what we are looking for. Change it to LIFO container (stack)! And that’s it. Child nodes of earlier examined node will be examined later.

static IEnumerable<T> LevelOrderBySpiral<T>(TreeNode<T> root)
{
   if (root == null)
      yield break;

   var curr = new Stack<TreeNode<T>>();
   var next = new Stack<TreeNode<T>>();
   // Specifies direction for the next level
   var leftToRight = true;
   next.Push(root);

   do
   {
      Swap(ref curr, ref next);
      while (curr.Count > 0)
      {
         var node = curr.Pop();
         yield return node.Data;
         // If next level must be traversed from left to right
         // we must first push right child node and then left
         // and in opposite order if next level will be 
         // traversed from right to left
         PushIfNotNull(next, leftToRight ? node.Right : node.Left);
         PushIfNotNull(next, leftToRight ? node.Left : node.Right);
      }
      // Change direction within level
      leftToRight = !leftToRight;
   } while (next.Count > 0);
}

static void PushIfNotNull<T>(Stack<T> stack, T value)
   where T : class
{
   if (value != null)
      stack.Push(value);
}

The code yields sought-for order.

Tuesday, August 24, 2010

External merge sort

If data to be sorted doesn’t fit into main memory external sorting is applicable. External merge sort can be separated into two phases:

  1. Create initial runs (run is a sequence of records that are in correct relative order or in other words sorted chunk of original file).
  2. Merge created runs into single sorted file.

To implement this algorithm I will use solutions from my previous posts so it may be helpful for you to look at them:

Let’s assume that M records at the same time are allowed to be loaded into main memory. One of the ways to create initial runs is to successively read M records from original file, sort them in memory and write back to disk. However we will use approach that allows us to create longer runs. It is called replacement selection.

The core structure behind this algorithm is priority queue. Taking one by one current minimum element out of the queue forms ordered sequence. And this is exactly what run stands for. The algorithm can be described as follows:

  1. Create two priority queues (that will contain items for current and next runs respectively) with capacity of M records.
  2. Prefill current run priority queue from unsorted file with M records.
  3. Create current run if there are elements in the current run priority queue:
    1. Take minimum element out of current run priority queue and append it to current run (basically write it to disk).
    2. Take next element from unsorted file (this is the replacement part of the algorithm) and compare it with just appended element.
    3. If it is less then it cannot be part of the current run (or otherwise order will be destroyed) and thus it is queued to the next  run priority queue.
    4. Otherwise it is part of the current run and it is queued to the current run priority queue.
    5. Continue steps 1 through 4 until current run priority queue is empty.
  4. Switch current and next runs priority queues and repeat step 3.

At any given moment at most M records are loaded into main memory as single written element into current run is replaced with single element from unsorted file if any (depending on comparison it either goes into current or next run).

Next step is to merge created initial runs. For the merge step we will use simplified algorithm (more advanced algorithms work with multiple physical devices to distribute runs, take into account data locality, etc.) based on k-way merge:

  1. Append created runs into a queue.
  2. Until there are more than one run in the queue:
    1. Dequeue and merge K runs into a single run and put it into the queue.
  3. Remaining run represents sorted original file.

Yeap, it is that simple. And let’s code it.

The implementation abstracts file structure and reading/writing details making algorithm more concise and easier to understand.

abstract class ExternalSorter<T>
{
	private readonly IComparer<T> m_comparer;
	private readonly int m_capacity;
	private readonly int m_mergeCount;

	protected ExternalSorter(IComparer<T> comparer, int capacity, int mergeCount)
	{
		m_comparer = comparer;
		m_capacity = capacity;
		m_mergeCount = mergeCount;
	}

	// Sorts unsorted file and returns sorted file name
	public string Sort(string unsorted)
	{
		var runs = Distribute(unsorted);
		return Merge(runs);
	}

	// Write run to disk and return created file name
	protected abstract string Write(IEnumerable<T> run);
	// Read run from file with given name
	protected abstract IEnumerable<T> Read(string name);

	// Merge step in this implementation is simpler than 
	// the one used in polyphase merge sort - it doesn't
	// take into account distribution over devices
	private string Merge(IEnumerable<string> runs)
	{
		var queue = new Queue<string>(runs);
		var runsToMerge = new List<string>(m_mergeCount);
		// Until single run is left do merge
		while (queue.Count > 1)
		{
			// Priority queue must not contain records more than 
			// required
			var count = m_mergeCount;
			while (queue.Count > 0 && count-- > 0)
				runsToMerge.Add(queue.Dequeue());
			// Perform n-way merge on selected runs where n is 
			// equal to number of physical devices with 
			// distributed runs but in our case we do not take 
			// into account them and thus n is equal to capacity
			var merged = runsToMerge.Select(Read).OrderedMerge(m_comparer);
			queue.Enqueue(Write(merged));

			runsToMerge.Clear();
		}
		// Last run represents source file sorted
		return queue.Dequeue();
	}

	// Distributes unsorted file into several sorted chunks
	// called runs (run is a sequence of records that are 
	// in correct relative order)
	private IEnumerable<string> Distribute(string unsorted)
	{
		var source = Read(unsorted);
		using (var enumerator = source.GetEnumerator())
		{
			var curr = new PriorityQueue<T>(m_comparer);
			var next = new PriorityQueue<T>(m_comparer);
			// Prefill priority queue to capacity which is used 
			// to create runs
			while (curr.Count < m_capacity && enumerator.MoveNext())
				curr.Enqueue(enumerator.Current);
			// Until unsorted source and priority queues are 
			// exhausted
			while (curr.Count > 0)
			{
				// Create next run and write it to disk
				var sorted = CreateRun(enumerator, curr, next);
				var run = Write(sorted);

				yield return run;

				Swap(ref curr, ref next);
			}
		}
	}

	private IEnumerable<T> CreateRun(IEnumerator<T> enumerator, PriorityQueue<T> curr, PriorityQueue<T> next)
	{
		while (curr.Count > 0)
		{
			var min = curr.Dequeue();
			yield return min;
			// Trying to move run to an end enumerator will 
			// result in returning false and thus current 
			// queue will simply be emptied step by step
			if (!enumerator.MoveNext())
				continue;

			// Check if current run can be extended with 
			// next element from unsorted source
			if (m_comparer.Compare(enumerator.Current, min) < 0)
			{
				// As current element is less than min in 
				// current run it may as well be less than 
				// elements that are already in the current 
				// run and thus from this element goes into 
				// next run
				next.Enqueue(enumerator.Current);
			}
			else
			{
				// Extend current run
				curr.Enqueue(enumerator.Current);
			}
		}
	}

	private static void Swap<U>(ref U a, ref U b)
	{
		var tmp = a;
		a = b;
		b = tmp;
	}
}

In the example below I created type that sorts text files containing single number per line.

class TextFileOfNumbersExternalSorter : ExternalSorter<int>
{
	public TextFileOfNumbersExternalSorter(int capacity, int mergeCount)
		: base(Comparer<int>.Default, capacity, mergeCount)
	{
	}

	protected override string Write(IEnumerable<int> run)
	{
		var file = Path.GetTempFileName();
		using (var writer = new StreamWriter(file))
		{
			run.Run(writer.WriteLine);
		}
		return file;
	}

	protected override IEnumerable<int> Read(string name)
	{
		using (var reader = new StreamReader(name))
		{
			while (!reader.EndOfStream)
				yield return Int32.Parse(reader.ReadLine());
		}
		File.Delete(name);
	}
}

That is used like this:

// capacity, mergeCount and unsortedFileName are initialized elsewhere
var sorter = new TextFileOfNumbersExternalSorter(capacity, mergeCount);
var sortedFileName = sorter.Sort(unsortedFileName);

That’s it folks!

Saturday, August 21, 2010

Minimum window that contains all characters

Like most of life's problems, this one can be solved with bending

- Bender B.Rodrigues

Let’s bend another problem. Given set of characters P and string T find minimum window in T that contains all characters in P. Applicable solution is restricted to O(length(T)) time complexity. For example, given a string T “of characters and as” and set of characters T in a form of a string “aa s” the minimum window will be “and as”.

The problem can be broken into two parts:

  • How to select window?
  • How to check that selected window contains all characters from P?

Selecting every possible window (all unique pairs (i, j) where 0 <= i <= j < length(T)) will lead to solution worse than O(length(T)^2) because you still need to check if all characters from P are within selected window. Instead we will check every possible window ending position. Thus there are at least length(T) windows to consider.

Any feasible window has length equal to or greater than length(P). Performing recheck for any considered window will result in a solution no better than O(length(T)*length(P)). Instead we need to use check results from previous iteration.

Now we need to make sure that checking if a particular character is in P is done in an optimal way. Taking into account that a particular character may appear more than once and window thus must contain appropriate number of characters. We will use hash table to map unique characters from P to their count for fast lookup.

And now let’s tie all things together.

  • Until reached the end of T move by one current window ending position.
  • Append next character to the end of previous window which to this moment doesn’t contain all necessary characters. Char to count map is used to track the number of characters left to find. Basically if character is in P count is decremented. The number may become negative meaning that there are more than required characters.
  • If unmatched character count goes to zero the window contains all required characters. However there may be redundant characters. Thus we try to compact current window. It is ok to do this as we are looking for minimum window and any window that is extended from this one won’t be better.
  • Once window is compacted compare it with the minimum one and updated it if needed.
  • If current window contains all the characters remove from it the first one simply by moving by one starting position to make sure that at each iteration previous window doesn’t contain all the characters (there is no point in appending new characters to a window that already contains all of the required ones).

Code the thing! =)

static string FindMinWindow(string t, string p)
{
	// Create char to count mapping for fast lookup
	// as some characters may appear more than once
	var charToCount = new Dictionary<char, int>();
	foreach (var c in p)
	{
		if (!charToCount.ContainsKey(c))
			charToCount.Add(c, 0);
		charToCount[c]++;
	}

	var unmatchesCount = p.Length;
	int minWindowLength = t.Length + 1, minWindowStart = -1;
	int currWindowStart = 0, currWindowEnd = 0;
	for (; currWindowEnd < t.Length; currWindowEnd++)
	{
		var c = t[currWindowEnd];
		// Skip chars that are not in P
		if (!charToCount.ContainsKey(c))
			continue;
		// Reduce unmatched characters count
		charToCount[c]--;
		if (charToCount[c] >= 0)
			// But do this only while count is positive
			// as count may go negative which means 
			// that there are more than required characters
			unmatchesCount--;

		// No complete match, so continue searching
		if (unmatchesCount > 0)
			continue;

		// Decrease window as much as possible by removing 
		// either chars that are not in T or those that 
		// are in T but there are too many of them
		c = t[currWindowStart];
		var contains = charToCount.ContainsKey(c);
		while (!contains || charToCount[c] < 0)
		{
			if (contains)
				// Return character to P
				charToCount[c]++;

			c = t[++currWindowStart];
			contains = charToCount.ContainsKey(c);
		}

		if (minWindowLength > currWindowEnd - currWindowStart + 1)
		{
			minWindowLength = currWindowEnd - currWindowStart + 1;
			minWindowStart = currWindowStart;
		}

		// Remove last char from window - it is definitely in a 
		// window because we stopped at it during decrease phase
		charToCount[c]++;
		unmatchesCount++;
		currWindowStart++;
	}

	return minWindowStart > -1 ?
	       t.Substring(minWindowStart, minWindowLength) :
	       String.Empty;
}

Every character is examined at most twice (during appending to the end and during compaction) so the whole solution has O(length(T)) time complexity assuming hash table lookup is O(1) operation.

Tuesday, August 17, 2010

Print numbers by spiral

Recently I came across simple yet interesting coding problem. So here is the deal. You are given positive integer N. Print first N ^ 2 positive integers in matrix form in a such a way that within matrix numbers form spiral starting from its center and goring clockwise. For example, for N = 5 matrix to be printed is:

21 22 23 24 25
20 7 8 9 10
19 6 1 2 11
18 5 4 3 12
17 16 15 14 13

Optimize it for speed and space.

One way you can approach it is to create N x N matrix and fill it with numbers that form spiral and then print whole matrix row by row. But this solution will be of N ^ 2 space complexity. Let’s try to reach O(1) space complexity.

The key observation here is how matrix changes when N changes by 1.

N = 1.

1

N = 2.

1 2
4 3

N = 3.

7 8 9
6 1 2
5 4 3

N = 4.

7 8 9 10
6 1 2 11
5 4 3 12
16 15 14 13

Can you see the pattern here? At every step we extend previous matrix (P) with additional column and row (C). If N is even we extend previous matrix of size N – 1 with right column and bottom row

P C
C C

and with left column and top row if it is odd

C C
C P

This leads us to naturally recursive algorithm. We have three cases:

  1. Print whole row of the current matrix (top when N is odd or bottom when N is even).
  2. Print row from previous matrix of size N - 1 first and then print value that belongs to current matrix (when N is even).
  3. Print value that belongs to current matrix and then print row from previous matrix of size N - 1 (when N is odd).
  4. Print matrix line by line.

So basically to print a row we need to know matrix size N and row index. Here goes the solution.

static void Print(int n)
{
	for(int i = 0; i < n; i++)
	{
		PrintLine(n, i);
		Console.WriteLine();
	}
}

static void PrintLine(int n, int i)
{
	// Number of integers in current matrix
	var n2 = n*n;
	// Number of itegers in previous matrix of size n - 1
	var m2 = (n - 1)*(n - 1);

	if (n % 2 == 0)
	{
		if (i == n - 1)
		{
			// n is even and we are at the last row so just 
			// print it
			for(int k = n2; k > n2 - n; k--)
			{
				PrintNum(k);
			}
		}
		else
		{
			// Print row from previous matrix of size n - 1 
			// first and then print value that belongs to current 
			// matrix. Previous matrix is at the top left corner 
			// so no need to adjust row index
			PrintLine(n - 1, i);
			// Skip all integers from previous matrix and upper 
			// ones in this columnas integers must form clockwise 
			// spiral
			PrintNum(m2 + 1 + i);
		}
	}
	else
	{
		if (i == 0)
		{
			// n is odd and we are at the first row so just 
			// print it
			for(int k = m2 + n; k <= n2; k++)
			{
				PrintNum(k);
			}
		}
		else
		{
			// Print value that belongs to current matrix and
			// then print row from previous matrix of size n - 1
			// Skip all integers from previous matric and bottom
			// ones in this column as integers must form clockwise
			// spiral
			PrintNum(m2 + n - i);
			// Previous matrix is at the bottom right corner so
			// row index must be reduced by 1
			PrintLine(n - 1, i - 1);
		}
	}
}

static void PrintNum(int n)
{
	Console.Write("{0, -4}  ", n);
}

If stack is not considered then this solution has O(1) space complexity otherwise O(N).

Sunday, April 4, 2010

Suffix array

Find all occurrences of a pattern (of length m) in a text (of length n) is quite commonly encountered string matching problem. For example, you hit Ctrl-F in your browser and type string you want to find while browser highlights every occurrence of a typed string on a page.

The naive solution is to at each iteration “shift” pattern along the text by 1 position and check if all characters of a pattern match to corresponding characters in text. This solution has O((n – m + 1)*m) complexity.

If either pattern or text is fixed it can be preprocessed to speed up the search. For example, if pattern is fixed we can use Knuth-Morris-Pratt algorithm to preprocess it in O(m) time and make search of its occurrences complexity O(n).

Fixed text that is queried many times can also be preprocessed to support fast patterns search. One way to do this is to build suffix array. The idea behind it pretty simple. It is basically a list of sorted in lexicographical order suffixes (which starts at some position inside the string and runs till the end of the string) of the subject text. For example, for the “mississippi” string we have the following:

i
ippi
issippi
ississippi
mississippi
pi
ppi
sippi
sissippi
ssippi
ssissippi

However due to strings immutability in .NET it is not practical to represent each suffix as separate string as it requires O(n^2) space. So instead starting positions of suffixes will be sorted. But why suffixes are selected in the first place? Because searching for every occurrence of a pattern is basically searching for every suffix that starts with the pattern.

Once they are sorted we can use binary search to find lower and upper bounds that enclose all suffixes that start with the pattern. Comparison of a suffix with a pattern during binary search should take into account only m (length of the pattern) characters as we are looking for suffixes that start with the pattern.

// Suffix array represents simple text indexing mechanism.
public class SuffixArray : IEnumerable<int>
{
  private const int c_lower = 0;
  private const int c_upper = -1;

  private readonly string m_text;
  private readonly int[] m_pos;
  private readonly int m_lower;
  private readonly int m_upper;

  SuffixArray(string text, int[] pos, int lower, int upper)
  {
    m_text = text;
    m_pos = pos;
    // Inclusive lower and upper boundaries define search range.
    m_lower = lower;
    m_upper = upper;
  }

  public static SuffixArray Build(string text)
  {
    Contract.Requires<ArgumentException>(!String.IsNullOrEmpty(text));

    var length = text.Length;
    // Sort starting positions of suffixes in lexicographical 
    // order.
    var pos = Enumerable.Range(0, length).ToArray();
    Array.Sort(pos, (x, y) => String.Compare(text, x, text, y, length));
    // By default all suffixes are in search range.
    return new SuffixArray(text, pos, 0, text.Length - 1);
  }

  public SuffixArray Search(string str)
  {
    Contract.Requires<ArgumentException>(!String.IsNullOrEmpty(str));

    // Search range is empty so nothing to narrow.
    if (m_lower > m_upper)
      return this;
    // Otherwise search for boundaries that enclose all 
    // suffixes that start with supplied string.
    var lower = Search(str, c_lower);
    var upper = Search(str, c_upper);
    // Once precomputed sorted suffixes positions don't change
    // but the boundaries do so that next refinement 
    // can be done within smaller range and thus faster.
    // For example, you may narrow search range to suffixes 
    // that start with "ab" and then search within this smaller
    // search range suffixes that start with "abc".
    return new SuffixArray(m_text, m_pos, lower + 1, upper);
  }

  public IEnumerator<int> GetEnumerator()
  {
    // Enumerates starting positions of suffixes that fall 
    // into search range.
    for (var i = m_lower; i <= m_upper; i++)
      yield return m_pos[i];
  }

  IEnumerator IEnumerable.GetEnumerator()
  {
    return GetEnumerator();
  }

  private int Compare(string w, int i)
  {
    // Comparison takes into account maximum length(w) 
    // characters. For example, strings "ab" and "abc" 
    // are thus considered equal.
    return String.Compare(w, 0, m_text, m_pos[i], w.Length);
  }

  private int Search(string w, int bound)
  {
    // Depending on bound value binary search results 
    // in either lower or upper boundary.
    int x = m_lower - 1, y = m_upper + 1;
    if (Compare(w, m_lower) < 0)
      return x;
    if (Compare(w, m_upper) > 0)
      return y;
    while (y - x > 1)
    {
      var m = (x + y)/2;
      // If bound equals to 0 left boundary andvances to median 
      // only // if subject is strictly greater than median and 
      // thus search results in lower bound (position that 
      // preceeds first suffix equal to or greater than 
      // subject w). Otherwise search results in upper bound 
      // (position that preceeds fisrt suffix that is greater 
      // than subject).
      if (Compare(w, m) > bound)
        x = m;
      else
        y = m;
    }
    return x;
  }
}

This implementation is simple (it has O(n^2 log n) complexity to sort and O(m log n) to search where n stands for text length and m for pattern length) and can be improved. It doesn’t take into account the fact that suffixes not arbitrary strings are sorted. On the other hand suffixes may share common prefixes and that may be used to speed up  binary search.

Here an example of narrowing the search.

var str = ...;
var sa = SuffixArray.Build(str);
string pat;
while ((pat = Console.ReadLine()) != String.Empty)
{
  sa = sa.Search(pat);
  foreach (var pos in sa)
  {
    Console.WriteLine(str.Substring(pos));
  }
}

Happy Easter, folks!

Wednesday, March 17, 2010

Selecting k smallest or largest elements

There are cases when you need to select a number of best (according to some definition) elements out of finite sequence (list). For example, select 10 most popular baby names in a particular year or select 10 biggest files on your hard drive.  

While selecting single minimum or maximum element can easily be done iteratively in O(n) selecting k smallest or largest elements (k smallest for short) is not that simple.

It makes sense to take advantage of sequences APIs composability. We’ll design an extension method with the signature defined below:

public static IEnumerable<TSource> TakeSmallest<TSource>(
 this IEnumerable<TSource> source, int count, IComparer<TSource> comparer)

The name originates from the fact that selecting k smallest elements can logically be expressed in terms of Enumerable.TakeWhile supplying predicate that returns true if an element is one of the k smallest. As the logical predicate is not changing only count do it is burned into method’s name (instead of “While” that represents changing predicate we have “Smallest”).

Now let’s find the solution.

If the whole list is sorted first k elements is what we are looking for.

public static IEnumerable<TSource> TakeSmallest<TSource>(
 this IEnumerable<TSource> source, int count, IComparer<TSource> comparer)
{
 return source.OrderBy(x => x, comparer).Take(count);
}

It is O(n log n) solution where n is the number of elements in the source sequence. We can do better.

Priority queue yields better performance characteristics if only subset of sorted sequence is required.

public static IEnumerable<TSource> TakeSmallest<TSource>(
 this IEnumerable<TSource> source, int count, IComparer<TSource> comparer)
{
 var queue = new PriorityQueue<TSource>(source, comparer);
 while (count > 0 && queue.Count > 0)
 {
  yield return queue.Dequeue();
  count--;
 }
}

It requires O(n) to build priority queue based on binary min heap and O(k log n) to retrieve first k elements. Better but we’ll improve more.

Quicksort algorithm picks pivot element, reorders elements such that the ones less than pivot go before it while greater elements go after it (equal can go either way). After that pivot is in its final position. Then both partitions are sorted recursively making whole sequence sorted. In order to prevent worst case scenario pivot selection can be randomized.

Basically we are interested in the k smallest elements themselves and not the ordering relation between them. Assuming partitioning just completed let’s denote set of elements that are before pivot (including pivot itself) by L and set of elements that are after pivot by H. According to partition definition L contains |L| (where |X| denotes number of elements in a set X) smallest elements. If |L| is equal to k we are done. If it is less than k than look for k smallest elements in L. Otherwise as we already have |L| smallest elements look for k - |L| smallest elements in H.

public static IEnumerable<TSource> TakeSmallest<TSource>(
  this IEnumerable<TSource> source, int count)
{
  return TakeSmallest(source, count, Comparer<TSource>.Default);
}

public static IEnumerable<TSource> TakeSmallest<TSource>(
  this IEnumerable<TSource> source, int count, IComparer<TSource> comparer)
{
  Contract.Requires<ArgumentNullException>(source != null);
  // Sieve handles situation when count >= source.Count()
  Contract.Requires<ArgumentOutOfRangeException>(count > 0);
  Contract.Requires<ArgumentNullException>(comparer != null);

  return new Sieve<TSource>(source, count, comparer);
}

class Sieve<T> : IEnumerable<T>
{
  private readonly IEnumerable<T> m_source;
  private readonly IComparer<T> m_comparer;
  private readonly int m_count;

  private readonly Random m_random;

  public Sieve(IEnumerable<T> source, int count, IComparer<T> comparer)
  {
    m_source = source;
    m_count = count;
    m_comparer = comparer;
    m_random = new Random();
  }

  public IEnumerator<T> GetEnumerator()
  {
    var col = m_source as ICollection<T>;
    if (col != null && m_count >= col.Count)
    {
      // There is not point in copying data
      return m_source.GetEnumerator();
    }
    var buf = m_source.ToArray();
    if (m_count >= buf.Length)
    {
      // Buffer already contains exact amount elements
      return buf.AsEnumerable().GetEnumerator();
    }
    // Find the solution
    return GetEnumerator(buf);
  }

  IEnumerator IEnumerable.GetEnumerator()
  {
    return GetEnumerator();
  }

  private IEnumerator<T> GetEnumerator(T[] buf)
  {
    var n = buf.Length;
    var k = m_count;
    // After rearrange is completed fist k 
    // items are the smallest elements
    Rearrange(buf, 0, n - 1, k);
    for (int i = 0; i < k; i++)
    {
      yield return buf[i];
    }
  }

  private void Rearrange(T[] buf, int l, int u, int k)
  {
    if (l == u)
    {
      return;
    }
    // Partition elements around randomly selected pivot
    var q = RandomizedPartition(buf, l, u);
    // Compute size of low partition (includes pivot)
    var s = q - l + 1;
    // We are done as low partition is what we were looking for
    if (k == s)
    {
      return;
    }

    if (k < s)
    {
      // Smallest elements group is less than low partition
      // find it there
      Rearrange(buf, l, q - 1, k);
    }
    else
    {
      // Low partition is in smallest elements group, find the 
      // rest in high partition
      Rearrange(buf, q + 1, u, k - s);
    }
  }

  private int RandomizedPartition(T[] buf, int l, int u)
  {
    // Select pivot randomly and swap it with the last element
    // to prevent worst case scenario where pivot is the 
    // largest remaining element
    Swap(buf, m_random.Next(l, u + 1), u);
    // Divides elements into two partitions:
    // - Low partition where elements that are less than pivot 
    // and pivot itself
    // - High partition contains the rest 
    var k = l;
    for (var i = l; i < u; i++)
    {
      if (m_comparer.Compare(buf[i], buf[u]) < 0)
      {
        Swap(buf, k++, i);
      }
    }
    // Put pivot into its final location
    Swap(buf, k, u);
    return k;
  }

  private static void Swap(T[] a, int i, int j)
  {
    var tmp = a[i];
    a[i] = a[j];
    a[j] = tmp;
  }
}

The solution is expected O(n) which means quit good performance in practice. Let’s run the thing.

const int count = 100;
const int max = 100;
var rnd = new Random();
var seq = Enumerable.Range(0, count).Select(_ => rnd.Next(max)).ToArray();
Func<int, int> i = x => x;

for(var k = 1; k < count / 2; k++)
{
  var a = seq.TakeSmallest(k).OrderBy(i);
  var b = seq.OrderBy(i).Take(k);

  Debug.Assert(a.SequenceEqual(b));
}

Enjoy!

Thursday, March 4, 2010

K-way merge

The classic merge (the one used in Merge Sort) takes as input some sorted lists and at each step outputs element with next smallest key thus producing sorted list that contains all the elements of the input lists.

An instance of a list is a computer representation of the mathematical concept of a finite sequence, that is, a tuple.

It is not always practical to have whole sequence in memory because of its considerable size nor to constraint sequence to be finite as only K first elements may be needed. Thus our algorithm must produce monotonically increasing (according to some comparison logic) potentially infinite sequence.

Two-way merge is the simplest variation where two lists are merged (it is named OrderedMerge to avoid confusion with EnumerableEx.Merge).

public static IEnumerable<T> OrderedMerge<T>(
  this IEnumerable<T> first,
  IEnumerable<T> second,
  IComparer<T> comparer)
{
  using (var e1 = first.GetEnumerator())
  {
    using (var e2 = second.GetEnumerator())
    {
      var c1 = e1.MoveNext();
      var c2 = e2.MoveNext();
      while (c1 && c2)
      {
        if (comparer.Compare(e1.Current, e2.Current) < 0)
        {
          yield return e1.Current;
          c1 = e1.MoveNext();
        }
        else
        {
          yield return e2.Current;
          c2 = e2.MoveNext();
        }
      }
      if (c1 || c2)
      {
        var e = c1 ? e1 : e2;
        do
        {
          yield return e.Current;
        } while (e.MoveNext());
      }
    }
  }
}

This algorithm runs in O(n) where n is a number of merged elements (as we may not measure it by the number of elements in sequences because of their potential infinity).

N-way merge is a more general algorithm that allows to merge N monotonically increasing sequences into one. It is used in external sorting. Here the most general overload signature (others are omitted as you can easily create them).

public static IEnumerable<T> OrderedMerge<T>(
  this IEnumerable<IEnumerable<T>> sources,
  IComparer<T> comparer)

Naive implementation will take advantage of existing two-way merge and composition.

public static IEnumerable<T> NaiveOrderedMerge<T>(
  this IEnumerable<IEnumerable<T>> sources,
  IComparer<T> comparer)
{
  return sources.Aggregate((seed, curr) => seed.OrderedMerge(curr, comparer));
}

Lets denote merging two sequences Si and Sj where i != j and both within [0, m) (m – is the number of sequences) with (Si, Sj). Then what the code above does is (((S0, S1), S2), … Sm). This implementation is naive because fetching next smallest element takes O(m) making total running time to O(nm). We can do better than that.

Recall that in my previous post we implemented priority queue based on binary heap that allows to get next smallest element in O(log n) where n is the size of the queue. Here is a solution sketch:

  • The queue will hold non empty sequences.
  • The priority of a sequence is its next element.
  • At each step we dequeue out of the queue sequence that has smallest next element. This element is next in the merged sequence.
  • If dequeued sequence is not empty it must be enqueued back because it may contain next smallest element in the merged  sequence.

Thus we will have queue of size that doesn’t exceed m (number of sequences) and thus making total running time O(n log m).

Other interesting aspect resource management. Each sequence has associated resources that needs to be released once merged sequence is terminated normally or abnormally. Number of sequences is not known in advance. We will use solution that is described in my previous post Disposing sequence of resources.

Now let’s do it.

// Convenience overloads are not included only most general one
public static IEnumerable<T> OrderedMerge<T>(
  this IEnumerable<IEnumerable<T>> sources,
  IComparer<T> comparer)
{
  // Make sure sequence of ordered sequences is not null
  Contract.Requires<ArgumentNullException>(sources != null);
  // and it doesn't contain nulls
  Contract.Requires(Contract.ForAll(sources, s => s != null));
  Contract.Requires<ArgumentNullException>(comparer != null);
  // Precondition checking is done outside of iterator because
  // of its lazy nature
  return OrderedMergeHelper(sources, comparer);
}

private static IEnumerable<T> OrderedMergeHelper<T>(
  IEnumerable<IEnumerable<T>> sources,
  IComparer<T> elementComparer)
{
  // Each sequence is expected to be ordered according to 
  // the same comparison logic as elementComparer provides
  var enumerators = sources.Select(e => e.GetEnumerator());
  // Disposing sequence of lazily acquired resources as 
  // a single resource
  using (var disposableEnumerators = enumerators.AsDisposable())
  {
    // The code below holds the following loop invariant:
    // - Priority queue contains enumerators that positioned at 
    // sequence element
    // - The queue at the top has enumerator that positioned at 
    // the smallest element of the remaining elements of all 
    // sequences

    // Ensures that only non empty sequences participate  in merge
    var nonEmpty = disposableEnumerators.Where(e => e.MoveNext());
    // Current value of enumerator is its priority 
    var comparer = new EnumeratorComparer<T>(elementComparer);
    // Use priority queue to get enumerator with smallest 
    // priority (current value)
    var queue = new PriorityQueue<IEnumerator<T>>(nonEmpty, comparer);

    // The queue is empty when all sequences are empty
    while (queue.Count > 0)
    {
      // Dequeue enumerator that positioned at element that 
      // is next in the merged sequence
      var min = queue.Dequeue();
      yield return min.Current;
      // Advance enumerator to next value
      if (min.MoveNext())
      {
        // If it has value that can be merged into resulting
        // sequence put it into the queue
        queue.Enqueue(min);
      }
    }
  }
}

// Provides comparison functionality for enumerators
private class EnumeratorComparer<T> : Comparer<IEnumerator<T>>
{
  private readonly IComparer<T> m_comparer;

  public EnumeratorComparer(IComparer<T> comparer)
  {
    m_comparer = comparer;
  }

  public override int Compare(
     IEnumerator<T> x, IEnumerator<T> y)
  {
    return m_comparer.Compare(x.Current, y.Current);
  }
}

It works well with infinite sequences and cases where we need only K first elements and it fetches only bare minimum out of source sequences.

Run the thing.

// Function that generates sequence of length k of random numbers
Func<int, IEnumerable<int>> gen = k => Enumerable.Range(0, k)
  .Select(l => rnd.Next(max));
// Generate sequence of random lengths and each length project
// to a sequence of that length of random numbers
var seqs = gen(count).Select(k => gen(k)
  .OrderBy(l => l).AsEnumerable());

var p = -1;
foreach (var c in seqs.OrderedMerge(Comparer<int>.Default))
{
  Debug.Assert(p <= c);
  Console.WriteLine(c);
  p = c;
}

Enjoy!

Monday, February 22, 2010

Right-hand side Enumerable.Zip

With Reactive Extensions for .NET (Rx) and .NET Framework 4 a new LINQ operator was introduced – Zip (Bart De Smet gives excellent explanation about the idea and implementation details behind Zip operator). In a nutshell it merges two sequences into one sequence by using the selector function.

int[] numbers = { 1, 2, 3, 4 };
string[] words = { "one", "two", "three" };

numbers.Zip(words, (first, second) => first + " " + second)
 .Run(Console.WriteLine); 

// 1 one
// 2 two
// 3 three

While it seems pretty simple and clear there is a subtlety in its behavior. Zip is implemented something like this (omitting arguments checking):

public static IEnumerable<TResult> Zip<TFirst, TSecond, TResult>(IEnumerable<TFirst> first, IEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
  {
   using (IEnumerator<TFirst> fe = first.GetEnumerator())
   {
    using (IEnumerator<TSecond> se = second.GetEnumerator())
    {
     // What if call to fe.MoveNext() throws an exception
     // returns false or never returns?
     while (fe.MoveNext() && se.MoveNext())
     {
      yield return resultSelector(fe.Current, se.Current);
     }
    }
   }
  }

A call to MoveNext on first sequence’s enumerator may prevent from calling MoveNext on second sequence’s enumerator because of exception thrown, terminated or stuck sequence (it is explained in great details in “Zip’em together” section of More LINQ with System.Interactive – More combinators for your Swiss Army Knife and this is where this post topic came from).

As an exercise we will implement Zip operator that could fetch results from both sources simultaneously, combining their results or exceptions into produced results.

Here is a solution sketch:

  • As either enumerator can get stuck two worker threads are used to fetch results out of sequences and one coordination thread (actually caller’s thread is used) that combines results, terminations or exceptions.
  • Coordinator waits for notification that
    • either both workers successfully fetched next value out of sequences
    • or either of them normally or abnormally (exception is thrown) terminated.
  • Coordinator allows workers to proceed only if both workers successfully fetched next value. Before signaling workers to proceed coordinator resets primitive to wait for notifications from workers on to support next iteration.
  • Workers notify coordinator on fetched values and termination.
  • Once notified coordinator workers wait until
    • either all workers produced results and coordinator allows them to fetch next results from sequences (in order to prevent fetching unnecessary results and potentially producing unnecessary side effects)
    • or coordinator notified them on outer sequence termination due to either of sequences termination (normal or abnormal).
  • When all workers proceed primitive must be reset to support next iteration.
  • Once outer sequence is terminated the last alive worker must dispose shared resources.

As we expect that zipping may be done on infinite sequences or fetching next item from a sequence may take infinitely long time or even consumer may wait indefinitely long before fetching next zipped item explicitly spawned threads are used to avoid seizing thread pool threads for indefinitely long time.

For now notification problems do not take into account abnormal notifications (we’ll take a look at it below).

Task Parallel Library provides for both waiting problems very convenient primitives:

  • CountdownEvent - represents a synchronization primitive that is signaled when its count reaches zero.
  • Barrier - enables multiple tasks to cooperatively work on an algorithm in parallel through multiple phases.

Coordinator needs to track remaining work to be completed and be awaken when it is done. CountdownEvent starts with the number of workers (in this case 2) and reset back to this value just before allowing workers to proceed to next iteration.

Things a little bit more trickier with workers wait problem. Barrier allows to setup number of participants. If we will setup number of participants equal to number of worker then once they all arrive at the barrier they will proceed to next iteration. But this not what we want as workers must not proceed without coordinator’s permission. In that case we’ll make him a participant as well. Coordinator will arrive at the barrier once next zipped value (next iteration) is requested. After every one pass the barrier it automatically goes to its initial state so no explicit reset is required.

But what to do in case of normal or abnormal termination of either sequences. Zip sequence must be terminated as well and we cannot wait for the other sequence to fetch next value or terminate because it may take indefinitely long time. Fortunately both primitives (CountdownEvent and Barrier) support new .NET 4 Cancellation Framework and in particular:

Coordinator waits on the CountdownEvent instance with a CancellationToken instance that workers can use to propagate cancellation thus notifying about termination.

Workers wait on the Barrier instance with CancellationToken instance that coordinator can use to notify about outer sequence (zip sequence) termination.

The last thing is how to deal uniformly with values, exceptions and normal sequence termination. Hopefully Reactive Extensions for .NET (Rx) got the right tool for that:

Basically value, exception and termination are represented by a type derived from Notification<T> where T is a sequence element type.

Let’s now put everything together.

public static class ZipEx
{
 public static IEnumerable<TResult> RightHandZip<TFirst, TSecond, TResult>(this IEnumerable<TFirst> first, IEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
 {
  Contract.Requires(first != null);
  Contract.Requires(second != null);
  Contract.Requires(resultSelector != null);

  return new Zipper<TFirst, TSecond>().Zip(first, second, resultSelector);
 }

 class Zipper<TFirst, TSecond>
 {
  private const int c_sourceCount = 2;
  private int m_workersCount = c_sourceCount;

  private CountdownEvent m_zipEvent;
  private CancellationTokenSource m_zipCancellation;

  private Barrier m_srcBarrier;
  private CancellationTokenSource m_srcCancellation;

  private volatile Notification<TFirst> m_first;
  private volatile Notification<TSecond> m_second;

  public IEnumerable<TResult> Zip<TResult>(IEnumerable<TFirst> first, IEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
  {
   // Coordinator tracks remaining work through
      // count down event
   m_zipEvent = new CountdownEvent(c_sourceCount);
      // Workers may use this cancellation token source
      // to awake coordinator in case of ternimation of 
      // either sequence
   m_zipCancellation = new CancellationTokenSource();
      // Here we basically state that coordinator also
      // barrier participant
   m_srcBarrier = new Barrier(c_sourceCount + 1);
      // Coordinator may use this cancellation token source
      // to awake workers in case of outer sequence (zip)
      // termination
   m_srcCancellation = new CancellationTokenSource();

   // Spawn workers that will fetch results from both 
   // sequences simultaneously
   RunWorker(first, n => { m_first = n; });
   RunWorker(second, n => { m_second = n; });

   var token = m_zipCancellation.Token;
   while (true)
   {
    try
    {
     // Wait until either all workers fetched next 
     // values or any worker notified on completition 
     // (either due to exception or sequence completition)
     m_zipEvent.Wait(token);
    }
    catch (OperationCanceledException)
    {
     // Notify workers that zip sequence is terminated
     m_srcCancellation.Cancel();
     // If zip sequence is terminated due to exception(s) 
     // throw either AggregateException if both sequences 
     // resulted in exception or the exception that 
     // terminated either of them
     ThrowIfError();
     // Otherwise sequence is terminated due to one of 
     // the sequences completion
     yield break;
    }
    // Reset count down event for the next round
    m_zipEvent.Reset(c_sourceCount);
    // Yield next zipped value
    yield return resultSelector(m_first.Current, m_second.Current);
    // Only once consumer asks for the next value we allow 
    // workers to attempt to fetch next value
    // Zipper is a barrier participant that arrives at the  
     // barrier once next zipped is requested
    m_srcBarrier.SignalAndWait();
   }
  }

  private void RunWorker<T>(IEnumerable<T> enumerable, Action<Notification<T>> update)
  {
   // In order to fetch results from sequences we will use 
   // manually spawned threads as we do not want to seize 
   // ThreadPool threads for indefinite time. 
   new Thread(() =>
      {
       var token = m_srcCancellation.Token;
       foreach (var notification in enumerable.Materialize())
       {
        update(notification);

        if (notification.Kind == NotificationKind.OnNext)
        {
         // Notify on sucessfully fetched value out 
            // of sequence
         m_zipEvent.Signal();
        }
        else
        {
      // Either sequence completition or error 
            // notifications terminate zipped sequence
         m_zipCancellation.Cancel();
        }

        try
        {
      // Wait until next zipped value is requested or 
      // zip sequence is terminated
         m_srcBarrier.SignalAndWait(token);
        }
        catch (OperationCanceledException)
        {
      // Last alive worker is responsible for resources 
      // disposal
         DisposeIfLast();
         return;
        }
       }
      }) {IsBackground = true}.Start();
  }

  private void DisposeIfLast()
  {
   if (Interlocked.Decrement(ref m_workersCount) == 0)
   {
    DisposeAndNullify(ref m_zipEvent);
    DisposeAndNullify(ref m_srcBarrier);
    DisposeAndNullify(ref m_zipCancellation);
    DisposeAndNullify(ref m_srcCancellation);
   }
  }

  private void ThrowIfError()
  {
   var ex1 = ExtractErrorOrNothing(m_first);
   var ex2 = ExtractErrorOrNothing(m_second);

   if (ex1 != null && ex2 != null)
   {
    throw new AggregateException(ex1, ex2);
   }
   ThrowIfNotNull(ex1);
   ThrowIfNotNull(ex2);
  }

  private static Exception ExtractErrorOrNothing<T>(Notification<T> n)
  {
   if (n != null && n.Kind == NotificationKind.OnError)
   {
    return ((Notification<T>.OnError) n).Exception;
   }
   return null;
  }

  private static void ThrowIfNotNull(Exception ex)
  {
   if (ex != null)
   {
    throw ex;
   }
  }

  private static void DisposeAndNullify<T>(ref T res)
   where T : class, IDisposable
  {
   if (res != null)
   {
    res.Dispose();
    res = null;
   }
  }
 }
}

Let’s setup examples infrastructure

static IEnumerable<int> Infinite(string id)
{
  Console.Write("{0} -> X ", id);
  while(true) {}
  yield break;
}

static IEnumerable<int> Seq(string id, int start, int count)
{
  return Enumerable.Range(0, count).Select(x => x * 2 + start)
    .Do(x => Console.Write("{0} -> {1} ", id, x));
}

static IEnumerable<int> Abnormal(string id)
{
  return EnumerableEx.Throw<int>(new Exception())
    .Finally(() => Console.Write("{0} -> E ", id));
}

static void RunTest(IEnumerable<int> first, IEnumerable<int> second)
{
  Func<int, int, string> format = (f, s) => String.Format("[{0}, {1}] ", f, s);
  try
  {
    first.RightHandZip(second, format).Run(Console.Write);
  }
  catch (Exception ex)
  {
    Console.Write(ex.GetType());
  }
  Console.WriteLine();
}

and run some examples

const int count = 3;
// Both sequences terminate
RunTest(Seq("F", 0, count), Seq("S", 1, count));
// Terminates normally
// F -> 0 S -> 1 [0, 1] F -> 2 S -> 3 [2, 3] F -> 4 S -> 5 [4, 5]

// First normally terminates and second continues
RunTest(Seq("F", 0, count), Seq("S", 1, count + 1));
// Terminates normally
// F -> 0 S -> 1 [0, 1] S -> 3 F -> 2 [2, 3] F -> 4 S -> 5 [4, 5] S -> 7

// First abnormally terminates and second continues
RunTest(Seq("F", 0, count).Concat(Abnormal("F")), Seq("S", 1, count + 1));
// Terminates abnormally
// F -> 0 S -> 1 [0, 1] F -> 2 S -> 3 [2, 3] S -> 5 F -> 4 [4, 5] S -> 7 F -> E System.Exception

// Both terminate abnormally
RunTest(Seq("F", 0, count).Concat(Abnormal("F")), Seq("S", 1, count).Concat(Abnormal("S")));
// Terminates abnormally
// F -> 0 S -> 1 [0, 1] F -> 2 S -> 3 [2, 3] S -> 5 F -> 4 [4, 5] F -> E S -> E System.AggregateException

// First stucks and second terminates abnormally
RunTest(Seq("F", 0, count).Concat(Infinite("F")), Seq("S", 1, count).Concat(Abnormal("S")));
// Terminates abnormally
// F -> 0 S -> 1 [0, 1] F -> 2 S -> 3 [2, 3] F -> 4 S -> 5 [4, 5] S -> E F -> X System.Exception

// First stucks and second terminates normally
RunTest(Seq("F", 0, count).Concat(Infinite("F")), Seq("S", 1, count));
// Terminates normally
// F -> 0 S -> 1 [0, 1] F -> 2 S -> 3 [2, 3] F -> 4 S -> 5 [4, 5] F -> X

// First stucks and second continues
RunTest(Seq("F", 0, count).Concat(Infinite("F")), Seq("S", 1, count + 1));
// Stucks
// F -> 0 S -> 1 [0, 1] F -> 2 S -> 3 [2, 3] F -> 4 S -> 5 [4, 5] F -> X S -> 7

That’s it.

Monday, February 8, 2010

Binary heap based priority queue

Design of container that supports items ordering raises lots of interesting design questions to consider. To be more concrete we will design simple priority queue based on binary min heap that supports the following operations:

  • Enqueue – add an item to the queue with an associated priority.
  • Dequeue - remove the element from the queue that has the highest priority and return it.
  • Peek – look at the highest priority element without removing it.

Good design that solves wrong problems isn’t better than the bad one. So the first step is to identify right problems to solve. Priority queue maintains set of items with associated key (priority). Items get off the queue based on employed ordering mechanism for keys (priorities). Basically the two problems we need to solve (from API design perspective) are the ways to represent:

  • association of a key (priority) and corresponding item
  • ordering mechanism for keys (priorities)

Association can be either explicit (PriorityQueue<TItem, TKey>, where key type is explicitly stated) or implicit (PriorityQueue<TItem>, where key type is of no interest). Each type parameter must have concrete consumers. Priority queue itself doesn’t care (although priority queues with updateable priority do) about keys but rather about comparing keys. Client code cannot benefit from explicit keys as well because it can easily access associated key as the client code defines what key actually means. So there is no point in cluttering API with irrelevant details (of what keys really are). Thus we will use PriorityQueue<T> (as now we have the only type parameter we will use short name for it) and let consumers provide comparison logic of two items based on whatever consumer defines as keys.

There are several options to represent comparison mechanism.

Item type may be constrained to support comparison through generic type parameter constraint:

class PriorityQueue<T> 
  where T : IComparable<T>
{
}

Though this approach benefits from clearly stated comparison mechanism it implies significant limitations:

  • It doesn’t support naturally comparison of items of the same type using different aspects (for example, in one case objects of Customer type are compared using number of orders and in the other – using date of last order). Of course consumer can create lightweight wrapper that aggregates object to compare and does actual comparison but it is not feasible from additional memory consumption and additional usage complexity perspectives.
  • It doesn’t support naturally changing order direction (ascending <-> descending) and thus it may require adding support into the data structure itself.

With those limitations in mind we can use comparers – something that knows how to compare two objects:

  • A type that implements IComparer<T> which benefits from .NET Framework support (it provides great documentation support and default implementation).
  • or a delegate Func<T, T, int> that accepts two parameters of type T and returns integer value indicating whether one is less than, equal to, or greater than the other. It benefits from anonymous functions conveniences.

Comparers are designed for particular usage scenarios and single instance corresponds to items container. Thus limitation mentioned above are not applied to comparers.

Taking into account value of .NET Framework support for IComparer<T> and that it is easy to create wrapper that derives from Comparer<T> and delegates comparison to aggregated function we will use IComparer<T> approach (although it seems costless to add also support for Func<T, T, int> mechanism and create wrapper ourselves in most cases it is best to avoid providing means to do the same thing in multiple ways or otherwise potential confusion may outweigh benefits).

Now putting everything together.

// Unbounded priority queue based on binary min heap
public class PriorityQueue<T>
{
  private const int c_initialCapacity = 4;
  private readonly IComparer<T> m_comparer;
  private T[] m_items;
  private int m_count;

  public PriorityQueue()
    : this(Comparer<T>.Default)
  {
  }

  public PriorityQueue(IComparer<T> comparer)
    : this(comparer, c_initialCapacity)
  {
  }

  public PriorityQueue(IComparer<T> comparer, int capacity)
  {
    Contract.Requires<ArgumentOutOfRangeException>(capacity >= 0);
    Contract.Requires<ArgumentNullException>(comparer != null);

    m_comparer = comparer;
    m_items = new T[capacity];
  }

  public PriorityQueue(IEnumerable<T> source)
    : this(source, Comparer<T>.Default)
  {
  }

  public PriorityQueue(IEnumerable<T> source, IComparer<T> comparer)
  {
    Contract.Requires<ArgumentNullException>(source != null);
    Contract.Requires<ArgumentNullException>(comparer != null);

    m_comparer = comparer;
    // In most cases queue that is created out of sequence 
    // of items will be emptied step by step rather than 
    // new items added and thus initially the queue is 
    // not expanded but rather left full
    m_items = source.ToArray();
    m_count = m_items.Length;
    // Restore heap order
    FixWhole();
  }

  public int Capacity
  {
    get { return m_items.Length; }
  }

  public int Count
  {
    get { return m_count; }
  }

  public void Enqueue(T e)
  {
    m_items[m_count++] = e;
    // Restore heap if it was broken
    FixUp(m_count - 1);
    // Once items count reaches half of the queue capacity 
    // it is doubled 
    if (m_count >= m_items.Length/2)
    {
      Expand(m_items.Length*2);
    }
  }

  public T Dequeue()
  {
    Contract.Requires<InvalidOperationException>(m_count > 0);

    var e = m_items[0];
    m_items[0] = m_items[--m_count];
    // Restore heap if it was broken
    FixDown(0);
    // Once items count reaches one eighth  of the queue 
    // capacity it is reduced to half so that items
    // still occupy one fourth (if it is reduced when 
    // count reaches one fourth after reduce items will
    // occupy half of queue capacity and next enqueued item
    // will require queue expand)
    if (m_count <= m_items.Length/8)
    {
      Expand(m_items.Length/2);
    }

    return e;
  }

  public T Peek()
  {
    Contract.Requires<InvalidOperationException>(m_count > 0);

    return m_items[0];
  }

  private void FixWhole()
  {
    // Using bottom-up heap construction method enforce
    // heap property
    for (int k = m_items.Length/2 - 1; k >= 0; k--)
    {
      FixDown(k);
    }
  }

  private void FixUp(int i)
  {
    // Make sure that starting with i-th node up to the root
    // the tree satisfies the heap property: if B is a child 
    // node of A, then key(A) ≤ key(B)
    for (int c = i, p = Parent(c); c > 0; c = p, p = Parent(p))
    {
      if (Compare(m_items[p], m_items[c]) < 0)
      {
        break;
      }
      Swap(m_items, c, p);
    }
  }

  private void FixDown(int i)
  {
    // Make sure that starting with i-th node down to the leaf 
    // the tree satisfies the heap property: if B is a child 
    // node of A, then key(A) ≤ key(B)
    for (int p = i, c = FirstChild(p); c < m_count; p = c, c = FirstChild(c))
    {
      if (c + 1 < m_count && Compare(m_items[c + 1], m_items[c]) < 0)
      {
        c++;
      }
      if (Compare(m_items[p], m_items[c]) < 0)
      {
        break;
      }
      Swap(m_items, p, c);
    }
  }

  private static int Parent(int i)
  {
    return (i - 1)/2;
  }

  private static int FirstChild(int i)
  {
    return i*2 + 1;
  }

  private int Compare(T a, T b)
  {
    return m_comparer.Compare(a, b);
  }

  private void Expand(int capacity)
  {
    Array.Resize(ref m_items, capacity);
  }

  private static void Swap(T[] arr, int i, int j)
  {
    var t = arr[i];
    arr[i] = arr[j];
    arr[j] = t;
  }
}

Example below prints top 200 elements from sequence of mscorlib types ordered by full name (sorting it first and than taking first 200 elements is less efficient).

class TypeNameComparer : Comparer<Type>
{
  public override int Compare(Type x, Type y)
  {
    Contract.Requires(x != null);
    Contract.Requires(y != null);

    return x.FullName.CompareTo(y.FullName);
  }
}

...

const int count = 200;
var types = typeof (object).Assembly.GetTypes();
var typesQueue = new PriorityQueue<Type>(types, new TypeNameComparer());

for (int i = 0; i < count && typesQueue.Count > 0; i++)
{
  Console.WriteLine(typesQueue.Dequeue());
}

That’s it.

Sunday, January 17, 2010

Queue based on a single stack

Looking at things from different perspectives allows to understand them better. On the other hand mind bending practice improves your ability to find solutions.

Previously we were Disposing sequence of resources with Reactive Extensions. This time we will build FIFO (first in, first out) collection based on single LIFO (last in, first out) collection with no additional explicit storage.

It is not that insane as it looks. Assume that items come out of stack in the order they must appear in the queue (FIFO). Choosing the opposite order is also possible however is not practical (see below). To make it happen we simply need to make sure that items in the stack (LIFO) are placed in the opposite order. Items queued first must appear at the top of the stack. This basically means that in order to queue item all items must be popped, the item  pushed and then existent items pushed inversely to pop order. But we have no additional explicit storage requirement. Then store items implicitly through recursion.

public class StackBasedQueue<T> : IEnumerable<T>
{
  private readonly Stack<T> m_items;

  public StackBasedQueue()
    : this(Enumerable.Empty<T>())
  {
  }

  public StackBasedQueue(IEnumerable<T> items)
  {
    // Items must be reversed as we want first 
    // item to appear on top of stack
    m_items = new Stack<T>(items.Reverse());
  }

  public int Count
  {
    get { return m_items.Count; }
  }

  public void Enqueue(T item)
  {
    // If stack is empty then simply push item
    // as it will be the first and the last item 
    // in the queue
    if (m_items.Count == 0)
    {
      m_items.Push(item);
      return;
    }

    // The item must be placed at the bottom of the stack
    // To do this existent items must be popped, the item  
    // pushed and then existent items pushed inversely to 
    // pop order
    var tmp = m_items.Pop();
    Enqueue(item);
    m_items.Push(tmp);
  }

  public T Dequeue()
  {
    ThrowIfEmpy();
    // If stack is not empty item on top of it 
    // is next to be dequeued or peeked
    return m_items.Pop();
  }

  public T Peek()
  {
    ThrowIfEmpy();
    return m_items.Peek();
  }

  public IEnumerator<T> GetEnumerator()
  {
    // As items queued first must appear at the top of the
    // stack we can enumerate items directly
    return m_items.GetEnumerator();
  }

  IEnumerator IEnumerable.GetEnumerator()
  {
    return GetEnumerator();
  }

  private void ThrowIfEmpy()
  {
    if (Count == 0)
    {
      throw new InvalidOperationException("The queue is empty.");
    }
  }
}

Enqueue is a O(n) operation (where n is the number items in the stack). Dequeue and Peek is a O(1) operation. Enumerating through all items is a O(n) operation. Choosing the opposite order will make enumerating through all items O(n^2) operation which is not practical.

It is just an exercise so it must not be used in real world scenarios (otherwise at some point queue size may become big enough so that next attempt to enqueue an item will result in StackOverflowException) but standard Queue<T> instead.