Amazon Kinesis is a new data stream processing service from AWS that makes it possible to ingest and read high volumes of data in real-time. That description may sound vaguely familiar to those who followed Microsoft’s attempts to put their CEP engine StreamInsight into the Windows Azure cloud as part of “Project Austin.” Two major differences between the two: Kinesis doesn’t have the stream query aspects of StreamInsight, and Amazon actually SHIPPED their product.
Kinesis looks pretty cool, and I wanted to try out a scenario where I have (1) a Windows Azure Web Site that generates data, (2) Amazon Kinesis processing data, and (3) an application in the CenturyLink Cloud which is reading the data stream.
What is Amazon Kinesis?
Kinesis provides a managed service that handles the intake, storage, and transportation of real-time streams of data. Each stream can handle nearly unlimited data volumes. Users set up shards which are the means for scaling up (and down) the capacity of the stream. All the data that comes into the a Kinesis stream is replicated across AWS availability zones within a region. This provides a great high availability story. Additionally, multiple sources can write to a stream, and a stream can be read by multiple applications.
Data is available in the stream for up to 24 hours, meaning that applications (readers) can pull shard records based on multiple schemes: given sequence number, oldest record, latest record. Kinesis uses DynamoDB to store application state (like checkpoints). You can interact with Kinesis via the provided REST API or via platform SDKs.
What DOESN’T Kinesis do? It doesn’t have any sort of adapter model, so it’s up to the developer to build producers (writers) and applications (readers). There is a nice client library for Java that has a lot of built in logic for application load balancing and such. But for the most part, this is still a developer-oriented solution for building big data processing solutions.
Setting up Amazon Kinesis
First off, I logged into the AWS console and located Kinesis in the navigation menu.
I’m then given the choice to create a new stream.
Next, I need to choose the initial number of shards for the stream. I can either put in the number myself, or use a calculator that helps me estimate how many shards I’ll need based on my data volume.
After a few seconds, my managed Kinesis stream is ready to use. For a given stream, I can see available shards, and some CloudWatch metrics related to capacity, latency, and requests.
I now have an environment for use!
Creating a data producer
Now I was ready to build an ASP.NET web site that publishes data to the Kinesis endpoint. The AWS SDK for .NET already Kinesis objects, so no reason to make this more complicated than it has to be. My ASP.NET site has NuGet packages that reference JSON.NET (for JSON serialization), AWS SDK, jQuery, and Bootstrap.
The web application is fairly basic. It’s for ordering pizza from a global chain. Imagine sending order info to Kinesis and seeing real-time reactions to marketing campaigns, weather trends, and more. Kinesis isn’t a messaging engine per se, but it’s for collecting and analyzing data. Here, I’m collecting some simplistic data in a form.
When clicking the “order” button, I build up the request and send it to a particular Kinesis stream. First, I added the following “using” statements:
using Newtonsoft.Json; using Amazon.Kinesis; using Amazon.Kinesis.Model; using System.IO; using System.Text;
The button click event has the following (documented) code. Notice a few things. My AWS credentials are stored in the web.config file, and I pass in an AmazonKinesisConfig to the client constructor. Why? I need to tell the client library which AWS region my Kinesis stream is in so that it can build the proper request URL. See that I added a few properties to the actual put request object. First, I set the stream name. Second, I added a partition key which is used to place the record in a given shard. It’s a way of putting “like” records in a particular shard.
protected void btnOrder_Click(object sender, EventArgs e) { //generate unique order id string orderId = System.Guid.NewGuid().ToString(); //build up the CLR order object Order o = new Order() { Id = orderId, Source = "web", StoreId = storeid.Text, PizzaId = pizzaid.Text, Timestamp = DateTime.Now.ToString() }; //convert to byte array in prep for adding to stream byte[] oByte = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(o)); //create stream object to add to Kinesis request using (MemoryStream ms = new MemoryStream(oByte)) { //create config that points to AWS region AmazonKinesisConfig config = new AmazonKinesisConfig(); config.RegionEndpoint = Amazon.RegionEndpoint.USEast1; //create client that pulls creds from web.config and takes in Kinesis config AmazonKinesisClient client = new AmazonKinesisClient(config); //create put request PutRecordRequest requestRecord = new PutRecordRequest(); //list name of Kinesis stream requestRecord.StreamName = "OrderStream"; //give partition key that is used to place record in particular shard requestRecord.PartitionKey = "weborder"; //add record as memorystream requestRecord.Data = ms; //PUT the record to Kinesis PutRecordResponse responseRecord = client.PutRecord(requestRecord); //show shard ID and sequence number to user lblShardId.Text = "Shard ID: " + responseRecord.ShardId; lblSequence.Text = "Sequence #:" + responseRecord.SequenceNumber; } }
With the web application done, I published it to a Windows Azure Web Site. This is super easy to do with Visual Studio 2013, and within a few seconds my application was there.
Finally, I submitted a bunch of records to Kinesis by adding pizza orders. Notice the shard ID and sequence number that Kinesis returns from each PUT request.
Creating a Kinesis application (record consumer)
To realistically read data from a Kinesis stream, there are three steps. First, you need to describe the stream in order to find out the shards. If I want a fleet of servers to run this application and read the stream, I’d need a way for each application to claim a shard to work on. The second step is to retrieve a “shard iterator” for a given shard. The iterator points to a place in the shard where I want to start reading data. Recall from above that I can start with the latest unread records, oldest records, or at a specific point in the shard. The third and final step is to get the records from a particular iterator. Part of the result set of this operation is a “next iterator” value. In my code, if I find another iterator value, I once again call the “get records” operation to pull any records from that iterator position.
Here’s the total code block, documented for your benefit.
private static void ReadFromKinesis() { //create config that points to Kinesis region AmazonKinesisConfig config = new AmazonKinesisConfig(); config.RegionEndpoint = Amazon.RegionEndpoint.USEast1; //create new client object AmazonKinesisClient client = new AmazonKinesisClient(config); //Step #1 - describe stream to find out the shards it contains DescribeStreamRequest describeRequest = new DescribeStreamRequest(); describeRequest.StreamName = "OrderStream"; DescribeStreamResponse describeResponse = client.DescribeStream(describeRequest); List<Shard> shards = describeResponse.StreamDescription.Shards; foreach(Shard s in shards) { Console.WriteLine("shard: " + s.ShardId); } //grab the only shard ID in this stream string primaryShardId = shards[0].ShardId; //Step #2 - get iterator for this shard GetShardIteratorRequest iteratorRequest = new GetShardIteratorRequest(); iteratorRequest.StreamName = "OrderStream"; iteratorRequest.ShardId = primaryShardId; iteratorRequest.ShardIteratorType = ShardIteratorType.TRIM_HORIZON; GetShardIteratorResponse iteratorResponse = client.GetShardIterator(iteratorRequest); string iterator = iteratorResponse.ShardIterator; Console.WriteLine("Iterator: " + iterator); //Step #3 - get records in this iterator GetShardRecords(client, iterator); Console.WriteLine("All records read."); Console.ReadLine(); } private static void GetShardRecords(AmazonKinesisClient client, string iteratorId) { //create reqest GetRecordsRequest getRequest = new GetRecordsRequest(); getRequest.Limit = 100; getRequest.ShardIterator = iteratorId; //call "get" operation and get everything in this shard range GetRecordsResponse getResponse = client.GetRecords(getRequest); //get reference to next iterator for this shard string nextIterator = getResponse.NextShardIterator; //retrieve records List<Record> records = getResponse.Records; //print out each record's data value foreach (Record r in records) { //pull out (JSON) data in this record string s = Encoding.UTF8.GetString(r.Data.ToArray()); Console.WriteLine("Record: " + s); Console.WriteLine("Partition Key: " + r.PartitionKey); } if(null != nextIterator) { //if there's another iterator, call operation again GetShardRecords(client, nextIterator); } }
Now I had a working Kinesis application that can run anywhere. Clearly it’s easy to run this on AWS EC2 servers (and the SDK does a nice job with retrieving temporary credentials for apps running within EC2), but there’s a good chance that cloud users have a diverse portfolio of providers. Let’s say I love the application services from AWS, but like the server performance and management capabilities from CenturyLink. In this case, I built a Windows Server to run my Kinesis application.
With my server ready, I ran the application and saw my shards, my iterators, and my data records.
Very cool and pretty simple. Don’t forget that each data consumer has some work to do to parse the stream, find the (partition) data they want, and perform queries on it. You can imagine loading this into an Observable and using LINQ queries on it to aggregate data. Regardless, it’s very nice to have a durable stream processing service that supports replays and multiple readers.
Summary
The “internet of things” is here, and companies that can quickly gather and analyze data will have a major advantage. Amazon Kinesis is an important service to that end, but don’t think of it as something that ONLY works with other applications in the AWS cloud. We saw here that you could have all sorts of data producers running on devices, on-premises, or in other clouds. The Kinesis applications that consume data can also run virtually anywhere. The modern architect recognizes that composite applications are the way to go, and hopefully this helped you understand another services that’s available to you!
Hi, Thanks for this post. I just installed AWS .Net SDK on Visual Studio but it doesn’t have the Kinesis libraries (Amazon.Kinesis and Amazon.Kinesis.Model) . Do you know what I should do? Could you please share the project files as well?
Bests,
Frank
Hi Frank, did you install the NuGet package for your project? I just did, and it appears the Kinesis classes are there.
Thanks, It is working.
Is this example use any KCL classes.
as kcl needs java to be installed to keep MultiLangdaemon running.
and why it is significant to use KC?
why after get record it not save dynamodb section
How to implement wpf application?
You’re asking how you’d process this in a WPF app?
yes…My application is desktop app and mostly works on offline mode. My needed when on offline mode the record should be captured locally and synced when app is connected to online. Does it do? .