Coordination Data Structures – BlockingCollections

Oct 2nd, 2009 | By | Category: Framework & Libraries

In my last post about Parallel Extensions in .NET I had mentioned about the Coordination Data Structure(CDS) , the new set of thread safe collections that are introduced in the System.Collections.Concurrent namespace.In this post we will discuss about the new BlockingCollection<T> class which is introduced to cater for the standard Producer/Consumer pattern.Producer/Consumer pattern is the most common concurrent programming pattern.Here we have producers and consumers sharing a common buffer, producer is adding items to the buffer and consumer is removing them.The main challenge is to ensure that producer does not add item when the buffer is full and similarly consumer does not try to remove item when buffer is empty.

The class BlockingCollection implements the new interaface

  • System.Collections.Concurrent.IProducerConsumerCollection<T>
  • System.Collections.Generic.IEnumerable<T>
  • System.Collections.Generic.ICollection<T>.

IProducerConsumerCollection<T> specifies the following two operations:

  • bool TryAdd(T item) – This method tries to add an item to the collection and returns false if the collection is full
  • bool TryTake(out T item) – This method tries to take an item from the collection and returns false if the collection is empty.

Let us consider the following example:

<pre>

class Program
    {
        static BlockingCollection buffer = new BlockingCollection
<int/>(10);
        static void Main(string[] args)
        {
            Produce();
            Consume();
            Console.Read();
        }
        private static void Produce()
        {
            Console.WriteLine("Produce::Invoked");
            for (int i = 0; i &lt; 11; i++)
            {
                if (buffer.TryAdd(i)) Console.WriteLine("Added " + i);
                else Console.WriteLine("Could Not Add");
            }
        }
        private static void Consume()
        {
            int j = 0;
            Console.WriteLine("Consume::Invoked");
            for (int i = 0; i &lt; 11; i++)
            {
                if (buffer.TryTake(out j)) Console.WriteLine("Taken " + j);
                else Console.WriteLine("Could Not Take");
            }
        }
    }
</pre>

Here we have a BlockingCollection of capacity 2and we are trying to add and remove 3 items to the collection.The add/take 3rd item will return false and program output will be:

Produce::Invoked
Added 0
Added 1
Could Not Add
Consume::Invoked
Taken 0
Taken 1
Could Not Take

So TryTake and TryAdd are non blocking methods and if the Collection is full/empty it allows the producer/consumer to proceed with it’s own job.Let us change the code a bit as shown below.Here instead of TryAdd and TryTake we are using Add and Take methods of BlockingCollection class.

<pre>
   class Program
    {
        static BlockingCollection
<int/> buffer = new BlockingCollection
<int/>(2);
        static void Main(string[] args)
        {
            Produce();
            Consume();
            Console.Read();
        }
        private static void Produce()
        {
            Console.WriteLine("Produce::Invoked");
            for (int i = 0; i &lt; 3; i++)
            {
                //if (buffer.TryAdd(i)) Console.WriteLine("Added " + i);
                //else Console.WriteLine("Could Not Add");

                buffer.Add(i);
            }
        }
        private static void Consume()
        {
            int j = 0;
            Console.WriteLine("Consume::Invoked");
            for (int i = 0; i &lt; 3; i++)
            {
                //if (buffer.TryTake(out j)) Console.WriteLine("Taken " + j);
                //else Console.WriteLine("Could Not Take");
                buffer.Take();
            }
        }
    }
</pre>

Here the program output will be:

Produce::Invoked

This is because after 2 elements the method Add is trying to Add the 3 rd item and finds that the buffer is full.So it blocks the producer thread and waits for the buffer capacity to reduce.This will be interesting to observe where Producer/Consumer is running in two different threads as shown below:

<pre>
   class Program
    {
        static BlockingCollection buffer = new BlockingCollection
<int/>(2);
        static void Main(string[] args)
        {
            ThreadPool.QueueUserWorkItem(new WaitCallback(Produce));
            ThreadPool.QueueUserWorkItem(new WaitCallback(Consume));

            Console.Read();
        }
        private static void Produce(object obj)
        {
            Console.WriteLine("Produce::Invoked");
            for (int i = 0; i &lt; 3; i++)
            {
                //if (buffer.TryAdd(i)) Console.WriteLine("Added " + i);
                //else Console.WriteLine("Could Not Add");
                Thread.Sleep(1000);
                buffer.Add(i);
                Console.WriteLine("Item Added::" + i);

            }
        }
        private static void Consume(object obj)
        {
            int j = 0;
            Console.WriteLine("Consume::Invoked");
            for (int i = 0; i &lt; 3; i++)
            {
                //if (buffer.TryTake(out j)) Console.WriteLine("Taken " + j);
                //else Console.WriteLine("Could Not Take");
                Console.WriteLine("Item Taken::" + buffer.Take());
            }
        }
    }
</pre>

Here we have slowed up the producer using Thread.Sleep.So the consumer thread is blocked by Take method until producer adds one item in the buffer.That’s why program output is :

Produce::Invoked
Consume::Invoked
Item Added::0
Item Taken::0
Item Added::1
Item Taken::1
Item Added::2
Item Taken::2


Kick It on DotNetKicks.com
Tags: , ,
  • John Dillinger

    In the first code snippet, I believe your capacity is set to 10