Our platform gives marketers the ability to engage with their customers via personalized messages across both email and SMS. Segmentation–the ability to deliver different messages to different groups of people based on a variety of attributes—is key to ensuring that the right person gets the right message. Here’s an example: a shoe company can send a coupon to a segment of customers in New York who bought running shoes last year but haven’t made a purchase this year.
In our legacy segmentation system, all of the different data attributes were stored in different databases. As per the example above, location of customers was stored in one database; purchase history in another database, etc. Our system had to reach into each database, run queries, and then do boolean logic operations of “and” or “or” (also known as intersect and union). The problem was that our system wasn’t distributed, so these set operations had to happen in memory in a single instance per request. We did have many instances handling requests, but that didn’t solve all problems. Had we not solved this scaling issue, we could have run out of memory if many giant clients performed segment calculations that unfortunately hit the same instance.
We knew that there were distributed systems built to handle data problems like this, but that would be a huge lift. As I mentioned, this was our legacy system, and we had a big rebuild in the works. However, we needed something to make sure that we could scale in our legacy system. And we wanted our segment calculations to be fast, so memory wasn’t the only scaling problem.
Of course, we didn't want to scale vertically and just get oversized expensive instances to run our services. AWS might have appreciated our high usage, but we had to keep an eye on our costs.
There’s an interview question on how to find the intersection or union of two very large datasets. There are a lot of answers to this, but we took lessons from the merge sort algorithm for our solution. In short, there's a huge advantage to solving this problem if both datasets are already sorted. This is easy for us to do because the source data comes from databases. To find the union of two sorted datasets, you can use the “merge” step of the merge sort algorithm that iterates through both datasets at the same time and “zips” the two sets of data together in a sorted order. To find the intersection of two sorted datasets, you can use a modified “merge” step that iterates through both datasets at the same time trying to find common values. Both of these operations are now linear in memory and performance, which is O(n+m) where n is the size of the first dataset and m is the size of the second dataset.
public void union(Iterator<Long> left, Iterator<Long> right, OutputStream output) {
Long leftValue;
Long rightValue;
while(left.hasnext() || right.hasNext()) {
leftValue = left.peek();
rightValue = right.peek();
if (leftValue < rightValue) {
output.write(leftValue.next().toByteArray());
else {
output.write(rightValue.next().toByteArray());
}
}
output.close();
}
public void intersect(Iterator<Long> left, Iterator<Long> right, OutputStream output) {
Long leftValue;
Long rightValue;
while(left.hasnext() && right.hasNext()) {
leftValue = leftValue == null ? left.next() : leftValue;
rightValue = rightValue == null ? right.next() : rightValue;
while (leftValue < rightValue && leftValue.hasNext()) {
left.next();
}
while (rightValue < leftValue && rightValue.hasNext()) {
right.next();
}
if (leftValue == rightValue) {
output.write(leftValue.toByteArray());
leftValue = null;
rightValue = null;
}
}
}
Generally, when you run a query against a database in any programming language, the underlying code will store the entire result set in memory. This is a good idea for most use cases, but for giant datasets (some of our result sets are tens of millions of rows) you're limited by the program’s memory limits. Luckily, you can set the “fetch size” in JDBC to “Integer.MIN_VALUE”, which tells the underlying driver to return results as they arrive and to not store them all in memory. We use JDBI to wrap database querying, and it allows setting the fetch size and returning an Iterator. You just have to be careful on how you close connections, because you should only close your connection after you have iterated through all the results.
We now needed a way to stitch together the two parts to allow us to do set operations on multiple datasets from multiple databases without blowing up memory, while still being fast. S3 is a super scalable big object key value store. Recently, they released an optimized version of the client they call the CRT client. This is basically a low level driver that does the heavy lifting that just has some interfaces to Java. The CRT library provides nice interfaces that allows you to read/download or write/upload as a stream in a non-blocking manner. There are some benchmarks on how fast the CRT client is. The interesting example is the “checkpoint” single 7.6G file.
Using the transfer manager, the download request returns a ResponseInputStream. We wrap that response in a BufferedReader so that we can read a line at a time, and then we wrap that in a class we call a “BufferedReaderIterator”, which just implements the Iterator interface on top of the BufferedReader. One caveat here is that the CRT client will try to download as fast as possible and will buffer in memory. That’s bad for our use case. Luckily, the library lets you inject an AsyncResponseTransformer (docs), and we wrote one that limits how much data is buffered into memory and just requests for data as needed when the buffer is empty. I won’t include the exact code we built, but you can follow the code path from the default InputStreamResponseTransformer, that instantiates an AbortableInputStreamSubscriber, that in turn create an InputStreamSubscriber, that utilizes an unbounded ByteBufferStoringSubscriber.
The upload path is a little more complicated. The CRT upload API allows you to specify an “AsyncRequestBody”, which allows you to asynchronously provide the body to upload. We had to implement our own AsyncRequest body based off of InputStreamWithExecutorAsyncRequestBody that:
One extra nice side effect of using S3 as an intermediary store is that the files persist for some time. So we can save on not having to rerun calculations when we have the results already from a prior run.
The flow for querying a segment now looks something like this:
We were able to abstract away a lot of the complicated S3 streaming and set operations in a shared library. However, it's still pretty complicated code, so touching it can be dangerous. Some of the complexity comes from some of the interesting roadblocks we hit:
We have had to thoroughly test our libraries and abstractions. That involved throwing a lot of load at it and to use profilers to make sure the memory and CPU consumption was as expected. As mentioned above, this is our legacy system. We have a brand new shiny system that we are working on (stay tuned!)
Interested in tackling big problems for our team and our customers? Join our team!