Engineering / August 22, 2019

Brand Safety with Spark Streaming and Delta Lake

Eyeview serves data-driven online video ads for its clients. Brands know the importance of ad placement and ensuring their ads are not placed next to unfavorable content,The Internet Advertising Bureau (IAB), defines brand safety as keeping a brand’s reputation safe when they advertise online. In practice, this means avoiding placing ads next to inappropriate content. The content of the webpage defines the segments of that page URL (e.g. CNN.com has news about the presidential election, then politics can be segments of that page).

Brand Safety at Eyeview

The below diagram shows how the Eyeview implemented the concept of brand safety and what challenges we faced.

 Eyeview’s cluster of real-time bidders requires the information on whether a URL is brand safe or not to make the decision to serve the ad.  It gets this information from Aerospike (our real-time database with latency close to 10ms), but to persist this information in Aerospike we defined an offline solution which loads the segment information. Once our cluster of bidders gets the bid request, the URL of that bid request is dumped into S3. The number of requests the cluster gets is close to 500k requests per second. Getting numerous of URLs every second dumped into S3 after the deduping process (so that HTTP call is not made for the same URLs multiple times in the same timeframe) can create a massive problem processing a large number of small files simultaneously. Many of the developers in the big data world face this problem in Spark or Hadoop environments. Here, I’ll list the challenges that can arise while batch processing a large number of small files.

Challenge #1: Underutilized Resources

Underutilizing the cluster resources (seen in the photo below) for reading a small size file (~KB size) using a 1TB cluster would be like cutting bread using a sword instead of a butter knife.

Challenge #2: Manual Checkpointing

It is important to perform manual checkpointing to track which files were processed and which were not.  This can be extremely tedious in cases involving reprocessing of files or failures. Also, this may not be scalable if the data size becomes very large.

Challenge #3: Parquet Table Issues

Let’s assume somehow we managed to process these large number of small files and we are not caching/persisting data on the cluster and writing directly to a parquet table via Spark, we then would end up writing too many small files to the tables. The problem with parquet files is that the continuous append on the table is too slow.  We leveraged overwrite mode to save the data which ended up creating millisecond partitions to table.

Challenge #4: No Concurrent Reads on Parquet Tables

The latency of the offline jobs became so high that the job was continuously writing the data to the parquet tables, which means no other jobs can query that table and parquet does not work great with a very large number of partitions.

Spark Streaming and Delta Lake

To solve the above challenges we introduced two new technologies: Spark Streaming and Delta Lake.  The source of most of the large number of small files can be converted from batch processing to streaming processing. Spark streaming helped in solving the first two challenges. Instead of a cluster of bidders writing files which contain the URLs to S3, we started sending URLs  directly to a kinesis stream. This way we didn’t have a small number of files; all the data is in the streams, which would lead to utilizing our Spark resources efficiently.

By connecting Spark Streaming with Kinesis streams we no longer need to do manual checkpointing. Since Spark Streaming is inherently fault-tolerant we don’t have worry about failures and reprocessing of files.  The code snippet below reads the data from the Kinesis stream



 

The other two challenges with the parquet table are solved by introducing a new table format, Delta Lake.  Delta Lake supports ACID transactions, which basically means we can concurrently and reliably read and write this table. Delta tables are also very efficient with continuous appends to the tables.  A table in Delta Lake is both a batch table, as well as a streaming source and sink. The below code shows persisting the data into delta lake. This also helped us in removing the millisecond partitions; see the below code for reference (partitions are up to only hour level).


Conclusion and Results

By default, our cluster of bidders makes no bid as the decision if the bidders did not have segment information of an URL.  This would result in less advertising traffic which would have a substantial monetary impact. The latency of the old architecture was so high that the result was filtered out URLs – i.e. no ads.  By switching the process to Spark Streaming and Delta Lake, we decreased the number of bid calls to be filtered by 50%!  Once we had moved the architecture from batch processing to a streaming solution, we were able to reduce the cluster size of the Spark jobs, thus significantly reducing the  cost of the solution. More impressively, now we only require one job to take care of all the brand safety providers, which further reduced costs.

 
Rahul Jain

Rahul Jain

Date: 08.22.2019
Tags: