10 traps to know before using Flink

Adopting a new framework always brings many surprises. When you spend a few days to troubleshoot why the service is running abnormally, it turns out that it is only because of the wrong usage of a certain function or the lack of some simple configurations.

At Contentsquare[1], we need to continuously upgrade data processing tasks to meet the demanding requirements of more and more data. This is why we decided to migrate hourly Spark tasks for session [2] processing to Flink [3] streaming services. In this way, we can take advantage of Flink's more robust processing capabilities, provide more real-time data to users, and provide historical data. But it's not easy, our team has been working on it for a year. At the same time, we also encountered some surprising problems, this article will try to help you avoid these pitfalls.

1. Load tilt caused by parallelism setting

Let's start with a simple question: when investigating the subtasks of a job in the Flink UI, you may encounter strange situations like the following regarding the amount of data processed by each subtask.

The workload of each subtask is not balanced

This shows that each subtask's operator does not receive the same number of Key Groups, which represent a fraction of all possible keys. If one operator receives 1 Key Group and another operator receives 2, the second subtask will likely need to do twice as much work. Looking at Flink's code, we can find the following functions:

Its purpose is to distribute all Key Groups to actual operators. The total number of Key Groups is determined by the maxParallelism parameter, and the number of operators is the same as parallelism. The biggest problem here is the default value of maxParallelism, which by default is equal to operatorParallelism + (operatorParallelism / 2) [4]. If we set parallelism to 10, then maxParallelism is 15 (the lower limit of the actual maximum concurrency value is 128, and the upper limit is 32768, here is just for convenience). In this way, according to the above function, we can calculate which operators will be assigned to which Key Groups.

In the default configuration, some operators are allocated two Key Groups, and some operators are allocated only one Key Group

Solving this problem is very easy: when setting the degree of concurrency, also set a value for maxParallelism, and this value is a multiple of parallelism. This will make the load more balanced and facilitate future expansion.

2. Pay attention to the importance of mapWithState & TTL

When dealing with data containing infinitely many keys, it is important to take into account the keyed state retention policy (via TTL timer to clean up unused data after a given time). The term "unlimited" is a bit misleading here, because if you're dealing with keys encoded in 128 bits, there will be a limit to the maximum number of keys (equal to 2 to the power of 128). But that's a huge number! You probably won't be able to store that many values in state, so it's best to consider your keyspace to be unbounded, and new keys to appear over time.

If your keyed state is contained in one of Flink's default windows, it will be safe: even if TTL is not used, a clear timer will be registered when processing the window's elements, which will call the clearAllState function, and delete The state associated with this window and its metadata.

If you want to use the Keyed State Descriptor [5] to manage the state, you can easily add a TTL configuration to ensure that the number of keys in the state does not increase without limit.

However, you'll probably want to use the simpler mapWithState method, which gives you access to the valueState and hides the complexity of the operation. While this is fine for testing and data with a small number of keys, it can cause problems in production with an infinite number of keys. Since the state is hidden from you, you cannot set a TTL, and by default no TTL is configured. That's why it's worth considering doing some extra work, like declaring something like a RichMapFunction , which will give you more control over the lifecycle of your state.

3. Restore and repartition from checkpoint

When using large states, it is necessary to use incremental checkpointing (incremental checkpointing). In our case, the full state of the task was about 8TB, and we configured checkpoints to be done every 15 minutes. Since checkpointing is incremental, we can only manage to send about 100GB of data to object storage every 15 minutes, which is much faster and less network intensive. This works great for fault tolerance, but we also need to retrieve state when updating tasks. A common approach is to create a savepoint for a running job that contains the entire state in a portable format.

However, in our case, savepoints can take hours to complete, making each release a long and cumbersome process. Instead, we decided to use Retained Checkpoints [6] . With this parameter set, we can speed up releases by restoring state from a previous job's checkpoint without having to trigger lengthy savepoints!

Also, although savepoints are more portable than checkpoints, you can still use preserved checkpoints to change a job's partitioning (it may not work for all types of jobs, so it's best to test it). This is exactly the same as repartitioning from a savepoint, but without Flink's lengthy process of redistributing data between TaskManagers. When we tried to do this, it took about 8 hours to complete, which was not sustainable. Fortunately, since we're using the RocksDB state backend, we can add more threads in this step to speed it up. This is done by increasing the following two parameters from 1 to 8:

Using reserved checkpoints, and increasing the number of threads allocated to RocksDB transfers, can reduce publish and repartition times by a factor of 10!

4. Increase logging in advance

This point may seem obvious, but it's also easy to forget. When developing a job, keep in mind that it will run for a long time and may process unexpected data. When this happens, you will need as much information as possible to investigate what happened without having to go back through the same data again to reproduce the problem.

Our task is to bring events together and combine them according to specific rules. Some of these rules perform okay most of the time, but take a long time when there is data skew. When we found a task stuck for 3 hours and had no idea what it was doing. It seems that only one TaskManager's CPU is working properly, so we suspect that specific data is causing our algorithm to perform poorly.

After finally processing the data, everything went back to normal, but we don't know where to start checking! That's why we've added some preventative logging for these cases: while processing the window, we measure the time spent. Whenever it takes more than 1 minute to calculate the window, we record all possible data. This was very helpful in understanding exactly what skew was causing the performance hit, and when this happened again we were able to pinpoint part of the reason why the merge process was slow. It may indeed take several hours if duplicate data is received. Of course, it's important not to log information too much, as this can slow down performance. So try to find a threshold where information is only displayed in exceptional cases.

5. How to find out what a stuck job is actually doing

Investigating the above issues also led us to realize that we needed to find an easy way to locate the currently running piece of code when the job was suspected to be stuck. Fortunately, there is an easy way to do this! First, you will need to configure the TaskManagers' JMX to accept remote monitoring. In a Kubernetes deployment, we can connect to JMX in three steps:

First, add this property to our flink-conf.yaml
Then, forward the local port 1099 to the port in the TaskManager's pod
Finally, open jconsole

This allows you to easily view the target TaskManager's information on the JVM. For stuck jobs, we targeted the only TaskManager running and profiled the running threads:

JConsole shows us what each thread is currently doing

Digging deeper, we can see that all threads are waiting except one (highlighted in the screenshot above). This allows us to quickly find out which method call the job is stuck in and fix it easily!

6. Risks of migrating data from one state to another

Depending on your actual situation, you may need to keep two different state descriptors with different semantics. For example, we accumulate events for ongoing sessions through the WindowContent state, and then move processed sessions into a ValueState called HistoricalSessions. The second state is kept for a few days in case it is needed later, until the TTL expires and discards it.

The first test we did worked fine: we could send additional data to the already processed session, which would create a new window for the same key. During the processing of the window, we fetch data from the HistoricalSessions state to merge the new data with the old session, and the resulting session is an enhanced version of the historical session, which is what we expect.

We've had memory issues a few times while doing this. After a few tests, we learned that OOM only happens when sending old data to Flink (i.e. sending data with a timestamp earlier than its current watermark). This led us to discover a big problem in the current way of handling: when old data is received, Flink merges it with the old window, while the data of the old window is still in the WindowContent state (this can be achieved by setting AllowedLateness). The result window is then merged with the HistoricalSessions content, which also contains the old data. What we end up with is duplicate events, and after a few events are received in the same session, each event will have thousands of duplicates, resulting in OOM.

The solution to this problem is quite simple: we want WindowContent to be cleared automatically before moving its contents to the second state. We use Flink's PurgingTrigger for this purpose, which sends a message to clear the contents of the state when the window fires.

7. Reduce VS Process

As mentioned above, our use of Flink relies on accumulating data for a given key and merging all this data together. This can be done in two ways:

Store the data in a ListState container, wait for the session to end, and merge all the data together when the session ends

Use ReducingState as each new event arrives, merging it with the previous one
Whether to use the first or second state depends on the function you run on the WindowedStream: a process call using ProcessWindowFunction will use ListState, while a reduce call using ReduceFunction will use ReducingState.

The advantages of ReducingState are obvious: instead of storing all the data before window processing, it is continuously aggregated in a single record. This usually results in a smaller state, depending on how much data is discarded during the reduce operation. For us, it offers little improvement in terms of storage, as the size of the state is negligible compared to the 7 days of data we store for historical sessions. Instead, we noticed a performance improvement by using ListState!

The reason is: successive reduce operations need to deserialize and serialize the data each time a new event arrives. This can be seen in the add function of RocksDBReducingState[7], which calls getInternal[8], which causes the data to be deserialized.

However, when updating the values in ListState using RocksDB, we can see that no serialization is happening [9]. This is thanks to the merging operation of RocksDB, which allows Flink to append data without deserialization.

In the end, we chose the ListState approach because of the performance gains that help reduce latency with minimal impact on storage.

8. Don't trust the input data!

Never assume your input will be what you expect it to be. Various unknown situations can arise, such as your tasks receiving skewed data, duplicate data, unexpected spikes, invalid records... Always think the worst and protect your assignments from these.

Let's quickly define a few key terms for later use:

"Page View (PV) Events" are the main messages we receive. It is triggered when a visitor loads the URL on the client side along with information like userId, sessionNumber and pageNumber

A "session" represents the sum of all interactions a user has without leaving the site. They are calculated by Flink by aggregating PV events and other information

To protect our tasks, we have added pre-filtering as much as possible. The rule we must abide by is to filter out invalid data as early as possible in the stream to avoid unnecessary expensive operations in the middle and late stages. For example, we have a rule that for a given session, no more than 300 PV events can be sent. Each PV event is marked with an incrementing page number to indicate its position within the session. When we receive more than 300 PV events in a session, we can filter them by:

Count the number of PV events when a given window expires

Discard events with page numbers greater than 300

The first option seems more reliable since it doesn't depend on the value of the page number, but we have to accumulate 300+ PV events in the state before we can exclude them. In the end we chose the second option, which eliminates bad data as it enters Flink.

In addition to these stateless filters, we also need to exclude data based on the metrics associated with each key. For example, the maximum size in bytes per session is set to 4MB. This number was chosen for business reasons and to help address a limitation of the state of RocksDB in Flink. In fact, if the value of the RocksDB API used by Flink exceeds 2^31 bytes [10], then it will fail. So if you use a ListState as explained above, you need to make sure you never accumulate too much data.

When you only have information about newly consumed events, it's impossible to know the current size of the session, which means we can't use the same tricks as we did with page numbers. All we do is store the metadata for each key (i.e. each session) in RocksDB in a separate ValueState . This metadata is used and updated after the keyBy operator, but before windowing. This means we can protect RocksDB from accumulating too much data in its ListState because based on this metadata we know when to stop accepting values for a given key!

9. The dangers of event timing

Event-time processing is great in most cases, but you have to keep in mind: if your method of processing late-arriving data is time-consuming, there may be some bad consequences. This problem is not directly related to Flink. When an external component is writing data to a Kafka topic while Flink is consuming data from this topic, if there is a problem with this external component, the data will arrive late. Specifically, when this component consumes some partitions more slowly than others.

This component (called Asimov) is a simple Akka streaming program that reads a Kafka topic, parses the JSON data, converts it to protobuf, and pushes it to another Kafka topic so that Flink can process the protobuf. The input to Asimov should be in order within each partition, but since the partitions are not mapped one-to-one with the output topic, some out-of-ordering may occur when Flink finally processes the message. This is fine, because Flink can support out-of-order by delaying watermarks.

The problem is that when Asimov reads one partition slower than the others: this means that Flink's watermark will advance with the fastest Asimov input partition (not Flink's input, since all partitions advance normally), and the slower Partitions of will emit records with older timestamps. This will eventually cause Flink to treat these records as late! This may be fine, but in our job we use specific logic to handle late records that need to fetch data from RocksDB and generate additional messages to Perform downstream updates. This means that every time Asimov falls behind on several partitions for some reason, Flink needs to do more work.

In a topic with 128 partitions, only 8 partitions accumulated latency, resulting in late arrival of data in Flink

We found two workarounds for this issue:

We can partition Asimov's input topic in the same way as its output topic (by userId). This means that when Asimov lags behind a few partitions, the corresponding partitions in the Flink input also lag behind, causing the watermark to advance more slowly:

We decided not to do this because the problem would still exist if we had late arriving data before Asimov, forcing us to partition each topic in the same way. But this cannot be done in many cases.

Another solution relies on batching late events: if we can defer the processing of late events, we can ensure that at most one update is produced per session, rather than one update per event.

We can implement the second solution by using a custom trigger to avoid triggering the window when a late event arrives. As you can see in the default EventTimeTrigger implementation, the late event does not register a timer under certain circumstances. In our scenario, we register a timer anyway, and the window doesn't fire immediately. Because our business requirements allow batching updates this way, we can ensure that we don't generate hundreds of expensive updates when there is a delay upstream.

10. Avoid storing everything in Flink

Let's end our discussion with some general points: If your data is large and not accessed very often, it is better to store it outside of Flink. When designing a job, you want all the required data to be available directly on the Flink nodes (either in RocksDB or in-memory). Of course, this makes working with this kind of data way faster, but it adds a lot of cost to your job when the data is large. This is because Flink's state is not replicated, so losing a node requires a full recovery from a checkpoint. Checkpointing itself is also expensive if you regularly need to write hundreds of gigabytes of data to checkpoint storage.

If access to state is a critical part of your performance requirements, it is definitely worth storing it in Flink. But if you can live with the extra latency, storing it in an external database with replication and fast access to a given record will save you a lot of headaches. For our use case, we chose to keep the WindowContent state in RocksDB, but we moved the HistoricalSessions data into Aerospike [11]. This makes our Flink jobs faster and easier to maintain due to the smaller state. We even benefit from the fact that the remaining data stored in Flink is small enough to fit in memory, which saves us from using RocksDB and local SSDs.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us