Sometimes I just want to quickly try out a technical idea and hate having to go through the process of building entire solutions (I’m looking at you, BizTalk). Up until now, StreamInsight has also fallen into that category. For a new product, that’s a dicey place to be. Ideally, we should be able to try out a product, execute a scenario, and make a quick assessment. For StreamInsight, this is now possible through the use of LINQPad. This post will walk you through the very easy steps for getting components installed and using a variety of data sources to test StreamInsight queries. As a bonus, I’ll also show you how to consume an OData feed and execute StreamInsight LINQ queries against it.
Step 1: Install StreamInsight 1.1
You need the second release of StreamInsight in order to use the LINQPad integration. Grab the small installation bits for StreamInsight 1.1 from the Microsoft Download Center. If you want to run an evaluation version, you can. If you want to keep it around for a while, use a SQL Server 2008 R2 license key (found in the SQL Server installation media at x86\DefaultSetup.ini).
Step 2: Install LINQPad 4.0
You can run either a free version of LINQPad (download LINQPad here) or purchase a version that has built-in Intellisense.
Step 3: Add the LINQPad drivers for StreamInsight
When you launch LINQPad, you see an option to add a connection.
You’ll see a number of built-in drivers for LINQ-to-SQL and OData.
Click the View more drivers … button and you’ll see the new StreamInsight driver created by Microsoft.
The driver installs in about 200 milliseconds and then you’ll see it show up in the list of LINQPad drivers.
Step 4: Create new connection with the StreamInsight driver
Now we select that driver (if the window is still open, if not, back in LINQPad choose to Add connection) and click the Next button on the Choose Data Context wizard page. At this point, we are prompted with a StreamInsight Context Chooser window where we can select from either data sets provided by Microsoft, or a new context. I’ll pick the Default Context right now.
Step 5: Write a simple query and test it
At this point, we have a connection to the default StreamInsight context. Make sure to flip the query’s Language value C# Statements and the Database to StreamInsight: Default Context.
This default context doesn’t have an input data source, so we can create a simple collection of point events to turn into a stream for processing. Our first query retrieves all events where the Count is greater than four.
//define event collection var source = new[] { PointEvent.CreateInsert(new DateTime(2010, 12, 1), new { ID = "ABC", Type="Customer", Count=4 }), PointEvent.CreateInsert(new DateTime(2010, 12, 2), new { ID = "DEF", Type="Customer", Count=9 }), PointEvent.CreateInsert(new DateTime(2010, 12, 3), new { ID = "GHI", Type="Partner", Count=5 }) }; //convert to stream var input = source.ToStream(Application,AdvanceTimeSettings.IncreasingStartTime); var largeCount = from i in input where i.Count > 4 select i; //emit results to LINQPad largeCount.Dump();
That query results in the output below. Notice that only two of the records are emitted.
To flex a bit more StreamInsight capability, I’ve created another query that creates a snapshot window over the three events (switched to the same day so as to have all point events in a single snapshot) and sum up the Count value per Type.
var source = new[] { PointEvent.CreateInsert(new DateTime(2010, 12, 1), new { ID = "ABC", Type="Customer", Count=4 }), PointEvent.CreateInsert(new DateTime(2010, 12, 1), new { ID = "DEF", Type="Customer", Count=9 }), PointEvent.CreateInsert(new DateTime(2010, 12, 1), new { ID = "GHI", Type="Partner", Count=5 }) }; var input = source.ToStream(Application,AdvanceTimeSettings.IncreasingStartTime); var custSum = from i in input group i by i.Type into TypeGroups from window in TypeGroups.SnapshotWindow(SnapshotWindowOutputPolicy.Clip) select new { Type = TypeGroups.Key, TypeSum = window.Sum(e => e.Count) }; custSum.Dump();
This query also results in two messages but notice that the new TypeSum value is an aggregation of all events with a matching Type.
In five steps (and hopefully about 8 minutes of your time), we got all the local components we needed and successfully tested a couple StreamInsight queries.
I could end with that, but hey, let’s try something more interesting. What if we want to use an existing OData source and run a query over that? Here are three additional bonus steps that let us flex LINQPad and StreamInsight a bit further.
Bonus Step #6: Create OData connection to Northwind items
Click the Add connection button in LINQPad and choose the WCF Data Services (OData) driver. Select OData as the provider, and put the Northwind OData feed (http://services.odata.org/Northwind/Northwind.svc) in the URI box and click OK. In LINQPad you’ll see all the entities that the Northwind OData feed exposes.
Let’s now execute a very simple query. This query looks through all Employee records and emits the employee ID, hire date and country for each employee.
var emps = from e in Employees orderby e.HireDate ascending select new { HireDate = (DateTime)e.HireDate, EmpId = e.EmployeeID, Country = e.Country }; emps.Dump();
The output of this service looks like this:
Bonus Step #7: Add ability to do StreamInsight queries over Northwind data
What if we want to look at employee hires by country over a specific window of time? We could do this doing a straight LINQ query, but where’s the fun in that? In seriousness, you can imagine some interesting uses of real-time analytics of employee data, but I’m not focusing on that here.
LINQPad only allows one data context at a time, so in order to use both the OData feed AND StreamInsight queries, we have to do a bit of a workaround. The spectacular Mark Simms has written an in depth post explaining this. I’ll do the short version here.
Right-click the LINQPad query tab that has the OData query and choose Query Properties. We need to add additional references to the StreamInsight dlls. Click Add on the Additional References tab and find/select the Microsoft.ComplexEventProcessing.dll and Microsoft.ComplexEventProcessing.Observable.dll (if you can’t see them, make sure to check the Show GAC Assemblies box).
Switch over to the Additional Namespace Imports tab and hand-enter the namespaces we need for our query.
Now we’re ready to build a query that leverages StreamInsight LINQ constructs against the OData source.
Bonus Step #8: Write StreamInsight query against Northwind data
I went ahead and “cloned” the previous query to start fresh but still copy the references and imports that we previously defined.
Below the previous query, I instantiated a StreamInsight “server” object to host our query. Then I defined a StreamInsight application that contains the query. Next up, I converted the OData results into a CEP stream. After that, I created a StreamInsight query that leverages a Tumbling Window that emits a count of hires by country for each 60 day window. Finally, I spit out the results to LINQPad.
var emps = from e in Employees orderby e.HireDate ascending select new { HireDate = (DateTime)e.HireDate, EmpId = e.EmployeeID, Country = e.Country }; //define StreamInsight server using (Server siServer = Server.Create("RSEROTERv2")) { //create StreamInsight app Application empApp = siServer.CreateApplication("demo"); //map odata query to the StreamInsight input stream var empStream = emps.ToPointStream(empApp, s => PointEvent.CreateInsert(s.HireDate, s), AdvanceTimeSettings.IncreasingStartTime); var counts = from f in empStream group f by f.Country into CountryGroup from win in CountryGroup.TumblingWindow(TimeSpan.FromDays(60), HoppingWindowOutputPolicy.ClipToWindowEnd) select new { EmpCountry = CountryGroup.Key, Count = win.Count() }; //turn results into enumerable var sink = from g in counts.ToPointEnumerable() where g.EventKind == EventKind.Insert select new { WinStart = g.StartTime, Country = g.Payload.EmpCountry, Count = g.Payload.Count}; sink.Dump(); }
The output of the query looks like the image below.
Conclusion
There you have it. You can probably perform the first five steps in under 10 minutes, and these bonus steps in another 5 minutes. That’s a pretty fast, and low investment, way to get a taste for a powerful product.
Thanks Richard!
I’ve been struggling to wrap my mind around si (and rx) and this is a great help!
Tom