Wednesday, March 17, 2010

Selecting k smallest or largest elements

There are cases when you need to select a number of best (according to some definition) elements out of finite sequence (list). For example, select 10 most popular baby names in a particular year or select 10 biggest files on your hard drive.  

While selecting single minimum or maximum element can easily be done iteratively in O(n) selecting k smallest or largest elements (k smallest for short) is not that simple.

It makes sense to take advantage of sequences APIs composability. We’ll design an extension method with the signature defined below:

public static IEnumerable<TSource> TakeSmallest<TSource>(
 this IEnumerable<TSource> source, int count, IComparer<TSource> comparer)

The name originates from the fact that selecting k smallest elements can logically be expressed in terms of Enumerable.TakeWhile supplying predicate that returns true if an element is one of the k smallest. As the logical predicate is not changing only count do it is burned into method’s name (instead of “While” that represents changing predicate we have “Smallest”).

Now let’s find the solution.

If the whole list is sorted first k elements is what we are looking for.

public static IEnumerable<TSource> TakeSmallest<TSource>(
 this IEnumerable<TSource> source, int count, IComparer<TSource> comparer)
{
 return source.OrderBy(x => x, comparer).Take(count);
}

It is O(n log n) solution where n is the number of elements in the source sequence. We can do better.

Priority queue yields better performance characteristics if only subset of sorted sequence is required.

public static IEnumerable<TSource> TakeSmallest<TSource>(
 this IEnumerable<TSource> source, int count, IComparer<TSource> comparer)
{
 var queue = new PriorityQueue<TSource>(source, comparer);
 while (count > 0 && queue.Count > 0)
 {
  yield return queue.Dequeue();
  count--;
 }
}

It requires O(n) to build priority queue based on binary min heap and O(k log n) to retrieve first k elements. Better but we’ll improve more.

Quicksort algorithm picks pivot element, reorders elements such that the ones less than pivot go before it while greater elements go after it (equal can go either way). After that pivot is in its final position. Then both partitions are sorted recursively making whole sequence sorted. In order to prevent worst case scenario pivot selection can be randomized.

Basically we are interested in the k smallest elements themselves and not the ordering relation between them. Assuming partitioning just completed let’s denote set of elements that are before pivot (including pivot itself) by L and set of elements that are after pivot by H. According to partition definition L contains |L| (where |X| denotes number of elements in a set X) smallest elements. If |L| is equal to k we are done. If it is less than k than look for k smallest elements in L. Otherwise as we already have |L| smallest elements look for k - |L| smallest elements in H.

public static IEnumerable<TSource> TakeSmallest<TSource>(
  this IEnumerable<TSource> source, int count)
{
  return TakeSmallest(source, count, Comparer<TSource>.Default);
}

public static IEnumerable<TSource> TakeSmallest<TSource>(
  this IEnumerable<TSource> source, int count, IComparer<TSource> comparer)
{
  Contract.Requires<ArgumentNullException>(source != null);
  // Sieve handles situation when count >= source.Count()
  Contract.Requires<ArgumentOutOfRangeException>(count > 0);
  Contract.Requires<ArgumentNullException>(comparer != null);

  return new Sieve<TSource>(source, count, comparer);
}

class Sieve<T> : IEnumerable<T>
{
  private readonly IEnumerable<T> m_source;
  private readonly IComparer<T> m_comparer;
  private readonly int m_count;

  private readonly Random m_random;

  public Sieve(IEnumerable<T> source, int count, IComparer<T> comparer)
  {
    m_source = source;
    m_count = count;
    m_comparer = comparer;
    m_random = new Random();
  }

  public IEnumerator<T> GetEnumerator()
  {
    var col = m_source as ICollection<T>;
    if (col != null && m_count >= col.Count)
    {
      // There is not point in copying data
      return m_source.GetEnumerator();
    }
    var buf = m_source.ToArray();
    if (m_count >= buf.Length)
    {
      // Buffer already contains exact amount elements
      return buf.AsEnumerable().GetEnumerator();
    }
    // Find the solution
    return GetEnumerator(buf);
  }

  IEnumerator IEnumerable.GetEnumerator()
  {
    return GetEnumerator();
  }

  private IEnumerator<T> GetEnumerator(T[] buf)
  {
    var n = buf.Length;
    var k = m_count;
    // After rearrange is completed fist k 
    // items are the smallest elements
    Rearrange(buf, 0, n - 1, k);
    for (int i = 0; i < k; i++)
    {
      yield return buf[i];
    }
  }

  private void Rearrange(T[] buf, int l, int u, int k)
  {
    if (l == u)
    {
      return;
    }
    // Partition elements around randomly selected pivot
    var q = RandomizedPartition(buf, l, u);
    // Compute size of low partition (includes pivot)
    var s = q - l + 1;
    // We are done as low partition is what we were looking for
    if (k == s)
    {
      return;
    }

    if (k < s)
    {
      // Smallest elements group is less than low partition
      // find it there
      Rearrange(buf, l, q - 1, k);
    }
    else
    {
      // Low partition is in smallest elements group, find the 
      // rest in high partition
      Rearrange(buf, q + 1, u, k - s);
    }
  }

  private int RandomizedPartition(T[] buf, int l, int u)
  {
    // Select pivot randomly and swap it with the last element
    // to prevent worst case scenario where pivot is the 
    // largest remaining element
    Swap(buf, m_random.Next(l, u + 1), u);
    // Divides elements into two partitions:
    // - Low partition where elements that are less than pivot 
    // and pivot itself
    // - High partition contains the rest 
    var k = l;
    for (var i = l; i < u; i++)
    {
      if (m_comparer.Compare(buf[i], buf[u]) < 0)
      {
        Swap(buf, k++, i);
      }
    }
    // Put pivot into its final location
    Swap(buf, k, u);
    return k;
  }

  private static void Swap(T[] a, int i, int j)
  {
    var tmp = a[i];
    a[i] = a[j];
    a[j] = tmp;
  }
}

The solution is expected O(n) which means quit good performance in practice. Let’s run the thing.

const int count = 100;
const int max = 100;
var rnd = new Random();
var seq = Enumerable.Range(0, count).Select(_ => rnd.Next(max)).ToArray();
Func<int, int> i = x => x;

for(var k = 1; k < count / 2; k++)
{
  var a = seq.TakeSmallest(k).OrderBy(i);
  var b = seq.OrderBy(i).Take(k);

  Debug.Assert(a.SequenceEqual(b));
}

Enjoy!

Thursday, March 4, 2010

K-way merge

The classic merge (the one used in Merge Sort) takes as input some sorted lists and at each step outputs element with next smallest key thus producing sorted list that contains all the elements of the input lists.

An instance of a list is a computer representation of the mathematical concept of a finite sequence, that is, a tuple.

It is not always practical to have whole sequence in memory because of its considerable size nor to constraint sequence to be finite as only K first elements may be needed. Thus our algorithm must produce monotonically increasing (according to some comparison logic) potentially infinite sequence.

Two-way merge is the simplest variation where two lists are merged (it is named OrderedMerge to avoid confusion with EnumerableEx.Merge).

public static IEnumerable<T> OrderedMerge<T>(
  this IEnumerable<T> first,
  IEnumerable<T> second,
  IComparer<T> comparer)
{
  using (var e1 = first.GetEnumerator())
  {
    using (var e2 = second.GetEnumerator())
    {
      var c1 = e1.MoveNext();
      var c2 = e2.MoveNext();
      while (c1 && c2)
      {
        if (comparer.Compare(e1.Current, e2.Current) < 0)
        {
          yield return e1.Current;
          c1 = e1.MoveNext();
        }
        else
        {
          yield return e2.Current;
          c2 = e2.MoveNext();
        }
      }
      if (c1 || c2)
      {
        var e = c1 ? e1 : e2;
        do
        {
          yield return e.Current;
        } while (e.MoveNext());
      }
    }
  }
}

This algorithm runs in O(n) where n is a number of merged elements (as we may not measure it by the number of elements in sequences because of their potential infinity).

N-way merge is a more general algorithm that allows to merge N monotonically increasing sequences into one. It is used in external sorting. Here the most general overload signature (others are omitted as you can easily create them).

public static IEnumerable<T> OrderedMerge<T>(
  this IEnumerable<IEnumerable<T>> sources,
  IComparer<T> comparer)

Naive implementation will take advantage of existing two-way merge and composition.

public static IEnumerable<T> NaiveOrderedMerge<T>(
  this IEnumerable<IEnumerable<T>> sources,
  IComparer<T> comparer)
{
  return sources.Aggregate((seed, curr) => seed.OrderedMerge(curr, comparer));
}

Lets denote merging two sequences Si and Sj where i != j and both within [0, m) (m – is the number of sequences) with (Si, Sj). Then what the code above does is (((S0, S1), S2), … Sm). This implementation is naive because fetching next smallest element takes O(m) making total running time to O(nm). We can do better than that.

Recall that in my previous post we implemented priority queue based on binary heap that allows to get next smallest element in O(log n) where n is the size of the queue. Here is a solution sketch:

  • The queue will hold non empty sequences.
  • The priority of a sequence is its next element.
  • At each step we dequeue out of the queue sequence that has smallest next element. This element is next in the merged sequence.
  • If dequeued sequence is not empty it must be enqueued back because it may contain next smallest element in the merged  sequence.

Thus we will have queue of size that doesn’t exceed m (number of sequences) and thus making total running time O(n log m).

Other interesting aspect resource management. Each sequence has associated resources that needs to be released once merged sequence is terminated normally or abnormally. Number of sequences is not known in advance. We will use solution that is described in my previous post Disposing sequence of resources.

Now let’s do it.

// Convenience overloads are not included only most general one
public static IEnumerable<T> OrderedMerge<T>(
  this IEnumerable<IEnumerable<T>> sources,
  IComparer<T> comparer)
{
  // Make sure sequence of ordered sequences is not null
  Contract.Requires<ArgumentNullException>(sources != null);
  // and it doesn't contain nulls
  Contract.Requires(Contract.ForAll(sources, s => s != null));
  Contract.Requires<ArgumentNullException>(comparer != null);
  // Precondition checking is done outside of iterator because
  // of its lazy nature
  return OrderedMergeHelper(sources, comparer);
}

private static IEnumerable<T> OrderedMergeHelper<T>(
  IEnumerable<IEnumerable<T>> sources,
  IComparer<T> elementComparer)
{
  // Each sequence is expected to be ordered according to 
  // the same comparison logic as elementComparer provides
  var enumerators = sources.Select(e => e.GetEnumerator());
  // Disposing sequence of lazily acquired resources as 
  // a single resource
  using (var disposableEnumerators = enumerators.AsDisposable())
  {
    // The code below holds the following loop invariant:
    // - Priority queue contains enumerators that positioned at 
    // sequence element
    // - The queue at the top has enumerator that positioned at 
    // the smallest element of the remaining elements of all 
    // sequences

    // Ensures that only non empty sequences participate  in merge
    var nonEmpty = disposableEnumerators.Where(e => e.MoveNext());
    // Current value of enumerator is its priority 
    var comparer = new EnumeratorComparer<T>(elementComparer);
    // Use priority queue to get enumerator with smallest 
    // priority (current value)
    var queue = new PriorityQueue<IEnumerator<T>>(nonEmpty, comparer);

    // The queue is empty when all sequences are empty
    while (queue.Count > 0)
    {
      // Dequeue enumerator that positioned at element that 
      // is next in the merged sequence
      var min = queue.Dequeue();
      yield return min.Current;
      // Advance enumerator to next value
      if (min.MoveNext())
      {
        // If it has value that can be merged into resulting
        // sequence put it into the queue
        queue.Enqueue(min);
      }
    }
  }
}

// Provides comparison functionality for enumerators
private class EnumeratorComparer<T> : Comparer<IEnumerator<T>>
{
  private readonly IComparer<T> m_comparer;

  public EnumeratorComparer(IComparer<T> comparer)
  {
    m_comparer = comparer;
  }

  public override int Compare(
     IEnumerator<T> x, IEnumerator<T> y)
  {
    return m_comparer.Compare(x.Current, y.Current);
  }
}

It works well with infinite sequences and cases where we need only K first elements and it fetches only bare minimum out of source sequences.

Run the thing.

// Function that generates sequence of length k of random numbers
Func<int, IEnumerable<int>> gen = k => Enumerable.Range(0, k)
  .Select(l => rnd.Next(max));
// Generate sequence of random lengths and each length project
// to a sequence of that length of random numbers
var seqs = gen(count).Select(k => gen(k)
  .OrderBy(l => l).AsEnumerable());

var p = -1;
foreach (var c in seqs.OrderedMerge(Comparer<int>.Default))
{
  Debug.Assert(p <= c);
  Console.WriteLine(c);
  p = c;
}

Enjoy!