Reactive Extensions for Retlang Developers

Tuesday, July 20, 2010 10:18 PM

Most of the talk about Rx has been about the "Push LINQ" aspects of it, so I thought I'd have a bash at explaining it from a Retlang perspective instead.  Let's start with IObserver:

    public interface IObserver<in T>
    {
        void OnNext(T value);
        void OnError(Exception error);
        void OnCompleted();
    }

Now, in the same place Retlang just uses an Action<T> (or IPublisher<T>, depending on context).  If you want to catch exceptions, you implement IBatchExecutor and catch the exception yourself.  As for OnCompleted, there just isn't a straight parallel, really.

Now, let's see the equivalent of IFiber:

    public interface IScheduler
    {
        DateTimeOffset Now { get; }
        IDisposable Schedule(Action action);
        IDisposable Schedule(Action action, TimeSpan dueTime);
    }

Okay, so what's different?  Well, you can't "EnqueueAll" and schedulers can't keep track of their own subscriptions.  On the other hand, Schedule returns a Disposable: every scheduled action should be cancellable.  The scheduling by time is a little less directly usable, but fine.  It's nice to see that "Now" is abstracted out, but it seems to be a bit of a strange implementation detail.  Also, when you actually examine the code, you discover that CurrentThreadQueue, for instance, is hardwired to Scheduler.Now anyway, which renders it all a bit pointless.

So far, this is all pretty similar to Retlang.  But the equivalent of ISubscriber is rather unexpected:

    public interface IObservable<out T>
    {
        IDisposable Subscribe(IObserver<T> observer);
    }

Ignoring all of the overloads subscriber has, there's one big thing missing: you specify the listener, but not the fiber.  That's because an IObservable can include a thread.  The command for this is "observable.ObserveOn(scheduler)"*, which returns another observable.  (There's also observable.SubscribeOn(observer, scheduler).  As far as I can figure out, this causes the subscriptions to run against that scheduler, which isn't really what you want, but is likely the routine you'd try first if you were a Retlang user.)

Finally, while Channel in Retlang is a combination of IPublisher and ISubscribable, ISubject does the same thing in Rx: it's the combination of IObserver and IObservable and hence is the natural place to put a buffer.

Some Interesting Implementation Details

If you look at the implementation of CurrentThreadScheduler, you'll see it uses a priority queue rather than the Timer model Retlang uses.  This is actually really rather cool.  In particular, it means that you can actually schedule events to occur in the past.  It may seem like that's a stupid thing to do, but in practice it means that you can cause certain messages to be processed out of band, which is actually pretty powerful.  Imagine, for instance, a set of workers responding to HTTP connections.  This would allow you to prioritize messages from existing (keep-alive) connections.  (Let's be honest, this is a bit of a hack, but out of band messages can be useful.)

There's all sorts of simple and useful classes running around here: BooleanDisposable, CompositeDisposable.  It's a pity they aren't just in the .NET framework to begin with.  Of course, Microsoft give with one hand and take with the other.  PriorityQueue is marked internal, continuing Microsoft's long tradition of not letting you near the code.  Equally, AnonymousObserver, a simple class that takes three delegates and constructs the obvious IObservable, is marked internal, making writing your own Subscribe overloads just that tiny bit less convenient.

Different Sorts of Schedulers

As you can see, the basic building blocks for a Retlang-style structure are all there.  The various sorts of schedulers/fibers are also worth looking at:

  • Scheduler.Immediate: Pretty much the same as StubFiber.  Runs on the current thread and blocks if you schedule something in the future.
  • Scheduler.CurrentThread : Okay, I'm just perplexed by this one.  As far as I can tell, it behaves identically to Immediate, but the code is significantly more complex.  It's not thread-safe: you couldn't enqueue something from another thread the way you can with ThreadFiber; it's got this curious EnsureTrampoline function that I frankly don't understand.  I imagine this will become more clear at some point in the future when I understand Rx better.
  • Scheduler.DispatcherScheduler:  WPF support
  • ControlScheduler:  WinForms support
  • EventLoopScheduler:  ThreadFiber with fewer options.  Would it be so hard to put a Func<ThreadStart, Thread> into the constructor?
  • Scheduler.ThreadPoolScheduler: PoolFiber
  • TaskPool scheduler: for integration with the parallel tasks library.

This is all harder to figure out than it should be.  There's no source code download and no documentation worth speaking of.  My personal opinion: if they can open source ASP.NET MVC, why can't they open source this?

Technorati Tags: ,
Comments
No comments posted yet.
Something to add?

Talking sense? Talking rubbish? Something I'm missing? Let me know!

Fields denoted with a "*" are required.

 (will not be displayed)

 
Please add 7 and 4 and type the answer here:

Preview Your Comment