Patterns with MEF and RX

Since I started using RX, events have played less and less of a role in my code. It’s true that in the same way LINQ has relegated for-loops to only niche situations, RX is making events all but obsolete. What does this mean for the EventAggregator? Certainly, you can implement your own version using RX. But, if you happen to be using MEF (preferably as your IOC container) you can even get rid of that.

Introducing the Streams Pattern

First we need to create a “stream provider”. The purpose of this class is to establish a single place that “owns” the stream. I like to use the stream provider to establish default values and behavior:

public static class StreamNames 
{
  private const string Prefix = "Streams_";

  public const string Accounts = Prefix + "Accounts";
  public const string SelectedAccount = Prefix + "SelectedAccount";
}

// No need to export this type. An instance will be 
// shared between the two child exports.
public class AccountStreamProvider
{
  [ImportingConstructor]
  public AccountStreamProvider(IAccountService accountService)
  {
    // Defer the query for the list of accounts until the first subscription
    var publishedAccounts = Observable
      .Defer(() => Observable.Return(accountService.GetAccounts()))
      .Publish();

    publishedAccounts.Connect();
    Accounts = publishedAccounts;

    SelectedAccount = new ReplaySubject(1);

    // Take the first account and set it as the initially selected account.
    Accounts
      .Take(1)
      .Select(Enumerable.FirstOrDefault)
      .Subscribe(SelectedAccount.OnNext);
  }

  [Export(StreamNames.Accounts)]
  public IObservable<IEnumerable<Account>> Accounts { get; set; }

  [Export(StreamNames.SelectedAccount)]
  [Export(StreamNames.SelectedAccount, typeof(IObservable<Account>)]
  public ISubject<Account> SelectedAccount { get;set; }
}

Already the benefits of RX over the EventAggregator are showing.

Now we just need to get a reference to our exports in a relevant view model:

public static class Extensions
{
  public static void DisposeWith(this IDisposable source, CompositeDisposable disposables)
  {
    disposables.Add(source);
  }
}

[Export, PartCreationPolicty(CreationPolicy.NotShared)]
public class AccountViewModel : BindableBase, IDisposable
{
  private readonly Streams _streams;

  private readonly CompositeDisposable _disposables;

  [Export]
  public class Streams
  {
    [Import(StreamNames.Accounts)]
    public IObservable<IEnumerable<Account>> Accounts { get; set; }

    [Import(StreamNames.SelectedAccount)]
    public ISubject<Account> SelectedAccount { get; set; }
  }

  [ImportingConstructor]
  public AccountViewModel(Streams streams)
  {
    _streams = streams;

    _disposables = new CompositeDisposable();

    _streams
      .Accounts
      .Subscribe(a => Accounts = a)
      .DisposeWith(_disposables);

    _streams
      .SelectedAccount
      .Subscribe(a => SelectedAccount = a)
      .DisposeWith(_disposables);
  }

  private IEnumerable<Account> _accounts;
  public IEnumerable<Account> Accounts
  {
    get { return _accounts; }
    private set { SetProperty(ref _accounts, value, "Accounts"); }
  }

  private Account _selectedAccount;
  public Account SelectedAccount
  {
    get { return _selectedAcccount; }
    private set { SetProperty(ref _selectedAccount, value, "SelectedAccount", OnSelectedAccountChanged); }
  }

  // This method is only called when _selectedAccount 
  // actually changes, so there's no indirect recursion. 
  private void OnSelectedAccountChanged()
  {
    _streams.SelectedAccount.OnNext(_selectedAccount);
  }

  public void Dispose()
  {
    _disposables.Dispose();
  }
}

By the way, I’m using BinableBase from my INPC template.

Introducing the Config Pattern

If you’ve been using RX for a while, you might have noticed that I forgot to put my setters on the dispatcher. Sure, I could rely on automatic dispatching to fix that problem, but if my queries got any more complicated It’d be better for me to take care of the dispatch myself.

Of course, the problem with ObserveOnDispatcher is that it makes testing a huge pain. Fortunately, we can use MEF to get around that problem, too.

public class AccountViewModel : BindableBase, IDisposable
{
  private readonly Streams _streams;
  private readonly Config _config;

  private readonly CompositeDisposable _disposables;

  [Export]
  public class Streams
  {
    [Import(StreamNames.Accounts)]
    public IObservable<IEnumerable<Account>> Accounts { get; set; }

    [Import(StreamNames.SelectedAccount)]
    public ISubject<Account> SelectedAccount { get; set; }
  }

  [Export] 
  public class Config 
  {
    public virtual IScheduler DispatcherScheduler { get { return Scheduler.Dispatcher; } }
  }

  [ImportingConstructor]
  public AccountViewModel(Streams streams, Config config)
  {
    _streams = streams;
    _config = config;

    _disposables = new CompositeDisposable();

    _streams
      .Accounts
      .ObserveOn(_config.DispatcherScheduler)
      .Subscribe(a => Accounts = a )
      .DisposeWith(_disposables);

    _streams
      .SelectedAccount
      .ObserveOn(_config.DispatcherScheduler)
      .Subscribe(a => SelectedAccount = a)
      .DisposeWith(_disposables);
  }
  // ...
}

Since the DispatcherScheduler property is virtual, it’s easy to mock it out. Using Moq, all you need to do is:

Mock.Of<AccountViewModel.Config>(m => m.DispatcherScheduler == Scheduler.Immediate)