Rx
There are 5 entries for the tag Rx

As I've previous discussed, exactly when something starts in Rx is a bit of an issue.  Sometimes, for instance when dealing with network traffic, we need to do some work before we receive any more data.  We could simply buffer it, but that runs the risk of the long queue problem.  So, we need some way of telling an observable to start, stop and resume.  IConnectableObservable gives us exactly the interface that we want here: the Connect method represents Start/Resume, with disposing of its return value represents Pause. Obviously, if extra values are produced, we need them buffered until...

The run-on-subscribe model I described in the previous post on Rx is an important concept within the system.  So much so that there's a method Observable.Defer.  Defer takes a factory and creates a new observable based on the factory that runs when you subscribe.  It's basically a lazy create-on-subscribe.  However, it's worth bearing in mind that Defer only works if your subscription occurs before the observable produces a value.  Now, if you examine our previous code, you'll see that there was no sensible way of achieving that. This raises the interesting question of how FromAsyncPattern does its job, since...

When you subscribe to a channel in Retlang, you get an IUnsubscriber back.  The equivalent of this in Rx is just IDisposable.  This makes AnonymousDisposable is a fairly vital class in ReactiveExtensions. I even used it in the last post. It's a pity someone decided to mark it as internal (again).  So here's another implementation of it: public class AnonymousDisposable : IDisposable { private readonly Action _onDispose; public AnonymousDisposable(Action onDispose) { _onDispose = onDispose; } public void Dispose() ...

Since Reactive Extensions seems to have no documentation worth speaking of, I figure it's worth writing up stuff as I learn it.  One of the major differences between Retlang and Reactive Extensions is that Rx has a first order concept of a message source.  ISubscriber defines approximately the same interface as IObservable, but the only implementation is Channel, which is the equivalent of subject.  A fair number of methods within Rx are geared to producing sources to react to. So, let's consider the following extremely boring Rx source: public class NumberSource : IObservable<int> { Subject<int> underlying...

Most of the talk about Rx has been about the "Push LINQ" aspects of it, so I thought I'd have a bash at explaining it from a Retlang perspective instead.  Let's start with IObserver: public interface IObserver<in T> { void OnNext(T value); void OnError(Exception error); void OnCompleted(); } Now, in the same place Retlang just uses an Action<T> (or IPublisher<T>, depending on context).  If you want to catch exceptions,...