Tuesday, August 24, 2010

External merge sort

If data to be sorted doesn’t fit into main memory external sorting is applicable. External merge sort can be separated into two phases:

  1. Create initial runs (run is a sequence of records that are in correct relative order or in other words sorted chunk of original file).
  2. Merge created runs into single sorted file.

To implement this algorithm I will use solutions from my previous posts so it may be helpful for you to look at them:

Let’s assume that M records at the same time are allowed to be loaded into main memory. One of the ways to create initial runs is to successively read M records from original file, sort them in memory and write back to disk. However we will use approach that allows us to create longer runs. It is called replacement selection.

The core structure behind this algorithm is priority queue. Taking one by one current minimum element out of the queue forms ordered sequence. And this is exactly what run stands for. The algorithm can be described as follows:

  1. Create two priority queues (that will contain items for current and next runs respectively) with capacity of M records.
  2. Prefill current run priority queue from unsorted file with M records.
  3. Create current run if there are elements in the current run priority queue:
    1. Take minimum element out of current run priority queue and append it to current run (basically write it to disk).
    2. Take next element from unsorted file (this is the replacement part of the algorithm) and compare it with just appended element.
    3. If it is less then it cannot be part of the current run (or otherwise order will be destroyed) and thus it is queued to the next  run priority queue.
    4. Otherwise it is part of the current run and it is queued to the current run priority queue.
    5. Continue steps 1 through 4 until current run priority queue is empty.
  4. Switch current and next runs priority queues and repeat step 3.

At any given moment at most M records are loaded into main memory as single written element into current run is replaced with single element from unsorted file if any (depending on comparison it either goes into current or next run).

Next step is to merge created initial runs. For the merge step we will use simplified algorithm (more advanced algorithms work with multiple physical devices to distribute runs, take into account data locality, etc.) based on k-way merge:

  1. Append created runs into a queue.
  2. Until there are more than one run in the queue:
    1. Dequeue and merge K runs into a single run and put it into the queue.
  3. Remaining run represents sorted original file.

Yeap, it is that simple. And let’s code it.

The implementation abstracts file structure and reading/writing details making algorithm more concise and easier to understand.

abstract class ExternalSorter<T>
{
	private readonly IComparer<T> m_comparer;
	private readonly int m_capacity;
	private readonly int m_mergeCount;

	protected ExternalSorter(IComparer<T> comparer, int capacity, int mergeCount)
	{
		m_comparer = comparer;
		m_capacity = capacity;
		m_mergeCount = mergeCount;
	}

	// Sorts unsorted file and returns sorted file name
	public string Sort(string unsorted)
	{
		var runs = Distribute(unsorted);
		return Merge(runs);
	}

	// Write run to disk and return created file name
	protected abstract string Write(IEnumerable<T> run);
	// Read run from file with given name
	protected abstract IEnumerable<T> Read(string name);

	// Merge step in this implementation is simpler than 
	// the one used in polyphase merge sort - it doesn't
	// take into account distribution over devices
	private string Merge(IEnumerable<string> runs)
	{
		var queue = new Queue<string>(runs);
		var runsToMerge = new List<string>(m_mergeCount);
		// Until single run is left do merge
		while (queue.Count > 1)
		{
			// Priority queue must not contain records more than 
			// required
			var count = m_mergeCount;
			while (queue.Count > 0 && count-- > 0)
				runsToMerge.Add(queue.Dequeue());
			// Perform n-way merge on selected runs where n is 
			// equal to number of physical devices with 
			// distributed runs but in our case we do not take 
			// into account them and thus n is equal to capacity
			var merged = runsToMerge.Select(Read).OrderedMerge(m_comparer);
			queue.Enqueue(Write(merged));

			runsToMerge.Clear();
		}
		// Last run represents source file sorted
		return queue.Dequeue();
	}

	// Distributes unsorted file into several sorted chunks
	// called runs (run is a sequence of records that are 
	// in correct relative order)
	private IEnumerable<string> Distribute(string unsorted)
	{
		var source = Read(unsorted);
		using (var enumerator = source.GetEnumerator())
		{
			var curr = new PriorityQueue<T>(m_comparer);
			var next = new PriorityQueue<T>(m_comparer);
			// Prefill priority queue to capacity which is used 
			// to create runs
			while (curr.Count < m_capacity && enumerator.MoveNext())
				curr.Enqueue(enumerator.Current);
			// Until unsorted source and priority queues are 
			// exhausted
			while (curr.Count > 0)
			{
				// Create next run and write it to disk
				var sorted = CreateRun(enumerator, curr, next);
				var run = Write(sorted);

				yield return run;

				Swap(ref curr, ref next);
			}
		}
	}

	private IEnumerable<T> CreateRun(IEnumerator<T> enumerator, PriorityQueue<T> curr, PriorityQueue<T> next)
	{
		while (curr.Count > 0)
		{
			var min = curr.Dequeue();
			yield return min;
			// Trying to move run to an end enumerator will 
			// result in returning false and thus current 
			// queue will simply be emptied step by step
			if (!enumerator.MoveNext())
				continue;

			// Check if current run can be extended with 
			// next element from unsorted source
			if (m_comparer.Compare(enumerator.Current, min) < 0)
			{
				// As current element is less than min in 
				// current run it may as well be less than 
				// elements that are already in the current 
				// run and thus from this element goes into 
				// next run
				next.Enqueue(enumerator.Current);
			}
			else
			{
				// Extend current run
				curr.Enqueue(enumerator.Current);
			}
		}
	}

	private static void Swap<U>(ref U a, ref U b)
	{
		var tmp = a;
		a = b;
		b = tmp;
	}
}

In the example below I created type that sorts text files containing single number per line.

class TextFileOfNumbersExternalSorter : ExternalSorter<int>
{
	public TextFileOfNumbersExternalSorter(int capacity, int mergeCount)
		: base(Comparer<int>.Default, capacity, mergeCount)
	{
	}

	protected override string Write(IEnumerable<int> run)
	{
		var file = Path.GetTempFileName();
		using (var writer = new StreamWriter(file))
		{
			run.Run(writer.WriteLine);
		}
		return file;
	}

	protected override IEnumerable<int> Read(string name)
	{
		using (var reader = new StreamReader(name))
		{
			while (!reader.EndOfStream)
				yield return Int32.Parse(reader.ReadLine());
		}
		File.Delete(name);
	}
}

That is used like this:

// capacity, mergeCount and unsortedFileName are initialized elsewhere
var sorter = new TextFileOfNumbersExternalSorter(capacity, mergeCount);
var sortedFileName = sorter.Sort(unsortedFileName);

That’s it folks!

No comments: