posts - 19 , comments - 8 , trackbacks - 0

Creating an observable on a task with cancellation logic

The following code demonstrates how to create an observable which runs a task. When the subscription is disposed the task completes.

 
using System;
using System.Reactive.Linq;
using System.Threading.Tasks;

namespace JetBlack.Reactive.TaskExamples
{
    public class SingleSubscriber
    {
        public void Test()
        {
            var observable = Observable.Create<int>(
                (observer, token) =>
                    Task.Factory.StartNew(() =>
                        {
                            var i = 0;
                            while (!(token.WaitHandle.WaitOne(500) || token.IsCancellationRequested))
                                observer.OnNext(i++);
                        }));

            var disposable = observable.Subscribe(
                x => Console.WriteLine("OnNext: {0}", x),
                e => Console.WriteLine("OnError: {0}", e),
                () => Console.WriteLine("OnCompleted"));

            Console.WriteLine("Press <ENTER> to stop subscriptions");
            Console.ReadLine();

            Console.WriteLine("Disposing");
            disposable.Dispose();

            Console.WriteLine("Press <ENTER> to quit");
            Console.ReadLine();
        }
    }
}

Print | posted on Thursday, October 16, 2014 9:59 AM | Filed Under [ C# Reactive Extensions rx Observable ]

Feedback

No comments posted yet.
Post A Comment
Title:
Name:
Email:
Comment:
Verification:
 

Powered by: