Sending StreamInsight Events to BizTalk Through New Web (SOAP/REST) Adapter

One StreamInsight usage scenario frequently discussed by the product team involves sending a subset of events (or aggregated complex events) to the Enterprise Service Bus for additional processing and distribution.  As I’ve mentioned before, StreamInsight doesn’t come with any out-of-the-box adapters.  So if you want to make this usage scenario a reality, it’s up to you to figure out how to do it.  In this post, I hope to give you a head start (and code) to making this happen.  I’ve built a StreamInsight web adapter which lets StreamInsight send either SOAP or REST-style messages to an endpoint. We can use this adapter to send messages to BizTalk, or any web endpoint.  Buckle up, this is a long one.

Designing the Adapter

In the StreamInsight SDK  you’ll find some solid examples of StreamInsight adapters that you can use as a template to build your own.  I’ve built a few so far myself and demonstrate how to build an MSMQ publication adapter in my new book.  But I hadn’t built a consumer adapter yet, so I had to think about the right design strategy.

The first design choice was whether to build a typed or untyped adapter.  While typed adapters are easier to craft since you are building to a known data payload, you don’t get any reuse out of the adapter.  So, the first (easy) decision was to build an untyped adapter that could send any payload to a web endpoint.

The second consideration was how to call the downstream web endpoint.  I decided to use the System.Net.HttpWebRequest object to publish the payload and not try to do an IOC pattern with proxy classes.  By using this mechanism, I can apply the same code to call a SOAP endpoint or invoke various HTTP verbs on a RESTful endpoint.

Finally, I had to decide how to actually convert the StreamInsight events to the expected XML payload of my web endpoints.  I figured that leveraging XSLT was a solid plan.  I can take the inbound event, and via a runtime configuration property, apply an XML transformation stylesheet to the event and produce output that my web endpoint requires.

Ok, with all of these considerations in place, let’s build the adapter.  Note that you are completely allowed to disagree with any of the choices above and modify my adapter to fit your needs.

Building the Adapter

First off, I built the adapter’s configuration object.  These are the settings that we apply at runtime when we bind a StreamInsight query to an adapter.  Consider this to be reference data that we don’t want to hardcode into our adapter.

public struct WebOutputConfig
    {
        public string XslPath { get; set; }
        public string ServiceAddress { get; set; }
        public string HttpMethod { get; set; }
        public string SoapAction { get; set; }
        public bool IsSoap { get; set; }
    }

Note that my configuration accepts the path to an XSLT file, the URL of the target service, the HTTP method to apply, and if we are calling a SOAP endpoint, what the SOAP Action value is.

Next I create my actual adapter class.  It inherits from the untyped PointOutputAdapter class.

public class WebPointOutput: PointOutputAdapter
    {
        //store reference to CEP event
        private CepEventType bindTimeEventType;
        private string serviceAddress;
        private string httpMethod;
        private string soapAction;
        private bool isSoap;
        private XslCompiledTransform consumerXform;

        public WebPointOutput(WebOutputConfig configInfo, CepEventType eventType)
        {
            this.bindTimeEventType = eventType;
            this.serviceAddress = configInfo.ServiceAddress;
            this.httpMethod = configInfo.HttpMethod;
            this.soapAction = configInfo.SoapAction;
            this.isSoap = configInfo.IsSoap;

            //load up transform
            consumerXform = new XslCompiledTransform(false);
            consumerXform.Load(configInfo.XslPath);
        }
  }

The adapter stores internal references to the configuration values it received and the constructor instantiates the XSL transformation object using the XSL path passed into the adapter.

Before writing the primary operation which calls the service, we need a helper function which takes the key/value pairs from the CEP event and creates a dictionary out of them.  We will later convert this dictionary into a generic XML structure that we’ll apply our XSLT against.

private Dictionary<string, string> GetCepEventFields(PointEvent currentEvent)
        {
            Dictionary<string, string> cepFields = new Dictionary<string, string>();

            for (int ordinal = 0; ordinal < bindTimeEventType.FieldsByOrdinal.Count; ordinal++)
            {
                CepEventTypeField evtField = bindTimeEventType.FieldsByOrdinal[ordinal];
                cepFields.Add(evtField.Name, currentEvent.GetField(ordinal).ToString());
            }
            return cepFields;
        }

See above that I loop through all the fields in the event and add each one (name and value) to a dictionary object.

Now we can build our primary function which takes the StreamInsight event and calls the web endpoint.  After the code snippet, I’ll comment on a few key points.

private void ConsumeEvents()
  {
      //create new point event
      PointEvent currentEvent = default(PointEvent);
      try
      {
          while (true)
          {
              if (AdapterState.Stopping == AdapterState)
              {
                 Stopped();
                 return;
              }

              if (DequeueOperationResult.Empty == Dequeue(out currentEvent))
             {
                 Ready();
                 return;
              }

             //only publish insert events and ignore CTIs
             if (currentEvent.EventKind == EventKind.Insert)
             {
                // ** begin service call
                //convert CEP message to XML for transformation
                XDocument intermediaryDoc = new XDocument(
                new XElement("Root",
                GetCepEventFields(currentEvent).Select(field => new XElement("Property",
                    new XElement("Name", field.Key),
                    new XElement("Value", field.Value)
                    ))));

                //transform CEP event fields to output format
                XDocument returnDoc = new XDocument();
                using (XmlWriter writer = returnDoc.CreateWriter())
                {
                  consumerXform.Transform(intermediaryDoc.CreateReader(), (XsltArgumentList)null, writer);
                }

                //call service
                HttpWebRequest req = (HttpWebRequest)HttpWebRequest.Create(serviceAddress);
                req.Method = httpMethod;
                req.ContentType = "text/xml";
                if (isSoap)
                    req.Headers.Add("SOAPAction", soapAction);

                using (Stream reqStream = req.GetRequestStream())
                {
                    var bytes = Encoding.UTF8.GetBytes(returnDoc.ToString());
                    reqStream.Write(bytes, 0, bytes.Length);
                    reqStream.Close();
                }

                var resp = (HttpWebResponse)req.GetResponse();
               }

             // Every received event needs to be released.
             ReleaseEvent(ref currentEvent);
         }
      }
      catch (AdapterException e)
      {
         System.IO.File.WriteAllText(
            @"C:\temp\" + System.Guid.NewGuid().ToString() + "_eventerror.txt", "Error: " + e.ToString());
       }
   }

First, notice that I do NOT emit CTI events.  Next see that I use a bit of LINQ to take the results of the event-to-dictionary conversion and create an XML document (XDocument) consisting of name/value pairs.  I then take this “intermediary XML” and pass it through an XslCompiledTransform using whichever XSLT was provided during adapter configuration.  The resulting XML is then streamed to the web endpoint via the HttpWebRequest object.  There are probably performance improvements that can be done here, but hey, it’s a proof-of-concept!

The final piece of this adapter is to fill in the required “start” and “resume” operations.

public override void Resume()
        {
            new Thread(this.ConsumeEvents).Start();
        }

        public override void Start()
        {
            new Thread(this.ConsumeEvents).Start();
        }

        protected override void Dispose(bool disposing)
        {
            base.Dispose(disposing);
        }

Finally, I have to create an adapter factory which spins up my adapter when the StreamInsight query gets started up.  Since we are using an untyped adapter, there isn’t any logic needed to pick the “right” output adapter.

public class WebOutputFactory : IOutputAdapterFactory<WebOutputConfig>
 {
     public OutputAdapterBase Create(WebOutputConfig configInfo, EventShape eventShape, CepEventType cepEventType)
     {
         OutputAdapterBase adapter = default(OutputAdapterBase);
         adapter = new WebPointOutput(configInfo, cepEventType);

         return adapter;
     }
 }

With that, we have a complete StreamInsight consumer adapter.

Using the Adapter

How can we use this fancy, new adapter?  In one scenario, we can use StreamInsight to process a high volume of events, filter out the “noise”, and amplify events of specific interest.  Or, we can empower StreamInsight to look for trends within the stream over a particular time duration and share these complex events whenever one is encountered.

2010.07.08StreaminsightBts03

For this post, I’ll show the latter example.  I have a StreamInsight application which generates call center events every half second and sends them to an embedded StreamInsight server.   I do some aggregation over a window of time and if a complex event is detected, the web adapter is called and BizTalk receives the message for further processing.  Note that nothing prevents me from substituting WCF Services or Azure-based services for BizTalk in this case.  Well, except for security which I have NOT added to my adapter.  Didn’t figure out a clean way to store and send credentials yet.

BizTalk Setup

Let’s set up the BizTalk application that StreamInsight will publish to.  First I created a simple schema that represents the event data I want BizTalk to receive.

2010.07.08StreaminsightBts02

In real life I’d add an orchestration or two to process the event data, but this post is already ginormous and you all get the point.  So, let’s jump right to exposing this schema as part of a BizTalk service contract.  I walked through the BizTalk WCF Publishing Wizard and produced a one-way service that takes in my CallThresholdEvent message.

2010.07.08StreaminsightBts01

Once the service is created, I built the requisite receive port/location and a send port which subscribes on the CallThresholdEvent message.

All we need now is the right XSLT to transform the CEP event message to the WCF service contract message format.  How do we get that? The easiest way to get the correct XML is to invoke the service in the WCF Test Client and steal the SOAP payload it builds to call the service.  I pointed the WCF Test Client to my endpoint and invoked the service.

2010.07.08StreaminsightBts04

Once I confirmed that the service worked (and emitted a file from the send port), I switched the view from “formatted” to “xml” and could view the XML that was sent across the wire.

2010.07.08StreaminsightBts05

I took the “request” XML and created a new XSLT file with this request structure created in the root template.

<xsl:template match="*">
    <s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/">
      <s:Header>
        <!--<Action s:mustUnderstand="1" xmlns="http://schemas.microsoft.com/ws/2005/05/addressing/none">PublishThresholdEvent</Action>-->
      </s:Header>
      <s:Body xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema">
        <CallThresholdEvent xmlns="http://BizTalkEventProcessor">
          <ProductName xmlns="">
            <xsl:value-of select="Property[Name = 'EvtProd']/Value"/>
          </ProductName>
          <CallCategory xmlns="">
            <xsl:value-of select="Property[Name = 'EvtType']/Value"/>
          </CallCategory>
          <OccuranceCount xmlns="">
            <xsl:value-of select="Property[Name = 'EvtCount']/Value"/>
          </OccuranceCount>
          <TimeReceived xmlns=""></TimeReceived>
        </CallThresholdEvent>
      </s:Body>
    </s:Envelope>
  </xsl:template>

Note that you should NOT send the Action header as WCF takes care of that and the service endpoint barfs with an HTTP 500 if you send it.  It also takes roughly 96 hours to figure out that this is the problem.  Consider yourself warned.

At this point, I have all I need in BizTalk to call the service successfully.

StreamInsight Setup

The first query in my StreamInsight application performs an aggregation of events over a “tumbling” window.

var inputStream = CepStream<CallCenterRequestEventType>.Create("input", typeof(CallCenterAdapterFactory), config, EventShape.Point);

 var callTypeCount =
          from w in inputStream
          group w by new { w.RequestType, w.Product } into appGroup
          from x in appGroup.TumblingWindow(
                  TimeSpan.FromSeconds(15),
                  HoppingWindowOutputPolicy.ClipToWindowEnd)
           select new EventTypeSummary
           {
               EvtType = appGroup.Key.RequestType,
               EvtProd = appGroup.Key.Product,
               EvtCount = x.Count()
            };

In the query above, I take the call center event input stream and put the incoming events into groups based on the event type (e.g. “Info Request”, “Product Complaint”, “Account Change”) and product the customer is calling about.  I base these groups on a tumbling window that lasts 15 seconds.  This means that the window is flushed every 15 seconds and started fresh.  I then take the output of the window grouping and put it into a new, known type named EventTypeSummary.  If I use an anonymous type here instead, I get a “System.IndexOutOfRangeException: Index was outside the bounds of the array” error.

I next take the result of the first query and make it the input into a second query.  This one looks at any groups emitted by the first query and filters them based on a criteria my ESB is interested in.

var callTypeThreshold =
            from summary in callTypeCount
            where summary.EvtCount > 3 && summary.EvtType == "Product Complaint"
            select summary;

Above, I am looking for any “summary events” where the call type is a product complaint and there have been more than 3 of them for a specific product (during a given window).

Before I register my query, I need to define the StreamInsight adapter configuration for my web endpoint.  Recall above that we defined a structure to hold parameters that we will pass into the adapter at runtime.

var webAdapterBizTalkConfig = new WebOutputConfig()
 {
    HttpMethod = "POST",
    IsSoap = true,
    ServiceAddress = "http://localhost/BizTalkEventProcessingService/BizTalkEventProcessingService.svc",
    SoapAction = "PublishThresholdEvent",
    XslPath = @"[path]\CallCenterEvent_To_BizTalkSoapService.xslt"
  };

Above, you’ll see the service address pointing to my BizTalk-generated WCF endpoint, the SOAP action for my service, and a pointer to the XSLT that I created to transform the CEP event to a SOAP payload.

Finally, I registered the query and start it.

var allQuery = callTypeThreshold.ToQuery(
                         myApp,
                         "Threshold Events",
                         string.Empty,
                         typeof(WebOutputFactory),
                         webAdapterBizTalkConfig,
                         EventShape.Point,
                         StreamEventOrder.FullyOrdered);

You can see that I pass in my web adapter factory type and the adapter configuration properties defined earlier.

The Result

When all this is in place, I start up my StreamInsight application, begin generating events, and can observe BizTalk messages getting written to disk.

2010.07.08StreaminsightBts06

In this post we saw how I can link StreamInsight with BizTalk Server through a WCF channel.  You can grab the source code for the StreamInsight Web Adapter here. I’ve done some basic testing of the adapter against both RESTful and SOAP services, but there are great odds that you’ll find something I missed.  However, it hopefully gives you a great head start when building a StreamInsight solution that emits events to web endpoints.

Share

Author: Richard Seroter

Richard Seroter is Director of Developer Relations and Outbound Product Management at Google Cloud. He’s also an instructor at Pluralsight, a frequent public speaker, the author of multiple books on software design and development, and a former InfoQ.com editor plus former 12-time Microsoft MVP for cloud. As Director of Developer Relations and Outbound Product Management, Richard leads an organization of Google Cloud developer advocates, engineers, platform builders, and outbound product managers that help customers find success in their cloud journey. Richard maintains a regularly updated blog on topics of architecture and solution design and can be found on Twitter as @rseroter.

11 thoughts

  1. This is exactly what I need but I have some troubles since StreamInsight has been renewed.
    I suppose instead of ToQuery, I have to use an observer but I don’t see how to make an observer of the type webOutputAdapter.
    Can you help me with this?

      1. In the end I did the following to get it to work:
        1) create an observer with a function to call instead of a class or factory or …
        2) in that method just call the service I added with “Add service Reference” in the regular way.

        It worked perfectly with a subject and different sources and sinks but I’m not sure if it’s the right way to get it done.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.