Category Archives: 1

CEP client with WPF Grid Data Control

CEP Client with WPF GridDataControl

Microsoft StreamInsight CEP platform is very powerful to develop Complex Event Processing (CEP) systems. There are several development models that we can follow. In this post we will use the IObservable / IObserver model using .NET Reactive extensions (Rx). Since this would be a real-time application, we would also be using F# async workflows to pull stock data.

image

F# async workflows are the coolest part of using F# in a real-time application. It allows writing concise code that

· Executes in parallel

· Expose to another .NET library with ease

I won’t go in detail about F# except for the async workflow used in this application. There is a three part series on using design patterns for F# async workflows, I have used the Pattern #3 in this post, since we are using Rx to invoke the workflows. In this design pattern, the worker reports the progress through events, modified version of AsyncWorker<> code is shown below,

type JobCompletedEventArgs<‘T>(job:int, result:’T) =

inherit EventArgs()

member x.Job with get() = job

member x.Result with get() = result

type AsyncWorker<‘T>(jobs: seq<Async<‘T>>) =

// This declares an F# event that we can raise

let allCompleted = new Event<‘T[]>()

let error = new Event<System.Exception>()

let canceled = new Event<System.OperationCanceledException>()

let jobCompleted = new Event<JobCompletedEventArgs<‘T>>()

let cancellationCapability = new CancellationTokenSource()

/// Start an instance of the work

member x.Start() =

// Capture the synchronization context to allow us to raise events back on the GUI thread

let syncContext = SynchronizationContext.CaptureCurrent()

// Mark up the jobs with numbers

let jobs = jobs |> Seq.mapi (fun i job -> (job, i+1))

let raiseEventOnGuiThread(evt, args) = syncContext.RaiseEvent evt args

let work =

Async.Parallel

[ for (job,jobNumber) in jobs ->

async { let! result = job

syncContext.RaiseEvent jobCompleted (new JobCompletedEventArgs<‘T>(jobNumber, result))

return result } ]

Async.StartWithContinuations

( work,

(fun res -> raiseEventOnGuiThread(allCompleted, res)),

(fun exn -> raiseEventOnGuiThread(error, exn)),

(fun exn -> raiseEventOnGuiThread(canceled, exn)),

cancellationCapability.Token)

/// Raised when a particular job completes

[<CLIEvent>]

member x.JobCompleted = jobCompleted.Publish

/// Raised when all jobs complete

[<CLIEvent>]

member x.AllCompleted = allCompleted.Publish

/// Raised when the composition is cancelled successfully

[<CLIEvent>]

member x.Canceled = canceled.Publish

/// Raised when the composition exhibits an error

[<CLIEvent>]

member x.Error = error.Publish

We have used [<CLIEvent>] attributes to mark these events for exposing to other .NET CLI languages. Since we are using Rx we need to have an event that inherits from System.EventArgs, JobCompletedEventArgs<T> does that here. The AsyncWorker is now ready to be used as a library for running parallel code.

Stock Quotes Reader

The Stock Quotes Reader defines a wrapper that does a request to the server (it would be yahoo finance here) and pull the stocks.

type StockAvailableEventArgs(stocks:string[]) =

inherit EventArgs()

member x.Stocks with get() = stocks

type StockQuotesReader(quotes:string) =

/// event raised for every job completed, this is easier to use in other CLI languages

let stockAvailableEvent = new Event<StockAvailableEventArgs>()

let httpLines (uri:string) =

async { let request = WebRequest.Create uri

use! response = request.AsyncGetResponse()

use stream = response.GetResponseStream()

use reader = new StreamReader(stream)

let lines = [ while not reader.EndOfStream do yield reader.ReadLine() ]

return lines }

// n – name, s – symbol, x – Stock Exchange, l1 – Last Trade, p2 – change in percent, h – high, l – low, o – open, p – previous close, v – volume

let yahooUri (quotes:string) =

let uri = String.Format("http://finance.yahoo.com/d/quotes.csv?s={0}&f=nsxl1hlopv", quotes)

uri

member x.GetStocks() =

let stocks = [httpLines(yahooUri quotes)]

stocks

member x.PullStocks() =

let stocks = x.GetStocks()

let worker = new AsyncWorker<_>(stocks)

worker.JobCompleted.Add(fun args ->

stockAvailableEvent.Trigger(new StockAvailableEventArgs(args.Result |> List.toArray))

)

worker.Start()

static member GetAsyncReader(quotes) =

let reader = new StockQuotesReader(quotes)

let stocks = reader.GetStocks()

let worker = new AsyncWorker<_>(stocks)

worker

[<CLIEvent>]

member x.StockAvailable = stockAvailableEvent.Publish

The above wrapper class does some interesting things,

· It has a async block code that yield returns a line of data based on the response stream.

· PullStocks will create async requests and raise the StockAvailable event whenever the async job is completed.

CEP Client

On the CEP client we would be using the below things,

· Syncfusion WPF GridDataControl – it works well with high speed data changes, keeping minimal CPU usage.

· Rx to create requests and update the ViewModel bound to the Grid.

Setup the application

The WPF application uses simple MV-VM by defining a StocksViewModel to hold stock data. The Stocks collection is bound to the Syncfusion WPF GridDataControl.

<syncfusion:GridDataControl Grid.Row="0" x:Name="grid"

NotifyPropertyChanges="True"

AutoPopulateRelations="False"

Width="Auto"

AllowGroup="True" ShowGroupDropArea="True"

AutoFocusCurrentItem="False"

ItemsSource="{Binding Model.Stocks}"

AllowEdit="False"

IsGroupsExpanded="True"

ShowAddNewRow="False">

</syncfusion:GridDataControl>

Using Rx to create requests

This real-time application requires real-time data that will be pulled over the wire for every 500 milliseconds. We would be making use of the IObservable to create a streaming request and repeat that over a time delay,

var stockReader = new StockQuotesReader(“MSFT+GOOG+INTL”);

var stockFeeds = Observable.Defer(() =>

{

stockReader.PullStocks();

var evt = from e in Observable.FromEvent<StockAvailableEventArgs>(stockReader, "StockAvailable")

select new { Stocks = e.EventArgs.Stocks.ToStockQuotes() };

var delayedEvt = Observable.Return(evt).Delay(TimeSpan.FromMilliseconds(delay));

return delayedEvt;

}).Repeat();

We now have a streaming real-time stock data pulled asynchronously over the web and shown on the Syncfusion GridDataControl.

clip_image004

If you want to get hold of the sample, send me a request.

Enabling Blend (/ VS designer) to identify ItemContainerStyle for custom control in Silverlight 3.0

When designing custom controls we would want the users to customize the appearance of the items that is added up in some derived form of ItemsControl. Say we have a property “ItemContainerStyle”, we would want that to be customized in Blend as it does for ListBox,

img

Lets say, we have a property called “MyButtonStyle”. To enable this property to be recognized by the designer, we would have to implement the StyleTypedPropertyAttribute in the implemented class,

[StyleTypedProperty(Property = "MyButtonStyle", StyleTargetType = typeof(MyButton))]
public class MyButtonItems: ItemsControl

That’s it. Hope this helps.

-Fahad

Dynamic LINQ Expressions – I

Generating dynamic LINQ expressions needs a bit of understanding on the IQueryable interface. Check out Matt’s series of posts on implementing the IQueryable interface –> LINQ Links. The strong drive for this post is that everything is strongly typed, of course we have the Dynamic LINQ library that comes in MSDN sample, but that comes with an over head of generating strings to perform operations. In many cases, strongly typed usage will result in controlling the flow more accurately.

Lets take a look at the extensions library that will be used for generating Expressions on a dynamic data source ( that implements IQueryable / IEnumerable ). In this post, we will look at generating OrderBy() query with different parameters defined in the Queryable class. Normally, there are helper classes that has implementation for generating LINQ expressions for IEnumerable and IQueryable interfaces. The Queryable class defines a wrapper thru which we can make calls to the IQueryProvider implemented for that source type (IQuerable / IEnumerable).

With our implementation,

  • We would go with making our objects AsQueryable().
  • Call the appropriate method from the Queryable class that defines the method.

OrderBy() query –

The OrderBy query generates a simple sort comparer with the property name that is present on the underlying IQueryable source,

/// <summary>
/// IQueryable expression for sorting in ascending.
/// </summary>
/// <param name="source"></param>
/// <param name="propertyName"></param>
public static IQueryable OrderBy(this IQueryable source, string propertyName)
{
    var sourceType = source.GetObjectType();
    ParameterExpression paramExpression = Expression.Parameter(sourceType, sourceType.Name);
    MemberExpression memExp = Expression.PropertyOrField(paramExpression, propertyName);
    LambdaExpression lambda = Expression.Lambda(memExp, paramExpression);
    return source.Provider.CreateQuery(
        Expression.Call(
            typeof(Queryable),
            "OrderBy",
            new Type[] { source.ElementType, lambda.Body.Type },
            source.Expression,
            lambda));
}

Taking a closer look at the functions,

Expression.Parameter – Defines the parameter on which the object is referred. Normally we would assign some parameter for the source in our LINQ expressions, Say for Northwind Orders table,

var orders = Orders.OrderBy( order => order.ShipCountry );

Like so, we can dynamically generate the above expression as,

var ordersQuerable = orders.AsQueryable();

var sortedOrders = ordersQueryable.OrderBy(“ShipCountry”);

You may want to perform multi-sorting on the underlying IEnumerable type, the other OrderBy related functions that have similar implementation are,

  • OrderByDescending
  • ThenBy
  • ThenByDescending

For building a big expression dynamically, all you have to do is loop thru the collection of sorted columns, and recursively build the expression by calling the OrderBy / OrderByDescending / ThenBy / ThenByDescending in some logical sequence.

Note: OrderByDescending / ThenByDescending will be generated only for IOrderedQueryable instance, So you need to maintain some kind of an instance that holds IOrderedQueryable from the return set of OrderBy / ThenBy.

Helper method:

This gets the underlying object type for the IQueryable source.

private static Type GetObjectType(this IQueryable source)
{
    var enumerable = source.GetEnumerator();
    var hasItem = enumerable.MoveNext();
    if (hasItem)
    {
        return enumerable.Current.GetType();
    }
    return null;
}

In the next posts, we will check out generating WHERE expression dynamically.

Hope this helps.

– Fahad

 

LINQ Data Table Extensions

Legacy applications which is to be migrated to a new platform (WPF, Silverlight etc.,) requires leveraging the underlying business layer too. We have IQueryable interface implemented for DataTable in System.Data.DataSetExtensions. There are couple of extension providers present,

  • DataTableExtensions
  • EnumerableRowCollectionExtensions

Data Table Extensions

This provides a list of functions to query with the DataTable. Most of them are casting functions,

  • AsDataView<T>
    • Accepts a EnumerableRowCollection and returns a DataView.
  • AsDataView
    • Accepts a DataTable and returns a DataView.
  • AsEnumerable
    • Accepts a DataTable and returns a EnumerableRowCollection, which is used for querying with DataRow collection.

Enumerable Row Collection Extensions

The LINQ provider is implemented for the DataRowCollection that is present in the DataTable.Rows property. The following are the LINQ extensions you can work with,

  • Select
  • OrderBy
  • OrderByDescending
  • ThenBy
  • ThenByDescending
  • Where
  • Cast

Select

Use the Select extension to get the query the required fields from the DataTable.

private static void SelectDataTable()
        {
            var dt = GetOrdersDataTable();
            var orders = dt.AsEnumerable().Select(o =>
                new
                {
                    OrderID = o.Field<int>("OrderID"),
                    CustomerID = o.Field<string>("CustomerID"),
                    EmployeeID = o.Field<int>("EmployeeID"),
                    OrderDate = o.Field<DateTime>("OrderDate"),
                    ShipCountry = o.Field<string>("ShipCountry")
                });
            foreach (var order in orders)
            {
                Console.WriteLine(string.Format("OrderID : {0} / CustomerID : {1} / EmployeeID : {2} 
/ OrderDate : {3} / ShipCountry : {4}", order.OrderID, order.CustomerID, 
order.EmployeeID, order.OrderDate, order.ShipCountry));
            }
        }

The Field<T> returns a strongly typed value from the underlying DataTable. You can also use SetField<T> 
to set the field value thru a strongly typed object.

OrderBy / OrderByDescending / ThenBy / ThenByDescending

Sort operations can be performed by using the above mentioned functions. Check out the code below,

 

private static void SortDataTable()
{
    var dt = GetOrdersDataTable();
    var orders = dt.AsEnumerable().OrderBy(r => r.Field<string>("ShipCountry"));
    var result = orders.ThenBy(r => r.Field<string>("CustomerID")).Select(o =>
        new
        {
            OrderID = o.Field<int>("OrderID"),
            CustomerID = o.Field<string>("CustomerID"),
            EmployeeID = o.Field<int>("EmployeeID"),
            OrderDate = o.Field<DateTime>("OrderDate"),
            ShipCountry = o.Field<string>("ShipCountry")
        });
    foreach (var order in result)
    {
        Console.WriteLine(string.Format("OrderID : {0} / CustomerID : {1} / EmployeeID : {2} 
/ OrderDate : {3} / ShipCountry : {4}", order.OrderID, order.CustomerID, order.EmployeeID, 
order.OrderDate, order.ShipCountry));
    }
}

Where

Filter operations require a predicate match to be passed. Check out the code below,
private static void WhereOperation()
{
    var dt = GetOrdersDataTable();
    var filteredOrders = dt.AsEnumerable().Where(o => o.Field<string>("ShipCountry") == "Brazil")
.Select(o =>
        new
        {
            OrderID = o.Field<int>("OrderID"),
            CustomerID = o.Field<string>("CustomerID"),
            EmployeeID = o.Field<int>("EmployeeID"),
            OrderDate = o.Field<DateTime>("OrderDate"),
            ShipCountry = o.Field<string>("ShipCountry")
        });
    foreach (var order in filteredOrders)
    {
        Console.WriteLine(string.Format("OrderID : {0} / CustomerID : {1} / EmployeeID : {2} 
/ OrderDate : {3} / ShipCountry : {4}", order.OrderID, order.CustomerID, 
order.EmployeeID, order.OrderDate, order.ShipCountry));
    }
}
With these functions we can easily translate our code to more typed collection and work 
with the legacy DataTable objects.
Note: The API documentation suggests that these API’s are used internally in the .NET FW and 
not intended to be used in our code directly :).
Hope this helps.
-Fahad 

Detect Google Chrome in ASP.NET AJAX

Google launched a new fast browser recently (currently in beta) – Chrome. Getting in short, with ASP.NET we do have some functionality for detecting browsers using the Sys.Browser class.

Extend the prototype as follows,

//Google chrome check
Sys.Browser.Chrome = {};
if (navigator.userAgent.indexOf(' Chrome/') > -1) {
    Sys.Browser.agent = Sys.Browser.Chrome;
    Sys.Browser.version = parseFloat(navigator.userAgent.match(/ Chrome\/(\d+\.\d+)/)[1]);
    Sys.Browser.name = 'Chrome';
    Sys.Browser.hasDebuggerStatement = true;
}
Once you have this included in your script file and loaded inside the page, you can check it out as follows,
if(Sys.Browser.agent == Sys.Browser.Chrome){
//Write code for Chrome specific browser
}
Hope this helps!!
-Fahad