Data Stream Processing with Amazon Kinesis and .NET Applications

Amazon Kinesis is a new data stream processing service from AWS that makes it possible to ingest and read high volumes of data in real-time. That description may sound vaguely familiar to those who followed Microsoft’s attempts to put their CEP engine StreamInsight into the Windows Azure cloud as part of “Project Austin.” Two major differences between the two: Kinesis doesn’t have the stream query aspects of StreamInsight, and Amazon actually SHIPPED their product.

Kinesis looks pretty cool, and I wanted to try out a scenario where I have (1) a Windows Azure Web Site that generates data, (2) Amazon Kinesis processing data, and (3) an application in the CenturyLink Cloud which is reading the data stream.

2014.01.08kinesis05

What is Amazon Kinesis?

Kinesis provides a managed service that handles the intake, storage, and transportation of real-time streams of data. Each stream can handle nearly unlimited data volumes. Users set up shards which are the means for scaling up (and down) the capacity of the stream. All the data that comes into the a Kinesis stream is replicated across AWS availability zones within a region. This provides a great high availability story. Additionally, multiple sources can write to a stream, and a stream can be read by multiple applications.

Data is available in the stream for up to 24 hours, meaning that applications (readers) can pull shard records based on multiple schemes: given sequence number, oldest record, latest record. Kinesis uses DynamoDB to store application state (like checkpoints). You can interact with Kinesis via the provided REST API or via platform SDKs.

What DOESN’T Kinesis do? It doesn’t have any sort of adapter model, so it’s up to the developer to build producers (writers) and applications (readers). There is a nice client library for Java that has a lot of built in logic for application load balancing and such. But for the most part, this is still a developer-oriented solution for building big data processing solutions.

Setting up Amazon Kinesis

First off, I logged into the AWS console and located Kinesis in the navigation menu.

2014.01.08kinesis01

I’m then given the choice to create a new stream.

2014.01.08kinesis02

Next, I need to choose the initial number of shards for the stream. I can either put in the number myself, or use a calculator that helps me estimate how many shards I’ll need based on my data volume.

2014.01.08kinesis03

After a few seconds, my managed Kinesis stream is ready to use. For a given stream, I can see available shards, and some CloudWatch metrics related to capacity, latency, and requests.

2014.01.08kinesis04

I now have an environment for use!

Creating a data producer

Now I was ready to build an ASP.NET web site that publishes data to the Kinesis endpoint. The AWS SDK for .NET already Kinesis objects, so no reason to make this more complicated than it has to be. My ASP.NET site has NuGet packages that reference JSON.NET (for JSON serialization), AWS SDK, jQuery, and Bootstrap.

2014.01.08kinesis06

The web application is fairly basic. It’s for ordering pizza from a global chain. Imagine sending order info to Kinesis and seeing real-time reactions to marketing campaigns, weather trends, and more. Kinesis isn’t a messaging engine per se, but it’s for collecting and analyzing data. Here, I’m collecting some simplistic data in a form.

2014.01.08kinesis07

When clicking the “order” button, I build up the request and send it to a particular Kinesis stream. First, I added the following “using” statements:

using Newtonsoft.Json;
using Amazon.Kinesis;
using Amazon.Kinesis.Model;
using System.IO;
using System.Text;

The button click event has the following (documented) code.  Notice a few things. My AWS credentials are stored in the web.config file, and I pass in an AmazonKinesisConfig to the client constructor. Why? I need to tell the client library which AWS region my Kinesis stream is in so that it can build the proper request URL. See that I added a few properties to the actual put request object. First, I set the stream name. Second, I added a partition key which is used to place the record in a given shard. It’s a way of putting “like” records in a particular shard.

protected void btnOrder_Click(object sender, EventArgs e)
    {
        //generate unique order id
        string orderId = System.Guid.NewGuid().ToString();

        //build up the CLR order object
        Order o = new Order() { Id = orderId, Source = "web", StoreId = storeid.Text, PizzaId = pizzaid.Text, Timestamp = DateTime.Now.ToString() };

        //convert to byte array in prep for adding to stream
        byte[] oByte = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(o));

        //create stream object to add to Kinesis request
        using (MemoryStream ms = new MemoryStream(oByte))
        {
            //create config that points to AWS region
            AmazonKinesisConfig config = new AmazonKinesisConfig();
            config.RegionEndpoint = Amazon.RegionEndpoint.USEast1;

            //create client that pulls creds from web.config and takes in Kinesis config
            AmazonKinesisClient client = new AmazonKinesisClient(config);

            //create put request
            PutRecordRequest requestRecord = new PutRecordRequest();
            //list name of Kinesis stream
            requestRecord.StreamName = "OrderStream";
            //give partition key that is used to place record in particular shard
            requestRecord.PartitionKey = "weborder";
            //add record as memorystream
            requestRecord.Data = ms;

            //PUT the record to Kinesis
            PutRecordResponse responseRecord = client.PutRecord(requestRecord);

            //show shard ID and sequence number to user
            lblShardId.Text = "Shard ID: " + responseRecord.ShardId;
            lblSequence.Text = "Sequence #:" + responseRecord.SequenceNumber;
        }
    }

With the web application done, I published it to a Windows Azure Web Site. This is super easy to do with Visual Studio 2013, and within a few seconds my application was there.

2014.01.08kinesis08

Finally, I submitted a bunch of records to Kinesis by adding pizza orders. Notice the shard ID and sequence number that Kinesis returns from each PUT request.

2014.01.08kinesis09

Creating a Kinesis application (record consumer)

To realistically read data from a Kinesis stream, there are three steps. First, you need to describe the stream in order to find out the shards. If I want a fleet of servers to run this application and read the stream, I’d need a way for each application to claim a shard to work on. The second step is to retrieve a “shard iterator” for a given shard. The iterator points to a place in the shard where I want to start reading data. Recall from above that I can start with the latest unread records, oldest records, or at a specific point in the shard. The third and final step is to get the records from a particular iterator. Part of the result set of this operation is a “next iterator” value. In my code, if I find another iterator value, I once again call the “get records” operation to pull any records from that iterator position.

Here’s the total code block, documented for your benefit.

private static void ReadFromKinesis()
{
    //create config that points to Kinesis region
    AmazonKinesisConfig config = new AmazonKinesisConfig();
    config.RegionEndpoint = Amazon.RegionEndpoint.USEast1;

   //create new client object
   AmazonKinesisClient client = new AmazonKinesisClient(config);

   //Step #1 - describe stream to find out the shards it contains
   DescribeStreamRequest describeRequest = new DescribeStreamRequest();
   describeRequest.StreamName = "OrderStream";

   DescribeStreamResponse describeResponse = client.DescribeStream(describeRequest);
   List<Shard> shards = describeResponse.StreamDescription.Shards;
   foreach(Shard s in shards)
   {
       Console.WriteLine("shard: " + s.ShardId);
   }

   //grab the only shard ID in this stream
   string primaryShardId = shards[0].ShardId;

   //Step #2 - get iterator for this shard
   GetShardIteratorRequest iteratorRequest = new GetShardIteratorRequest();
   iteratorRequest.StreamName = "OrderStream";
   iteratorRequest.ShardId = primaryShardId;
   iteratorRequest.ShardIteratorType = ShardIteratorType.TRIM_HORIZON;

   GetShardIteratorResponse iteratorResponse = client.GetShardIterator(iteratorRequest);
   string iterator = iteratorResponse.ShardIterator;

   Console.WriteLine("Iterator: " + iterator);

   //Step #3 - get records in this iterator
   GetShardRecords(client, iterator);

   Console.WriteLine("All records read.");
   Console.ReadLine();
}

private static void GetShardRecords(AmazonKinesisClient client, string iteratorId)
{
   //create reqest
   GetRecordsRequest getRequest = new GetRecordsRequest();
   getRequest.Limit = 100;
   getRequest.ShardIterator = iteratorId;

   //call "get" operation and get everything in this shard range
   GetRecordsResponse getResponse = client.GetRecords(getRequest);
   //get reference to next iterator for this shard
   string nextIterator = getResponse.NextShardIterator;
   //retrieve records
   List<Record> records = getResponse.Records;

   //print out each record's data value
   foreach (Record r in records)
   {
       //pull out (JSON) data in this record
       string s = Encoding.UTF8.GetString(r.Data.ToArray());
       Console.WriteLine("Record: " + s);
       Console.WriteLine("Partition Key: " + r.PartitionKey);
   }

   if(null != nextIterator)
   {
       //if there's another iterator, call operation again
       GetShardRecords(client, nextIterator);
   }
}

Now I had a working Kinesis application that can run anywhere. Clearly it’s easy to run this on AWS EC2 servers (and the SDK does a nice job with retrieving temporary credentials for apps running within EC2), but there’s a good chance that cloud users have a diverse portfolio of providers. Let’s say I love the application services from AWS, but like the server performance and management capabilities from CenturyLink. In this case, I built a Windows Server to run my Kinesis application.

2014.01.08kinesis10

With my server ready, I ran the application and saw my shards, my iterators, and my data records.

2014.01.08kinesis11

Very cool and pretty simple. Don’t forget that each data consumer has some work to do to parse the stream, find the (partition) data they want, and perform queries on it. You can imagine loading this into an Observable and using LINQ queries on it to aggregate data. Regardless, it’s very nice to have a durable stream processing service that supports replays and multiple readers.

Summary

The “internet of things” is here, and companies that can quickly gather and analyze data will have a major advantage. Amazon Kinesis is an important service to that end, but don’t think of it as something that ONLY works with other applications in the AWS cloud. We saw here that you could have all sorts of data producers running on devices, on-premises, or in other clouds. The Kinesis applications that consume data can also run virtually anywhere. The modern architect recognizes that composite applications are the way to go, and hopefully this helped you understand another services that’s available to you!

Author: Richard Seroter

Richard Seroter is Director of Developer Relations and Outbound Product Management at Google Cloud. He’s also an instructor at Pluralsight, a frequent public speaker, the author of multiple books on software design and development, and a former InfoQ.com editor plus former 12-time Microsoft MVP for cloud. As Director of Developer Relations and Outbound Product Management, Richard leads an organization of Google Cloud developer advocates, engineers, platform builders, and outbound product managers that help customers find success in their cloud journey. Richard maintains a regularly updated blog on topics of architecture and solution design and can be found on Twitter as @rseroter.

10 thoughts

  1. Hi, Thanks for this post. I just installed AWS .Net SDK on Visual Studio but it doesn’t have the Kinesis libraries (Amazon.Kinesis and Amazon.Kinesis.Model) . Do you know what I should do? Could you please share the project files as well?

    Bests,
    Frank

  2. Is this example use any KCL classes.
    as kcl needs java to be installed to keep MultiLangdaemon running.
    and why it is significant to use KC?

      1. yes…My application is desktop app and mostly works on offline mode. My needed when on offline mode the record should be captured locally and synced when app is connected to online. Does it do? .

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.