This post is a continuation of a two-part series. In the first part, we delved into Apache Flink‘s internal mechanisms for checkpointing, in-flight data buffering, and handling backpressure. We covered these concepts in order to understand how buffer debloating and unaligned checkpoints allow us to enhance performance for specific conditions in Apache Flink applications.
In Part 1, we introduced and examined how to use buffer debloating to improve in-flight data processing. In this post, we focus on unaligned checkpoints. This feature has been available since Apache Flink 1.11 and has received many improvements since then. Unaligned checkpoints help, under specific conditions, to reduce checkpointing time for applications suffering temporary backpressure, and can be now enabled in Amazon Managed Service for Apache Flink applications running Apache Flink 1.15.2 through a support ticket.
Even though this feature might improve performance for your checkpoints, if your application is constantly failing because of checkpoints timing out, or is suffering from having constant backpressure, you may require a deeper analysis and redesign of your application.
As discussed in Part 1, Apache Flink checkpointing allows applications to record state in case of failure. We’ve already discussed how checkpoints, when triggered by the job manager, signal all source operators to snapshot their state, which is then broadcasted as a special record called a checkpoint barrier. This process achieves exactly-once consistency for state in a distributed streaming application through the alignment of these barriers.
Let’s walk through the process of aligned checkpoints in a standard Apache Flink application. Remember that Apache Flink distributes the workload horizontally: each operator (a node in the logical flow of your application, including sources and sinks) is split into multiple sub-tasks based on its parallelism.
The alignment of checkpoint barriers is crucial for achieving exactly-once consistency in Apache Flink applications during checkpoint runs. To recap, when a job manager triggers a checkpoint, all sub-tasks of source operators receive a signal to initiate the checkpoint process. Each sub-task independently snapshots its state to the state backend and broadcasts a special record known as a checkpoint barrier to all outgoing streams.
When an application operates with a parallelism higher than 1, multiple instances of each task—referred to as sub-tasks—enable parallel message consumption and processing. A sub-task can receive distinct partitions of the same stream from different upstream sub-tasks, such as after a stream repartitioning with
rebalance operations. To maintain exactly-once consistency, all sub-tasks must wait for the arrival of all checkpoint barriers before taking a snapshot of the state. The following diagram illustrates the checkpoint barriers flow.
This phase is called checkpoint alignment. During alignment, the sub-task stops processing records from the partitions from which it has already received barriers, as shown in the following figure.
However, it continues to process partitions that are behind the barrier.
When barriers from all upstream partitions have arrived, the sub-task takes a snapshot of its state.
Then it broadcasts the barrier downstream.
The time a sub-task spends waiting for all barriers to arrive is measured by the checkpoint Alignment Duration metric, which can be observed in the Apache Flink UI.
If the application experiences backpressure, an increase in this metric could lead to longer checkpoint durations and even checkpoint failures due to timeouts. This is where unaligned checkpoints become a viable option to potentially enhance checkpointing performance.
Unaligned checkpoints address situations where backpressure is not just a temporary spike, but results in timeouts for aligned checkpoints, due to barrier queuing within the stream. As discussed in Part 1, checkpoint barriers can’t overtake regular records. Therefore, significant backpressure can slow down the movement of barriers across the application, potentially causing checkpoint timeouts.
The objective of unaligned checkpoints is to enable barrier overtaking, allowing barriers to move swiftly from source to sink even when the data flow is slower than anticipated.
Building on what we saw in Part 1 concerning checkpoints and what aligned checkpoints are, let’s explore how unaligned checkpoints modify the checkpointing mechanism.
Upon emission, each source’s checkpoint barrier is injected into the stream flowing across sub-tasks. It travels from the source output network buffer queue into the input network buffer queue of the subsequent operator.
Upon the arrival of the first barrier in the input network buffer queue, the operator initially waits for barrier alignment. If the specified alignment timeout expires because not all barriers have reached the end of the input network buffer queue, the operator switches to unaligned checkpoint mode.
The alignment timeout can be set programmatically by
env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ofSeconds(30)), but modifying the default is not recommended in Apache Flink 1.15.
The operator waits until all checkpoint barriers are present in the input network buffer queue before triggering the checkpoint. Unlike aligned checkpoints, the operator doesn’t need to wait for all barriers to reach the queue’s end, allowing the operator to have in-flight data from the buffer that hasn’t been processed before checkpoint initiation.
After all barriers have arrived in the input network buffer queue, the operator advances the barrier to the end of the output network buffer queue. This enhances checkpointing speed because the barrier can smoothly traverse the application from source to sink, independent of the application’s end-to-end latency.
After forwarding the barrier to the output network buffer queue, the operator initiates the snapshot of in-flight data between the barriers in the input and output network buffer queues, along with the snapshot of the state.
Although processing is momentarily paused during this process, the actual writing to the remote persistent state storage occurs asynchronously, preventing potential bottlenecks.
The local snapshot, encompassing in-flight messages and state, is saved asynchronously in the remote persistent state store, while the barrier continues its journey through the application.
When to use unaligned checkpoints
Remember, barrier alignment only occurs between partitions coming from different sub-tasks of the same operator. Therefore, if an operator is experiencing temporary backpressure, enabling unaligned checkpoints may be beneficial. This way, the application doesn’t have to wait for all barriers to reach the operator before performing the snapshot of state or moving the barrier forward.
Temporary backpressure could arise from the following:
- A surge in data ingestion
- Backfilling or catching up with historical data
- Increased message processing time due to delayed external systems
Another scenario where unaligned checkpoints prove advantageous is when working with exactly-once sinks. Utilizing the two-phase commit sink function for exactly-once sinks, unaligned checkpoints can expedite checkpoint runs, thereby reducing end-to-end latency.
When not to use unaligned checkpoints
Unaligned checkpoints won’t reduce the time required for savepoints (called snapshots in the Amazon Managed Service for Apache Flink implementation) because savepoints exclusively utilize aligned checkpoints. Furthermore, because Apache Flink doesn’t permit concurrent unaligned checkpoints, savepoints won’t occur simultaneously with unaligned checkpoints, potentially elongating savepoint durations.
Unaligned checkpoints won’t fix any underlying issue in your application design. If your application is suffering from persistent backpressure or constant checkpointing timeouts, this might indicate data skewness or underprovisioning, which may require improving and tuning the application.
Using unaligned checkpoints with buffer debloating
One alternative for reducing the risks associated with an increased state size is to combine unaligned checkpoints with buffer debloating. This approach results in having less in-flight data to snapshot and store in the state, along with less data to be used for recovery in case of failure. This synergy facilitates enhanced performance and efficient checkpoint runs, leading to smaller checkpointing sizes and faster recovery times. When testing the use of unaligned checkpoints, we recommend doing so with buffer debloating to prevent the state size from increasing.
Unaligned checkpoints are subject to the following limitations:
- They provide no benefit for operators with a parallelism of 1.
- They only improve performance for operators where barrier alignment would have occurred. This alignment happens only if records are coming from different sub-tasks of the same operator, for example, through repartitioning or
- Operators receiving input from multiple sources or participating in joins might not experience improvements, because the operator would be receiving data from different operators in those cases.
- Although checkpoint barriers can surpass records in the network’s buffer queue, this won’t occur if the sub-task is currently processing a message. If processing a message takes too much time (for example, a flat-map operation emitting numerous records for each input record), barrier handling will be delayed.
- As we have seen, savepoints always use aligned checkpoints. If the savepoints of your applications are slow due to barrier alignment, unaligned checkpoints will not help.
- Additional limitations affect watermarks, message ordering, and broadcast state in recovery. For more details, refer to Limitations.
Considerations for implementing unaligned checkpoints:
- Unaligned checkpoints introduce additional I/O to checkpoint storage
- Checkpoints encompass not only operator state but also in-flight data within network buffer queues, leading to increased state size
We offer the following recommendations:
- Consider enabling unaligned checkpoints only if both of the following conditions are true:
- Checkpoints are timing out.
- The average checkpoint Async Duration of any operator is more than 50% of the total checkpoint duration for the operator (sum of Sync Duration + Async Duration).
- Consider enabling buffer debloating first, and evaluate whether it solves the problem of checkpoints timing out.
- If buffer debloating doesn’t help, consider enabling unaligned checkpoints along with buffer debloating. Buffer debloating mitigates the drawbacks of unaligned checkpoints, reducing the amount of in-flight data.
- If unaligned checkpoints and buffer debloating together don’t improve checkpoint alignment duration, consider testing unaligned checkpoints alone.
Finally, but most importantly, always test unaligned checkpoints in a non-production environment first, running some comparative performance testing with a realistic workload, and verify that unaligned checkpoints actually reduce checkpoint duration.
This two-part series explored advanced strategies for optimizing checkpointing within your Amazon Managed Service for Apache Flink applications. By harnessing the potential of buffer debloating and unaligned checkpoints, you can unlock significant performance improvements and streamline checkpoint processes. However, it’s important to understand when these techniques will provide improvements and when they will not. If you believe your application may benefit from checkpoint performance improvement, you can enable these features in your Amazon Managed Service For Apache Flink version 1.15 applications. We recommend first enabling buffer debloating and testing the application. If you are still not seeing the expected outcome, enable buffer debloating with unaligned checkpoints. This way, you can immediately reduce the state size and the additional I/O to state backends. Lastly, you may try using unaligned checkpoints by itself, bearing in mind the considerations we’ve mentioned.
With a deeper understanding of these techniques and their applicability, you are better equipped to maximize the efficiency of checkpoints and mitigate the effect of backpressure in your Apache Flink application.
About the Authors
Lorenzo Nicora works as Senior Streaming Solution Architect helping customers across EMEA. He has been building cloud-native, data-intensive systems for over 25 years, working in the finance industry both through consultancies and for FinTech product companies. He has leveraged open-source technologies extensively and contributed to several projects, including Apache Flink.
Francisco Morillo is a Streaming Solutions Architect at AWS. Francisco works with AWS customers helping them design real-time analytics architectures using AWS services, supporting Amazon Managed Streaming for Apache Kafka (Amazon MSK) and AWS’s managed offering for Apache Flink.