Sharing in RX: Publish, Replay, and Multicast
10 Nov 2011State is a tricky thing in RX, especially when we have more than one subscriber to a stream. Consider a fairly innocuous setup:
var interval = Observable.Interval(TimeSpan.FromMilliseconds(500)); interval.Subscribe(i => Console.WriteLine("First: {0}", i)); interval.Subscribe(i => Console.WriteLine("Second: {0}", i));
At first glance it might look like I’m setting up two listeners to a single interval pulse, but what’s actually happening is that each time I call Subscribe
, I’ve created a new timer to tick values. Imagine how bad this could be if instead of an interval I was, say, sending a message across the network and waiting for a response.
There are a few ways to solve this problem, but only one of them is actually correct. The first thing most people latch on to in Intellisense is Publish()
. Now that method looks useful. So I might try:
var interval = Observable.Interval(TimeSpan.FromMilliseconds(500)).Publish(); interval.Subscribe(i => Console.WriteLine("First: {0}", i)); interval.Subscribe(i => Console.WriteLine("Second: {0}", i));
Now I get nothing at all. Great advertising there.
So what is actually happening here? Well, for one you might notice that interval
is no longer IObservable<long>
but is now an IConnectableObservable<long>
, which extends IObservable
with a single method: IDisposable Connect()
.
As it turns out, Publish
is simply a convenience method for Multicast
that supplies the parameter for you. Specifically, calling stream.Publish()
is exactly the same as calling stream.Multicast(new Subject<T>())
. Wait, a subject?
What Multicast
does is create a concrete implementation of IConnectableObservable<T>
to wrap the subject we give it and forwards the Subscribe
method of the IConnectableObservable<T>
to Subject<T>.Subscribe
, so it looks something like this:
You might have noticed that the input doesn’t go anywhere. That’s exactly why our simple call to Publish()
earlier didn’t produce any results at all – IConnectableObservable<T>
hadn’t been fully wired up yet. To do that, we need to make a call to Connect()
, which will subscribe our input into our subject.
Connect()
returns to us an IDisposable
which we can use to cut off the input again. Keep in mind the downstream observers have no idea any of this is happening. When we disconnect, OnCompleted
will not be fired.
Getting back to my example, the correct code looks like this:
var interval = Observable.Interval(TimeSpan.FromMilliseconds(500)).Publish(); interval.Subscribe(i => Console.WriteLine("First: {0}", i)); interval.Subscribe(i => Console.WriteLine("Second: {0}", i)); var connection = interval.Connect(); // Later connection.Dispose();
It is very important to make sure all of your subscribers are setup before you call Connect()
. You can think of Publish
(or, really, Multicast
) like a valve on a pipe. You want to be sure you have all your pipes together and sealed before you open it up, otherwise you’ll have a mess.
A problem that comes up fairly often is what do you do when you do not control the number or timing of subscriptions? For instance, if I have a stream of USD/EUR market rates, there’s no need for me to keep that stream open if nobody is listening, but if someone is, I’d like to share that connection, rather than create a new one.
This is where RefCount()
comes in. RefCount()
takes an IConnectableObservable<T>
and returns an IObservable<Tg>
, but with a twist. When RefCount
gets its first subscriber, it automatically calls Connect()
for you and keeps the connection open as long as anyone is listening; once the last subscriber disconnects, it will call Dispose
on its connection token.
So now you might be wondering why I didn’t use RefCount()
in my so-called “correct” implementation. I wouldn’t have had to call either Connect()
or Dispose
, and less is more, right? All that is true, but it omits the cost of safety. Once I dispose my connection, my source no longer has an object reference to my object, which allows the GC to do what it does best. Often, these streams start to make their way outside of my class, which can create a long dependency chain of object references. That’s fine, but if I dispose an object in the middle, I want to make sure that that object is now ready for collection, and if I RefCount()
, I simply can’t make that assertion, because I’d have to ensure every downstream subscriber had also disposed.
Another scenario that comes up is how to keep a record of things you’ve already received. For instance, I might make a call to find all tweets with the hashtag “#RxNet” with live updates. If I subscribe second observer, I might expect that all the previously found data to be sent again without making a new request to the server. Fortunately, we have Replay()
for this. It literally has 15 overloads, which cover just about every permutation of windowing by count and/or time, and supplying an optional scheduler and/or selector. The parameterless call, however, just remembers everything. Replay
is just like Publish
in the sense that it also forwards a call to Multicast
, but this time with a ReplaySubject<T>
.
Now the temptation is to combine Replay()
and RefCount
to make caches of things for subscribers when they are needed. Lets look at my Twitter example.
tweetService.FindByHashTag("RxNet").Replay().RefCount()
When the first observer subscribes, FindByHashTag
will make a call (I assume this is deferred) to the network and start streaming data, and all is well. When the second observer subscribes, he gets all the previously found data and updates. Great! Now, let’s say both unsubscribe and a third observer then subscribes. He’s going to get all that previous data, and then the deferred call in FindByHashTag
is going to be re-triggered and provide results that we might have already received from the replay cache! Instead, we should implement a caching solution that actually does what we expect and wrap it in an Observable.Create
, or if we expect only fresh tweets, use Publish().RefCount()
instead.