CEP client with WPF Grid Data Control

CEP Client with WPF GridDataControl

Microsoft StreamInsight CEP platform is very powerful to develop Complex Event Processing (CEP) systems. There are several development models that we can follow. In this post we will use the IObservable / IObserver model using .NET Reactive extensions (Rx). Since this would be a real-time application, we would also be using F# async workflows to pull stock data.

image

F# async workflows are the coolest part of using F# in a real-time application. It allows writing concise code that

· Executes in parallel

· Expose to another .NET library with ease

I won’t go in detail about F# except for the async workflow used in this application. There is a three part series on using design patterns for F# async workflows, I have used the Pattern #3 in this post, since we are using Rx to invoke the workflows. In this design pattern, the worker reports the progress through events, modified version of AsyncWorker<> code is shown below,

type JobCompletedEventArgs<‘T>(job:int, result:’T) =

inherit EventArgs()

member x.Job with get() = job

member x.Result with get() = result

type AsyncWorker<‘T>(jobs: seq<Async<‘T>>) =

// This declares an F# event that we can raise

let allCompleted = new Event<‘T[]>()

let error = new Event<System.Exception>()

let canceled = new Event<System.OperationCanceledException>()

let jobCompleted = new Event<JobCompletedEventArgs<‘T>>()

let cancellationCapability = new CancellationTokenSource()

/// Start an instance of the work

member x.Start() =

// Capture the synchronization context to allow us to raise events back on the GUI thread

let syncContext = SynchronizationContext.CaptureCurrent()

// Mark up the jobs with numbers

let jobs = jobs |> Seq.mapi (fun i job -> (job, i+1))

let raiseEventOnGuiThread(evt, args) = syncContext.RaiseEvent evt args

let work =

Async.Parallel

[ for (job,jobNumber) in jobs ->

async { let! result = job

syncContext.RaiseEvent jobCompleted (new JobCompletedEventArgs<‘T>(jobNumber, result))

return result } ]

Async.StartWithContinuations

( work,

(fun res -> raiseEventOnGuiThread(allCompleted, res)),

(fun exn -> raiseEventOnGuiThread(error, exn)),

(fun exn -> raiseEventOnGuiThread(canceled, exn)),

cancellationCapability.Token)

/// Raised when a particular job completes

[<CLIEvent>]

member x.JobCompleted = jobCompleted.Publish

/// Raised when all jobs complete

[<CLIEvent>]

member x.AllCompleted = allCompleted.Publish

/// Raised when the composition is cancelled successfully

[<CLIEvent>]

member x.Canceled = canceled.Publish

/// Raised when the composition exhibits an error

[<CLIEvent>]

member x.Error = error.Publish

We have used [<CLIEvent>] attributes to mark these events for exposing to other .NET CLI languages. Since we are using Rx we need to have an event that inherits from System.EventArgs, JobCompletedEventArgs<T> does that here. The AsyncWorker is now ready to be used as a library for running parallel code.

Stock Quotes Reader

The Stock Quotes Reader defines a wrapper that does a request to the server (it would be yahoo finance here) and pull the stocks.

type StockAvailableEventArgs(stocks:string[]) =

inherit EventArgs()

member x.Stocks with get() = stocks

type StockQuotesReader(quotes:string) =

/// event raised for every job completed, this is easier to use in other CLI languages

let stockAvailableEvent = new Event<StockAvailableEventArgs>()

let httpLines (uri:string) =

async { let request = WebRequest.Create uri

use! response = request.AsyncGetResponse()

use stream = response.GetResponseStream()

use reader = new StreamReader(stream)

let lines = [ while not reader.EndOfStream do yield reader.ReadLine() ]

return lines }

// n – name, s – symbol, x – Stock Exchange, l1 – Last Trade, p2 – change in percent, h – high, l – low, o – open, p – previous close, v – volume

let yahooUri (quotes:string) =

let uri = String.Format("http://finance.yahoo.com/d/quotes.csv?s={0}&f=nsxl1hlopv", quotes)

uri

member x.GetStocks() =

let stocks = [httpLines(yahooUri quotes)]

stocks

member x.PullStocks() =

let stocks = x.GetStocks()

let worker = new AsyncWorker<_>(stocks)

worker.JobCompleted.Add(fun args ->

stockAvailableEvent.Trigger(new StockAvailableEventArgs(args.Result |> List.toArray))

)

worker.Start()

static member GetAsyncReader(quotes) =

let reader = new StockQuotesReader(quotes)

let stocks = reader.GetStocks()

let worker = new AsyncWorker<_>(stocks)

worker

[<CLIEvent>]

member x.StockAvailable = stockAvailableEvent.Publish

The above wrapper class does some interesting things,

· It has a async block code that yield returns a line of data based on the response stream.

· PullStocks will create async requests and raise the StockAvailable event whenever the async job is completed.

CEP Client

On the CEP client we would be using the below things,

· Syncfusion WPF GridDataControl – it works well with high speed data changes, keeping minimal CPU usage.

· Rx to create requests and update the ViewModel bound to the Grid.

Setup the application

The WPF application uses simple MV-VM by defining a StocksViewModel to hold stock data. The Stocks collection is bound to the Syncfusion WPF GridDataControl.

<syncfusion:GridDataControl Grid.Row="0" x:Name="grid"

NotifyPropertyChanges="True"

AutoPopulateRelations="False"

Width="Auto"

AllowGroup="True" ShowGroupDropArea="True"

AutoFocusCurrentItem="False"

ItemsSource="{Binding Model.Stocks}"

AllowEdit="False"

IsGroupsExpanded="True"

ShowAddNewRow="False">

</syncfusion:GridDataControl>

Using Rx to create requests

This real-time application requires real-time data that will be pulled over the wire for every 500 milliseconds. We would be making use of the IObservable to create a streaming request and repeat that over a time delay,

var stockReader = new StockQuotesReader(“MSFT+GOOG+INTL”);

var stockFeeds = Observable.Defer(() =>

{

stockReader.PullStocks();

var evt = from e in Observable.FromEvent<StockAvailableEventArgs>(stockReader, "StockAvailable")

select new { Stocks = e.EventArgs.Stocks.ToStockQuotes() };

var delayedEvt = Observable.Return(evt).Delay(TimeSpan.FromMilliseconds(delay));

return delayedEvt;

}).Repeat();

We now have a streaming real-time stock data pulled asynchronously over the web and shown on the Syncfusion GridDataControl.

clip_image004

If you want to get hold of the sample, send me a request.

Advertisements

6 thoughts on “CEP client with WPF Grid Data Control

  1. Hi!

    Cool example! However saying this is an example using Microsoft CEP (i.e. StreamInsight) is not totally true is it?!

    As far as I can see you are using Reactive Extensions framework (Rx), to get the data, and you are not using any of the StreamInsight stuff.

    Anyway, a really, really cool demo (especially the F# stuff). Wouldn’t mind to get the code if possible.

    Niels

    • Fahad says:

      Hi Niels,
      Thanks! If you look at the docs for MS StreamInsight, it has 3 different variations of exposing things, it doesn’t exactly use the ToCEPStream to convert the Observable, but then this is only a sample :), So used Rx since it has debugging + awful lot of APIs to use from. I will mail you the sample!

      -Fahad

      • Sure, I know of the three different models of SI, and I also know that IObservable is one of them – I am using it myself in production.

        However, IObservable in SI and IObservable in Rx are two different beasts – i.e., they do not have the same implementation (or methods / properties for that matter). That was all I wanted to say.

        The sample is still extremely cool!! And thanks a lot for the sample code!!!

        Niels

  2. Fahad says:

    Well, I have a conspiracy theory that Rx evolved out of SI or vice-versa :-), Erik Mijer team actually is a part of SQL team, thats what he says.

    Anyway, I read in the docs that SI’s IObservable was included in the CTP release because the .NET BCL was not having the required interfaces at that time, also Rx was not released too, I guess when the final release is out they will have it linked with .NET 4.0 and use Rx extensions to provide the extra power. May be I guessed a bit too much :)!

    Thanks for your inputs on the samples!

  3. […] Uncategorized on May 26, 2010 by Fahad My previous post had implementation of Rx + F# + Syncfusion WPF GridDataControl. This post is a continuation with a […]

  4. […] with my previous posts (long back, can’t find enough time these days), I ported the Stock ticker code base to WP7. The […]

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: