Engineering BlogRSS Feed

James Brown

Migrating Terabytes Of Data From MySQL To Cassandra With No Downtime

by James Brown

Over the 10-year lifetime of our company, EasyPost has grown in leaps and bounds — from zero customers to hundreds of thousands; from measuring our volume in minutes per shipment to hundreds of shipments per second; from megabytes of data to hundreds of terabytes of online (OLTP) data and petabytes of offline/analytics (OLAP) data. This post will talk about how we've swapped out the storage system for some of our largest kinds of data with no downtime or user impact.


EasyPost is a proud believer in boring technologyopens in new tab; our original application stack was Ruby on Railsopens in new tab and MySQLopens in new tab (technically, Percona Distribution for MySQLopens in new tab). As we've grown from hundreds of labels per day to hundreds of labels per second and from megabytes to gigabytes to terabytes to petabytes of data, we've carefully selected technologies that enable this scaling without introducing too much complexity or spending too much of our risk budget - for data storage, this has included vertical partitioning, where we've broken different features out into different codebases and different storage solutions, and horizontal partitioning, where we've divided up some datasets across multiple servers.

For some of our largest and least-relational kinds of data, we've been working on migrating from MySQL to Apache Cassandraopens in new tab. Cassandra is a "wide-column" database which is conceptually similar to Google's BigTable and Amazon's Dynamo (and is, in fact, based on the original Dynamo paperopens in new tab). Cassandra's data model is highly scalable, and allows you to increase storage capacity by adding more nodes, without any substantial performance or availability cost.

Cassandra's data model consists of a series of tables containing rows and typed columns, some of which make up the primary key; these are similar terms to those in a traditional relational database like MySQL, but the similarity ends there. In Cassandra, each primary key is broken down into a partition key and a clustering key; the primary key determines which servers will hold the data for that row, and the clustering key is used to order those rows in a sorted-set on that server. Each other cell in a given row has an existence of its own and can be written, read, or even deleted independently.

In Cassandra, each primary key is consistently hashed and assigned to some subset of the nodes (exactly which set depends on whether your cluster is using vnodesopens in new tab or the simpler token-ring system); because of this, it's very important that your queries should be limited to a single primary key at a time to avoid expensive whole-cluster scans

In this post, we'll talk about our first Cassandra migration, an ongoing migration, and our future plans.

Case Study: Events

A big part of the EasyPost API is our support for Webhook Events. Customers who configure webhook URIs on their account will receive an HTTP POST from EasyPost when certain actions occur (such as when a tracked parcel changes its location, or when a Batch is purchased). This allows our customers to build reactive systems without having to poll our API. In addition to sending these webhook events, we also have APIs and a UI for querying historical events - this allows our customers to build and improve their webhook endpoints by looking at what events they've received recently and how their servers responded. Our events also err on the side of eventually-reliable delivery; webhook events that do not receive an HTTP 200 response from the customer are retried several times with extensive backoff, up to a couple of days later. This feature is very popular with our users, and we're (at the very instant this post is being written) currently sending upward of 400 webhook events per second; storing these events (and our customers' response metadata) for three weeks means that we're storing more than a half-billion rows.

This was feasible on MySQL (with some clever data representation, it took around 7TB, spread across several servers), but we decided to move it to Cassandra to improve availability, simplify our maintenance logic, and serve as a demonstration of Cassandra at scale in our real environment.

Data Modeling and Schema Design

When using Cassandra, there are two main questions you have to ask up-front:

  • What is the data you need to store?
  • What are the operations you need to perform on that data?

Because secondary indices tend to be very expensive in Cassandra, it's important to carefully model your data based not on classical, size-focused, database normal formsopens in new tab, but instead based on the queries you want to perform. For our events system, the relevant operations were relatively simple:

  1. Enqueue a new event
  2. Record an attempt to send an event to a given webhook
  3. Retrieve all send attempts for an event
  4. Page through recent events for a given user

Our schema for events themselves ended up looking roughly like the following:

CREATE TABLE ( user_id bigint, bucket text, uuid timeuuid, description ascii, event_priority int, request_body blob, status ascii, updated_at timestamp, PRIMARY KEY ((user_id, bucket), uuid) ) WITH CLUSTERING ORDER BY (uuid DESC) AND compaction = {'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy', 'compaction_window_size': '1', 'compaction_window_unit': 'DAYS', 'max_threshold': '32', 'min_threshold': '4', 'unchecked_tombstone_compaction': 'true'}

Each event is uniquely identified by a TIMEUUID (a version 1 UUIDopens in new tab, although using a synthetic node identifier instead of a MAC address) and user ID (every request in our system always knows what the relevant user is). In order to support relatively-efficient pagination of events by user, the partition key is actually (user_id, bucket) where bucket is a truncated timestamp extracted from the UUID. This parameter is set experimentally based on load in the system to approximately 10 minutes (which means that each 10-minute window for a given user will be stored in a single partition).

Here we're using the wonderful Time Window Compaction Strategyopens in new tab, which causes data to be stored on disk grouped by a given time window (in this case, by day) and makes deleting expired data extremely cheap. This allows us to retain a few weeks of data very cheaply. Note in particular the use of the 'unchecked_tombstone_compaction' parameter; this allows TWCS to compact individual SSTables as soon as a given fraction (by default, 20% of their data) has expired, and is basically required if you're going to run read repairs on TWCS-compacted tables.

Our send attempts (which we call payloads), have a similar schema:

CREATE TABLE events.payloads ( user_id bigint, bucket text, uuid timeuuid, event_uuid timeuuid, request_body blob, request_headers blob, request_url text, response_body blob, response_code int, response_headers blob, total_time int, PRIMARY KEY ((user_id, bucket), uuid) ) WITH CLUSTERING ORDER BY (uuid DESC) AND compaction = {'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy', 'compaction_window_size': '1', 'compaction_window_unit': 'DAYS', 'max_threshold': '32', 'min_threshold': '4', 'unchecked_tombstone_compaction': 'true'}

Now on to our other use cases. In order to identify the payloads for a given event, we use a separate table which is written at the same time as a payload:

CREATE TABLE events.payload_event_index ( user_id bigint, event_uuid timeuuid, payload_uuid timeuuid, PRIMARY KEY ((user_id, event_uuid), payload_uuid) ) WITH CLUSTERING ORDER BY (payload_uuid ASC)

There are a couple of other ways we could have approached this:

  1. A Secondary Indexopens in new tab on the event_uuid column of the payloads table would be the traditional approach; data in a secondary index is always stored on the same node as the underlying data, so querying a secondary index always requires querying all nodes in the datacenter. We decided this was an unacceptable performance cost.
  2. A Materialized Viewopens in new tab is effectively a new table which Cassandra builds and maintains internally; this solution would be effectively identical to what we've built, and we originally considered using MVs. Unfortunately, there are a number of stability and correctness issuesopens in new tab with the materialized view system, and it was disabled by default in Cassandra 4opens in new tab. We are hopeful that a future release of Apache Cassandra will squash these bugs.

Given that querying payloads by event is relatively infrequent in our system, we elected to simply maintain a second table. Inserts into this table could potentially get out-of-sync if a client crashes after writing to the main table but before writing to the index table (since we are not using logged batches for these writes), but in practice this does not occur.

Migrating to Cassandra

Concurrently with developing our schema, we started working on the nitty-gritty of migrating our events system to use Cassandra. Our events system was originally developed in Ruby using Sinatraopens in new tab and the Sequelopens in new tab Object-Relational Mapping system, with a fairly traditional model-view-controlleropens in new tab design pattern. In order to support changing out the backend, we first refactored this application to use the Repository Patternopens in new tab

First, we centralized all accesses for each object into new MySQL Repositories which encapsulated all of the MySQL logic, and we removed all of the places where MySQL operations were being directly carried out on ORM objects from controllers. This also entailed building new Entities, which are simple immutable data structures.

Next, we built new Cassandra Repositories supporting all of the same operations and returning the same entities.

Finally, we built unified repositories which called into both the MySQL and Cassandra repositories. We used these to initially dual-write, and eventually dual-read. These emitted logs into our central logging system (Apache Kafkaopens in new tab) any time an inconsistency was detected.

We initially started switching to the dual-write mode for a fraction of traffic, to ensure that our Cassandra clusters were working in production and met our performance goals; as we got more comfortable; we rolled up to 100% dual-writing and, after a few days, enabled dual-reading. This revealed some bugs, which we were able to fix with no customer impact.

Due to the fact that we only retain data for a few weeks, we were able to simply run in dual-write mode until all of the data was in Cassandra, and then stop writing to MySQL — no backfill was necessary!

Future Work

The next project for moving data into Cassandra is addresses; we record billions of billions of Address Objects for our customers. We had a few goals to improve our address system:

  • Improve storage scalability — hundreds of terabytes of address data shouldn't be a problem
  • Improve write throughput — tens of thousands of writes per second are the minimum goal
  • Improve security — addresses are PII, so everything should be encrypted with application-controlled keys

While this project is still in-progress, we anticipate using a schema similar to the following:

CREATE TABLE addresses.addresses ( user_id bigint, time_bucket bigint, id timeuuid, city text, country text, zip text, name blob, [other fields] PRIMARY KEY ((user_id, time_bucket), id) ) WITH CLUSTERING ORDER BY (id ASC)

Each of the BLOB fields is stored end-to-end encrypted using application controlled keys and cryptographic primitives from NaClopens in new tab / libsodiumopens in new tab: in particular, the hard-to-misuse crypto_secretbox primitive.

Many of our applications today rely heavily on joins using 64-bit BIGINT keys; keep your eyes open for a future blog post talking about some of our strategies for preserving those relations while moving to systems like Cassandra that don't have easy auto-increment ID generation.

Are you interested in working on problems like these and building the future of logistics APIs? Join us at EasyPost!