Microsoft StreamInsight is a powerful tool for event stream processing and monitoring complex events. To be sure, StreamInsight is not designed to be a message routing engine. It’s primarily a foundation for real-time business intelligence where events are run through (temporal) queries and new insight is discovered. To have a useful event-driven architecture (EDA), we want to be able to tap into the event cloud and siphon off the events flowing past. StreamInsight has an interesting decoupled model that lets us publish events and have any number of targets tap into that event stream and do something else with it. In this blog post, I’ll investigate this idea further.
What’s the Goal?
Todd Biske has recently written about the challenges of implementing an EDA and mentions the challenge of getting applications to share events. That particular problem will be the topic of future posts and demonstrations. Here, I’m looking to address the problem of how multiple applications receive events disseminated by other applications/devices/people/etc. I want to publish a stream of events and let multiple different event targets operate against it.
For instance, I may publish an event to StreamInsight whenever a customer makes a website or call center inquiry about a product. I can then produce two distinct standing queries which operate on that raw event stream and could emit a subset of events to an interested system, correlate these events with an additional event stream, or include the events in a temporal aggregation. In this way, I have a published event stream and allow multiple additional queries to execute against it.
Setting It Up
In reality, this is quite straightforward and utilizes standard StreamInsight behavior. In this example, I have an input adapter that receives events from my call center. In reality, this adapter just generates random call center event every so often and feeds them into the StreamInsight engine. I also chose to use the Standalone StreamInsight server (vs. embedded one) so that my queries are owned by a centrally managed service.
Let’s see some code. First off, I connect to my standalone instance of StreamInsight.
using (Server server = Server.Connect(new System.ServiceModel.EndpointAddress(@"http://SERVER:80/StreamInsight/RSEROTERv2"))) { }
Next up, I create an Application to host my StreamInsight queries.
Application myApp; //create new application on the server myApp = server.CreateApplication("CallCenterEvents");
After this, I create an input stream from my “Call Center” adapter.
var inputStream = CepStream.Create("input", typeof(CallCenterAdapterFactory), config, EventShape.Point);
At this point, I can write a very simple LINQ statement that emits every event from the stream. As this is the initial query on the adapter, I’m not filtering or aggregating content in case a downstream event consumer wants to start with the raw stream.
var allEvents = from ev in inputStream select ev;
I can now turn this statement into a standing query to deploy to the server. However, notice that this query does NOT have an output adapter assigned to it. Rather, I’m emitting events into the ether. If no one is pulling the events off of this query, they simply get dropped. This differs from a BizTalk Server model where any message in the bus that doesn’t find a subscriber will throw an error.
var allQueryUnbound = allEvents.ToQuery(myApp, "All Events", string.Empty, EventShape.Point, StreamEventOrder.FullyOrdered);
Since I’m using the standalone StreamInsight model, I don’t even have to start the query at this point. Just running this code deploys the (stopped) query to StreamInsight.
I can go ahead and start this query, and it runs perfectly fine. At this point however, I have no listeners on this event stream, and the events just fall out.
Let’s go ahead and consume this stream from two different StreamInsight queries. In a different .NET application (signifying a different event consumer coming online at a later date), I connect to my StreamInsight standalone instance.
using (Server server = Server.Connect(new System.ServiceModel.EndpointAddress(@"http://SERVER:80/StreamInsight/RSEROTERv2"))) {}
I acquire a reference to my application container, and then pull out the specific query that I’m interested in tapping into and convert it to a usable event stream.
var myApp = server.Applications["CallCenterEvents"]; var allEventsQuery = myApp.Queries["All Events"]; var allEventsStream = allEventsQuery.ToStream();
Next, I write a LINQ query that takes the output of that stream and applies a filter to it. In this case, I only want the call center events that are related to a customer complaint.
var complaints = from e in allEventsStream where e.RequestType == "Customer Complaint" select e;
Finally, I build up a StreamInsight query and use the sample “Tracer” adapter to write the output to a file.
var complaintsQuery = complaints.ToQuery( "Filtered Query", string.Empty, typeof(TracerFactory), new TracerConfig { DisplayCtiEvents = false, SingleLine = true, TraceName = @"C:\TEMP\Output_ComplaintsOnly.txt", TracerKind = TracerKind.File }, EventShape.Point, StreamEventOrder.FullyOrdered);
After building and deploying this, I added one more separate Visual Studio.NET project signifying yet another event target. This code is similar to the previous, except that THIS query does an aggregation on the initial event stream. Here, I have a hopping window that builds up a count of events over 30 second intervals and moves along the timeline every 1 second.
var countByType = from ev in allEventsStream group ev by ev.RequestType into typeGroup from win in typeGroup.HoppingWindow( TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(1), HoppingWindowOutputPolicy.ClipToWindowEnd) select new CallEventSummary { RequestType = typeGroup.Key, TotalRequests = win.Count() };
With all of my queries deployed, I can now see three total queries in my StreamInsight application.
I started the “All Events” query which creates the connection to my source adapter and starts processing events. Next, I started my “Filtered Query” which taps into the first event stream and discards any events that aren’t customer complaints. Finally, I started the “Rolling Count Query” which also listens on the first event stream and does some temporal aggregations. Now, with the queries started, I can view the “Published Streams” and see two “subscribers” on the initial stream.
On the file system, I have two files created by the two event targets. The first contains all the individual events for customer complaints. The second contains counts of event types.
Conclusion
If we are going to get a lot of traction evangelizing an event driven architecture, we have to make the event stream easy to tap into. Microsoft StreamInsight has a pretty clean model for chaining together or aggregating event streams and hopefully the tooling will get better for managing and discovering these streams.
One thought