Using StreamInsight 2.1 and IEnumerable to Process SQL Server Data

The StreamInsight team recently released a major new version of their Complex Event Processing engine and I’ve finally gotten a chance to start playing around with it. StreamInsight 2.1 introduced a new programming model that elevated the importance of IEnumerable/IObservable as event sources/sinks and deprioritized the traditional adapter model. In order to truly replace the adapter model with IEnumerable/IObservable objects, we need to prove that we can do equal interactions with sources/sinks. My first test of this is what inspired this post. In this post, I’m going to try and retrieve data (events) stored in a Microsoft SQL Server database.

Before we get started, I’ll let you know that my complete Visual Studio project is available in my Github. Feel free to browse it, fork it, suggest changes or make fun of it.

The first thing I have is a SQL Server database. Let’s assume that server logs are loaded into this database and analyzed at a later time. For each log event, I store an ID, server name, event level (“Information”, “Warning”, “Error”) and the timestamp.

2012.08.03si01

Fortunately for us, the .NET framework makes it relatively easy to get an IEnumerable from a SQL Server result set. In order to write a good LINQ query, I also wanted the results to be in a strongly typed collection. So, I took advantage of the useful Translate operation that comes with the LINQ DataContext class. First, I defined a class that mapped to the database table.

public class ServerEvent
    {
        public int Id { get; set; }
        public string ServerName { get; set; }
        public string Level { get; set; }
        public DateTime Timestamp { get; set; }
    }

In the method defined below (“GetEvents()”), I connect to my database, execute a command, and return a strongly typed IEnumerable collection.

private static IEnumerable<ServerEvent> GetEvents()
{
   //define connection string
   string connString = "Data Source=.;Initial Catalog=DemoDb;Integrated Security=SSPI;";

   //create enumerable to hold results
   IEnumerable<ServerEvent> result;

   //define dataconext object which is used later for translating results to objects
   DataContext dc = new DataContext(connString);
            
   //initiate and open connection
   conn = (SqlConnection)dc.Connection;
   conn.Open();

   //return all events stored in the SQL Server table
   SqlCommand command = new SqlCommand("select ID, ServerName, Level, Timestamp From ServerEvent", conn);
            
   //get the database results and set the connection to close after results are read
   SqlDataReader dataReader = command.ExecuteReader(System.Data.CommandBehavior.CloseConnection);

   //use "translate" to flip the reader stream to an Enumerable of my custom object type
   result = dc.Translate<ServerEvent>(dataReader);
            
   return result;
}

Now let’s take a peek at the StreamInsight code. After creating an embedded server and application (see the Github code for the full source), I instantiated my event source. This command is new in StreamInsight 2.1, and basically, I’m defining a point stream that invokes my “GetEvents()” method and treats each IEnumerable entry as a new point event (“CreateInsert”) with a timestamp derived from the data itself.

//define the (point event) source by creating an enumerable from the GetEvents operation
 var source = app.DefineEnumerable<ServerEvent>(() => GetEvents()).
         ToPointStreamable<ServerEvent, ServerEvent>(
               e => PointEvent.CreateInsert<ServerEvent>(e.Timestamp, e), 
               AdvanceTimeSettings.StrictlyIncreasingStartTime);

After that, I defined my initial query. This is nothing more than a passthrough query, and doesn’t highlight anything unique to StreamInsight. Baby steps first!

//write LINQ query against event stream
 var query = from ev in source
                    select ev;

Next, I have my event sink, or output. This uses an IObservable which writes the output event to a Console window.

//create observer as sink and write results to console
 var sink = app.DefineObserver(() =>
                   Observer.Create<ServerEvent>(x => Console.WriteLine(x.ServerName + ": " + x.Level)));

Finally, I bind the query to the sink and run it.

//bind the query to the sink
using (IDisposable proc = query.Bind<ServerEvent>(sink).Run("MyProcess"))
{
       Console.WriteLine("Press [Enter] to close the application.");
       Console.ReadLine();
}

When I run the application, I can see each event printed out.

2012.08.03si02

Let’s try something more complicated. Let’s skip to a query that uses both groups and windows and highlights the value of using StreamInsight to process this data. In this 3rd query (you can view the 2nd one in the source code), I group each event based on their event level (e.g. “Error”) and create three minute event windows. The result of this should be a breakdown of each type of event level and a count of occurrences during a given window.

var query3 = from ev in source
                     group ev by ev.Level into levelgroup
                     from win in levelgroup.TumblingWindow(TimeSpan.FromMinutes(3))
                     select new EventSummary
                     {
                           EventCount = win.Count(),
                           EventMessage = levelgroup.Key
                      };

When I run the application again, I see each grouping and count. Imagine using this data in real-time to detect a emerging trend and proactively prevent a widespread outage.

2012.08.03si03

I have a lot more to learn about how the new object model in StreamInsight 2.1 works, but it looks promising. I previously built a SQL Server StreamInsight adapter that polled a database (for more real-time results), and would love to figure out a way to make that happen with IObservable.

Download StreamInsight 2.1, take a walk through the new product team samples, and let me know if you come up with cool new ways to pull and push data into this engine!

Author: Richard Seroter

Richard Seroter is Director of Developer Relations and Outbound Product Management at Google Cloud. He’s also an instructor at Pluralsight, a frequent public speaker, the author of multiple books on software design and development, and a former InfoQ.com editor plus former 12-time Microsoft MVP for cloud. As Director of Developer Relations and Outbound Product Management, Richard leads an organization of Google Cloud developer advocates, engineers, platform builders, and outbound product managers that help customers find success in their cloud journey. Richard maintains a regularly updated blog on topics of architecture and solution design and can be found on Twitter as @rseroter.

18 thoughts

  1. So this is going to get the data in one big gulp – in previous versions of SI, I had an input adapter that had a loop and called into SQL every minute for more data (simply doing a WHERE timestamp > the-last-time-I-called). The adapter injected the events.

    Clearly that won’t happen here, since in this new model, you return an enumerable.

    So what’s the canonical way to say, “watch this table on a SQL server and stream in new rows as they’re added?”

  2. Well, I found one way to do it as somewhat of a hybrid between the old way and the new way.

    If you look at the 2.1 examples, there’s a solution called ComposingQueriesNoAdapter. I copied the classes in generator.cs, broke out the PointGenerator class and removed its templated type to use a strong type corresponding to the output of my SQL query.

    I then re-wrote the code in PopulateEvent() to do my SQL call to return a single point event.

    Since this point generator can be called with a time interval (see the base event generation class in generator.cs), this means my SQL call is triggered over and over.

    This is, as I said, as a single event. So it’s not as elegant as doing it with IEnumerable, which would give a batch of events, but then I wonder if that’s such a good thing. Consider, if you use IEnumerable and your first call returns 10,000 rows, what happens if you’re not done processing them before the next timed call?

    Using IObservable and doing one event at a time, you don’t run that risk.

    Alas, I’m not conversant enough yet to know if this presents a temporal problem (single events when one does take too long – the next one will fire and be processed, but not in order. Does that matter? Or does the time stamp on them make that okay?)

    Fun, huh?

    1. @Christopher: I’m trying to implement what you’re explaining here because I’m in need of some code that polls the database every few minutes but I don’t completely understand what you’re saying.
      Do you have some sample code that you could share?
      Thanks in advance!

  3. Following up on Christopher’s first post; is there a way to stream in data that is flowing to sql server database table? Thank you

    1. Hi Tamta. I haven’t tried yet. There’s the option to use the classic adapter model and poll a database table, wire up an IObservable that pulls changes, or figure out a way to push the data out of SQL Server to a listening endpoint (e.g. triggers).

  4. Surely one would use SQL Server’s Service Broker or Notification to push new rows to the application, then allow Streaminsight to process those messages as it will? This will avoid polling, which can be wasteful, and allows real-time reaction to events if this is what is required.

  5. tried to work on the below scenario for polling a database and get the results in ServerEvent TPayload- /* Replace the Observable.Interval with your logic to poll the database and
    convert the messages to instances of TPayload. It just needs to be a class
    that implements the IObservable interface. */
    created a ServerEvent Observable like below

    IObservable sr = new ServerEventObservable(“Data Source=.;Initial Catalog=SqlOutputTest;Integrated Security=SSPI;”, new TimeSpan(0, 0, 1));

    var observable = app.DefineObservable(() => sr);

    however it gives the following error at query.Bind(observer).Run();:
    Cannot serialize value of type ‘System.IObservable`1[Seroter.StreamInsight.SqlEnumerable.ServerEvent]’.

    can you please help or am I missing something?

  6. Thanks, Rich. This is a very good article & worked for me in the 1st run.

    I am wondering how to get fresh data from SQL using StreamInsight (basically, poll the SQL Server Data Source).

    I have a table which gets updated at the rate of 300 TPS (Transactions Per Second). So, how do I retrieve this data, transform it using SI queries & push it to a destination (maybe MQ or another DB) as & when it is available with near-zero latency?

    1. Good question. I honestly haven’t found a great way yet. Ideally you get an IObservable hooked up so that it polls/pushes data into the StreamInsight engine. Obviously you need some way to quickly push data out of your database or poll aggressively!

      1. Thanks for your reply, Richard. I was wondering if it is possible to open a dedicated stream between SQL and SI (SI running as a Windows Service) & there being another Windows Service (which can be setup as a client to the 1st Service)?
        So, the 1st service (SI server service) will push data to the 2nd service (SI client service) when it is available.

        Polling is something which we plan to consider as a “last resort”. When you say push data out of SQL DB, are you suggesting me to use either Replication or a Trigger to fulfill the purpose? Isn’t there a way to use SI to do this for me?

        Thanks.

        1. Hi Venkat. There’s nothing native in SI that would do this for you. Your proposed idea (having a service that receives data from SQL Server and then pushes to SI) is probably the best one. The question is how to setup the first service. You might want to look at something like Service Broker (external activation) to send notifications out.

  7. Hi,

    I’m a neewbie and while running the code i am getting an error which says -‘Microsoft.ComplexEventProcessing.Application’ does not contain a definition for ‘DefineEnumerable’ and no extension method ‘DefineEnumerable’ accepting a first argument of type ‘Microsoft.ComplexEventProcessing.Application’ could be found (are you missing a using directive or an assembly reference?).
    Have added the references for streaminsight, what could i be doing wrong.

  8. AFAIK External Activation in Service Broker implemented as .EXE application (not as a Windows service) with a quite challenging configuration setup. Unfortunately pushing data from RDBMS still required a BLACK BELT level of skills and remains a “Holy Grail” of Event-Driven Architecture. As soon as it done, converting SSSB messages to Stream events seems to be less sophisticated exercise.

  9. Richard, is any chance you could provide an example with DB polling for new data and how to add those values into Event source. Still a mystery for me!

Leave a Reply to frigate Cancel reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.