TPL DataFlow–BufferBlock<T> & Load Balancing

Mar 27th, 2011 | By | Category: Framework & Libraries

Let’s consider a situation where we have a data source which is churning out data like a continuous stream and passing it to some data processing units (like a method) as shown in the figure below.The processing units are load balanced i.e. if one unit is still busy processing then the next element coming out of the data source will be handed over to the next processing unit.

 

TDF

In this post we will see how we can implement such a data processing unit using the TPL DataFlow Library which is now available as a MSDN Dev Labs.This DataFlow library is built on top of Task Parallel Library (TPL) to provide facilities for easy development of programs involving data flow networks.TPL DataFlow provides a set of foundational interfaces and set of built-in ready to use data flow block classes as shown below.

 

 

TDF1

  • IDataFlowBlock<T> is the basic interface which is implemented by all the dataflow blocks
  • ISourceBlock<T> is implemented by all the data flow blocks which is designed to act as a data source from which data can be received.
  • ITargetBlock<T> is implemented by all the data flow blocks which needs to act as a target to which data can be passed onto or posted.

Amongst the various built in dataflow blocks the BufferBlock and ActionBlock are most useful.

  • BufferBlock<T> can act as a target block to which data can be passed onto.It can buffer this data and pass it onto other target blocks acting as a data source for them.
  • ActionBlock<TInput> is a target block which receives an input element and executes a method on that element via a delegate.

So we can very well implement the dataflow network we discussed about in the beginning of the post using BufferBlock and ActionBlock as shown below:

TDF2

This is what we are planning to do:

  • Some code block will post data to the BufferBlock using it’s Post(T t) method.
  • This BufferBlock is linked to 3 ActionBlock instances using the LinkTo<ITarget<T> t) method of BufferBlock.

Note, that BufferBlock does not handover copies of the input data to all the target blocks it is linked to.Instead it does so to one target block only.Here we are expecting that when one target is busy processing the request.It will be handed over to the other target.Now let’s refer to the code below:

static void Main(string[] args)
        {
            BufferBlock<int> bb = new BufferBlock<int>();
            ActionBlock<int> a1 = new ActionBlock<int>((a) =>
                                                        {
                                                            Thread.Sleep(100);
                                                            Console.WriteLine("Action A1 executing with value {0}", a);
                                                        }
                                                      );

            ActionBlock<int> a2 = new ActionBlock<int>((a) =>
                                                        {
                                                            Thread.Sleep(50);
                                                            Console.WriteLine("Action A2 executing with value {0}", a);
                                                        }
                                                      );
            ActionBlock<int> a3 = new ActionBlock<int>((a) =>
                                                        {
                                                            Thread.Sleep(50);
                                                            Console.WriteLine("Action A3 executing with value {0}", a);
                                                        }
                                                      );
            bb.LinkTo(a1);
            bb.LinkTo(a2);
            bb.LinkTo(a3);
            Task t = new Task(() =>
                                {
                                    int i = 0;
                                    while (i < 10)
                                    {
                                        Thread.Sleep(50);
                                        i++;
                                        bb.Post(i);
                                    }
                                }
                             );
            t.Start();
            Console.Read();
        }

When executed it produces the following output:

Action A1 executing with value 1
Action A1 executing with value 2
Action A1 executing with value 3
Action A1 executing with value 4
Action A1 executing with value 5
Action A1 executing with value 6
Action A1 executing with value 7
Action A1 executing with value 8
Action A1 executing with value 9
Action A1 executing with value 10

This shows that only one target is actually executing all the data even when it’s busy(due to the Thread.Sleep(100) added purposefully).Why?

This is because all the target blocks are by default greedy in nature and buffers the input even when they are not able to process the data.To change this behavior we have set the Greedy property to false in the DataFlowBlockOptions while initializing the ActionBlock as shown below.

static void Main(string[] args)
        {
            BufferBlock<int> bb = new BufferBlock<int>();
            ActionBlock<int> a1 = new ActionBlock<int>((a) =>
                                                        {
                                                            Thread.Sleep(100);
                                                            Console.WriteLine("Action A1 executing with value {0}", a);
                                                        }
                                                      , new DataflowBlockOptions(taskScheduler: TaskScheduler.Default,
                                                                                 maxDegreeOfParallelism: 1, maxMessagesPerTask: 1,
                                                                                 cancellationToken: CancellationToken.None,
                                                                                 //Not Greedy
                                                                                 greedy: false)
                                                      );

            ActionBlock<int> a2 = new ActionBlock<int>((a) =>
                                                        {
                                                            Thread.Sleep(50);
                                                            Console.WriteLine("Action A2 executing with value {0}", a);
                                                        }
                                                        , new DataflowBlockOptions(taskScheduler: TaskScheduler.Default,
                                                                                 maxDegreeOfParallelism: 1, maxMessagesPerTask: -1,
                                                                                 cancellationToken: CancellationToken.None,
                                                                                 greedy: false)
                                                      );
            ActionBlock<int> a3 = new ActionBlock<int>((a) =>
                                                        {
                                                            Thread.Sleep(50);
                                                            Console.WriteLine("Action A3 executing with value {0}", a);
                                                        }
                                                        , new DataflowBlockOptions(taskScheduler: TaskScheduler.Default,
                                                                                 maxDegreeOfParallelism: 1, maxMessagesPerTask: -1,
                                                                                 cancellationToken: CancellationToken.None,
                                                                                 greedy: false)
                                                      );
            bb.LinkTo(a1);
            bb.LinkTo(a2);
            bb.LinkTo(a3);
            Task t = new Task(() =>
                                {
                                    int i = 0;
                                    while (i < 10)
                                    {
                                        Thread.Sleep(50);
                                        i++;
                                        bb.Post(i);
                                    }
                                }
                             );
            t.Start();
            Console.Read();
        }

The output of this program is:

Action A1 executing with value 1
Action A2 executing with value 3
Action A1 executing with value 2
Action A3 executing with value 6
Action A3 executing with value 7
Action A3 executing with value 8
Action A2 executing with value 5
Action A3 executing with value 9
Action A1 executing with value 4
Action A2 executing with value 10

This clearly a distribution of the data across three ActionBlock(s) as expected.


Kick It on DotNetKicks.com
Tags: , ,