The following example creates an observable which wraps a task with multiple subscribers.
Note that the task is cancelled when the last subscriber is disposed.
using System;
using System.Reactive.Linq;
using System.Threading.Tasks;
namespace JetBlack.Reactive.TaskExamples
{
class MultipleSubscribers
{
public void Test()
{
var observable = Observable.Create<int>(
(observer, token) =>
Task.Factory.StartNew(() =>
{
var i = 0;
while (!(token.WaitHandle.WaitOne(500) || token.IsCancellationRequested))
observer.OnNext(i++);
Console.WriteLine("Task completed");
}));
var publisher = observable.Publish().RefCount();
var disposable1 = publisher.Subscribe(
x => Console.WriteLine("OnNext(1): {0}", x),
e => Console.WriteLine("OnError(1): {0}", e),
() => Console.WriteLine("OnCompleted(1)"));
var disposable2 = publisher.Subscribe(
x => Console.WriteLine("OnNext(2): {0}", x),
e => Console.WriteLine("OnError(2): {0}", e),
() => Console.WriteLine("OnCompleted(2)"));
Console.WriteLine("Press <ENTER> to stop the first subscriber");
Console.ReadLine();
Console.WriteLine("Disposing the first subscriber");
disposable1.Dispose();
Console.WriteLine("Press <ENTER> to stop the second subscriber");
Console.ReadLine();
Console.WriteLine("Disposing the second subscriber");
disposable2.Dispose();
Console.WriteLine("Press <ENTER> to quit");
Console.ReadLine();
}
}
}