The Case for Shared Storage

Nov 19, 2024
Richard Artoul
HN Disclaimer: WarpStream sells a drop-in replacement for Apache Kafka built directly on-top of object storage.

Introduction

In this post, I’ll start off with a brief overview of “shared nothing” vs. “shared storage” architectures in general. This discussion will be a bit abstract and high-level, but the goal is to share with you some of the guiding philosophy that ultimately led to WarpStream’s architecture. We’ll then quickly transition to discussing the trade-offs between the two architectures more specifically in the context of data streaming and WarpStream; this is the WarpStream blog after all!

Shared Nothing

The term “shared nothing” was first introduced as a distributed systems architecture in which nodes share “nothing”, where “nothing” was defined (in practice) as either memory or storage. The goal with shared-nothing architectures is to improve performance and scalability by minimizing contention and coordination overhead. The reasoning for this is simple: if contention and coordination are minimized, then the system should scale almost linearly as nodes are added, since each additional node provides significant additional capacity, and doesn’t incur (much) additional overhead on the existing nodes.

The most common way that shared-nothing architectures are implemented is by sharding or partitioning the data model. This is almost definitionally true: in order for nodes in the system to avoid excessive coordination, each node must only process a subset of the data, otherwise every request would inevitably involve interacting with every node. In fact, the relationship between shared nothing and sharded architectures is so strong that the terms can be used almost interchangeably. Some people will still refer to a sharded distributed system as leveraging a “shared nothing” architecture, but more commonly they’ll just describe the system as “sharded” or “partitioned”.

Today, the term “shared nothing” is usually reserved for a more specific flavor of sharded distributed system where sharding happens at the CPU level instead of at the node level. Specifically, the term is often used to describe systems that leverage a process-per-core or thread-per-core model where each core of the machine acts as its own logical shard / partition with zero (or very minimal) cross-CPU communication. This architecture is usually implemented with an event-loop-based framework that runs on each CPU using processor affinity (CPU pinning). A popular example of this is the C++ Seastar library, which is used by databases like ScyllaDB.

Shared-nothing architectures have a lot of benefits –  primarily that they scale (almost) infinitely for perfectly shardable workloads. Of course, the primary downside of shared-nothing architectures is that they’re susceptible to hotspotting if the workload doesn’t shard well. For example, if you write records to a sharded KV store like Redis or Cassandra, but 90% of the records have the same partition key, then scaling the cluster beyond the maximum throughput of a single node will be impossible because the entire cluster will be bottlenecked by the node(s) responsible for the hot partition key.

This problem is particularly acute for systems that take “shared nothing” to its logical extreme with CPU-level sharding. The reason for this is simple: in a system where sharding happens at the node level, the maximum potential throughput of a single shard is the maximum throughput of a single node which can be increased with vertical scaling, whereas if sharding happens at the CPU level, the maximum potential throughput is bound by the maximum throughput of a single core.

Because of all this, heat management (the process of trying to keep every shard evenly balanced) is the defining problem that shared-nothing distributed systems must solve.

Shared Storage

Shared storage systems take a very different approach. Instead of sharding at the node level or cpu level, they shard at the storage level using remote storage. In practice, this is usually accomplished by using a remote storage system that is implemented as a shared-nothing architecture (like commodity object storage), and combining it with a centralized metadata store.

The metadata store acts as a central point of coordination (the exact opposite of a shared-nothing architecture), which enables the compute nodes in the system to behave as one logical system while still performing work independently. In terms of what the metadata is, that varies a lot from one shared storage system to another, but in general, the primary responsibility of the metadata layer is to serve as a strongly consistent source of truth about what data exists in the system, and where it is located. In addition, it is the metadata layers’ responsibility to guarantee the overall correctness of the system behaving in a highly distributed manner: ensuring that operations are performed atomically/transactionally, resolving conflicts, preventing duplicates, etc.

This technique is commonly referred to as “separation of storage and compute”, but a phrase I’ve found to be more useful is “separation of data from metadata”. What does this mean? Well, compare and contrast a shared-nothing distributed log-structured merge-tree (LSM) like Cassandra, with a shared storage distributed LSM like a modern data lake.

In Cassandra, there are $REPLICATION_FACTOR nodes that are responsible for all the data for a given partition key. When we want to interact with that data, we must route our requests to the nodes responsible for that key no matter what, and then consult the metadata stored on those nodes to find the data that we want to process (if it exists). With this architecture, the maximum throughput of a partition key will always be bound by the maximum throughput of a Cassandra node.

In a modern data lake, the metadata store introduces a layer of indirection between the sharding scheme (I.E the user-facing data model) and the storage layer. It doesn’t matter at all which storage node(s) the data is stored on, because its location is tracked and indexed in the metadata store. As a result, we can pick a sharding key for the storage layer that shards perfectly, like a UUID or strong hash function. In distributed LSM terms, this means we could write all of the records to the system with the same partitioning key, and still evenly distribute the load across all of the storage nodes in the system.

For example in the diagram above, imagine the client is constantly writing to the same key: “key1”. In a shared-nothing architecture, all of this traffic will be routed to the same storage node and overload it. In a shared-nothing architecture, the layer of indirection created by the intermediary compute layer and centralized metadata store results in the load being evenly distributed across the storage nodes.

This results in a very different set of trade-offs from shared-nothing architectures: the system will not scale infinitely, even with a perfect sharding/partitioning key, because the centralized metadata store is a (potential) bottleneck. However, the problem of hotspotting disappears almost entirely because as you can see in the diagram above, we can balance writes against the storage nodes however we want, whenever we want. In fact, not only does hotspotting become a non-issue, but the system also gains the ability to shift load around the cluster almost instantaneously.

This is the killer feature that explains why almost every modern data lake / warehouse is implemented as a shared storage architecture instead of a shared-nothing one: the ability to choose at query time whether to recruit one CPU or 10,000 to process an individual request is what enables all of the performance and functionality that defines the modern data landscape.

Of course, while this architecture solves the hotspotting problem, it’s not without trade-offs. If heat management is the defining problem for shared-nothing systems, then metadata scaling is the defining problem for shared storage systems. We’ll discuss this problem more later in the WarpStream Metadata Scalability section.

One Final Tradeoff: Flexibility vs. Latency

The split between shared nothing and shared storage architectures is not a hard boundary –many systems lie somewhere in the middle and include aspects of both. But in general, highly transactional systems (like Postgres) tend to lean toward shared-nothing architectures, whereas highly analytical systems (like Snowflake) tend to lean toward shared storage architectures. The reason for this is primarily due to the inherent trade-offs around flexibility and latency.

Transactional systems forgo flexibility to reduce latency. For example, relational databases require that you define your schemas and indexes up front, that your data is (mostly) structured, that you pre-size your database instances to the amount of expected load, and that you think hard about what types of queries your application will need to run up front. In exchange, they will happily serve tens of thousands of concurrent queries with single-digit milliseconds latency.

Analytical systems take the exact opposite approach. You can run whatever query you want, whenever you want, regardless of the existing schemas. You can also recruit as much hardware as you want at a moment's notice to accelerate the queries, even thousands of cores for just a few minutes, and you don’t have to think about what types of queries you want to run up front. However, your data lake / warehouse will almost never complete any queries in single-digit milliseconds. Even double-digit milliseconds query execution time is rare for analytical databases in practice, except for the easiest workloads.

The details and intuitions behind why shared nothing architectures can provide much lower latency than shared storage architectures are beyond the scope of this blog post, but here’s a simple intuition: Since shared storage architectures involve so much more coordination, they tend to do a lot of batching to improve throughput; this results in higher latency.

Apache Kafka and Other Data Streaming Systems

OK, let’s get more specific and talk about the data streaming landscape. Apache Kafka is a classic shared-nothing distributed system that uses node-level sharding to scale. The primary unit of sharding in Kafka is a topic-partition, and scaling is handled by balancing topic-partitions across brokers (nodes).

This means that Apache Kafka can handle imbalances in the throughput (either read or write) of individual topic-partitions reasonably well, but the maximum throughput of a single topic-partition will always be bound by the maximum throughput of a single broker. This is obvious if we go back to the diagram from earlier:

The bigger the machine we can get Apache Kafka to run on, the more resilient it will be to variation in individual topic-partition throughput. That said, while some imbalance can be tolerated, in general, the topic-partitions in a Kafka cluster need to be well balanced across the brokers in order for the cluster to scale properly. They also need to be balanced across multiple dimensions (throughput, requests per second, storage, etc.).

As discussed earlier, the trade-offs with this approach are clear: Apache Kafka clusters can scale linearly and (almost) infinitely as long as additional brokers and partitions are added. However, topic-partitions must be balanced very carefully across various dimensions, adding or removing capacity takes a long time (especially if you use very large brokers!), and there are hard limits on the maximum throughput of individual topic-partitions, especially in an already-busy cluster.

Of course, Apache Kafka isn’t the only technology in the data streaming space, but in practice, almost all of the other data streaming systems (AWS Kinesis, Azure Event Hubs, AWS MSK, etc.) use a similar shared-nothing architecture and as a result experience similar tradeoffs.

In fact, for a long time, shared-nothing was widely considered to be the correct way to build data streaming systems, to the point where even some of the newest entrants to the data streaming space leaned even further into the shared-nothing architecture by leveraging libraries like Seastar(C++) to do CPU-level sharding of topic-partitions. This enables lower latency in some scenarios, but exacerbates all of Apache Kafka’s topic-partition balancing issues even further since the maximum throughput of a single partition is now bound by the maximum throughput of a single core instead of a single broker.

Unless you need microsecond-level performance, the trade-offs of using CPU-level sharding for data streaming workloads are simply not worth it. Another thing I won’t dwell on, but will point out quickly is that while it’s tempting to think that tiered storage could help here, in practice it doesn’t.

WarpStream’s Shared Storage Architecture

With WarpStream, we took a different approach. Instead of doubling down on the shared-nothing architecture used by other data streaming systems, we decided to take a page out of the data warehousing playbook and build WarpStream from the ground up with a shared storage architecture instead of a shared-nothing architecture.

Instead of Kafka brokers, WarpStream has “Agents”. Agents are stateless Go binaries (no JVM!) that speak the Kafka protocol, but unlike a traditional Kafka broker, any WarpStream Agent can act as the “leader” for any topic, commit offsets for any consumer group, or act as the coordinator for the cluster. No Agent is special, so auto-scaling them based on CPU usage or network bandwidth is trivial. In other words, WarpStream is the shared storage alternative to Apache Kafka’s shared nothing architecture.

WarpStream can still provide all the exact same abstractions that Kafka does (topics, partitions, consumer groups, ordering within a topic-partition, transactions, etc) even though the Agents are stateless and there are no leaders, because it uses a centralized metadata store that acts as the logical leader for the entire cluster. For example, two Agents can concurrently flush files to object storage that contain batches of data for the same topic-partition, but consumers will still consume the batches in a deterministic order because the metadata store will determine the order of the batches in the two different files relative to each other when the files are committed to the metadata store.

Because WarpStream relies on remote storage, it is a higher latency data streaming system than Apache Kafka. In practice, we’ve found that it's real-time enough (P99 latency in the hundreds of milliseconds) not to matter for the vast majority of use cases. And in exchange for this higher latency, WarpStream gains a lot of other benefits. 

We’ve written about many of those benefits before in previous posts (like this one on our zero disks architecture), so we won’t repeat them here. Instead, today I’d like to focus on one specific benefit that is usually overlooked: heat management and topic-partition limits.

In Apache Kafka, a topic-partition is a “real” thing. Somewhere in the cluster there is a broker that is the leader for that topic-partition, and it is the only broker in the cluster that is allowed to process writes for that topic-partition. No matter what you do, the throughput of that topic-partition will always be bound by the free capacity of that specific broker.

In WarpStream, topic-partitions are much more virtualized – so much so that you could configure a WarpStream cluster with a single topic-partition and write 10GiB/s to it across a large number of Agents. Consuming the data in a reasonable manner would be almost impossible, but you’d have no trouble writing it.

The reason this is possible is because WarpStream has a shared storage architecture that separates storage from compute, and data from metadata. In WarpStream, any Agent can handle writes or reads for any topic-partition, therefore the maximum throughput of a topic-partition is not bound by the maximum throughput of any single Agent, let alone a single core.

Obviously, there are not many use cases for writing 10GiB/s to a single topic-partition, but it turns out that having a data streaming system with effectively no limits on the throughput of individual topic-partitions is really useful, especially for multi-tenant workloads. 

For example, consider an Apache Kafka cluster that is streaming data for a multi-tenant workload where tenants are mapped to specific topic-partitions in some deterministic manner. A tenant typically doesn’t write more than 50MiB/s of data at peak, but every once in a while one of the tenants temporarily bursts 10x to 500 MiB/s.

With a traditional shared-nothing Apache Kafka cluster, every Broker in the cluster would always require an additional 450MiB/s of spare capacity (in terms of CPU, networking, and disk). This would be extremely inefficient and difficult to pull off in practice.

Contrast that with WarpStream where the additional 450MiB/s would be automatically spread across all of the available Agents so you would only need 450MiB/s of spare capacity at the cluster level instead of the node level which is much easier (and cheaper) to accomplish. In addition, since the WarpStream Agents are stateless, they’ll auto-scale when the overall cluster load increases, so you won’t have to worry about manual capacity planning.

But how does this work in practice while remaining within the confines of the Kafka protocol? Since any WarpStream Agent can handle writes or reads for any topic-partition, WarpStream doesn’t try to balance partitions across brokers as Kafka does. Instead, WarpStream balances connections across Agents. 

When a Kafka client issues a Metadata request to a WarpStream cluster to determine which Agent is the “leader” for a specific topic-partition, the WarpStream control plane consults the service discovery system and returns a Metadata response with a single Agent (one that has lower overall utilization than the other Agents in the cluster) as the leader for all of the topic-partitions that the client requested.

WarpStream's load balancing strategy looks more like a traditional load balancer than Apache Kafka which results in a full mesh of connections.

Another way to think about this is that with Apache Kafka, the “processing power” of the cluster is assigned to individual partitions and divided amongst all the Brokers when a rebalance happens (which can take hours, or even days to perform), whereas with WarpStream the “processing power” of the cluster is assigned to individual connections and divided amongst all the Agents on the fly based on observable load. “Rebalancing” happens continuously, but since its just connections being rebalanced, not partitions or data it happens in seconds/minutes instead of hours/days.

This has a number of benefits:

  1. It balances the overall cluster utilization for both produce and fetch across all the Agents equally regardless of how writes / reads are distributed across different topic-partitions.
  2. Each Kafka client ends up connected to roughly one Agent, instead of creating a full mesh of connections like it would with Apache Kafka. This makes it much easier to scale WarpStream to workloads with a very high number of client connections. In other words, WarpStream clusters scale more like a traditional load balancer than a Kafka cluster.
  3. The Kafka clients will periodically issue background Metadata requests to refresh their view of the cluster, so the client connections are continuously rebalanced in the background.
  4. Load balancing connections is an almost instantaneous process that doesn’t require copying or re-replicating data, whereas rebalancing partitions in Apache Kafka can take hours or even days to complete.

WarpStream Metadata Scalability

There’s still one final point to discuss: metadata scalability. We mentioned earlier in the shared storage section that the defining problem for shared storage systems is scaling the metadata layer to high-volume use cases. Since the metadata store is centralized and shared by the entire system, it’s the most likely component to become the limiting factor for an individual cluster.

In terms of what the metadata is for WarpStream, I mentioned earlier in the shared storage section that the metadata layer’s primary responsibility is keeping track of what data exists in the system, and where it can be located. WarpStream’s metadata store is no different: its primary responsibility is to keep track of all the different batches for every topic-partition, as well as their relative ordering. This ensures that consumers can read a topic-partition’s batches in the correct order, even if those batches are spread across many different files. This is how WarpStream recreates Apache Kafka’s abstraction of an ordered log.

How WarpStream solves the metadata layer scalability problem warrants its own blog post, but I’ll share a few key points briefly:

  1. Depending on the data model of the system being implemented, the metadata store itself may be amenable to sharding. This is interesting because it further solidifies the idea that the line between shared nothing and shared storage systems is blurry where a shared storage system may be implemented with dependencies on a shared nothing system, and vice versa.
  2. Good design that incorporates batching and ensures that the ratio of $DATA_PLANE_BYTES / $CONTROL_PLANE_BYTES is high minimizes the amount of work that the metadata store has to perform relative to the data plane. A ratio of 1,000 ensures that the metadata store will scale comfortably to large workloads, and a ratio of 10,000 or higher means the metadata store will likely never be the bottleneck in the first place even if it runs on a single CPU.

To make this more concrete, consider the following real WarpStream cluster. At peak, the cluster handles roughly 4.5GiB/s of traffic:

At this peak, the metadata store for this cluster is less than 10% utilized. This implies that with no further changes, this workload could scale another 10x to over 40 GiB/s in write throughput before the metadata store became a bottleneck. This is a real customer workload, not a benchmark, running with our default metadata store settings, with no special tuning or optimizations to handle this particular workload.

Of course in reality there are many different factors that impact the metadata store utilization besides write throughput. Things like the number of Kafka clients, how they’re configured, the number of topic-partitions that are being written / read from, etc.

But in practice, we’ve never encountered a workload that came even close to the theoretical limits of our metadata store. The highest metadata store utilization we’ve ever observed across any of our clusters currently sits at 30%, and that’s a single WarpStream cluster that serves hundreds of applications, more than 10,000 clients, and has nearly 40,000 topic-partitions. In addition, this particular customer onboarded to WarpStream after several failed attempts to scale their workload with alternative systems (not Apache Kafka) that use CPU-level shared-nothing architectures. These systems should have scaled better than WarpStream in theory, but in practice were plagued by heat management issues that made it impossible for them to keep up with the demands of this workload.

Conclusion

I’ll end with this: shared-nothing architectures are incredibly attractive for their theoretical scaling properties. But actually realizing those benefits requires finding a natural sharding key that’s very regular, or deploying an incredible amount of effort to face the heat management problem. In the real world, where it’s hard to keep all your clients very well-behaved, hoping the sharding key is going to keep your workload very balanced is often unrealistic. To make things worse, it often needs to be balanced across multiple dimensions like write throughput, read throughput, storage size, etc.

Shared storage architectures, on the other hand, have a lower theoretical scale ceiling, but in practice they are often much easier to scale than their shared nothing counterparts. The reason for this is simple, but not obvious: shared storage systems separate data from metadata which introduces a layer of abstraction between the user-facing domain model and the physical sharding used by the storage engine. As a result, it is possible to choose at runtime how much of the resources we allocate to storing or retrieving data for a particular key, rather than forcing us to choose it when we create the cluster topology. This solves the heat management problem in a very simple way. 

In exchange for this massive benefit, shared storage architectures usually incur a higher latency penalty and have to figure out how to scale their centralized metadata stores. While scaling the metadata layer seems daunting at first, especially since sharding is often impractical, it turns out that often the metadata problem can be made so small that it doesn’t need to be sharded in the first place.

Shared storage architectures are not the answer to every problem. But they’re so much more flexible and easier to manage than shared-nothing architectures, they should probably be the default for all but the most latency-sensitive workloads. For example, as we outlined earlier in the WarpStream section, the ability to leverage the abstraction of Kafka without ever having to deal with topic-partition balancing or per-partition limits is a huge improvement for the end-user. In addition, with modern cloud storage technologies like S3 Express One Zone and even DynamoDB, the latency penalty just isn’t that high.

Create a free WarpStream account and start streaming with $400 in free credits.
Get Started
Author
Richard Artoul
Co-Founder
Return To Blog
Return To Blog