Engineering Blog Atom Feed

James Brown

Migrating Terabytes Of Data From MySQL To Cassandra With No Downtime

by James Brown

Over the 10-year lifetime of our company, EasyPost has grown in leaps and bounds — from zero customers to hundreds of thousands; from measuring our volume in minutes per shipment to hundreds of shipments per second; from megabytes of data to hundreds of terabytes of online (OLTP) data and petabytes of offline/analytics (OLAP) data. This post will talk about how we've swapped out the storage system for some of our largest kinds of data with no downtime or user impact.

Background

EasyPost is a proud believer in boring technology; our original application stack was Ruby on Rails and MySQL (technically, Percona Distribution for MySQL). As we've grown from hundreds of labels per day to hundreds of labels per second and from megabytes to gigabytes to terabytes to petabytes of data, we've carefully selected technologies that enable this scaling without introducing too much complexity or spending too much of our risk budget - for data storage, this has included vertical partitioning, where we've broken different features out into different codebases and different storage solutions, and horizontal partitioning, where we've divided up some datasets across multiple servers.

For some of our largest and least-relational kinds of data, we've been working on migrating from MySQL to Apache Cassandra. Cassandra is a "wide-column" database which is conceptually similar to Google's BigTable and Amazon's Dynamo (and is, in fact, based on the original Dynamo paper). Cassandra's data model is highly scalable, and allows you to increase storage capacity by adding more nodes, without any substantial performance or availability cost.

Cassandra's data model consists of a series of tables containing rows and typed columns, some of which make up the primary key; these are similar terms to those in a traditional relational database like MySQL, but the similarity ends there. In Cassandra, each primary key is broken down into a partition key and a clustering key; the primary key determines which servers will hold the data for that row, and the clustering key is used to order those rows in a sorted-set on that server. Each other cell in a given row has an existence of its own and can be written, read, or even deleted independently.

In Cassandra, each primary key is consistently hashed and assigned to some subset of the nodes (exactly which set depends on whether your cluster is using vnodes or the simpler token-ring system); because of this, it's very important that your queries should be limited to a single primary key at a time to avoid expensive whole-cluster scans

In this post, we'll talk about our first Cassandra migration, an ongoing migration, and our future plans.

Case Study: Events

A big part of the EasyPost API is our support for Webhook Events. Customers who configure webhook URIs on their account will receive an HTTP POST from EasyPost when certain actions occur (such as when a tracked parcel changes its location, or when a Batch is purchased). This allows our customers to build reactive systems without having to poll our API. In addition to sending these webhook events, we also have APIs and a UI for querying historical events - this allows our customers to build and improve their webhook endpoints by looking at what events they've received recently and how their servers responded. Our events also err on the side of eventually-reliable delivery; webhook events that do not receive an HTTP 200 response from the customer are retried several times with extensive backoff, up to a couple of days later. This feature is very popular with our users, and we're (at the very instant this post is being written) currently sending upward of 400 webhook events per second; storing these events (and our customers' response metadata) for three weeks means that we're storing more than a half-billion rows.

This was feasible on MySQL (with some clever data representation, it took around 7TB, spread across several servers), but we decided to move it to Cassandra to improve availability, simplify our maintenance logic, and serve as a demonstration of Cassandra at scale in our real environment.

Data Modeling and Schema Design

When using Cassandra, there are two main questions you have to ask up-front:

  • What is the data you need to store?
  • What are the operations you need to perform on that data?

Because secondary indices tend to be very expensive in Cassandra, it's important to carefully model your data based not on classical, size-focused, database normal forms, but instead based on the queries you want to perform. For our events system, the relevant operations were relatively simple:

  1. Enqueue a new event
  2. Record an attempt to send an event to a given webhook
  3. Retrieve all send attempts for an event
  4. Page through recent events for a given user

Our schema for events themselves ended up looking roughly like the following:

  
CREATE TABLE events.events (
    user_id bigint,
    bucket text,
    uuid timeuuid,
    description ascii,
    event_priority int,
    request_body blob,
    status ascii,
    updated_at timestamp,
    PRIMARY KEY ((user_id, bucket), uuid)
) WITH CLUSTERING ORDER BY (uuid DESC)
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy', 'compaction_window_size': '1', 'compaction_window_unit': 'DAYS', 'max_threshold': '32', 'min_threshold': '4', 'unchecked_tombstone_compaction': 'true'}
  

Each event is uniquely identified by a TIMEUUID (a version 1 UUID, although using a synthetic node identifier instead of a MAC address) and user ID (every request in our system always knows what the relevant user is). In order to support relatively-efficient pagination of events by user, the partition key is actually (user_id, bucket) where bucket is a truncated timestamp extracted from the UUID. This parameter is set experimentally based on load in the system to approximately 10 minutes (which means that each 10-minute window for a given user will be stored in a single partition).

Here we're using the wonderful Time Window Compaction Strategy, which causes data to be stored on disk grouped by a given time window (in this case, by day) and makes deleting expired data extremely cheap. This allows us to retain a few weeks of data very cheaply. Note in particular the use of the 'unchecked_tombstone_compaction' parameter; this allows TWCS to compact individual SSTables as soon as a given fraction (by default, 20% of their data) has expired, and is basically required if you're going to run read repairs on TWCS-compacted tables.

Our send attempts (which we call payloads), have a similar schema:

  
CREATE TABLE events.payloads (
    user_id bigint,
    bucket text,
    uuid timeuuid,
    event_uuid timeuuid,
    request_body blob,
    request_headers blob,
    request_url text,
    response_body blob,
    response_code int,
    response_headers blob,
    total_time int,
    PRIMARY KEY ((user_id, bucket), uuid)
) WITH CLUSTERING ORDER BY (uuid DESC)
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy', 'compaction_window_size': '1', 'compaction_window_unit': 'DAYS', 'max_threshold': '32', 'min_threshold': '4', 'unchecked_tombstone_compaction': 'true'}
  

Now on to our other use cases. In order to identify the payloads for a given event, we use a separate table which is written at the same time as a payload:

  
CREATE TABLE events.payload_event_index (
    user_id bigint,
    event_uuid timeuuid,
    payload_uuid timeuuid,
    PRIMARY KEY ((user_id, event_uuid), payload_uuid)
) WITH CLUSTERING ORDER BY (payload_uuid ASC)
  

There are a couple of other ways we could have approached this:

  1. A Secondary Index on the event_uuid column of the payloads table would be the traditional approach; data in a secondary index is always stored on the same node as the underlying data, so querying a secondary index always requires querying all nodes in the datacenter. We decided this was an unacceptable performance cost.
  2. A Materialized View is effectively a new table which Cassandra builds and maintains internally; this solution would be effectively identical to what we've built, and we originally considered using MVs. Unfortunately, there are a number of stability and correctness issues with the materialized view system, and it was disabled by default in Cassandra 4. We are hopeful that a future release of Apache Cassandra will squash these bugs.

Given that querying payloads by event is relatively infrequent in our system, we elected to simply maintain a second table. Inserts into this table could potentially get out-of-sync if a client crashes after writing to the main table but before writing to the index table (since we are not using logged batches for these writes), but in practice this does not occur.

Migrating to Cassandra

Concurrently with developing our schema, we started working on the nitty-gritty of migrating our events system to use Cassandra. Our events system was originally developed in Ruby using Sinatra and the Sequel Object-Relational Mapping system, with a fairly traditional model-view-controller design pattern. In order to support changing out the backend, we first refactored this application to use the Repository Pattern

First, we centralized all accesses for each object into new MySQL Repositories which encapsulated all of the MySQL logic, and we removed all of the places where MySQL operations were being directly carried out on ORM objects from controllers. This also entailed building new Entities, which are simple immutable data structures.

Next, we built new Cassandra Repositories supporting all of the same operations and returning the same entities.

Finally, we built unified repositories which called into both the MySQL and Cassandra repositories. We used these to initially dual-write, and eventually dual-read. These emitted logs into our central logging system (Apache Kafka) any time an inconsistency was detected.

We initially started switching to the dual-write mode for a fraction of traffic, to ensure that our Cassandra clusters were working in production and met our performance goals; as we got more comfortable; we rolled up to 100% dual-writing and, after a few days, enabled dual-reading. This revealed some bugs, which we were able to fix with no customer impact.

Due to the fact that we only retain data for a few weeks, we were able to simply run in dual-write mode until all of the data was in Cassandra, and then stop writing to MySQL — no backfill was necessary!

Future Work

The next project for moving data into Cassandra is addresses; we record billions of billions of Address Objects for our customers. We had a few goals to improve our address system:

  • Improve storage scalability — hundreds of terabytes of address data shouldn't be a problem
  • Improve write throughput — tens of thousands of writes per second are the minimum goal
  • Improve security — addresses are PII, so everything should be encrypted with application-controlled keys

While this project is still in-progress, we anticipate using a schema similar to the following:

  
CREATE TABLE addresses.addresses (
    user_id bigint,
    time_bucket bigint,
    id timeuuid,
    city text,
    country text,
    zip text,
    name blob,
    [other fields]
    PRIMARY KEY ((user_id, time_bucket), id)
) WITH CLUSTERING ORDER BY (id ASC)
  

Each of the BLOB fields is stored end-to-end encrypted using application controlled keys and cryptographic primitives from NaCl / libsodium: in particular, the hard-to-misuse crypto_secretbox primitive.

Many of our applications today rely heavily on joins using 64-bit BIGINT keys; keep your eyes open for a future blog post talking about some of our strategies for preserving those relations while moving to systems like Cassandra that don't have easy auto-increment ID generation.

Are you interested in working on problems like these and building the future of logistics APIs? Join us at EasyPost!