Reactive Extensions: Making a Pausable Observable

09 August 2010 19:08

As I've previous discussed, exactly when something starts in Rx is a bit of an issue.  Sometimes, for instance when dealing with network traffic, we need to do some work before we receive any more data.  We could simply buffer it, but that runs the risk of the long queue problem.  So, we need some way of telling an observable to start, stop and resume.  IConnectableObservable gives us exactly the interface that we want here: the Connect method represents Start/Resume, with disposing of its return value represents Pause.

Obviously, if extra values are produced, we need them buffered until we reconnect.  I had a couple of other requirements: to behave like Observable.While (because I'm asynchronously reading from a stream) and to allow multiple "connections" at once.

So, here's the code:

public static class ObservableHelper {
    public static IConnectableObservable<TSource> WhileResumable<TSource>(Func<bool> condition, IObservable<TSource> source) {
        var buffer = new Queue<TSource>();
        var subscriptionsCount = 0;
        var isRunning = Disposable.Create(() => {
            lock (buffer)
            {
                subscriptionsCount--;
            }
        });
        var raw = Observable.CreateWithDisposable<TSource>(subscriber => {
            lock (buffer)
            {
                subscriptionsCount++;
                if (subscriptionsCount == 1)
                {
                    while (buffer.Count > 0) {
                        subscriber.OnNext(buffer.Dequeue());
                    }
                    Observable.While(() => subscriptionsCount > 0 && condition(), source)
                        .Subscribe(
                            v => { if (subscriptionsCount == 0) buffer.Enqueue(v); else subscriber.OnNext(v); },
                            e => subscriber.OnError(e),
                            () => { if (subscriptionsCount > 0) subscriber.OnCompleted(); }
                        );
                }
            }
            return isRunning;
        });
        return raw.Publish();
    }
}

I'll reiterate my previous point about scoping of anonymous functions.  If you don't understand them completely, you're not going to get very far with Reactive Extensions.  The above code is short and achieves its aims, but we've done the following:

  • Created an anonymous disposable using a lambda.
  • Created an anonymous observable using another lambda using the same scope of the disposable.
  • Constructed a repeating observable within that.  (Using another lambda)
  • Subscribed to that observable using lambda expressions that use the "raw" scope.

Equally, we have a chain of observables:

  • The source observable that is to be repeated.
  • The while observable that actually does the work
  • The raw observable that starts when you subscribe and buffers when you unsubscribe.
  • The final result, which is a connectable version of the raw observable.

Constructions like this are extremely common in Rx (and Clojure, for that matter), but rare in C# code in general.  You'll also note, I haven't even touched on LINQ.

Technorati Tags: ,
Comments
Gravatar
# re: Reactive Extensions: Making a Pausable Observable
Posted by Karli Watson on 10/12/2010 16:47
Thanks - solved a tricky problem that I was having without having to implement a complex custom buffering scheme!

One minor issue I had was that I had to move the definition of isRunning inside the subscriber delegate, else it only worked once. Maybe to do with how the rest of my code is using this helper though.
Gravatar
# re: Reactive Extensions: Making a Pausable Observable
Posted by Julian on 10/12/2010 18:27
Actually, you're probably dead on. I only needed the one, so the use case didn't come up. In any event I'm still spending more of my time with Retlang than Rx, simply because it's less complex.
Gravatar
# re: Reactive Extensions: Making a Pausable Observable
Posted by Ray Booysen on 03/10/2011 16:49
Hi Julian

Just a quick one, it looks like you're missing lock calls on your Observable.While actions. You're missing them around the Enqueue calls and potentially the subscriptionCount variable check
Gravatar
# re: Reactive Extensions: Making a Pausable Observable
Posted by Anderson Imes on 03/10/2011 23:10
They collapsed the CreateWithDisposable method into an overload of Observable.Create recently. Just in case anyone tries this code and wonders why it doesn't work.
Gravatar
# re: Reactive Extensions: Making a Pausable Observable
Posted by Julian on 08/10/2011 13:20
@Ray I think the locking is OK, since all of those calls are within a lock(buffer).

@Anderson Thanks, I never liked the name "createwithdisposable" anyway...
Something to add?

Talking sense? Talking rubbish? Something I'm missing? Let me know!

Fields denoted with a "*" are required.

 (will not be displayed)

 
Please add 8 and 2 and type the answer here:

Preview Your Comment