The Kafka Metric You're Not Using: Stop Counting Messages, Start Measuring Time

Jul 16, 2024
Aratz Manterola Lasa
HN Disclaimer: WarpStream sells a drop-in replacement for Apache Kafka built directly on-top of object storage.

Consumer groups are the backbone of data consumption in Kafka. Consumer groups are logical groupings of consumers who work together to read data from topics, dividing the workload by assigning partitions to individual group members. Each group member then reads messages from its assigned partitions independently. Consumer groups also keep track of consumption progress by storing offset positions for every topic partition that the group is consuming. This ensures that when a member leaves the group (because it was terminated or crashed), a new member can pick up where the last one left off without interruption.

Depiction of a Kafka consumer group. Consumers read from their respective partitions, and commit their progress (as Kafka offsets) back to the cluster.

Consumer groups are great, but monitoring them can be a challenge. Specifically, it can be tricky to determine if your consumers are keeping up with the incoming data stream (i.e., are they “lagging”) and, if not, why. In this post, we'll explain why the usual way of measuring consumer group lag (using Kafka offsets) isn't always the best and show you an alternative approach that makes it much easier to monitor and troubleshoot them.

The most common way to monitor consumer groups is to alert on the delta between the maximum offset of a topic partition (i.e., the offset of the most recently produced message) and the maximum offset committed by the consumer group for that same topic partition. We’ll call this metric “offset lag.”

Offset lag is the delta between the committed offset and the offset of the last produced record for each topic-partition.

Consumer groups track their own progress using Kafka offsets, so intuitively, it makes sense to reuse the same mechanism to monitor whether they’re keeping up. High offset lag indicates that your consumers can't keep up with the incoming data, necessitating action like increasing the number of consumers, partitions, or both. In addition, the rate of change of consumer group lag is an important early indicator of potential problems and a good indicator that attempts to mitigate observed increases in lag are working.

The Problem with Consumer Offset Lag

Tracking consumer group offset lag can be a really useful way to monitor an individual Kafka consumer. However, converting offset lag into a value that is meaningful to humans or that can be compared with other workloads is difficult.

 Let’s use a concrete example to make this more clear. Imagine you're an infrastructure engineer responsible for managing your company's data streaming platform. In a recent incident, one team's consumer application fell so far behind that customer data was delayed for hours. No monitors were fired, and you only discovered the issue when some of your (rightfully angry!) customers complained.

As a remediation item, you've been tasked with ensuring that all Kafka consumers are monitored, so alarms will go off if any consumers fall “too far” behind.

Great! We just learned about the concept of offset lag, so you can create a monitor on the offset lag metric and group by consumer group name, right? All you have to do is pick the offset lag "threshold" beyond which the monitor should fire.

You run the query in a dashboard to see the current values, and you are shocked to find that the current offset lag for your various consumer groups varies wildly, from 10 (no extra zeros!) to 12 million. You freeze in panic. “Are we having an incident right now!?

Two different consumer groups with wildly varying offset lag.

After some investigation and talking to other teams, you realize this is normal. Some of these consumer groups naturally have much higher throughput than others, so their baseline offset lag is higher because there’s more data “in-flight” at any given moment. Other consumer groups process data in large batches, accumulating large amounts of offset lag, consuming it all at once, and then repeating that process.

Every team's use case makes sense in isolation, but now you're stuck. How in the world will you pick one threshold that makes sense for all of these different workloads? You could pick different thresholds for each workload, but even then, you'll probably get woken up in the middle of the night with false alarms when some of these workloads grow in throughput and their baseline offset lag increases, even if the actual consumers are keeping up just fine.

Time Lag: A More Intuitive Metric

To overcome the limitations of offset-based lag, the Kafka community has introduced a more intuitive metric called "time lag”. While intuitive, this concept wasn't immediately available in Open Source Kafka's native tooling. Companies like Agoda and AppsFlyer recognized its value and developed their own solutions, with Agoda notably sharing their insights in a blog post that inspired many in the community (including us!). Since then, tools like Burrow and kafka-lag-exporter have emerged, offering simple ways to calculate the time lag as part of their Kafka monitoring tools.

Imagine once again that you’re an infrastructure engineer, and you’re in the middle of an incident where one of your consumer groups has fallen behind. Your customers are asking you how delayed their data will be. They're likely to look at you with a blank stare if you tell them: "your data is delayed 30 million offsets", but they'll understand immediately if you tell them the maximum data delay is 17 minutes.

Time lag is calculated using the following function:

<span class="codeinlinemb">Time Lag = CurrentTime - LastTimeConsumedOffsetWasLatest</span>

Where <span class="codeinline">LastTimeConsumedOffsetWasLatest</span> is defined as the moment when the last consumed message was also the most recently produced message.

Let's illustrate that with an example. Imagine a Kafka topic where:

  • The latest produced message has offset 15 and was generated at 3:15 PM.
  • A consumer group processes messages up to offset 10 by 3:20 PM.
  • The message with offset 11 was produced at 3:10 PM.

In this scenario, <span class="codeinline">LastTimeConsumedOffsetWasLatest</span> is 3:10 PM. This is because at 3:09:59 PM, offset 10 was still the latest message on the topic. However, at 3:10 PM, offset 11 was produced, meaning the consumer started to fall behind at that exact moment. So we round this up to 3:10 PM.

Therefore, at 3:20 PM, the time lag is calculated as:

<span class="codeinlinemb">Time Lag = 3:20 PM - 3:10 PM = 10 minutes</span>

This means the consumer group is 10 minutes behind the most recent message.

Another way to put it: “time lag” is the time elapsed since the next-message-to-be-consumed was produced. This definition is simple but also deceptively elegant: by making it relative to the current time, the metric keeps increasing if there are any unprocessed records, even if producers and consumers have stopped entirely. It acts as an alarm, alerting you to unprocessed messages even when the system appears idle. 

An Integrated Approach to Time Lag Calculation

While monitoring time lag can be a game-changer, accessing this metric isn't always straightforward. If you search online resources, you'll find the primary method involves third-party tooling, making it very easy to calculate this metric, like Burrow. These tools are great; they really make monitoring trivial. However, Burrow is yet another piece of software that has to be deployed, maintained, and troubleshooted.

At WarpStream, we like to make things easy. Asking our users to install third-party tooling just to know if their consumer applications were caught up didn't sit right with us. So, we decided to build time lag measurement directly into WarpStream so that all our users would benefit from it out of the box. 

This is probably a good time to briefly review WarpStream’s architecture. If you’re not familiar with it, WarpStream is a drop-in replacement for Apache Kafka built directly on top of object storage. WarpStream has many different architectural differences from Apache Kafka, but the one most relevant to the current topic is that in addition to separating computing from storage, WarpStream also separates data from metadata.

WarpStream architecture diagram. Agents (stateless thick proxies) run in the customers cloud account, and the metadata store runs in WarpStream's cloud account.

Customers’ raw data is stored exclusively in their own S3 buckets, accessible only to them. Meanwhile, WarpStream Cloud stores metadata in a highly available, quadruply-replicated metadata store.

The fact that WarpStream stores all of the cluster’s metadata in a centralized metadata store makes calculating time lag (relatively) straightforward. Unlike Apache Kafka, we don’t have to read or load any raw records or their headers; we can just query the timestamp index in the metadata store directly. This has the added benefit that it doesn’t rely on potentially unreliable record header timestamps (the client can set custom timestamps in the records). Instead, WarpStream maintains its own accurate timestamps in the metadata store and uses optimized data structures for time-based searches.

There was one challenge we had to solve, though: metrics are published by the Agents (data plane), which run in the customer’s environment and expose metrics via a Prometheus endpoint. However, the time lag calculation was running in WarpStream’s cloud control plane, so we needed a mechanism to make the time lag metrics the control plane generated available as Prometheus metrics in the Agents.

To solve this, we came up with a very simple solution: leverage WarpStream's existing job queueing system. WarpStream's architecture includes a centralized scheduler on the control plane that orchestrates various operational tasks. Agents, deployed within the customer's environment, regularly poll this scheduler to receive and execute tasks, including functions like data compaction and object storage cleanup. Leveraging this existing infrastructure, we introduced a new job dedicated to calculating time lag metrics. This job runs on the control plane, periodically computing the metrics and making them accessible for the agents to retrieve during their polling cycles, who then emit them. We liked this solution because it’s simple and allows us to provide more metrics easily in the future.

We leverage this metadata to provide the <span class="codeinline">warpstream_consumer_group_estimated_lag_very_coarse_do_not_use_to_measure_e2e_seconds</span> metric. Why such a weird name? As you’ll see later in our more detailed explanation, this value is coarse-grained and imprecise. For example, while the actual end-to-end latency for a workload may be 500ms, this metric could report that the consumer group time lag is as high as 5 seconds. We wanted to clarify that while this metric is valuable for monitoring and alerting, it should not be used for benchmarking.

This metric is an approximation, so it's not perfect, but it's great for getting a general idea of how things are going and catching bigger problems. If an incident happens and someone tries to use this metric to explain a one-millisecond delay, they're using the wrong tool for the job. We want people to feel comfortable setting alerts for more substantial delays (e.g., several minutes) because this metric excels at that. Think of it as a coarse-grained tool for catching big problems, not a fine-tuned instrument for performance tuning.

Offset lag for the blue consumer is more than 20x higher, but time lag is less than 2x higher.

The graph above showcases the difference between offset lag and time lag for two consumer groups. One group has a much larger offset lag of 20,000, while the other has a smaller lag of a few hundred. However, when we switch to the time lag, we see a different picture: both groups have very similar lags of 2 and 4.5 seconds. This shows how offset lag alone can be misleading and how time lag provides a more understandable overview of consumer group health.

Imagine trying to set alerts based on these metrics. With time lag, a single alert threshold (e.g., 2 minutes) could easily cover both consumer groups. With offset lag, you'd need to set different thresholds for each, carefully considering the nature of each workload and potentially missing alerts for the group with the "smaller" lag.

Behind the Scenes: The Mechanics of Time Lag Metrics

Having established the benefits of time lag over offset lag, let's delve into the technical implementation. Understanding this implementation will also show how WarpStream is able to calculate the time lag we introduced earlier: <span class="codeinline">Time Lag = CurrentTime - LastTimeConsumedOffsetWasLatest.</span>

WarpStream continuously tracks when messages are produced, associating each message offset with its corresponding timestamp. This data is stored internally in a way that allows us to efficiently query for offsets based on timestamps. To optimize storage, we aggregate this data into minute-level intervals. For each minute, we record the earliest offset produced (baseOffset) and the total number of offsets produced (offsetCount), effectively creating a compact time-series representation of message production.

When we need to know <span class="codeinline">LastTimeConsumedOffsetWasLatest</span> for a specific offset consumed by a consumer group, we use this index:

  1. We first locate the relevant minute-level interval that contains that offset. 
  2. Within that interval, we divide the time by the offsetCount to estimate how frequently messages were produced within that time range.
  3. Using the production rate and the offset's position within the interval, we calculate an estimated timestamp for when that specific message was produced. This gives us the <span class="codeinline">LastTimeConsumedOffsetWasLatest</span>, which we then subtract from the current time to obtain the time lag.

As mentioned earlier, a dedicated background job within WarpStream's control plane periodically calculates each cluster's time lag and other relevant metrics for every consumer group. This involves querying the committed offsets for every consumer group and partition and then utilizing the timestamp index to compute the corresponding time lag values. These calculated values are subsequently transmitted to the WarpStream Agents operating within the customer's environment. And finally the Agents expose these time lag metrics via their Prometheus endpoint, under the name <span class="codeinline">warpstream_consumer_group_estimated_lag_very_coarse_do_not_use_to_measure_e2e_seconds</span>

However, keeping a record of every minute would consume excessive storage space for clusters with many topic-partitions and high (or infinite) retention. To address this, we merge the index entries periodically. This involves merging multiple entries into one, updating the baseOffset and offsetCount, and introducing an additional field called minuteCount to keep track of the number of minutes the compacted entry represents.

This merging does sacrifice some timestamp precision, but we prioritize the most recent entries, ensuring we maintain their original accuracy untouched. Older entries are the only ones subject to merging. We prioritize recent entries because the more recent an offset is, the more crucial it is for consumers to have precise lag information. If a consumer is 10 minutes behind, a 30-second difference isn't a major concern. But for a consumer who's only 1 minute behind, that level of precision becomes much more important. In this way, we balance optimizing storage efficiency and maintaining the level of precision that matters most for effective monitoring.

Now, it’s clear why this metric is an approximation designed for monitoring and alerting, not precise benchmarking. The "very coarse" part of the metric's name highlights a few key limitations:

  • Interpolation: The metric is calculated by interpolating at least 1-minute level entries in the timestamp index, which can introduce inaccuracies compared to the true message production time.
  • Committed Offsets: The metric relies on committed offsets, which may not always reflect the most up-to-date consumption progress. Consumers can commit offsets at varying intervals, either immediately after processing a message or after processing an entire batch. This leads to potential discrepancies between the committed offset and the actual latest consumed message.

These factors make the metric less suitable for precise performance measurements but perfectly adequate for identifying significant delays in consumer group processing. Moreover, the utility of the timestamp index extends beyond just calculating the time lag. It also enables internal Kafka APIs to query for offsets based on specific timestamps, which is useful for features like time-based data retrieval and analysis.

Depiction of timestamp index compaction and subsequent interpolation.

In conclusion, monitoring Kafka consumer groups doesn’t need to be a guessing game. By shifting the focus from the message counts (offset lag) to time (time lag), understanding how consumers perform becomes trivial. With Warpstream’s built-in time lag metrics, this insight is readily available, ensuring you can monitor and react timely in case your data pipeline consumers start to fall behind. 

Create a free WarpStream account and start streaming with $400 in free credits.
Author
Aratz Manterola Lasa
Software Engineer
Return To Blog
Return To Blog