Reactive Extensions in .NET – Observable Collections

Sep 12th, 2010 | By | Category: Framework & Libraries

Last week I was reading about the two new interfaces introduced in .NET Framework 4.0 – IObservable & IObserver.In first place they seemed to be just simple interfaces providing a mechanism to implement the Observer (GoF) design pattern.But soon I found out that really meaningful implementations of these are actually present in the Reactive Extensions for .NET.This is a separate class library released as a MSDN DevLabs project and not part of .NET Framework till now.

This library is defined as

Rx is a library for composing asynchronous and event-based programs using observable collections.

Whoa!!! this definition seems to be a bit academic and intellectual one.

In this post and some of the following posts we will elaborate on this definition and try to understand the basic essence and purpose of this framework.

Before getting into observable collections let’s take look back towards the enumerable collections that we already have in .NET.The two interfaces implemented by these collections are

  • public interface IEnumerable<out T> : IEnumerable- Collections implementing this interface exposes an iterator or enumerator to enumerate through that collection.It exposes a method
    • IEnumerator<T> GetEnumerator()- This method returns the enumerator used for iteration.
  • public interface IEnumerator<out T> : IDisposable,IEnumerator- Used to iterate over a collection.It supports the following method and property
    • bool MoveNext() – Moves the iterator to the next element in the collection and returns false if there is no next element i.e. it has reached end of the collection.
    • T Current { get; } – Retrieves the value of the current element of the collection where iterator is pointing to.

The following code shows a simple iteration:

IEnumerable<int> list = Enumerable.Range(1, 10);
IEnumerator<int> en = list.GetEnumerator();
while (en.MoveNext())

So to get values out of an enumerable collection we need to interact with the collection and pull values out of it.

Now let’s take a look into the interfaces IObservable & IObserver :

  • public interface IObservable<out T> – This is implemented by the collection to be observed or subscribed to.
    • IDisposable Subscribe(IObserver<T> observer) – Used by the observer to subscribe to the observable collections
  • public interface IObserver<in T> – This is used to observe the changes and receive notifications.
    • void OnCompleted() – Informs the observer that source has finished sending messages.
    • void OnError(Exception error) – Informs about any exception/error.
    • void OnNext(T value) - Sends the next data value from the source.

The Reactive Extensions provides a set of APIs which helps to manipulate these observable collections.Out of these ,two classes are very important:

  • System.Linq.Observable – This class provides a set of methods for creating & performing LINQ/query operations on IObservable collections.
  • System.Linq.Observer – This class provides a set of methods to create Observers.

Let’s take a look at the following code below:

IObservable<int> source = Observable.Range(1, 10);
IObserver<int> obs1 = Observer.Create<int>
                       (x) => Console.WriteLine("OnNext :: " + x),
                       (ex) => Console.WriteLine("OnError :: " + ex.Message),
                       () => Console.WriteLine("Completed")

IDisposable sub1 = source.Subscribe(obs1);

This program prints out:

OnNext :: 1
OnNext :: 2
OnNext :: 3
OnNext :: 4
OnNext :: 5
OnNext :: 6
OnNext :: 7
OnNext :: 8
OnNext :: 9
OnNext :: 10

Here we are doing the following things:

  • Creating an observable collection of integer using the Observable.Range method.
  • Creating an observer by passing three action delegates to Observable.Create method.These three delegates correspond to OnNext,OnError and OnCompleted methods of IObserver. 
  • The observer subscribes to the collection using Observable.Subscribe method.As soon as the subscription happens the values are sent to the observer and delegate corresponding to OnNext method prints out the values.
  • This is followed by the OnCompleted method.

So here to get values from the collection we need to subscribe to it and then the collection pushes the values to which we need to react to.

In the following posts we will explore the API in more depth and try to find out how these observable collections can be of use in asynchronous and event based programming.

Kick It on
Tags: , , ,
  • Tweets that mention Reactive Extensions in .NET – Observable Collections – Coding N Design —

    [...] This post was mentioned on Twitter by Vaibhav Sharma, Sankarsan Bose. Sankarsan Bose said: Reactive Extensions in .NET – Observable Collections [...]

  • Shaunak

    interesting stuff… thanks for sharing!

  • Anonymous

    @Shaunak thanks….

  • Dave

    This is the first thing I’ve read about Rx that actually made sense to me. Thanks!

  • Biztalk Musings | Reactive Extensions in .NET – Observable Collections

    [...] September 13, 2010 21:52 by btsguy Reactive Extensions in .NET – Observable Collections link 84e8e81f-d7fc-49ac-8cf8-786ab0142659|0|.0 Tags: Categories: .NET | Rx Actions: E-mail | Kick it! [...]

  • Reactive Extensions in .NET – Hot & Cold Observable – Coding N Design

    [...] my last post on the Reactive Extensions in .NET we have discussed about observable collections.Here we will take [...]