At the Wix server infrastructure group we work daily on a set of tools that help make the lives of our backend developers easier, while also helping them deliver their code to production faster and safer.
Wix handles huge traffic with more than 500 billion HTTP requests and 30 billion Kafka events messages daily with over 2500 microservices all across multiple data centers all over the globe. And these numbers continue to grow.
On this scale, caching is not “nice to have” anymore - it’s a must.
Because of Wix’s massive growth over a short period of time, several different developers created different caching solutions to help them reduce the number of HTTP requests and fancy DB queries. But these libraries didn’t cover all the needed features and were more of a “halfway” solution. On top of that, different groups used different libraries so it became unhealthy for the organization to keep going like this - we needed an internal caching solution as part of our server infrastructure.
Our team (Data Streams team in the Server Infrastructure group) has taken on the opportunity to handle this task and take advantage of our event-driven Kafka knowledge.
This photo by Tuân Nguyễn Minh on Unsplash
The Requirements
The requirements for the caching solution were:
Minimal latency - the cache solution should be as close as possible to zero latency on CRUD operations (~10ms P99 for 2-level cache put and get operations).
Multi-regional data consistency - data should be consistent across regions. If an item in one region has been modified, items in other regions would eventually be modified as well.
Infinite DB storage - the DB storage should not become a bottleneck in the future and it should be part of the design. Should be created as a large-scale solution from day one.
Handling increasing dataset - The caching solution should be designed to handle 10X and 100X the existing traffic and events.
Why not use an existing solution like Redis or Memcached?
The major reason why we decided not to use existing caching solutions is the potential size of the storage capacity. Potentially the size of the tables can get to terabytes (just for one table) so it’s not a good idea to store this amount of data in RAM.
Redis, for example, is an in-memory store, which means all of its data is stored in the server’s RAM and is available only as long as the server’s running. In-memory cache data is temporary by definition, so it doesn’t fit our needs, we expect our data (for example some permission object per app that should never expire as long as the app exists) to be stored for a long time without TTL by default.
Our first caching solution: Kafka compact topics
Architecture: The plan was to use an in-memory KV(key-value) store solution, based on Kafka’s compact topic (Kafka is highly used across Wix backends).
In general, a compact Kafka topic means that each message is saved to the topic with a key and the latest value of each key doesn’t get deleted.
The idea was to create a consumer that would consume the compact topic with offset.reset = earliest (meaning, from the beginning of the topic), on pod’s startup time and would use the consumed messages as KV in-memory values. This solution missed one KPI - Handling the increasing dataset. We didn’t think about the full consequences of “consume the compact topic with offset.reset = earliest, on pod’s startup time”. On a low-scale topic - Some pod starts its initialization process, our KV storage solution starts and the consumer consumes the topic from the beginning; a KV store is created using the consumed messages from the topic.
All is good, we are happy.
What would happen on a high-scale topic? A pod starts its initialization process, the compact topic consumer consumes the topic from the beginning - and we have an issue.
Consuming a high-scale topic from the beginning is not a fast task (at all!) and when a topic has hundreds of thousands or more events across its partitions - it will take a long time to consume each and every message (even when using a batch consumer).
With such a solution there is a potential to damage our users’ pods' initialization process within a couple of minutes! Think about your auto-scaler solution - and now add a couple of minutes to it.
While a solution like that could have been OK a few years ago, it just doesn't fit Wix's current scale. Back to the drawing board.
Our 2nd caching solution: WixCache library
Architecture: design our very own event-driven, in-memory caching library based on LRU evacuation and write-through policies with a configurable warmup strategy, with cross-region data consistency.
The solution of our new architecture design is based on 3 parts:
A cross-region DB - how to have consistent data across regions?
The cache itself - monitoring, evacuations policies, cache warm-ups, and pretty much all of the caching library itself.
Pods’ in-memory cached value updating - notify all pods in each region about CRUD operations needed to be done in their in-memory cache values to be aligned to its DB values.
Full caching library solution architecture:
Let’s break this solution into pieces and understand them one by one.
1st Part: Cross regions DB
The first part is to create DB replication across regions so that each CRUD operation that happens in one region will affect other regions to achieve cross-region data consistency.
Chosen DB - AWS dynamoDB.
There are certain reasons why we choose dynamoDB - it is managed by AWS, has no size limitations, there’s proven performance and scalability, schemaless, etc. But the most useful feature for us is the cross-region replication (called “Global table” in AWS) that is so easy to use that you don’t even need to write a single line of code. All you have to do is access your dynamoDB AWS console, navigate to the “Global table” section and create a replica:
Choose the region that you want to set this global table with and the replication will start in the background, all of it is managed on the AWS side.
You can even monitor (on cloud watch) the replication lag and autoscale accordingly:
Now we need to find a way to consume the CDC events out of dynamoDB and use them in each region.
What is a CDC (Change Data Capture) event?
dynamoDB (as well as many other DBs) has the option to listen to logs of its own events.
On each DB operation, dynamoDB creates an internal event that an external service can consume and then decide on how to process that event.
Each CDC event, basically, means that some CRUD operation happened on the dynamoDB side and now some action needs to happen in the in-memory cache library as well for it to be aligned with the DB values.
These CDC events are the actions that the caching library needs to perform on its in-memory cached values.
We’ve formated the CDC event stream internally to be something like this:
We’ve successfully created replication between regions and we now have the knowledge of how to align the pod’s in-memory cached data to its DB (we will close the circle in part 3).
2nd Part: The cache itself
The 2nd part is all about the library that will be used as a resource in each pod across regions.
This library implements simple CRUD operations on in-memory cached values, logging, metrics, evacuation and population strategies, CI/CD, and more.
The cache library should do 2 things:
Create the CDC events topic consumer - the library creates a consumer for the CDC events topic to handle all necessary operations on its in-memory values in order to achieve data consistency with dynamoDB.
Service’s operations - basically the cache library itself and its operations. Evacuations, data consistency policies, logs and metrics, and of course the implementation of the CRUD operations for the hosted service to use when cache operations are needed:
localPut - Will be used for the Kafka consumer handler and will update the cache in-memory data only.
Put - Will be used for service’s put requests and will update the in-memory cache value, but also will update the DB.
Delete - Will delete an item from cache and dynamoDB (an action that will cause a replication to remove it from other regions as well)
Get - Will first try to get the value from the in-memory cache (0-latency) and if the cache misses, will fetch the value from dynamoDB and store it in-memory cache for later usage.
I left other implementation details as open questions because it’s up to your organization to decide what the best fit is. There are many cache algorithms out there and there is no right/wrong answer. It’s about configuring the cache to have the best hit ratio possible. You can read more about it in this blog post.
We have a consumer that consumes CDC events and performs the necessary operations and also an API for the service to invoke the cache operations.
3rd Part: Pods’ in-memory cached value updating
The missing part is that there is no connection between dynamoDB CDC event pool and the CDC Kafka topic of the cache. All that’s left for us to do is to find a way to consume those dynamo CDC events and navigate them into the caching library’s topic. The problem with this missing part is that data is not consistent across regions.
Let’s think about the following scenario, we have:
2 regions: region A and region B
1 service
1 pod per service in each region
1 item with value: {“test” : 4}
What will happen if we get an update operation request in region A of “test”=5? The service will update the in-memory cached value and will write the updated value to the DB, the cross-region replication will take action and the updated value will be shared - and that is it.
All pods in the same region will provide the same value because they consume the same topic using Kafka's random subscriber group, so cache memory is aligned. But pods in region A will not provide the same value as pods in region B because the pods in region B never got the updated value of test=5, so they will still respond with test=4, meaning the data between regions is not consistent.
The following solution will explain how we handled this issue.
Sync service - to fetch CDC events:
The purpose of this sync service is to fetch CDC events from the “AWS world” and to produce them in the “Wix internal world” as internal Kafka messages. The sync service listens to dynamo’s CDC event pool and produces them to Kafka topics as formatted messages.
In the AWS dynamoDB you have two listeners available for you to use:
Kinesis data stream - Amazon Kinesis Streams for dynamoDB captures item changes in each table and replicates those changes to a Kinesis stream which you can then consume from.
DynamoDB stream details - capture item changes in each table and produce those changes to a DynamoDB stream which you can access through the DynamoDB Streams API.
The more native solution, and one that’s more recommended to use by AWS, is the Kinesis one (option 1) and that is exactly what we’ve done using the AWS documentation.
As part of the implementation, we’ve created a service that consumes CDC events of dynamo’s tables and then produces them to internal Kafka topics, one topic per dynamoDB table.
Dispatcher service:
The dispatcher is responsible for dispatching the CDC events to the right Kafka topic to be consumed by the caching library to update its internal in-memory values.
It has a routing table that includes a map like: dynamoDB table -> which topic to produce this event to.
Routing table example:
All existing routing tables are stored somewhere (Kafka compact topic, MySQL, or other storage solutions) and we created some very basic APIs to handle operations on the routing table (like adding new routing or deleting existing routing).
Using the routing table the dispatcher understands which topic each CDC event should be produced to in order for the right caching library to consume and perform the right action on its data.
Conclusion and closing thoughts
That is it! This is how we created a cross-region, zero latency, event-driven, in-memory caching solution for the whole company to use that is fully scalable for an infinite amount of records.
Caching successfully takes a lot of time and effort and cannot be achieved in one go. A lot of questions might come up in the process: What are the best warm-up strategies? How to achieve a good hit rate? LRU? How many items the in-memory should hold by default? Also, what is the default configuration that the cache will use?
All of those questions don’t come with just one right answer. For that reason, I highly recommend that you don’t optimize parts of your code when it’s not truly necessary. Only when latency or costs are at the critical point should you start thinking about cache.
This post was written by Tal Kot
For more engineering updates and insights:
Join our Telegram channel
Visit us on GitHub
Subscribe to our YouTube channel