StreamInsight and Reactive Framework Challenge

by Allan Mitchell 6 Feb 2011 23:09

In his blogpost Roman from the StreamInsight team asked if we could create a Reactive Framework version of what he had done in the post using StreamInsight.  For those who don’t know, the Reactive Framework or Rx to its friends is a library for composing asynchronous and event-based programs using observable collections in the .Net framework.  Yes, there is some overlap between StreamInsight and the Reactive Extensions but StreamInsight has more flexibility and power in its temporal algebra (Windowing, Alteration of event headers)

Well here are two alternate ways of doing what Roman did.

The first example is a mix of StreamInsight and Rx

var rnd = new Random();
var RandomValue = 0;
var interval = Observable.Interval(TimeSpan.FromMilliseconds((Int32)rnd.Next(500,3000)))
    .Select(i =>
    {
        RandomValue = rnd.Next(300);
        return RandomValue;
    });

Server s = Server.Create("Default");
Microsoft.ComplexEventProcessing.Application a = s.CreateApplication("Rx SI Mischung");

var inputStream = interval.ToPointStream(a, evt =>
                            PointEvent.CreateInsert(
                                    System.DateTime.Now.ToLocalTime(), 
                                    new { RandomValue = evt}), AdvanceTimeSettings.IncreasingStartTime, "Rx Sample");


var r = from evt in inputStream
        select new { runningVal = evt.RandomValue };


foreach (var x in r.ToPointEnumerable().Where(e => e.EventKind != EventKind.Cti))
{
    Console.WriteLine(x.Payload.ToString());
}

This next version though uses the Reactive Extensions Only

 

var rnd = new Random();
var RandomValue = 0;
Observable.Interval(TimeSpan.FromMilliseconds((Int32)rnd.Next(500, 3000)))
    .Select(i =>
    {
        RandomValue = rnd.Next(300);
        return RandomValue;
    }).Subscribe(Console.WriteLine, () => Console.WriteLine("Completed"));

Console.ReadKey();

 

These are very simple examples but both technologies allow us to do a lot more.  The ICEPObservable() design pattern was reintroduced in StreamInsight 1.1 and the more I use it the more I like it.  It is a very useful pattern when wanting to show StreamInsight samples as is the IEnumerable() pattern.

Add comment

  Country flag

biuquote
  • Comment
  • Preview
Loading

RecentComments

Comment RSS