5 Quick Steps For Trying Out StreamInsight with LINQPad

Sometimes I just want to quickly try out a technical idea and hate having to go through the process of building entire solutions (I’m looking at you, BizTalk).  Up until now, StreamInsight has also fallen into that category.  For a new product, that’s a dicey place to be.  Ideally, we should be able to try out a product, execute a scenario, and make a quick assessment.  For StreamInsight, this is now possible through the use of LINQPad.  This post will walk you through the very easy steps for getting components installed and using a variety of data sources to test StreamInsight queries.  As a bonus, I’ll also show you how to consume an OData feed and execute StreamInsight LINQ queries against it.

Step 1: Install StreamInsight 1.1

   You need the second release of StreamInsight in order to use the LINQPad integration.  Grab the small installation bits for StreamInsight 1.1 from the Microsoft Download Center.  If you want to run an evaluation version, you can.  If you want to keep it around for a while, use a SQL Server 2008 R2 license key (found in the SQL Server installation media at x86\DefaultSetup.ini).

Step 2: Install LINQPad 4.0

  You can run either a free version of LINQPad (download LINQPad here) or purchase a version that has built-in Intellisense. 

Step 3:  Add the LINQPad drivers for StreamInsight

When you launch LINQPad, you see an option to add a connection.

2010.12.22si01

You’ll see a number of built-in drivers for LINQ-to-SQL and OData.

2010.12.22si02

Click the View more drivers … button and you’ll see the new StreamInsight driver created by Microsoft.

2010.12.22si03

The driver installs in about 200 milliseconds and then you’ll see it show up in the list of LINQPad drivers.

2010.12.22si04

Step 4: Create new connection with the StreamInsight driver

Now we select that driver (if the window is still open, if not, back in LINQPad choose to Add connection) and click the Next button on the Choose Data Context wizard page.  At this point, we are prompted with a StreamInsight Context Chooser window where we can select from either data sets provided by Microsoft, or a new context.  I’ll pick the Default Context right now.

2010.12.22si05

Step 5: Write a simple query and test it

At this point, we have a connection to the default StreamInsight context.  Make sure to flip the query’s Language value C# Statements and the Database to StreamInsight: Default Context.

This default context doesn’t have an input data source, so we can create a simple collection of point events to turn into a stream for processing.  Our first query retrieves all events where the Count is greater than four. 

//define event collection
var source = new[]
{
   PointEvent.CreateInsert(new DateTime(2010, 12, 1), new { ID = "ABC", Type="Customer", Count=4 }),
   PointEvent.CreateInsert(new DateTime(2010, 12, 2), new { ID = "DEF", Type="Customer", Count=9 }),
   PointEvent.CreateInsert(new DateTime(2010, 12, 3), new { ID = "GHI", Type="Partner", Count=5 })
};

//convert to stream
var input = source.ToStream(Application,AdvanceTimeSettings.IncreasingStartTime);

var largeCount = from i in input
       where i.Count > 4
       select i;

//emit results to LINQPad
largeCount.Dump();

That query results in the output below.  Notice that only two of the records are emitted.

2010.12.22si07

To flex a bit more StreamInsight capability, I’ve created another query that creates a snapshot window over the three events (switched to the same day so as to have all point events in a single snapshot) and sum up the Count value per Type.

var source = new[]
{
   PointEvent.CreateInsert(new DateTime(2010, 12, 1), new { ID = "ABC", Type="Customer", Count=4 }),
   PointEvent.CreateInsert(new DateTime(2010, 12, 1), new { ID = "DEF", Type="Customer", Count=9 }),
   PointEvent.CreateInsert(new DateTime(2010, 12, 1), new { ID = "GHI", Type="Partner", Count=5 })
};

var input = source.ToStream(Application,AdvanceTimeSettings.IncreasingStartTime);

var custSum = from i in input
          group i by i.Type into TypeGroups
          from window in TypeGroups.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
          select new { Type = TypeGroups.Key, TypeSum = window.Sum(e => e.Count) };

custSum.Dump();

This query also results in two messages but notice that the new TypeSum value is an aggregation of all events with a matching Type.

2010.12.22si08

In five steps (and hopefully about 8 minutes of your time), we got all the local components we needed and successfully tested a couple StreamInsight queries.

I could end with that, but hey, let’s try something more interesting.  What if we want to use an existing OData source and run a query over that?  Here are three additional bonus steps that let us flex LINQPad and StreamInsight a bit further.

Bonus Step #6: Create OData connection to Northwind items

Click the Add connection button in LINQPad and choose the WCF Data Services (OData)  driver.  Select OData as the provider, and put the Northwind OData feed (http://services.odata.org/Northwind/Northwind.svc) in the URI box and click OK. In LINQPad you’ll see all the entities that the Northwind OData feed exposes.

2010.12.22si09

Let’s now execute a very simple query.  This query looks through all Employee records and emits the employee ID, hire date and country for each employee.

var emps = from e in Employees
	     orderby e.HireDate ascending
	     select new {
	        HireDate = (DateTime)e.HireDate,
	        EmpId = e.EmployeeID,
	        Country = e.Country
	};

 emps.Dump();

The output of this service looks like this:

2010.12.22si12

Bonus Step #7: Add ability to do StreamInsight queries over Northwind data

What if we want to look at employee hires by country over a specific window of time?  We could do this doing a straight LINQ query, but where’s the fun in that?  In seriousness, you can imagine some interesting uses of real-time analytics of employee data, but I’m not focusing on that here.

LINQPad only allows one data context at a time, so in order to use both the OData feed AND StreamInsight queries, we have to do a bit of a workaround.  The spectacular Mark Simms has written an in depth post explaining this.  I’ll do the short version here.

Right-click the LINQPad query tab that has the OData query and choose Query Properties.  We need to add additional references to the StreamInsight dlls.  Click Add on the Additional References tab and find/select the Microsoft.ComplexEventProcessing.dll and Microsoft.ComplexEventProcessing.Observable.dll (if you can’t see them, make sure to check the Show GAC Assemblies box).

2010.12.22si10

Switch over to the Additional Namespace Imports tab and hand-enter the namespaces we need for our query.

2010.12.22si11

Now we’re ready to build a query that leverages StreamInsight LINQ constructs against the OData source.

Bonus Step #8: Write StreamInsight query against Northwind data

I went ahead and “cloned” the previous query to start fresh but still copy the references and imports that we previously defined. 

Below the previous query, I instantiated a StreamInsight “server” object to host our query.  Then I defined a StreamInsight application that contains the query.  Next up, I converted the OData results into a CEP stream.  After that, I created a StreamInsight query that leverages a Tumbling Window that emits a count of hires by country for each 60 day window.  Finally, I spit out the results to LINQPad.

var emps = from e in Employees
    	     orderby e.HireDate ascending
	     select new {
		HireDate = (DateTime)e.HireDate,
		EmpId = e.EmployeeID,
		Country = e.Country
	     };

//define StreamInsight server
using (Server siServer = Server.Create("RSEROTERv2"))
{
	//create StreamInsight app
	Application empApp = siServer.CreateApplication("demo");

	//map odata query to the StreamInsight input stream
	var empStream = emps.ToPointStream(empApp, s => PointEvent.CreateInsert(s.HireDate, s), AdvanceTimeSettings.IncreasingStartTime);

	var counts = from f in empStream
	     	       group f by f.Country into CountryGroup
		       from win in CountryGroup.TumblingWindow(TimeSpan.FromDays(60), HoppingWindowOutputPolicy.ClipToWindowEnd)
		       select new { EmpCountry = CountryGroup.Key, Count = win.Count() };

	//turn results into enumerable
	var sink  = from g in counts.ToPointEnumerable()
		     where g.EventKind == EventKind.Insert
		     select new { WinStart = g.StartTime, Country = g.Payload.EmpCountry, Count = g.Payload.Count};

	sink.Dump();
}

The output of the query looks like the image below.

Conclusion

There you have it.  You can probably perform the first five steps in under 10 minutes, and these bonus steps in another 5 minutes.  That’s a pretty fast, and low investment, way to get a taste for a powerful product.

Author: Richard Seroter

Richard Seroter is Director of Developer Relations and Outbound Product Management at Google Cloud. He’s also an instructor at Pluralsight, a frequent public speaker, the author of multiple books on software design and development, and a former InfoQ.com editor plus former 12-time Microsoft MVP for cloud. As Director of Developer Relations and Outbound Product Management, Richard leads an organization of Google Cloud developer advocates, engineers, platform builders, and outbound product managers that help customers find success in their cloud journey. Richard maintains a regularly updated blog on topics of architecture and solution design and can be found on Twitter as @rseroter.

2 thoughts

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.