Reactive Extensions in .NET – ConnectableObservable

Oct 15th, 2010 | By | Category: Framework & Libraries

In my last post on Reactive Extensions in .NET I had discussed about Hot and Cold Observable and their behavior.In this post we will discuss on how a cold observable can be converted to a hot one using the System.Collections.Generic.IConnectableObservable.

The classes and interfaces involved in achieving this is shown in the diagram below.

 connobs

IConnectableObservable provides Connect() method to establish connection to an observable(IObservable) collection.The ConnectableObservable class implements this interface and acts a mediator in providing a hot observable like behavior out of a cold observable collection.

The following lines of code demonstrates how the ConnectableObservable class can be used to convert a cold observable created using Observable.Range method to hot observable.

IObservable<int> source = Observable.Range(1, 10);
ConnectableObservable<int> c = new ConnectableObservable<int>(source);
IDisposable sub1 = c.Subscribe<int>
                (
                  ((x) => Console.WriteLine("{0}::OnNext::{1}", 1, x)),
                  ((e) => Console.WriteLine("{0}::Error::{1}", 1, e.Message)),
                  () => Console.WriteLine("{0}::Completed", 1)
                );
Thread t3 = new Thread(() => c.Connect());
t3.Start();
Thread.Sleep(15);
IDisposable sub2 = c.Subscribe<int>
                (
                  ((x) => Console.WriteLine("{0}::OnNext::{1}", 2, x)),
                  ((e) => Console.WriteLine("{0}::Error::{1}", 2, e.Message)),
                  () => Console.WriteLine("{0}::Completed", 2)
                );
Console.Read();
sub1.Dispose();
sub2.Dispose();

The program prints the following:

1::OnNext::1
1::OnNext::2
2::OnNext::2
1::OnNext::3
2::OnNext::3
……

1::OnNext::9
2::OnNext::9
1::OnNext::10
2::OnNext::10
1::Completed
2::Completed

So what’s going on in here?

  • The ConnectableObservable class has an observable(cold) collection as the backing source.
  • Subscriber 1(sub1) subscribes to the ConnectableObservable.No values are pushed at this point to the subscriber.
  • On invoking the Connect method on the ConnectableObservable class it establishes a subscription to the backing source and starts receiving the values.The values received are passed on to Subscriber1.Note, the Connect method is invoked in a separate thread to prevent blocking.
  • After few milliseconds delay Subscriber2 subscribes to the ConnectableObservable and starts receiving the values immediately from the current position(2 in this case) till the end(like subscribing to a hot observable)

connobsd

So the ConnectableObservable subscribes/connects to the cold observable as the single subscriber and pushes the values to the subscribers like a hot observable as it receives them from the source.

The connectable observable collection can also be created using the Observable.Publish method as shown below:

IConnectableObservable<int> c = Observable.Publish<int>(Observable.Range(1, 10)); 

IDisposable sub1 = c.Subscribe<int>
                 (
                   ((x) => Console.WriteLine("{0}::OnNext::{1}", 1, x)),
                   ((e) => Console.WriteLine("{0}::Error::{1}", 1, e.Message)),
                   () => Console.WriteLine("{0}::Completed", 1)
                 );
Thread t3 = new Thread(() => c.Connect());
t3.Start();
Thread.Sleep(15);
IDisposable sub2 = c.Subscribe<int>
                 (
                   ((x) => Console.WriteLine("{0}::OnNext::{1}", 2, x)),
                   ((e) => Console.WriteLine("{0}::Error::{1}", 2, e.Message)),
                   () => Console.WriteLine("{0}::Completed", 2)
                 );
Console.Read();
sub1.Dispose();
sub2.Dispose();

Kick It on DotNetKicks.com
Tags: ,
  • http://topsy.com/trackback?url=http%3A%2F%2Fcodingndesign.com%2Fblog%2F%3Fp%3D186&utm_source=pingback&utm_campaign=L2 Tweets that mention Reactive Extensions in .NET – ConnectableObservable – Coding N Design — Topsy.com

    [...] This post was mentioned on Twitter by Sankarsan Bose, Sankarsan Bose. Sankarsan Bose said: RT @sankarsan Reactive Extensions in .NET – ConnectableObservable http://bit.ly/d4HbCP #RxDotNet #Rx #DotNet #CSharp [...]

  • http://real-url.org/twitted.php?id=27760597930 Twitted by Mozito18

    [...] This post was Twitted by Mozito18 [...]

  • http://real-url.org/twitted.php?id=28585130512 Twitted by progg_ru

    [...] This post was Twitted by progg_ru [...]