Reactive Extensions in .NET – Hot & Cold Observable

Sep 26th, 2010 | By | Category: Framework & Libraries

In my last post on the Reactive Extensions in .NET we have discussed about observable collections.Here we will take a look into the different types of observable and how they pushes values to the subscribers.In the example mentioned in the last post we  saw that, after invoking the subscribe method on the observable collection it starts pushing values to the subscriber.

Here I have written a very simple implementation of IObservable to make this clear.

public class MyColdObservable:IObservable<int>
{ 

    #region IObservable<int> Members 

    public IDisposable Subscribe(IObserver<int> observer)
    {
        try
        {
            for (int i = 0; i < 100; i++)
            {
                observer.OnNext(i);
            }
            observer.OnCompleted();
        }
        catch (Exception ex)
        {
            observer.OnError(ex);
        }
        return Disposable.Empty;
    } 

    #endregion
}

I have used two subscribers subscribing to this observable as shown below:

MyColdObservable cold = new MyColdObservable(); 

IDisposable sub1= cold.Subscribe((i) => Console.WriteLine("Observer#1::OnNext()::{0}", i),
               (e) => Console.WriteLine("Observer#1::OnError()", e),
               () => Console.WriteLine("Observer#1::OnCompleted()"));

IDisposable sub2 = cold.Subscribe((i) => Console.WriteLine("Observer#2::OnNext()::{0}", i),
               (e) => Console.WriteLine("Observer#2::OnError()", e),
               () => Console.WriteLine("Observer#2::OnCompleted()"));

The program prints out:

Observer#1::OnNext()::0
Observer#1::OnNext()::1

…..

……
Observer#1::OnNext()::99
Observer#1::OnCompleted()

Observer#2::OnNext()::0
Observer#2::OnNext()::1

…..

……
Observer#2::OnNext()::99
Observer#2::OnCompleted()

Here when the Subscribe method is called then only the observable start pushing values to the observer.These are referred to as the Cold Observable.We can think of an example where values are fetched from the database on each invocation of the subscribe method and pushed to the subscriber.

Similarly we can have Hot Observable, which exists and pushes values even when there are no observers or subscribers to it.When a subscriber subscribes it gets the current value and henceforth.The mousemove events,timers and stock tickers are good examples of hot observable.

Let’s start building a simple hot observable step by step.

public class MyHotObservable
 {
     private List<IObserver<int>> observers = null;
     private object lockObject = new object();
     public void Start()
     {
         Thread t = new Thread(() => Push());
         t.Start();
     }
     private void Push()
     {
         int i =0;
         int observerCount = 0;
         try
         {
             while (i<20)
             {
                 Thread.Sleep(100);
                 if (observers != null)
                 {
                     lock (lockObject)
                     {
                         observerCount = observers.Count;
                     }
                     for(int k=0;k<observerCount;k++)
                     {
                         observers[k].OnNext(i);
                     }
                 }
                 i++;
             } 

         }
         catch (Exception ex)
         {
             for (int k = 0; k < observerCount; k++)
             {
                 observers[k].OnError(ex);
             }
         }
     }
 }

The following points needs to be noted about this class:

  • This class maintains a collection of IObserver objects.
  • In the method Push within while loop it pushes the integer values to the all the observers in that collection.If the collection is empty then the loop progresses without any observer to receive them.
  • The Start method spawns a new thread and invokes the Push method

Now let’s implement the IObservable and the Subscribe method.

   public class MyHotObservable : IObservable<int>
    {
        private List<IObserver<int>> observers = null;
        private object lockObject = new object();
    ...............
    ............... 

        #region IObservable<int> Members 

        public IDisposable Subscribe(IObserver<int> observer)
        {
            lock (lockObject)
            {
                if (observers == null) observers = new List<IObserver<int>>();
                observers.Add(observer);
            }
            return Disposable.Create(() => {
                                                lock(lockObject)
                                                {
                                                    Console.WriteLine("Disposed");
                                                    observer.OnCompleted();
                                                    observers.Remove(observer);
                                                }
                                           }
            );
        } 

        #endregion 

    }

In the Subscribe method we are adding the observers to the collection. This method returns a IDisposable object which is created using the Create method of the Disposable class in System.Disposable namespace.This method

  • Creates an instance of a IDisposable object.
  • accepts an Action delegate as a parameter which is invoked when Dispose method of the IDisposable object is called.

 

The following code is used to test this class.

MyHotObservable hot = new MyHotObservable();
hot.Start();
IDisposable sub1 = hot.Subscribe((i) => Console.WriteLine("Observer#1::OnNext()::{0}", i),
               (e) => Console.WriteLine("Observer#1::OnError()", e),
               () => Console.WriteLine("Observer#1::OnCompleted()"));
Thread.Sleep(300);
IDisposable sub2 = hot.Subscribe((i) => Console.WriteLine("Observer#2::OnNext()::{0}", i),
               (e) => Console.WriteLine("Observer#2::OnError()", e),
               () => Console.WriteLine("Observer#2::OnCompleted()"));
Thread.Sleep(300);
sub1.Dispose(); 

Console.Read();
sub2.Dispose();

The program prints out the following:

Observer#1::OnNext()::1
Observer#1::OnNext()::2
Observer#1::OnNext()::3
Observer#2::OnNext()::3
Observer#1::OnNext()::4
Observer#2::OnNext()::4
Observer#1::OnNext()::5
Observer#2::OnNext()::5
Disposed
Observer#1::OnCompleted()
Observer#2::OnNext()::6

….

….
Observer#2::OnNext()::19

Here we are staring the hot observable class, then two subscribers are subscribing with a time delay and after that the first subscriber is disposed.So we can easily observe from the output shown above that the OnNext method of second observer has started printing values later and it has started printing from 3 not from 1 like the cold observable case shown earlier.

This was all about cold & hot observable.In the next post we will discuss about how cold observable can be converted to hot observable.


Kick It on DotNetKicks.com
Tags: , , ,
  • http://topsy.com/trackback?url=http%3A%2F%2Fcodingndesign.com%2Fblog%2F%3Fp%3D178&utm_source=pingback&utm_campaign=L2 Tweets that mention Reactive Extensions in .NET – Hot & Cold Observable – Coding N Design — Topsy.com

    [...] This post was mentioned on Twitter by Sankarsan Bose, Sankarsan Bose. Sankarsan Bose said: Reactive Extensions in .NET – Hot & Cold Observable http://dlvr.it/5xXMg [...]

  • http://blog.cwa.me.uk/2010/09/27/the-morning-brew-694/ The Morning Brew – Chris Alcock » The Morning Brew #694

    [...] Reactive Extensions in .NET Hot & Cold Observable – ‘Sankarsan’ continues exploring the Reactive Extensions for .NET with a look in this part at the concept of Hot and Cold Observables and what the difference is. [...]

  • http://codingndesign.com/blog/?p=186 Reactive Extensions in .NET – ConnectableObservable – Coding N Design

    [...] my last post on Reactive Extensions in .NET I had discussed about Hot and Cold Observable and their behavior.In [...]