Articles

Maestro: Attentive's Event Platform

Article illustration

Attentive delivers the best-performing messaging across SMS and email for 8k+ global brands, helping them reach their customers in a hyper-personalized way. Every day, we dispatch millions of messages and process billions of events for our customers. An operation of this scale requires a lot of backend business functions and teams. Examples of business functions are:

  • Subscriptions: Allows brands to collect customer emails and phone numbers in a secure and legally compliant manner.
  • Campaigns: Allows brands to compose and schedule messages to subscribers.
  • Segmentation: Allows brands to target subsets of subscribers with messages.
  • Messaging: The core message dispatch functionality for emails and text messages.
  • Integrations: Allows customers to import or export their data in or out of Attentive.
  • Reporting and Billing: Allows our customers to observe the performance of their campaigns, and for Attentive to collect payment on dispatched messages.

All these domains generate data and need to share it with each other, often in real time. Messaging needs data from Campaigns, which needs data from Subscriptions, and so on. In this blog post, we discuss our team’s approach to real-time data exchange between services, and the first iteration of the platform we built to do this at scale.

Hacking around Kinesis quotas

The fanout problem

From the very beginning, our team embraced an asynchronous event-driven architecture for data exchange between domains. We build on the AWS cloud, so we opted for Amazon Kinesis for event delivery. 

We started with a very simple architecture where all domain data is expressed as an event and produced into a single shared stream: the “raw events” Kinesis stream. This data is considered “raw” because it is generally not usable by consumers without additional “enrichment”. For example, a service may produce an event into the raw stream with an integer subscriber ID, but not the email address or phone number that the consumer needs to perform its function, e.g., send a text message to the subscriber.  Since this is a common problem, an enricher-service consumes the raw events Kinesis stream, enriches all the events, and produces them to an “enriched events” Kinesis stream. Consumers from all domains then consume from this enriched stream directly or indirectly.

The challenge then was: how do we deliver data from this single enriched stream to every service that wants it, now and in the future? Since the events are in a stream, the naive solution would be that every interested consumer would subscribe to the stream and consume the events it needs. To understand why this is impossible, it’s necessary to understand the constraints imposed upon us by Kinesis.

Kinesis 101

Amazon Kinesis Data Streams (Kinesis in short) is a serverless offering from AWS that enables the collection and processing of large amounts of data in real time. Each kinesis stream consists of one or more shards. A shard is the scaling unit, denoting the throughput of the stream. The number of shards can be increased or decreased depending on the requirements of your application.

The Kinesis data plane has the following quotas, called shared throughput mode:

  • Producers:
    • 1 MB/s or 1000 records/second per shard
  • Consumers:
    • 2 MB/s or 2000 records/second per shard
    • 5 GetRecords API calls per second per shard

Using the default shared throughput mode, all consumers connected to a stream periodically poll for new records, and share the provisioned throughput. It's easy enough to add more shards to get more throughput. However, due to the GetRecords API quota, increasing the number of shards doesn’t allow you to increase the number of concurrent consumer groups. We have hundreds of applications at Attentive. Connecting them all to one Kinesis stream would result in constant ProvisionedThroughputExceeded errors from Kinesis, high latencies, and a growing backlog. We found that it’s not viable to connect more than five consumer groups to a Kinesis stream using the standard shared throughput consumption mode, regardless of how many shards are provisioned.

V1 architecture

Because of the above limitations, we had to find (sometimes creative) ways to extract data out of the main enriched Kinesis stream for our ever-increasing amount of services. We ended up relying mostly on S3 buckets for downstream fan-out since S3 offers much higher access quotas than Kinesis. 

A typical pipeline would use Firehose or Lambda to transform and batch write the data in the Kinesis stream to an S3 bucket. Downstream services or jobs could then listen for new objects in the buckets.

As Attentive continued to grow, this iteration of the architecture posed several challenges:

  • Hard limit on the number of real-time consumers on the main event stream. The five consumer-group limit imposed a cap on the number of latency-sensitive applications we could build if the functionality required enriched events. 
  • Increased infrastructure burden on product teams. Because of the Kinesis fan-out limit, teams that required real-time data had to make the difficult decision of standing up separate data pipelines. This came with increased infrastructure maintenance costs that had to be shouldered by the team. Attentive likes to move fast. This would hamper our ability to do so if we didn’t fix it soon.
  • Overprovisioned services. Most services consuming from the main event stream usually require only a small subset of events. However, due to the lack of filtering, every service must consume every event whether it needs it or not. This led to consumer services needing constant overprovisioning to keep up with event volume. 
  • Duplicated solutions for common problems. As product teams started building ad-hoc data pipelines for event data, they each faced common problems such as how to replay events for backfills. Since we lacked common tooling at the time, each team created home-grown solutions that solved their immediate problem, but couldn’t be reused across the organization.

Data fan-out on Kinesis

In early 2021, we decided to make a concerted effort to fix these problems. We wanted to solve not just the challenges arising from Kinesis fan-out quotas, but also improve the overall developer experience around distributed messaging. We set the following success criteria for our next-generation platform:

  • Unlimited real-time fan-out of events from the main enriched stream (or any stream) to any consumers.
  • Consumers should be able to subscribe for only the events they need, without overprovisioning.
  • Developers should be able to self-onboard onto the platform and manage their own stream subscriptions.
  • Replaying events to consumers should be easy and standardized for all streams.

A new team was created to lead this effort: the Event Platform team. We decided to name the platform Maestro, after the fictional supervillain in the Marvel Universe (a bit of a tradition around here).

The Maestro Datastream abstraction

Self-service was a fundamental requirement for the new platform. We wanted to expose a very simple interface to developers when interacting with the system. To that end, we introduced the “datastream” abstraction to represent a subscription for a set of events from a source stream. A datastream could be easily represented as YAML. This would make it easy for our developers to express what events they want to extract from the main event stream. The platform would then take care of the details of shipping just the events in the datastream definition to the private destination stream. An example datastream configuration is shown below.

datastreamName: subscription-api-enriched-analytics-events
contactName: eng-subscriptions
desiredState: ON
sources:
 - kinesisStream:
   streamName: attentive-enriched-events-prod
sinks:
 - kinesisStream:
     streamName: subscriptions-enriched-analytics-events-prod
eventTypeFilters:
 - ADD_TO_CART
 - CART_UPDATED
 - CHECKOUT
 - ORDER_CONFIRMED
 - PAGE_VIEW
 - PRODUCT_VIEW
 - PURCHASE
 - REPLENISHMENT

Kinesis “Enhanced Fan-Out” (EFO) to the rescue

Now that we have an interface and high-level abstraction of the system, how do we go about solving the fan-out problem given the five consumer-group limit imposed by Kinesis? Fortunately, Kinesis offers another mode for consuming from streams called “Enhanced Fan-Out”. From the Kinesis documentation:

This feature enables consumers to receive records from a stream with throughput of up to 2 MB of data per second per shard. This throughput is dedicated, which means that consumers that use enhanced fan-out don't have to contend with other consumers that are receiving data from the stream.

Furthermore, EFO uses a push model. EFO Consumers don’t poll for data, therefore they are not subject to the GetRecords API quota of five calls per second

In theory, we could now add as many consumers to our main event stream as we need. But there was one more quota that we had to contend with: a single Kinesis stream can have at most 20 EFO consumers. This still is a far cry from the 5-consumer limit we started with! EFO consumers provided a more scalable foundation for us to solve the problem of distributing events from the main enriched stream.

Maestro system architecture

We leveraged Apache Flink, an open source distributed stream processing framework, as the core engine for Maestro. At a very high level, the system would take a Datastream configuration YAML as input, translate it into a Flink job, and execute it on a Flink cluster.

The system consists of these main parts:

  • The Datastream Registration Service: responsible for ingesting and storing datastream configurations as well as submitting said configurations to a Flink cluster to execute.
  • A Java library for translating datastream configurations into a Flink job graph, so it can be executed as a Flink job. 
  • A set of Flink clusters: these are responsible for executing jobs submitted onto the cluster by the Registration Service.

The Event Platform team handles the responsibility of managing all the components of the Maestro platform, including the Flink jobs and clusters. Application developers’ only responsibility is to create and submit their datastream YAML configurations.

From this foundation, we can now easily add new consumers onto the enriched event stream. Any time a domain team needs a set of events from the main event stream for a new service, they would create a new datastream definition and submit it to the Registration Service, which would then submit a new job request to the Flink cluster. Finally, Flink would wire up a new job from the datastream definition, subscribe to the source Kinesis stream using an EFO consumer, filter for the user's requested event types, and write those to the destination stream.

But we designed Maestro to do more than just filter and route events in Kinesis streams. Maestro can leverage the full toolbox of Flink input/output connectors as pluggable datastream sources and sinks. One of the features we wanted to support was the ability to replay data in streams. This is useful for incident remediation or load testing. To support this feature, we leveraged Flink’s Filesystem connector to offload stream data into S3 in parquet format for long-term retention. Users could then use S3 as a source for a Maestro datastream that replays stream data. 

Using EFO subscriptions for Flink jobs allowed us to bring new Maestro datastreams online without disturbing legacy pipelines that still used the “shared throughput” mode for consuming the main enriched event stream. To get around the 20 EFO consumers quota, we simply reserved some EFO slots for “replica” streams. Each replica stream would then enable another 20 more fan-out consumers.

More Kinesis, more problems

Maestro proved to be a great success according to our original goals. We were able to onboard dozens of teams and services onto the platform in short order. Anyone that wanted real-time data from a stream could get it. Streaming data was retained in S3 and could be reused for replay in a standard manner. But we quickly ran into a new set of challenges.

Rising Kinesis cost

As the saying goes, there's no such thing as a free lunch. And sure enough, Amazon didn't offer Kinesis and EFO subscriptions as a free lunch. Once Kinesis usage increased, we saw our Kinesis bill quickly rise to unsustainable levels. This was due mostly to two reasons:

  • We made it a lot easier to create new Kinesis streams and route data to them, so we ended up with a lot more streams. In fact, we had to request shard quota increases on our AWS account several times.
  • Every new Kinesis stream had at least one shared throughput consumer and one EFO consumer. The EFO subscription was used to backup the stream’s data to S3 for long-term storage and replay.

We built Maestro in Q2 of 2021 and ramped up adoption in the last half of the year. By the end of the year, our Kinesis costs had grown over 15X for the year and showed no signs of slowing down.

Unstable Flink jobs

One additional issue we experienced was instability in Flink jobs if they started to build up a backlog in the Kinesis source stream. The Flink-Kinesis connector doesn’t support backpressure when using an EFO consumer. This is due to the very nature of how EFO subscriptions work. In this mode, Kinesis pushes the records to the consumer at a throughput of up to 2 MB/sec/shard, or 2000 records/sec/shard, and the consumer must be able to process the full capacity of the stream at all times. 

If the Flink job experienced any slowness that allowed a large backlog to build up, the Flink job could get overwhelmed and enter a crash-loop due to OutOfMemory exceptions. This always required intervention by an on-call engineer to recover, usually by increasing the parallelism of the Flink job as well as the capacity of the downstream sink to allow the backlog to drain. This was a constant source of toil for the Event Platform team. 

Requirements for a new streaming engine

At this point, we knew we wanted to replace Kinesis as the underlying messaging engine for Maestro. This new engine would need to support our scaling requirements well into the future, and be much more cost-effective for our usage patterns than is possible with Kinesis. It also needs to support backpressure-aware streams for smooth integration with our Flink jobs. 

Furthermore, since we were going through the trouble of replacing our streaming engine, it was a good opportunity to scout for something that addressed other needs we had. Needs such as: multi-region support, rate limiting, delayed delivery, deduplication, support for both queues and streams, etc.

In the next installment of this series, we'll discuss our technology choice and migration process for replacing Kinesis in the second iteration of Maestro.

Want to help us solve interesting problems like these? Check out our open roles!

View all articles

Discover what it's like to work for us —
our culture, benefits, values, and more.