Articles

Why Use Memory When You Can Use S3?

Article illustration

Learn about our novel approach to solve for the limitations of memory when processing massive data sets.

Our platform gives marketers the ability to engage with their customers via personalized messages across both email and SMS. Segmentation–the ability to deliver different messages to different groups of people based on a variety of attributes—is key to ensuring ‌that the right person gets the right message. Here’s an example: a shoe company can send a coupon to a segment of customers in New York who bought running shoes last year but haven’t made a purchase this year. 

In our legacy segmentation system, all of the different data attributes were stored in different databases. As per the example above, location of customers was stored in one database; purchase history in another database, etc. Our system had to reach into each database, run queries, and then do boolean logic operations of “and” or “or” (also known as intersect and union). The problem was that our system wasn’t distributed, so these set operations had to happen in memory in a single instance per request. We did have many instances handling requests, but that didn’t solve all problems. Had we not solved this scaling issue, we could have run out of memory if many giant clients performed segment calculations that unfortunately hit the same instance. 

We knew that there were distributed systems built to handle data problems like this, but that would be a huge lift. As I mentioned, this was our legacy system, and we had a big rebuild in the works. However, we needed something to make sure that we could scale in our legacy system. And we wanted our segment calculations to be fast, so memory wasn’t the only scaling problem. 

Of course, we didn't want to scale vertically and just get oversized expensive instances to run our services. AWS might have appreciated our high usage, but we had to keep an eye on our costs.

Solution: stream all the things

Part I: The Interview Question

There’s an interview question on how to find the intersection or union of two very large datasets. There are a lot of answers to this, but we took lessons from the merge sort algorithm for our solution. In short, there's a huge advantage to solving this problem if both datasets are already sorted. This is easy for us to do because the source data comes from databases. To find the union of two sorted datasets, you can use the “merge” step of the merge sort algorithm that iterates through both datasets at the same time and “zips” the two sets of data together in a sorted order. To find the intersection of two sorted datasets, you can use a modified “merge” step that iterates through both datasets at the same time trying to find common values. Both of these operations are now linear in memory and performance, which is O(n+m) where n is the size of the first dataset and m is the size of the second dataset.

Union Pseudocode

public void union(Iterator<Long> left, Iterator<Long> right, OutputStream output) {
	Long leftValue;
	Long rightValue;
	while(left.hasnext() || right.hasNext()) {
		leftValue = left.peek();
		rightValue = right.peek();
		if (leftValue < rightValue) {
			output.write(leftValue.next().toByteArray());
		else {
			output.write(rightValue.next().toByteArray());
		}
	}
	output.close();
}

Intersection Pseudocode

public void intersect(Iterator<Long> left, Iterator<Long> right, OutputStream output) {
	Long leftValue;
	Long rightValue;
	while(left.hasnext() && right.hasNext()) {
		leftValue = leftValue == null ? left.next() : leftValue;
		rightValue = rightValue == null ? right.next() : rightValue;
		while (leftValue < rightValue && leftValue.hasNext()) {
			left.next();
		}
		while (rightValue < leftValue && rightValue.hasNext()) {
			right.next();
		}
		if (leftValue == rightValue) {
			output.write(leftValue.toByteArray());
			leftValue = null;
			rightValue = null;
		}
	}
}

Part II: JDBC Streams

Generally, when you run a query against a database in any programming language, the underlying code will store the entire result set in memory. This is a good idea for most use cases, but for giant datasets (some of our result sets are tens of millions of rows) you're limited by the program’s memory limits. Luckily, you can set the “fetch size” in JDBC to “Integer.MIN_VALUE”, which tells the underlying driver to return results as they arrive and to not store them all in memory. We use JDBI to wrap database querying, and it allows setting the fetch size and returning an Iterator. You just have to be careful on how you close connections, because you should only close your connection after you have iterated through all the results.

Part III: Use S3 as Swap Memory

We now needed a way to stitch together the two parts to allow us to do set operations on multiple datasets from multiple databases without blowing up memory, while still being fast. S3 is a super scalable big object key value store. Recently, they released an optimized version of the client they call the CRT client. This is basically a low level driver that does the heavy lifting that just has some interfaces to Java. The CRT library provides nice interfaces that allows you to read/download or write/upload as a stream in a non-blocking manner. There are some benchmarks on how fast the CRT client is. The interesting example is the “checkpoint” single 7.6G file.

Download 

Using the transfer manager, the download request returns a ResponseInputStream. We wrap that response in a BufferedReader so that we can read a line at a time, and then we wrap that in a class we call a “BufferedReaderIterator”, which just implements the Iterator interface on top of the BufferedReader. One caveat here is that the CRT client will try to download as fast as possible and will buffer in memory. That’s bad for our use case. Luckily, the library lets you inject an AsyncResponseTransformer (docs), and we wrote one that limits how much data is buffered into memory and just requests for data as needed when the buffer is empty.  I won’t include the exact code we built, but you can follow the code path from the default InputStreamResponseTransformer, that instantiates an AbortableInputStreamSubscriber, that in turn create an InputStreamSubscriber, that utilizes an unbounded ByteBufferStoringSubscriber

Upload

The upload path is a little more complicated. The CRT upload API allows you to specify an “AsyncRequestBody”, which allows you to asynchronously provide the body to upload. We had to implement our own AsyncRequest body based off of InputStreamWithExecutorAsyncRequestBody that:

  1. Doesn't know the body content length at upload time
  2. Is backed by an iterator that we read until a batch size and then upload that batch. The CRT library uses the ReactiveStreams library, so we had to implement a Publisher to accomplish this batching

One extra nice side effect of using S3 as an intermediary store is that the files persist for some time. So we can save on not having to rerun calculations when we have the results already from a prior run.

Part IV: Tying It All Together

The flow for querying a segment now looks something like this:

  1. In parallel, query all the databases and stream the results to S3
  2. Stream the results from Step 1 out of S3 and perform set operations on the streams with the output being an upload to S3
  3. Repeat steps 2 and 3 as needed
  4. Have a single output in S3

Lessons Learned

We were able to abstract away a lot of the complicated S3 streaming and set operations in a shared library. However, it's still pretty complicated code, so touching it can be dangerous.  Some of the complexity comes from some of the interesting roadblocks we hit:

  1. Sometimes in a non deterministic way requests would get stuck.  We resolved this by having multiple S3 clients, but that’s against best practices and causes many other issues that we have had to deal with.  It was very hard to figure out what was going on, but we were finally able to reproduce it with simple code and have a github issue open here
  2. With multiple clients we noticed that after a certain time we would get weird errors.  It turns out that when you close an S3 client it closes the credentials provider, which means that after the cached credentials expire you will not be able to create new connections.  We had to short circuit these close requests to the credentials provider.
  3. We had to be very careful to not have connections leak and close all clients and transfer managers, which is typically not a problem you need to deal with if one is shared across all requests.
  4. After running happily for a while, we noticed an off-heap memory leak.  This was very hard to debug, but we realized a big contributing factor was the CRT library.  The CRT library can be a bit greedy with memory for the sake of performance.  There are some toggles like initialReadBufferSizeInBytes and maxNativeMemoryLimitInBytes.  These configurations were in a newer version of the library so we upgraded.  However, since we use multiple clients, those configurations that put limitations on memory actually get multiplied by the number of clients we create.  We just have to keep that in mind in our calculations for memory allocation.   
  5. After upgrading the AWS libraries we noticed CPU spikes.  It seems that with later versions of the CRT library there is some lock contention when using multiple S3 clients.  So far this has been alright since the CPU is a “compressible” resource and nothing crashes if we use too much.     

We have had to thoroughly test our libraries and abstractions.  That involved throwing a lot of load at it and to use profilers to make sure the memory and CPU consumption was as expected. As mentioned above, this is our legacy system. We have a brand new shiny system that we are working on (stay tuned!)

Interested in tackling big problems for our team and our customers? Join our team!

View all articles

Discover what it's like to work for us —
our culture, benefits, values, and more.