Monday, May 2, 2011

Concurrent Object Pool

Object pool is a set of initialized objects that are kept ready to use, rather than allocated and destroyed on demand.

Wikipedia

Object pool usage may get performance improvement in case pooled object initialization cost and frequency of instantiation are high and at any period in time number of used objects is low (ThreadPool is a good example).

There are many questions to take into account when designing object pools. How to handle acquire request when there are no free objects? In single threaded scenarios you may choose to create new object or let the caller know that the request cannot be fulfilled. In multithreaded scenarios you may choose to wait for free objects until other threads release them. How to organize access to pooled objects? Based on your logic you may choose to use most recently used first strategy, or least recently or even random first. Besides that you need to provide synchronized access to internal object storage.   

Let’s assume that we made decision on handling empty pool case (object pool growth strategy) and access to pooled objects (most recently used first approach) and now focused on minimizing synchronization costs to internal object storage.

The simplest way of doing it will be to protect any access with a lock. That will work however under high contention it may result in lock convoys (in most cases time spent within the lock will be pretty short necessary only to take ready to use object). We may use lock-free data structure as storage such as ConcurrentStack<T>. Or we may try to reduce contention.

The idea is somewhat similar to the algorithm used in ThreadPool where each worker thread in addition to global work item queue maintains local work item double ended queue (where one end is exposed to the owner and the other to worker threads that may want to steal work items if global queue is empty). Each worker thread puts newly created work items into its local queue to avoid synchronization costs. When worker thread is ready to process next work item it first tries to dequeue from its local queue, if fails it tries to get work item from global queue and lastly resorts to stealing from other threads (check out cool series of articles from Joe Duffy on building custom thread pool). 

In order to build object pool we will maintain local per thread storage of pooled objects in addition to global storage. Pooled objects are stored in segments of a selected size (depends on usage scenarios so it must be specified during object pool creation). Global storage holds set of segments. In order to acquire pooled object thread must:

  1. Get local segment (if any)
  2. If no local segment present or it is empty try to get segment out of global storage (if any)
  3. If global storage is empty create new segment
  4. Update local segment
  5. Get item from local segment and return it

Returning object look like this:

  1. Put item into local segment
  2. If segment has grown beyond threshold split it into two parts and one of them put back to global storage.

Thus most of the time thread will work with its own local set of pooled objects. However it has its own cost. If for example a different thread comes and acquires pooled object and then never again use pooled objects we’ll get orphaned segment. So as usual it is a matter of tradeoff. If you have a limited set of threads that will work with object pool for a long time usage of this approach can be justified. If however you have either many threads that work with object pool for short periods of time and then never come back you’d probably better look into other approaches.

// Represents thread safe object pool
public class ConcurrentObjectPool<T>
{
    private readonly int m_segmentSize;
    // Thread local pool used without synchronization 
    // to reduce costs
    private ThreadLocal<Segment> m_localPool = new ThreadLocal<Segment>();
    // Global pool that is used once there is nothing or 
    // too much in local pool
    private readonly ConcurrentStack<Segment> m_globalPool = new ConcurrentStack<Segment>();
    // Factory function that is used from potentionally multiple 
    // threads to produce pooled objects and thus must be thread 
    // safe
    private readonly Func<T> m_factory;

    public ConcurrentObjectPool(Func<T> factory, int segmentSize)
    {
        m_factory = factory;
        m_segmentSize = segmentSize;
    }

    // Acquires object from pool
    public PoolObject<T> Acquire()
    {
        var local = m_localPool.Value;
            
        T item;
        // Try to acquire pooled object from local pool
        // first to avoid synchronization penalties
        if (local != null && local.TryPop(out item))
            return new PoolObject<T>(item, this);
        // If failed (either due to empty or not yet 
        // initialized local pool) try to acquire segment
        // that will be local pool from global pool
        if (!m_globalPool.TryPop(out local))
        {
            // If failed create new segment using object 
            // factory
            var items = Enumerable.Range(0, m_segmentSize)
                .Select(_ => m_factory());
            local = new Segment(m_segmentSize, items);
        }
        m_localPool.Value = local;
        // Eventually get object from local non-empty pool    
        local.TryPop(out item);
        return new PoolObject<T>(item, this);
    }

    // Releases pooled ojbect back to the pool however 
    // it is accessible publicly to avoid multiple releases
    // of the same object
    internal void Release(T poolObject)
    {
        var local = m_localPool.Value;
        // Return object back to local pool first 
        var divided = local.Push(poolObject);
        // If local pool has grown beyond threshold 
        // return extra segment back to global pool
        if (divided != null)
            m_globalPool.Push(divided);
    }

    // Represents chunk of pooled objects
    class Segment
    {
        private readonly int m_size;
        // Using stack to store pooled objects assuming 
        // that hot objects (recently used) provide better 
        // locality
        private readonly Stack<T> m_items;

        public Segment(int size, IEnumerable<T> items)
        {
            m_size = size;
            m_items = new Stack<T>(items);
        }

        public bool TryPop(out T item)
        {
            item = default(T);
            // Pop item if any available
            if (m_items.Count > 0)
            {
                item = m_items.Pop();
                return true;
            }
            return false;
        }

        public Segment Push(T item)
        {
            m_items.Push(item);
            // If current segment size is still smaller
            // than twice of original size no need to split
            if (m_items.Count < 2 * m_size)
                return null;
            // Otherwise split current segment to get it 
            // pushed into global pool
            var items = Enumerable.Range(0, m_size)
                .Select(_ => m_items.Pop());
            return new Segment(m_size, items);
        }
    }
}

// Represents disposable wrapper around pooled object 
// that is used to return object back to the pool
public class PoolObject<T> : IDisposable
{
    private readonly ConcurrentObjectPool<T> m_pool;
    private bool m_disposed;

    private readonly T m_value;

    // Get reference to the pool to return value to
    public PoolObject(T value, ConcurrentObjectPool<T> pool)
    {
        m_value = value;
        m_pool = pool;
    }

    public T Value
    {
        get
        {
            // Make sure value can't be obtained (though we can't 
            // guarantee that it is not used) anymore after it is
            // released back to the pool
            ThrowIfDisposed();
            return m_value;
        }
    }

    public void Dispose()
    {
        if (m_disposed)
            return;
        // As we are disposing pooled object disposal basically 
        // equivalent to returning the object back to the pool
        m_disposed = true;
        m_pool.Release(m_value);
    }

    private void ThrowIfDisposed()
    {
        if (m_disposed)
            throw new ObjectDisposedException("Pool object has been disposed");
    }
}

And it is used somewhat like this

// Initialized elsewhere
ConcurrentObjectPool<T> pool = ...
...
using (var obj = pool.Acquire())
{
    // Use pooled object value
    Process(obj.Value);
}

I omitted IDisposable implementation on the object pool to make code simpler. However in order to implement it we will need to track all segments in a separate collection as otherwise thread local segments won’t be accessible from the thread disposing the pool.