Kafka as a KV Store: deduplicating millions of keys with just 128 MiB of RAM

Mar 4, 2024
Manu Cupcic

Compacted Topics

A huge part of building a drop-in replacement for Apache Kafka® was implementing support for compacted topics. The primary difference between a “regular” topic in Kafka and a “compacted” topic is that Kafka will asynchronously delete records from compacted topics that are not the latest record for a specific key within a given partition. Also, Kafka will not allow records to be produced to a compacted topic without an associated key.

In other words, compacted topics have a “retention policy” that is not based on the size of the topic, or how old the records are, but whether or not an individual record is the latest version of a given key. If we squint even more, another way to think about compacted topics is that they convert the Produce operation from one that appends a record to the end of a topic-partition, to one that appends a record to the end of a topic-partition and allows previous records with that same key to be removed from the log (asynchronous upsert).

The primary use-case for this feature is to represent the state of a set of “resources” within a Kafka topic. The key is the “unique identifier” of that resource, and more recent records represent a more recent state of that resource. Systems that need to know the overall state of some (or all) resources can subscribe to the topic, read all the messages, and derive whatever materialized views they need.

Of course, you could just use a regular topic instead of a compacted one to accomplish this. A consumer that reads the full history of a regular topic would always arrive at the same final “materialized view” as a consumer of the equivalent compacted topic. However, with a regular topic you’d have to configure the cleanup policy to maintain infinite retention which would require an ever-increasing amount of disk space. 

In addition, consumers that wanted to generate a materialized view by reading the topic from the beginning would have to process O(n) records where n is the number of operations that have been performed on the dataset since it was first created. Given enough time, it would become impossible to restart or add new consumers at all! On the other hand, using a compacted topic ensures that disk usage and consumer restart time are O(n) where n is the cardinality of the unique resources in the system. A much more tenable scaling factor for many workloads.

Compacted topics are widely used within the Kafka ecosystem:

  1. Apache Kafka itself stores consumer group offsets in a compacted topic.
  2. Kafka Streams used compacted topics as the default “state store” implementation.
  3. Kafka Connect uses compacted topics to track offsets.
  4. Schema Registry uses compacted topics to store schema state.
  5. Debezium uses compacted topics to store CDC entries.

In the best case scenario (when you have a finite number of resources that are active at any point in time), this keeps the size of a compacted topic bounded, even with infinite retention.

Here is an example of what actually happens during log compaction:

Before / After compaction of an Apache Kafka log segment.

The compaction process scans for keys that are duplicates, and keeps only the last record for these keys. For example, K1 was found at offsets 0, 2 and 3 and only the record at offset 3 was kept in the log after compaction finished. The records at offset 0 and 2 were omitted from the log altogether, making it use less space on disk. On the other hand K6 was found only once, at offset 11, and so it was kept.

Note that a record’s offset never changes, so the offsets themselves have not been compacted to a tighter range: the remaining record with key K1 still has offset 3, the record with key K6 still has offset 11, etc. This means log compaction introduces gaps in “offset space” within a given partition, and that there are offsets that do not have a corresponding record anymore. This is fine, Kafka consumer clients know to expect this, and if you seek to an offset that doesn’t have a record anymore, the client will just continue reading at the next record that still exists.

Why compacted topics are challenging to implement

That all sounds straightforward enough, but how is it actually implemented and what guarantees can we expect? If you read the Apache Kafka documentation carefully, you’ll notice some hints that this feature is not as simple as it seems to implement.

The first hint is that the guarantees provided by compacted topics are less robust than what you might have expected. For example, there is no guarantee that the log compaction process will actually delete older versions of records in the log. In fact, never deleting any duplicate records would be a completely valid implementation of compacted topics according to the documented guarantees. But of course, users probably wouldn’t be very happy if that’s how it actually worked!

The second hint is that the settings contain advanced configuration options to tune memory usage:

  • log.cleaner.dedupe.buffer.size is for controlling how much memory is used by the thread that runs log compaction.
  • log.cleaner.io.buffer.load.factor is for controlling how full the deduplication buffer is allowed to get.

The reason that Kafka does not guarantee deduplication, and has advanced configuration to tune exactly how much memory is used, is that running log compaction “perfectly” would require either:

  1. An unbounded amount of memory
  2. An out-of-core shuffle

Let’s dive into the details of a hypothetical workload to understand why. Assume we have a topic with a single partition and log compaction enabled. This partition contains 1 billion records and each record is 1000 bytes long. This amounts to 1 TB of uncompressed data. In addition, each record has one key that is taken at random from a set of 1 million 36-character UUIDs.

Let us also assume that we are OK with the log compaction system using 128 MB of memory per active compaction, but not more.

Depiction of the size of an individual record in a log segment.

In the case where the cardinality of the keys is low, the work of the log compaction system is easy and can be performed using a “single pass” algorithm. The algorithm “just” builds a mapping from record keys to records and record offsets. When it sees a more recent record for a given key, it overrides the previous record, since the previous record will not need to be kept. It also tracks the insertion order so that it can output the retained records in the correct order.

Here’s a very simple version:

input_records = new_segment_iterator(segment_path) records = {} inserted = [] first_offset = record[0].offset for record in input_records: records[record.Key] = record inserted.append(record.Key) output_segment = new_segment_writer(path) for i, key in enumerate(inserted): last_offset = records[key].offset if first_offset + i == last_offset: output_segment.write(records[key])

However, in the general case, this requires an unbounded amount of memory. In our example where there are a million different unique keys, the hash map will contain a million different records (the last one for each key) and they will each be 1000 bytes long. That means we’d need at least 1GiB of RAM to deduplicate all of the records in memory for this workload. With the 128MB budget we set in our example, we would only manage to deduplicate a tiny portion of the records.

As we’ll see in the following sections, there’s a better version of this algorithm that dramatically reduces the memory requirements by eliminating the need to store the entire record value in-memory, as well as replacing the in-memory keys with in-memory hashes. However, even with these improvements, the space complexity of the algorithm remains O(n) with the number of unique keys in the workload.

The only way to prevent our algorithm from being memory constrained entirely would be to take a page out of database literature and use an external sort, but that would make the algorithm much more expensive and trade memory usage for disk space amplification.

This is why Apache Kafka does not guarantee that it can actually deduplicate records. If two records share a key, but there are too many distinct keys in the log between them, it is impossible to deduplicate with a fixed amount of memory without performing an expensive external sort.

How it works in Apache Kafka

The way Apache Kafka solves this problem is smarter than the naive implementation we outlined above.

First, Kafka chooses to read the whole dataset twice in two different passes to reduce the memory footprint. Instead of building a mapping of key → <record, latest_offset>, it builds a mapping of just key → <latest_offset> in the first pass, and then uses this mapping in the second pass to determine which records actually need to be copied into the new segment. This reduces memory usage dramatically, especially when keys are small and values are large as is common in many workloads.

# First pass. input_records = new_segment_iterator(segment_path) offsets = {} for record in input_records: offsets[record.Key] = record.Offset # Second pass. input_records = new_segment_iterator(segment_path) output_segment = new_segment_writer(path) for record in records: latest_offset = offsets[record.Key] if record.Offset == latest_offset: output_segment.write(record)

This algorithm is more CPU and IO intensive, because Kafka has to read and deserialize each record twice (once in each pass of the algorithm), but the in-memory buffer now only needs to store the keys. In our example, each key is 36 bytes long and each offset is 8 bytes, so the 128 MB buffer can guarantee deduplication across almost 3 million records, a huge improvement over the previous implementation which could only guarantee deduplication across ~129,000 records with the same amount of memory.

However, this implementation still struggles in scenarios where the keys themselves are large. So Apache Kafka does not store the keys themselves in this map, but uses a 16-byte hash of the keys (and an 8-byte offset) instead. The hashing algorithm that is used is MD5, and it has a low-enough probability of hash collision that Kafka considers two keys equal if their hashes are equal.

With this, a 128 MB buffer can deduplicate across more than 5 million keys in a single partition regardless of how large the keys or records are. Not bad!

How it works in WarpStream

WarpStream also uses a two-pass algorithm that stores hashes instead of keys. However, there was one additional challenge that we had to solve with WarpStream: Unlike Apache Kafka, WarpStream cannot (cheaply) seek back to the beginning of a file.

In Kafka, a segment file is stored on the local disk. It is possible to open it, read all of the data for the first pass, seek back to the beginning of the file and then re-process all of the data during the second pass.

WarpStream, on the other hand, operates directly on files in commodity object storage. That means “seeking” around different offsets in files is expensive because each “seek” requires paying for at least one GET request. If we had naively reused the exact same sequential algorithm that Apache Kafka does, we’d have to perform 2 GET requests for each tuple of <input_file, topic_partition> participating in a compaction. For workloads with thousands of topic-partitions this would have been prohibitively expensive.

As a side-note, this is one of the reasons that retrofitting tiered storage into existing systems like Apache Kafka is so difficult. Much of Kafka’ existing architecture assumes that seeking within a file is a cheap and fast operation. Once segments have been offloaded to object storage, this assumption is no longer true. For this reason, most Kafka tiered storage implementations do not support compacted topics at all.

Luckily, there is a big difference between WarpStream’s storage engine and Apache Kafka’s tiered storage implementation that makes the GET request problem easier to solve. Specifically, unlike Apache Kafka which creates per topic-partition segment files, WarpStream’s storage engine creates files containing records from many different topic-partitions. This is important even outside the context of compacted topics because opening and writing to a new file incurs a significant cost when using object storage. That’s why for normal Produce requests, the WarpStream Agents will write a single file every 250 ms (or 8MiB, whichever comes first) that contains data from all the different topic-partitions that it receives batches of data for within that time frame.

High level depiction of a WarpStream segment file that contains data for many different topic-partitions.

When we were implementing support for compacted topics in WarpStream, we realized that what we really needed was a way to perform the two-phase algorithm without having to constantly “seek” backwards to the beginning of a given topic-partition in the file to begin phase 2 once we had completed phase 1. The answer turned out to be quite simple: instead of seeking backwards constantly (performing new GET requests), we would just read all of the data twice like Apache Kafka does. The trick was to do so using only two GET requests that create two readable streams at the beginning of each file, and treat both streams as distinct pointers that are advanced independently. Specifically, the first phase of the algorithm for a given topic-partition always uses the first input stream, and the second phase of the algorithm always uses the second.

Let’s walk through the algorithm step by step now. Note that in reality multiple input files would participate in a compaction, but we’ll pretend there is only one to simplify the explanation and diagrams.

First, WarpStream will open two readable streams for each of the input files. The first pass of the algorithm reads the first stream to map each record's key hash to the record's latest offset. The second pass reads the other stream and looks up every key to determine whether each successive record should be written to the compacted output file.

Next, the phase 1 stream will be advanced until it has read all of the records and generated the in-memory map for the next topic-partition in the file (in this case, topic a partition 0).

Next, the phase 1 stream will be advanced until it has read all of the records and generated the in-memory map for the next topic-partition in the file (in this case, topic a partition 0).

Now that the offset map has been built, the phase 2 stream can be advanced, and as it advances it will consult the map built in phase 1 to redetermine which records it reads should be written to the output file.

Once phase 2 is complete, both stream pointers will be at the beginning of the next topic-partition and the process repeats until all topic-partitions in the file have been processed and the compaction is complete. Of course, this may seem like it's almost exactly the same thing as seeking or opening new streams per topic-partition. And that would be true if we were programming against an SSD, but when the backing storage is an object store the unit economics are completely different. Instead of paying for potentially thousands or tens of thousands of object storage GET requests, we pay for exactly 2 GET requests per input file regardless of how many partitions are present.

This approach isn’t just a trick to side-step object storage API costs either. Ask anyone who has ever built or operated an object store and they’ll tell you that the hardest part to maintain and scale is the metadata layer. Our two pointer algorithm puts the same amount of load on the object store data plane as the seek approach would, but it results in 100 or 1000s fewer metadata operations which are required to initiate each GET request.

While implementing support for compacted topics in WarpStream was a bit more difficult and complex than implementing it in Apache Kafka, there is an unexpected benefit. There are no special caveats about how compacted topics in WarpStream interact with “tiered storage” like there are in many other Kafka implementations. Compacted topics in WarpStream always store all of their data in object storage, and behave exactly the same way regardless of how “old” the data is or where it’s being stored.

Conclusion

There’s a lot more that went into implementing support for compacted topics in WarpStream than what we covered in this post. For example, we had to create a system to keep compacted and uncompacted topic data separated from each other at the file level, even though they’re initially mixed when the Agents first create ingestion files. A lot of work also went into our compaction planner to select which files to compact together and fine tune the trade-offs between write and read amplification. We even had to modify our file format to use roaring bitmaps to account for the fact that there might be holes in the offset space between records for record-batches that belonged to compacted topics. Unfortunately, this post is already long enough so those topics (no pun intended!) will have to be the subject for a different post.

The latest version of WarpStream now has full support for compacted topics. If you’d like to learn more about WarpStream check out our documentation, and please contact us or book a demo!

Return To Blog
Return To Blog