Attentive delivers the best-performing messaging across SMS and email for 8k+ global brands, helping them reach their customers in a hyper-personalized way. Every day, we dispatch millions of messages and process billions of events for our customers. An operation of this scale requires a lot of backend business functions and teams. Examples of business functions are:
All these domains generate data and need to share it with each other, often in real time. Messaging needs data from Campaigns, which needs data from Subscriptions, and so on. In this blog post, we discuss our team’s approach to real-time data exchange between services, and the first iteration of the platform we built to do this at scale.
From the very beginning, our team embraced an asynchronous event-driven architecture for data exchange between domains. We build on the AWS cloud, so we opted for Amazon Kinesis for event delivery.
We started with a very simple architecture where all domain data is expressed as an event and produced into a single shared stream: the “raw events” Kinesis stream. This data is considered “raw” because it is generally not usable by consumers without additional “enrichment”. For example, a service may produce an event into the raw stream with an integer subscriber ID, but not the email address or phone number that the consumer needs to perform its function, e.g., send a text message to the subscriber. Since this is a common problem, an enricher-service consumes the raw events Kinesis stream, enriches all the events, and produces them to an “enriched events” Kinesis stream. Consumers from all domains then consume from this enriched stream directly or indirectly.
The challenge then was: how do we deliver data from this single enriched stream to every service that wants it, now and in the future? Since the events are in a stream, the naive solution would be that every interested consumer would subscribe to the stream and consume the events it needs. To understand why this is impossible, it’s necessary to understand the constraints imposed upon us by Kinesis.
Amazon Kinesis Data Streams (Kinesis in short) is a serverless offering from AWS that enables the collection and processing of large amounts of data in real time. Each kinesis stream consists of one or more shards. A shard is the scaling unit, denoting the throughput of the stream. The number of shards can be increased or decreased depending on the requirements of your application.
The Kinesis data plane has the following quotas, called shared throughput mode:
Using the default shared throughput mode, all consumers connected to a stream periodically poll for new records, and share the provisioned throughput. It's easy enough to add more shards to get more throughput. However, due to the GetRecords API quota, increasing the number of shards doesn’t allow you to increase the number of concurrent consumer groups. We have hundreds of applications at Attentive. Connecting them all to one Kinesis stream would result in constant ProvisionedThroughputExceeded errors from Kinesis, high latencies, and a growing backlog. We found that it’s not viable to connect more than five consumer groups to a Kinesis stream using the standard shared throughput consumption mode, regardless of how many shards are provisioned.
Because of the above limitations, we had to find (sometimes creative) ways to extract data out of the main enriched Kinesis stream for our ever-increasing amount of services. We ended up relying mostly on S3 buckets for downstream fan-out since S3 offers much higher access quotas than Kinesis.
A typical pipeline would use Firehose or Lambda to transform and batch write the data in the Kinesis stream to an S3 bucket. Downstream services or jobs could then listen for new objects in the buckets.
As Attentive continued to grow, this iteration of the architecture posed several challenges:
In early 2021, we decided to make a concerted effort to fix these problems. We wanted to solve not just the challenges arising from Kinesis fan-out quotas, but also improve the overall developer experience around distributed messaging. We set the following success criteria for our next-generation platform:
A new team was created to lead this effort: the Event Platform team. We decided to name the platform Maestro, after the fictional supervillain in the Marvel Universe (a bit of a tradition around here).
Self-service was a fundamental requirement for the new platform. We wanted to expose a very simple interface to developers when interacting with the system. To that end, we introduced the “datastream” abstraction to represent a subscription for a set of events from a source stream. A datastream could be easily represented as YAML. This would make it easy for our developers to express what events they want to extract from the main event stream. The platform would then take care of the details of shipping just the events in the datastream definition to the private destination stream. An example datastream configuration is shown below.
datastreamName: subscription-api-enriched-analytics-events
contactName: eng-subscriptions
desiredState: ON
sources:
- kinesisStream:
streamName: attentive-enriched-events-prod
sinks:
- kinesisStream:
streamName: subscriptions-enriched-analytics-events-prod
eventTypeFilters:
- ADD_TO_CART
- CART_UPDATED
- CHECKOUT
- ORDER_CONFIRMED
- PAGE_VIEW
- PRODUCT_VIEW
- PURCHASE
- REPLENISHMENT
Now that we have an interface and high-level abstraction of the system, how do we go about solving the fan-out problem given the five consumer-group limit imposed by Kinesis? Fortunately, Kinesis offers another mode for consuming from streams called “Enhanced Fan-Out”. From the Kinesis documentation:
This feature enables consumers to receive records from a stream with throughput of up to 2 MB of data per second per shard. This throughput is dedicated, which means that consumers that use enhanced fan-out don't have to contend with other consumers that are receiving data from the stream.
Furthermore, EFO uses a push model. EFO Consumers don’t poll for data, therefore they are not subject to the GetRecords API quota of five calls per second.
In theory, we could now add as many consumers to our main event stream as we need. But there was one more quota that we had to contend with: a single Kinesis stream can have at most 20 EFO consumers. This still is a far cry from the 5-consumer limit we started with! EFO consumers provided a more scalable foundation for us to solve the problem of distributing events from the main enriched stream.
We leveraged Apache Flink, an open source distributed stream processing framework, as the core engine for Maestro. At a very high level, the system would take a Datastream configuration YAML as input, translate it into a Flink job, and execute it on a Flink cluster.
The system consists of these main parts:
The Event Platform team handles the responsibility of managing all the components of the Maestro platform, including the Flink jobs and clusters. Application developers’ only responsibility is to create and submit their datastream YAML configurations.
From this foundation, we can now easily add new consumers onto the enriched event stream. Any time a domain team needs a set of events from the main event stream for a new service, they would create a new datastream definition and submit it to the Registration Service, which would then submit a new job request to the Flink cluster. Finally, Flink would wire up a new job from the datastream definition, subscribe to the source Kinesis stream using an EFO consumer, filter for the user's requested event types, and write those to the destination stream.
But we designed Maestro to do more than just filter and route events in Kinesis streams. Maestro can leverage the full toolbox of Flink input/output connectors as pluggable datastream sources and sinks. One of the features we wanted to support was the ability to replay data in streams. This is useful for incident remediation or load testing. To support this feature, we leveraged Flink’s Filesystem connector to offload stream data into S3 in parquet format for long-term retention. Users could then use S3 as a source for a Maestro datastream that replays stream data.
Using EFO subscriptions for Flink jobs allowed us to bring new Maestro datastreams online without disturbing legacy pipelines that still used the “shared throughput” mode for consuming the main enriched event stream. To get around the 20 EFO consumers quota, we simply reserved some EFO slots for “replica” streams. Each replica stream would then enable another 20 more fan-out consumers.
Maestro proved to be a great success according to our original goals. We were able to onboard dozens of teams and services onto the platform in short order. Anyone that wanted real-time data from a stream could get it. Streaming data was retained in S3 and could be reused for replay in a standard manner. But we quickly ran into a new set of challenges.
As the saying goes, there's no such thing as a free lunch. And sure enough, Amazon didn't offer Kinesis and EFO subscriptions as a free lunch. Once Kinesis usage increased, we saw our Kinesis bill quickly rise to unsustainable levels. This was due mostly to two reasons:
We built Maestro in Q2 of 2021 and ramped up adoption in the last half of the year. By the end of the year, our Kinesis costs had grown over 15X for the year and showed no signs of slowing down.
One additional issue we experienced was instability in Flink jobs if they started to build up a backlog in the Kinesis source stream. The Flink-Kinesis connector doesn’t support backpressure when using an EFO consumer. This is due to the very nature of how EFO subscriptions work. In this mode, Kinesis pushes the records to the consumer at a throughput of up to 2 MB/sec/shard, or 2000 records/sec/shard, and the consumer must be able to process the full capacity of the stream at all times.
If the Flink job experienced any slowness that allowed a large backlog to build up, the Flink job could get overwhelmed and enter a crash-loop due to OutOfMemory exceptions. This always required intervention by an on-call engineer to recover, usually by increasing the parallelism of the Flink job as well as the capacity of the downstream sink to allow the backlog to drain. This was a constant source of toil for the Event Platform team.
At this point, we knew we wanted to replace Kinesis as the underlying messaging engine for Maestro. This new engine would need to support our scaling requirements well into the future, and be much more cost-effective for our usage patterns than is possible with Kinesis. It also needs to support backpressure-aware streams for smooth integration with our Flink jobs.
Furthermore, since we were going through the trouble of replacing our streaming engine, it was a good opportunity to scout for something that addressed other needs we had. Needs such as: multi-region support, rate limiting, delayed delivery, deduplication, support for both queues and streams, etc.
In the next installment of this series, we'll discuss our technology choice and migration process for replacing Kinesis in the second iteration of Maestro.
Want to help us solve interesting problems like these? Check out our open roles!