Data Pipelines in Production — Managing Batch and Stream Processing with Airflow and Spark

Jenny Ching
9 min readNov 28, 2022

--

As year end approaching, thus marking my first year at my current company. The past sixth months I’ve started as a Spark newbie and learning the necessary skills to productionize our data pipelines on the job.

While my previous focus was more on modeling pipelines, I was oblivious to how to make the data that the model was trained on comes in a more manageable and reproducible way. So I volunteered to pick up work on the data engineering side, and begun my six month journey as a data engineer.

In this post, I want to share my learnings of data engineering tools and building, and optimizing data pipelines. I will walk through the pieces of setting up data pipelines with Spark and Airflow, combined with materials from the Designing Data-Intensive Applications book, Chapter 10 & 11.

Data Pipelines

So why do we need data pipelines? Let’s start with common use case, where we want to process some log files and get all the error and messages. If the file is small, this can be done with Unix commands and read/write parsed outputs to local files. As the data grows, this approach will not scale.

There are two use cases where we can apply data processing pipelines:

Batch processing systems (offline systems):
A batch processing system takes a large amount of input data, runs a job to process it, and produces some output data. Jobs often takes time to process, so no user is waiting for the job to finish. Instead, batch jobs are scheduled to run periodically (houly, daily, weekly etc.) offline.

Stream processing systems (near-real-time systems):
Stream processing is somewhere between online and offline/batch processing. Like a batch processing system, a stream processor consumes inputs and produces outputs. However, a stream job operates on events shortly after they happen, whereas a batch job operates on a fixed set of input data. This difference allows stream processing systems to have lower latency than the equivalent batch systems.

Introducing MapReduce

MapReduce, a batch processing algorithm was invented to help scale the data processing pipelines. A single MapReduce job is like a single Unix process: take one or more inputs and produces one or more outputs. That means, running a MapReduce job normally does not modify the input and does not have any side effects other than producing the output. The output files are written once, in a sequential fashion.

The main difference from pipelines of Unix commands is that MapReduce can parallelize a computation across many machines, without you having to write code to explicitly handle the parallelism. The mapper and reducer only operate on one record at a time; they don’t need to know where their input is coming from or their output is going to, so the framework can handle the complexities of moving data between machines.

There are two parts to the MapReduce algorithm:

Mapper:
The mapper is called once for every input record, and its job is to extract the key and value from the input record. For each input, it may generate any number of key-value pairs (including none). It does not keep any state from one input record to the next, so each record is handled independently.

Reducer:
The MapReduce framework takes the key-value pairs produced by the mappers, collects all the values belonging to the same key, and calls the reducer with an iterator over that collection of values. The reducer can produce output records.

From Hadoop to Spark

The earliest implementation of MapReduce is Hadoop. Although it is nowadays replaced by Spark because of performance while Spark keeps things mostly in memory and saving the network of file reading and writing. Other parts of Hadoop — yarn is replaced by k8s; HDFS replaced by cloud file systems (such as S3), making Hadoop no longer people’s choice.

In contrast, Spark is currently a must-have tool for processing large datasets. This technology has become the leading choice for many business applications in data engineering. The momentum is supported by managed services such as Databricks, which reduce part of the costs related to the purchase and maintenance of a distributed computing cluster.

In this post, we will focus on using Spark (Flink is another common choice, but since my experience so far is with Spark, we will go with that).

Entering Spark

A Spark system is consisted of Spark driver (master), spark executors (worker), and the orchestration mechanism between them. A part of data engineer’s job is to tinker with the Spark configurations, tuning cpu & memory to allow Spark perform shuffle jobs more efficiently and utilize the memory sufficiently.

  • Data partitions: this will be the number of tasks that Spark can process in parallel. Spark’s default number is 200, so if you see the shuffle tasks shows 200 on Spark UI, meaning the data might not be distributed evenly, you’d need to configure repartitioning (say 800 tasks) to speed up
  • Executor cores: one core performs one task, so say if you have 800 tasks and 50 instances, you’ll need 16 cores on each instance to process all those tasks
  • Memory: since Spark computes everything in memory, it is obvious that the system relies on memory allocated in each Spark executor. It is not desirable if the Spark executor experiences OOM, it will spill the results to disk and potentially slow down the processing. Leaving out too much unused memory is also bad, meaning we are wasting memory while we could have further speed up the jobs with same computing resources.

Spark Under the Hood

So how does Spark actually works? It a contains the mapper and reducer as we mentioned earlier.

In most cases, the application code that should run in the map task is not yet present on the machine that is assigned the task of running it, so the Spark first copies the code (e.g., JAR files in the case of Scala Spark program) to the appropriate machines. It then starts the map task and begins reading the input (from Hive in our case), passing one record at a time to the mapper callback, resulting in key-value pairs.

The reduce side of the computation is also partitioned. While the number of map tasks is determined by the number of input file blocks, the number of reduce tasks is configured by the job author (it can be different from the number of map tasks). To ensure that all key-value pairs with the same key end up at the same reducer, the framework uses a hash of the key to determine which reduce task should receive a particular key-value pair.

The key-value pairs must be sorted, but the dataset is likely too large to be sorted with a conventional sorting algorithm on a single machine. Instead, the sorting is performed in stages. First, each map task partitions its output by reducer, based on the hash of the key. Each of these partitions is kept in the mapper machine’s memory.

Whenever a mapper finishes reading its input and generate its sorted output, the Spark driver notifies the reducers that they can start fetching the output from that mapper for their partition. The process of partitioning by reducer, sorting, and copying data partitions from mappers to reducers is known as the shuffle.

The reduce task takes the key-value pairs from the mappers and merges them together, preserving the sort order. Thus, if different mappers produced records with the same key, they will be adjacent in the merged reducer input.

The reducer is called with a key and an iterator that sequentially scans over all records with the same key (which may in some cases not all fit in memory). These output records can be written to a S3 location where each partition is written as one file in the same location.

Orchestrating with Airflow

The range of problems you can solve with a single Spark job is limited. Thus, it is very common for Spark jobs to be chained together into workflows, such that the output of one job becomes the input to the next job.

To handle these dependencies between job executions, workflow schedulers such as Airflow are developed. Airflow as the scheduler have management features that are useful when maintaining a large collection of batch jobs. Workflows consisting of 50 to 100 Spark jobs are common when building recommendation systems.

We tipically create a Airflow DAG with the Spark operator, with params of where to pull the Spark JAR file and which Spark cluster to run the job. Airflow will kick off the job and monitor the job status. Once the job succeeds, it will trigger the next step in the DAG we specified.

Introducing Stream Processing

The second use case we mention earlier for applying data processing pipelines is stream processing, also known as near real-time processing.

In batch processing, a file is written once and then potentially read by multiple jobs. Analogously, in streaming terminology, an event is generated once by a producer and then potentially processed by multiple consumers In a filesystem, a filename identifies a set of related records; in a streaming system, related events are usually grouped together into a topic or stream.

The patterns for partitioning and parallelization in stream processors are also very similar to those in batch jobs, as basic mapping operations such as transforming and filtering records also work the same.

The one crucial difference to batch jobs is that a stream never ends. This difference has many implications: sorting does not make sense with an unbounded dataset, and so sort-merge joins cannot be used. Fault-tolerance mechanisms must also change: with a batch job that has been running for a few minutes, a failed task can simply be restarted from the beginning, but with a stream job that has been running for several years, restarting from the beginning after a crash may not be ideal.

Stream Processing Use Cases

Stream processing has long been used for monitoring purposes, where an organization wants to be alerted if certain things happen. For example:

  • Fraud detection systems: determine if the usage patterns of a credit card have unexpectedly changed, and block the card if it is likely to have been stolen.
  • Personalization: takes user realtime data and process those to provide personalized service, such as pinning a recent clicked item. It requires data processed in a high-volume, low-latency and streaming fashion.
  • Trading systems: examine price changes in a financial market and execute trades according to specified rules.
  • Manufacturing systems: monitor the status of machines in a factory, and quickly identify the problem if there is a malfunction.

These kinds of applications require quite sophisticated pattern matching and correlations. In the following section, I will focus on the personalization use case, which is one of the projected that I worked on. My task was to take advantage of Spark streaming’s stateful aggregation of user data streams such as impressions and click events to surface users’ latest clicked item.

Personalization with Spark batch and Streaming

After the user clicks on an item they like, imagine he/she shops around and came back 30 seconds later, the item they clicked on showed up on the front page. Data showed that it is more likely that the user will purchase the item they just licked a moment ago.

My task is to engineer that whole process, with the help of both Spark batch processing and Spark streaming.

Spark batch processing

Spark batch processing is first used to bootstrapped the initial state for the streaming job so at the time we start running the streaming job, we already have an idea of what people clicked on in the past 7 days. The workflow is as following:

  • Trigger the Scala Spark app to run daily scheduled on Airflow
  • In the the Scala Spark app, compute the latest clicked property id of each user for the past 7 days query below:
# group each user's clicks and sort by time of the clicks with latest on top,
# then get the top one row, which is the latest click we want
top_k = 1
df_click
.withColumn("latest_visit", expr("row_number() over (partition by user_id order by timestamp desc)"))
.where(s"""latest_visit <= ${top_k}""")
  • Save the computed Dataframe as parquet files to S3 bucket for streaming jobs to read as initial state

Spark Streaming

Now we have our batch job giving us data to start with, we can start the Spark streaming job. Unlike batch jobs that have longer cadence to trigger, Spark streaming job will trigger every 20 seconds, do the processing (stateful aggregation in our personalization use case), and write out the results. The workflow is as following:

  • Trigger the streaming job with 7 days as the water mark window
  • Read in the parquet files in S3 bucket that batch job wrote to and convert the Dataframe into a Dataset and perform groupByKey transform it to a KeyValueGroupedDataset
  • Consume Kafka streaming inputs from the click stream topic and apply filtering to process the data
  • Trigger Spark streaming mini-batch every 20 seconds to perform flatMapGroupsWithState on the user id to update their latest click and expire the clicks that are outside of the 7 days water mark window
  • Write out results to Redis cache for the online services to surface that latest click
Visualization of Spark streaming

There is still a lot of nitty gritty in the data engineering world that I am still exploring. For example, how to handle the skewed data (first thing is you need to spot where it happens from information Spark UI tells you); how does different queries impact the Spark performance? To learn more practical stuff, I recommend this book and blog.

--

--

No responses yet