Multiple Regions, Single Pane of Glass

Jun 20, 2024
Emmanuel Pot
HN Disclaimer: WarpStream sells a drop-in replacement for Apache Kafka built directly on-top of object storage.

Multiple Regions, Single Pane of Glass

A common problem when building infrastructure-as-a-service products is the need to provide highly available and isolated resources in many different regions, while also having the overall product present as a “single pane of glass”  to end-users. Unfortunately, these two requirements stand in direct opposition to each other. Ideally, regional infrastructure is, well, regional, with zero inter-regional dependencies. On the other hand, users really don’t want to have to sign into multiple accounts / websites to manage infrastructure spread across many different regions.

When we were first designing how we would expand WarpStream’s cloud control planes from a single region to many, we searched around for good content on the topic and didn’t find much. Many different infrastructure companies have solved this problem, but very few have blogged about it, so we decided to write about the approach we took, and perhaps more importantly, some of the approaches we didn’t take.

Let’s start by briefly reviewing WarpStream’s architecture by tracing the flow of a single request through the system. An operation usually begins with a customer’s Kafka client issuing a Kafka protocol message to the Agents, say a Metadata request. Since Kafka Metadata requests don’t interact with raw topic data like Produce and Fetch do, they can be handled solely by the WarpStream control plane. So when the WarpStream Agents receive a Kafka Metadata request, they just proxy it directly to the control plane.

WarpStream Agents deployed in customer cloud account, sending metadata requests to WarpStream's Metadata Store.

The request will hit a load balancer, and then one of WarpStream’s “Gateway” nodes. The Gateway node’s job is to perform light authentication and authorization (basically, verify the request’s API key and map it to the correct customer / virtual cluster), and then forward the request to the Metadata Store for this customer’s cluster.

Based on this, it’s already clear that WarpStream’s control plane has to deal with two very different types of data:

  1. Platform data: everything that users can control from our web console and APIs:  users, clusters, api keys, SASL credentials, etc. This data is persisted in a primary Aurora database that runs in us-east-1 and changes very infrequently.
  2. Cluster metadata: all the metadata that enables WarpStream to present the abstraction of Kafka on top of a low-level primitive like commodity object storage. For example, the Metadata Store keeps track of all the topic-partitions (and offsets) that are contained within every file stored in the user’s object storage bucket.

These two different types of data have very different requirements. The cluster metadata is in the critical path of every Kafka operation (both writes and reads), and therefore must be strongly consistent, extremely durable, highly available, and have low latency. As a result, we run every instance of the Metadata Store in a single region, whichever region is closest to the user’s WarpStream Agents. We also run each instance of the Metadata Store quadruply replicated across three availability zones, and we never replicate this metadata across multiple regions (for now). 

The requirements for the platform data, on the other hand, look completely different. This data changes infrequently, and the data being slightly stale is of no consequence (eventual consistency is ok). While platform data like API keys are technically required in the critical path, since they’re trivially cacheable for arbitrarily long periods of time, they’re not really in the critical path. Also, unlike the cluster metadata, some of the platform data needs to be available in multiple regions for the service to function as a single pane of glass.

When we were evaluating how to add support for additional regions to WarpStream, there wasn’t much to think about for the virtual cluster Metadata Stores. We would just run dedicated instances of it in more regions, and users would connect their Agents to whichever region was closest to their Agents since most (but not all) WarpStream clusters run in a single region anyways.

The platform data (like API keys) is a different story. We could have used the same approach we did with the Metadata Store for the platform data by running a dedicated (and fully isolated) Aurora instance in every region, but that would have resulted in a poor user experience. Every region would have presented to users as a fully independent “website” and users who wanted to run clusters in multiple regions would have had to maintain different WarpStream accounts, re-invite their teams, configure billing multiple times, etc which is not what we wanted.

Hub and Spoke

When we looked at these requirements, the architecture that seemed like the best candidate was a “hub and spoke” model. The us-east-1 region that hosts our Aurora cluster would be the primary “hub” region that hosts the WarpStream UI and all of our “infrastructure as code” APIs for creating/destroying virtual clusters. All the other regions would be “spokes” that run fully independent and isolated versions of WarpStream’s Metadata Store, but not the Aurora database that stores the “platform data”.

Three spoke regions running fully isolated Metadata Stores powered by platform data replicated from the hub region.

CRUD operations to create and destroy virtual clusters would always be routed to the hub region, but actual customer WarpStream clusters and their Agents would only ever interact with a single “spoke” region and have no cross regional dependencies.

This approach would give us the best of both worlds: a single pane of glass where WarpStream customers could manage clusters in any region, while still keeping regions independent from each other such that a failure in one region (including the hub region) would never cause a failure in any other region. The one caveat with this approach is that any unavailability of Aurora in the primary hub region would prevent customers from creating new clusters in all regions, but existing clusters would continue working just fine. We felt like this was an acceptable trade off.

However, this architecture did present a conundrum for us. In order for our product to present as a single pane of glass, some of the data in our primary region (like whether a virtual cluster exists, whether an API key was valid, etc) had to be made available in all of our spoke regions.

Hub region can read the platform data from the primary Aurora database, but where do the spoke regions read the platform data from?

But we also needed to avoid creating any critical path inter-regional dependencies. Whatever we ended up doing, we had to ensure that under no circumstances could the failure of a single region ever impact clusters running in different regions.

Easier said than done!

Option 1: Multi-Region Aurora

The first option we considered was to leverage AWS Aurora’s native multi-region functionality. Specifically, AWS Aurora has support for spawning read replicas in other regions. There are limits on how many additional regions can contain read replicas, and this approach would only work with AWS so we'd need a different solution for multi-cloud, but we thought this solution could be a good enough stop-gap in the short term to scale from a single region to a handful without much engineering work. We also really liked the idea of offloading the tricky problem of replicating a subset of our platform data to the AWS Aurora team.

Multi-region AWS Aurora cluster with the primaries in the hub region, and read replicas in the spoke regions.

Unfortunately, when we investigated further, we discovered that any unavailability of the primary Aurora region could result in unavailability of the secondary region read replicas. If that ever happened to us, we’d end up in a terrible situation: all of our spoke regions, and their associated clusters / Metadata Stores, would still be running (thanks to the in-memory caches), but restarting or deploying the control plane nodes would cause an incident due to the in-memory caches being dropped and unable to be refilled.

It turns out that multi-region functionality in Aurora is designed for a completely different use case: failing over regions fast when the primary region fails. Useful for that situation, but we wanted a solution that would never require manual intervention, so we ruled it out.

We briefly considered migrating to a different database with better multi-region availability support like CockroachDB or Spanner, but we had no previous experience with these technologies and migrating all of our platform data to a brand new database technology felt like overkill.

Option 2: Smart (and durable) Caches

Luckily, the platform data (like which virtual clusters exist and which API keys are valid) changes infrequently. So another approach we considered was to query the source of truth (our us-east-1 Aurora database) from all sub-regions, and then cache that data aggressively. For example, the first time a gateway node encountered an API key that it had not seen before, it would query Aurora in us-east-1 to determine if it was valid, and then cache the result in-memory.

ap-southeast-1 spoke region querying the AWS Aurora cluster in us-east-1 to fill its in-memory caches.

This approach was appealing because it would require only minimal code changes, and it took advantage of a strategy we were already employing within the primary region to be resilient against Aurora failures: in-memory caching. The Gateway nodes were already using a custom “smart” cache (internally referred to as the “loading cache”) that would fit the bill perfectly. This cache employs a number of tricks to make it suitable for critical use cases like this:

  1. It automatically deduplicates cache fills. This eliminates the thundering herd problem.
  2. It incorporates negative caching as a first class concept, so if the Gateway nodes keep receiving requests for API keys that don’t exist anymore, they don’t keep querying Aurora over and over again.
  3. It limits the concurrency of cache fills so that a flood of requests with new and unique API keys results in the cache being filled at a continuous (and manageable) rate instead of flooding Aurora with queries.
  4. It implements asynchronous background refreshes (again, with limited concurrency) so that changes in Aurora (like invalidating an API key) are eventually reflected back into the state of the in-memory caches. This ensures that in normal circumstances when Aurora is available, invalidating an API key is reflected within seconds, but in rare circumstances where Aurora is unavailable the API gateway nodes can keep running more or less indefinitely, as long as they aren’t restarted.

This smart caching strategy had served us well within our primary region, but ultimately we decided that it wasn’t an acceptable solution to our multi-region data replication problem. A failure of the primary Aurora database in us-east-1 wouldn’t immediately impair the other regions, but it would leave us unable to deploy or restart any of the control planes in our other regions until the availability of the Aurora database was restored. In other words, this approach suffered from the same problem as the Aurora read replicas approach.

Briefly, we considered extending our existing loading cache implementation to be durable so that we could restart control planes nodes, even when the primary Aurora database was down, without losing the data that had already been cached.

ap-southeast-1 spoke region querying the AWS Aurora cluster in us-east-1 to fill its in-memory caches, but then persisting the cached data to a local DynamoDB instance so that the Gateway nodes can still be restarted safely even if the primary Aurora cluster is unavailable.

However, we decided against that approach as well because it didn’t feel very stable. The way the system functioned when the primary Aurora database was available would be completely different than the way it functioned when the Aurora database was unavailable, and we didn’t like the idea of relying heavily on an infrequently exercised code path for such critical functionality.

Ultimately we decided that while the loading cache was great for caching data within a region, it was not an acceptable solution for replicating data across regions.

Option 3: Push-Based Replication (we chose this one)

Both of the models we considered previously were “pull based” models. Instead, we decided to pursue a “push based” approach using a technique we’d learned at previous jobs called “contexts”. A Context is a bundle of metadata with the following characteristics:

  1. Its values change slowly (if at all).
  2. The metadata is associated with specific clusters or tenants.
  3. The metadata needs to be made available on a large number of machines in a highly available manner.
  4. Availability is always favored over consistency, i.e. we’d rather use values that are several hours old than have the system fail entirely.

For example, one of the contexts we created is called the “cluster context” and it contains:

  1. The cluster’s ID
  2. The cluster’s name
  3. The ID of the tenant (customer) the cluster belongs to
  4. A few additional internal fields that are required for the Metadata Store to begin processing requests

Building the contexts was straightforward. We wrote a job that scans the Aurora database every 10 minutes, builds the contexts, and then writes them as individual key-value pairs to a durable KV store in the relevant spoke regions.

Context publisher replicating context from the hub regions to the spoke regions.

Of course, we pride ourselves on the fact that a new WarpStream cluster can be created in under a second, so forcing users to wait 10 minutes before their clusters were usable after creation wasn’t acceptable. Solving for this was easy though: when a new cluster is created (or any operation is performed that could result in a context being created or an existing one mutated), we submit an asynchronous request to the same job service that will trigger an update for that specific context immediately.

This gives us the best of both worlds. Changes to the contexts (like a new cluster being created, or an API key being revoked) are reflected in their associated subregions almost instantaneously, but in the worst case scenario where we forget to issue the async update request in some code path (or it fails for some reason), the issue will automatically resolve itself within a few minutes. In other words, this approach is fast in the happy path, and self-healing in the non-happy path. Simple and easy to reason about.

The primary downside of this approach is that it was a lot more work to implement. But we think it was worth it for a few reasons:

  1. We truly have zero inter-regional dependencies in the critical path. Instead, the primary region pushes updates to the sub-regions proactively, but the sub-regions never query the primary region or create any external connections. In fact, the spoke regions aren’t even aware of the hub region in any meaningful way. This makes reasoning about availability, reliability, and failure modes easy. We know the failure of one region will never impact other regions, because no region takes dependencies on another region, so it can’t have any impact by definition.
  2. The context framework we created is broadly useful. For example, in the future we’ll use it to build out support for our own feature flagging system without taking on any additional external dependencies.

With this setup, we have been able to deploy our control plane in three additional new regions all over the world, and we would be ready to spawn more depending on customers' needs. 

Author
Emmanuel Pot
Software Engineer
Return To Blog
Return To Blog