A core component of modern data processing systems is a streaming architecture, such as Apache Kafka or Amazon Kinesis. Streams allow data to be processed in real time, while making consumers of data completely independent of data producers. At Eyeview, we use Amazon Kinesis because having a streaming-based architecture allows us to react to consumers’ online behavior within seconds while processing billions of events each day.
Here, we’ll walk you through how we set up Kinesis with our system, ways to optimize the process and important things to look out for along the way.
First Steps with Kinesis
To start using the power of Kinesis, you first need to create a stream in the AWS console. Kinesis is composed of streams, each of which can grow and shrink independently of other streams. Each stream is composed of shards, and a stream’s capacity is determined by the number of its shards. Amazon charges for the stream’s defined capacity rather than the actual usage, so one of the challenges is how to scale Kinesis up as data grows, while keeping the cost in check.
One way to understand shards is to think of each stream as a highway with a predefined number of lanes. As the number of cars (data items) grows or shrinks, we are asking Amazon to construct or deconstruct highway lanes (shards), so we can create a better traffic flow. Although Amazon creates new shards much faster than construction companies create highway lanes, it will take a few minutes, and–most importantly–new shards are not immediately available. Kinesis creates new shards by splitting existing shards, so data will be pushed into a new shard only once all data was read from its parent shard.
Adding lanes is not enough, though; we have to ensure that new traffic is well distributed across lanes. This is where partition keys come into play. When pushing data to Kinesis, each item includes the payload as well as a partitioning key. The partitioning key defines the lane (shard) to which each car (data item) goes. Each shard has a defined range of keys it accepts and for each data item we assign a key. Based on the shards’ ranges and the items’ keys, Kinesis decides which data item will be pushed to which shard. Here’s a code example of pushing a data item with a partition key to a Kinesis stream:
If either the shards’ ranges or the data’s keys are not well distributed, it will create hotspots in the data pipeline and some lanes will get more traffic than others. Since Kinesis imposes limits at a shard level, if items are not well distributed, you will run into frequent errors–specifically, “ProvisionedThroughputException”, which occurs when a shard gets more data than its capacity.
Picking a Good Partition Is Key
The simplest way to ensure well-distributed keys is to generate a random key. The size of the key should be 16 bytes or more; the actual key used in Kinesis to distribute the data is a md5 of the the provided key. A simple implementation would be to use: UUID.randomUUID().toString() as a partition key.
In some cases, however, a random key is not the best solution. If you’re planning to join multiple streams when reading them from Kinesis, then use the same partition key for both datasets to ensure the same reader processes both items. Another use case that requires using a custom key is aggregating items by a specific dimension. For example, if you want to count all events per user in memory, use the user ID as the key to ensure all events of a given user will be read by the same reader.
When using a custom key, you have to calculate the distribution of keys. If, on average, the number of data items per key varies, plan the stream’s capacity based on the maximum number of items per key. Alternatively, use random keys, and instead of aggregating in memory, use a global cache that all readers can access, such as ElasticCache, DynamoDB or Redis:
Ensuring balanced ranges
In addition to the well-distributed partition keys, we have to make sure the shards’ assigned ranges are not skewed. When creating a new stream, shards ranges will be uniformly assigned, but as we scale the stream up and down (by adding and removing shards), this may no longer be the case. If a scaling operation fails, it might leave the shards’ ranges unbalanced.
Periodically, verify the shards’ ranges by calling the “describeStream” API and checking the key ranges for each shard. Note that it is a paginated call and it includes both open and closed shards. Keep calling describeStream until no more pages are available and filter the results to open shards only. Here’s a sample code for retrieving the open shards:
Scaling Streams on the Client Side
One way to auto-scale streams up and down is to use the Kinesis Scaling Utils provided by AWS (https://github.com/awslabs/amazon-kinesis-scaling-utils). Cloud Watch metrics can be used to determine when the streams have to grow or shrink.
However, there are two important limitations to this method:
- Versions prior to 9.5.x support scaling based on either write capacity or read capacity.
- Client-side scaling is implemented by consecutively calling “splitShard” and “mergeShards” until reaching the number of desired shards with balanced ranges. Since the full scaling operation is not atomic, it could fail in the middle, leaving the shards unbalanced.
At Eyeview, these limitations with client-side scaling left us with unbalanced shards more often than expected, which caused unnecessary delays. The underlying reason was that the scaling library calls the “describeStream” API many times and the AWS API limit is 10 calls per second for the entire AWS account. Our data architecture is composed of many data streams, which caused the scaling operation to fail midway and leaving streams unbalanced.
We were able to fix this issue once AWS released a new API that can scale streams up and down automatically on the server side. This ensures that the operation does not fail midway and shards’ ranges stay balanced. The limits for this API call are a maximum of 2 calls per 24 hours rolling window, maximum scaling up of double the current capacity, and maximum scaling down of halving the current capacity. If your needs are within the above limits, we recommend using this API. Here’s a sample call from command line:
Partition key: Choose a random partition key unless you need to aggregate or join streams in memory. In this case, you’ll have to either remove items with above average data per item or plan shard and data processing applications capacity based on the maximum data per item.
Scaling: Use the new server-side API unless you’re planning to scale streams up and down quickly or more than double in capacity. If you’re using the client-side library, monitor the process and ensure that shards’ ranges are balanced. If a scaling operation fails, keep trying until it succeeds.
By utilizing Kinesis and automatically scaling its capacity up and down, we are able to process millions of events within seconds and send them to multiple data stores while keeping cost in check.