Engineering / August 20, 2019

From Data Lake to No Sql Table

Intro

Eyeview is a video advertising company which specializes in delivering relevant ads to users across their devices in real-time. As for any data driven company, the quality of our data determines the power of our algorithms, and ultimately, the effectiveness of our ads. Part of the challenge we face is maintaining data integrity across many services that use it. In this post, we will focus on one challenge as it relates to audience data – a dataset which is critical for accurate ad targeting.

The Challenge of Many Hops and Data Integrity:

Audience data comes to us from external partners. It travels across five different services before it gets to its most important destination (Aerospike). There, it gets accessed during real-time bidding to match a user with an audience in a matter of milliseconds. The services consuming the data are:

  1. S3 : distributed file storage provided by AWS
  2. Spark : our big data lake
  3. AWS Kinesis :  data bus used to persist and share data via streams
  4. AWS KCL apps : data enrichment and dispatching services on top of Kinesis
  5. Aerospike : scalable key value store used for real time bidding

Each data hop presents unique challenges in terms of checking integrity and ensuring no data loss occurs. Here, we will describe an issue we found in one of these interactions – namely between Spark and Kinesis and the innovative approach we took to solve the issue.

For context, the path of audience data is shown below:

The Challenge of Dealing with AWS Kinesis:

Sending data to Kinesis is not straightforward, due to the number of limits it imposes on the sender. The limits are expressed in terms of rate of batches sent, rate of bytes sent as well as max entries and bytes per batch. Violating these limits results in rejection of sent data via an exception. Depending on the capacity of the stream, (expressed in units of parallelism called shards) the effective limits can vary significantly. To deal with the issue we developed a component called Kinesis Buffered Client (KBC) which uses buffering and throttling to enforce rate limits. KBC uses a common worker pool pattern based on java’s executor framework (a queue for requests and a pool of thread-based workers taking items off the queue and processing them in the background).

The design is shown below:

Each thread monitors how quickly its buffer fills up in terms of bytes and number of entries. When it occurs too quickly, it waits to fill the remaining time until it reaches the limit, and then sends out records to the kinesis server. If the buffer is not full yet enough time has passed to allow for sending, the thread will automatically send out what it has in the buffer. The client adding records to KBC won’t block unless the main request queue gets full. So in a sense, throttling controls exist at two levels: main thread adding items to KBC, and worker threads buffering and sending data out. This is a complex design, but it allows for large bursts of incoming records to be absorbed without blocking the main client (at the expense of using more memory of course).

Leaking Data:

KBC is used in two places in our system – a java service receiving tracking events, (http requests representing stages of video playback) and data ingestion Spark jobs. While KBC works perfectly in the tracking service (which is stateful and super sensitive to data loss ), it posed a major issue in a stateless environment like Spark. The data loss we observed was up to 70% in certain cases. We discovered that the issue was not in any component itself, but in an interaction between KBC and Spark. Spark code interacts with KBC via partition closures (scala functions distributed across Spark worker nodes). Each closure executes on a separate thread and gets access to a part of a distributed data set (called DataFrame).

Example of how this looks is below:

TL;DR: The code above will exit before the data in all buffers is sent out to kinesis. This is because the client does not block waiting for results after submitting data into KBC, therefore, when all the s3 data is read and buffered whatever is in the buffers will get lost.

Detailed Explanation: The closure code reads data from s3 and submits it to KBC completes as soon as the last batch of items have been buffered on the main KBC request queue. When closure completes, the worker threads launched during its execution stop their processing. It seems like a violation of the JVM specification which says that any non-daemon thread will continue to run within a jvm until it finishes its work. However, this is not the behavior we observed. The only way to guarantee that a thread would complete its work was to add an explicit wait for the results. The following example in scala reproduces the two behaviors in spark (with and without the explicit wait).

The Next Generation Solution:

To ensure that the client is blocked for just enough time to comply with the limits, we introduced two changes: Rate limiting and Futures. Rate limiting is a mechanism which blocks a caller whenever the rate of activity exceeds a given limit. We picked a simple and intuitive algorithm called token bucket which is commonly used in the networking domain. A java timer incrementing an atomic integer was all that was needed to generate tokens. KBC requests a token for every new request either consuming the token (by decrementing atomic integer) or blocking until a new token is generated.  Futures gave us a simple way to wait for status of sent data even if rate limits weren’t violated. KBC design became much simpler as Rate Limiter is focused on throttling based on given limits, worker code focused on sending batches of data to kinesis, facade focused on buffering requests to optimize kinesis batch record size while delegated throttling and sending to Rate Limiter and Worker respectively.

The new design is shown below:

This is the main logic of the Facade within KBC:

 

The Proof is in the Pudding

After we launched the newly improved KBC data loss came down to under 1%. We then migrated this component to all Spark ingestion jobs with great results improving stability of our jobs and consistency of our data. This project was a win, not only for the business but for us as engineers because of all the exciting things we’ve learned along the way. If you enjoy working on distributed systems and architecting data pipelines come join us!

 
Alex Belyansky

Alex Belyansky

Date: 08.20.2019
Tags: