Event aggregator in .NET

A few days ago I had to choose an event aggregator implementation for using it inside the project (ASP.NET in this case). Of course, there are plenty of alternatives, probably the most famous being the one from Prism . Since that one has dependencies on WPF (or Silverlight) I had to look elsewhere for ASP.NET.

After a bit of digging, I realized that the answer was in front of me: for recent .NET Framework versions (4.0 and above at least) we already have a library (from Microsoft) that comes with an event aggregator implementation: Reactive Extensions (RX).
It’s free and open source (https://rx.codeplex.com/ – Apache license).

The class I was looking for is Subject<T>: http://msdn.microsoft.com/en-us/library/hh229173(v=vs.103).aspx . Despite it’s name, it has inside all what’s needed for an event aggregator.

If we come back to the definition, the event aggergator pattern is very simple: ‘Channel events from multiple objects into a single object to simplify registration for clients. An Event Aggregator is a simple element of indirection. In its simplest form you have it register with all the source objects you are interested in, and have all target objects register with the Event Aggregator. The Event Aggregator responds to any event from a source object by propagating that event to the target objects.‘.

public sealed class Subject<T> : ISubject<T>, ISubject<T, T>, IObserver, IObservable, IDisposable

Let’s take a look at Subject<T> from RX: ‘A subject acts similar to a proxy. It performs as both a subscriber and a publisher. This is accomplished by supporting the IObserver and IObservable interfaces. The IObserver interface can be used to subscribe the subject to multiple streams or sequences of data. The data is then published through it’s IObservable interface to all subscribed observers.‘.

If we replace ‘subscribe’ with ‘register’, ‘observable’ with ‘source objects’, ‘observer’ with ‘target objects’, and ‘streams or sequence of data’ with ‘events’, we get an event aggregator, despite the different terminology.

The methods we are interested in are two:
– public void OnNext(T value) – used by the source to ‘trigger’ (publish) an event
– public static IDisposable Subscribe<T>(this IObservable source, Action<T> onNext) – used by the target object to register (subscribe)

where T is the (custom) event type. In this context ‘event’ is not used to mean an .NET framework event – but the ‘generic’ event concept. The T will contain just the event payload and can be any type. Somehow it’s similar to EventArgs from .NET.

How can we use the Subject<T> class as an event aggregator? Easy.
The only precondition is to have a way to pass the same Subject instance to all interested parties (using a DI container or some other way). We build the Subject easily:

Subject subject = new Subject();

Subscribe just as easy from anywhere in the application:

...
var subscription = subject.AsObservable().Subscribe(ev => { // ... do something with the event } );
...
// unsubscribe when we are no longer interested, or before our class will be disposed
subscription.Dispose();

Trigger the event from a very different and distant class:

// ...
subject.OnNext(new MyCustomEventType(...));

Since we have access to RX source code (http://rx.codeplex.com/SourceControl/changeset/view/2b5dbddd740b#Rx/NET/Source/System.Reactive.Linq/Reactive/Subjects/Subject.cs)
we can verify what’s really happening behind the scenes:

   public IDisposable Subscribe(IObserver<T> observer)
   {
       // ...
       var obs = oldObserver as Observer<T>;
       if (obs != null)
       {
          newObserver = obs.Add(observer);
       }
       else
       {
          newObserver = new Observer<T>(new ImmutableList<IObserver<T>>(new[] { oldObserver, observer }));
       }
       // ...
       return new Subscription(this, observer);
    }

– each time an observer it’s added, it’s added to a list maintained by the (internal) Observer<T> class. Inside Onserver<T> we have:

internal class Observer<T> : IObserver<T>
    {
        private readonly ImmutableList<IObserver<T>> _observers;
    // ...
    }

ImmutableList is just a thread-safe collection that build a new array each time an element is added to it.
Probably the same collection will be publicly released by Microsoft soon: https://nuget.org/packages/Microsoft.Bcl.Immutable

When an event is triggered/published, the Subject class just iterates this collection:

public void OnNext(T value)
        {
            foreach (var observer in _observers.Data)
                observer.OnNext(value);
        }

To sum up, Subject<T> can be used as a basic event aggregator, even if it does not have some fancy features from Prism, like publishing the events on the subscriber UI thread.
Instead, we can use the full power of Reactive Extensions to filter the ‘stream’ of events in any way we want:

    using (subject.AsObservable()
                  .Where(se => se.Status == 1)
                  .Subscribe(se => { eventWasRaised = true; // ... })
          )
    {

      // ...
    }

Of course, a quick google search reaveals that the above idea is nothing new, and was use dby other people before:
http://machadogj.com/2011/3/yet-another-event-aggregator-using-rx.html or https://github.com/shiftkey/Reactive.EventAggregator
The only disadvantage is that it brings a dependency on Reactive Extensions..

Advertisements
This entry was posted in .NET, C# and tagged , , , , , , , , . Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s