Articles

Goodbye Distributed Locks: Message Orchestration at Scale with Apache Pulsar

Article illustration

Introduction

In this article, we explore the architectural evolutions that have enabled our AI-powered decision engine to efficiently process high-volume event traffic and help our customers orchestrate personalized, timely marketing messages based on each user’s shopping journey.

To ensure a seamless experience for our customers’ subscribers (users), it is critical to prevent them from receiving multiple messages from different processing units simultaneously. This introduces the challenge of enforcing mutual exclusivity in processing each subscriber’s shopping events while maintaining high throughput at scale.

We will also examine how we safeguard message delivery decisions from split-brain scenarios, discuss concerns related to architectural changes such as distributed locking, and explore the alternative solutions we implemented to simplify the architecture while ensuring scalability and a non-blocking design. Additionally, we will address the challenges encountered during migration and the strategies used to mitigate them.

Terminology

Customer: refers to Attentive customers (brands)

User/Subscriber:  refers to our customers' shoppers or end users.

Consumer: refers to a consumer of a message topic specifically in the context of event-driven architecture. 

Background

Attentive is a leading marketing platform dedicated to helping our customers deliver personalized and timely messages to their subscribers. At the heart of this operation is our decision engine, which determines the optimal time for our customers to send marketing messages based on users' recent shopping behavior. Success in SMS and email platforms relies on reaching customers’ subscribers (users) at the right moment, through their preferred communication channels, and ensuring messages arrive precisely when they matter most. We power this experience for over 8,000 brands.

Scale

As you can imagine, our traffic closely follows consumer shopping patterns, with peak activity around noon ET and significant surges during holiday periods. The biggest spike, unsurprisingly, occurs during Black Friday / Cyber Monday commonly referred to as BFCM or Cyber week in retail. Last year, Attentive brands successfully captured the attention and spending power of consumers, sending 3.9 billion SMS and email messages. This marks a 38% lift in growth year-over-year. 

On Black Friday alone, brands set a new record by sending out 620 million text messages. 

Our messaging pipeline and event-driven architecture must prioritize four key factors: reliability, fault-tolerance, throughput and mutual exclusivity. 

We use Apache Pulsar as our distributed messaging and streaming platform. User activities and intent is translated into internal events, driving decisions that allow our customers to deliver personalized, timely, and high-performing marketing messages, ensuring the best user experience. 

Understanding the Problem

Apache Pulsar’s SHARED subscription mode is a core part of Attentive’s established patterns, enabling us to efficiently scale to millions of events. While this approach has served us well, it presents a challenge inherent to event-driven architectures: events for a given user can be consumed concurrently by multiple consumers. 

Attentive’s new product strategy is moving towards AI-powered personalized messages where we not only help customers generate personalized copy finely tuned to their brand’s unique tone and style but also optimize send times based on each user's shopping journey. We will explore in the next section, how concurrent consumption violates the mutual exclusivity required to achieve product goals for our decision-making processing unit, impacting the integrity of our message orchestration.

SHARED Subscription

Pulsar’s SHARED subscription type allows multiple consumers to attach to the same subscription. Customer messages in the Pulsar topic are delivered in a round-robin distribution across consumers, and any given message is sent to only one consumer. When a consumer disconnects, all the messages that were sent to it and were not acknowledged will be rescheduled for sending to the remaining consumers. [Source: Apache Pulsar Documentation]

Split Brain Effect

In an event-driven architecture, events for the same user may be processed concurrently by different consumers of a topic. Since each consumer independently determines whether to send a message—without awareness of other concurrent events—this lack of coordination can lead to inconsistent decisions, commonly referred to as the split-brain effect. In our case, as the following diagram depicts, a user may receive multiple messages triggered by events being processed in parallel by different consumers. This is not an ideal user experience and may increase subscription opt-outs.

Optimal Solution

The optimal solution would be to enforce mutual exclusivity in processing events for a given user. As depicted in the diagram below, the events for a given user are processed by a single processing unit. 

Evolution of the architecture

In a fast-paced environment like Attentive Engineering—where we support live traffic from some of the world’s most popular brands—it is crucial to carefully plan our releases to ensure the best user experience while strategically evolving our architecture. This requires gradually aligning all components toward a more optimal and cohesive solution over time. 

Evolution 1: Mutual Exclusivity via Redlock 

In Pulsar’s SHARED subscription mode, to achieve mutual exclusivity it’s important to have some locking mechanism. Our first attempt of achieving mutual exclusivity was by using a Redis provided distributed lock (Redlock). 

Impact / Result

This was helpful during the MVP phase of the project. Utilizing distributed locks provided the mutual exclusivity guarantees we needed, but as the scale grew, we started observing performance degradation. Let’s dive into those issues.

Tradeoffs

  • Increased Lock contention: During high-traffic shopping events, such as when users are browsing extensive product catalogs, each user intent triggers an event that might influence processing and message delivery decisions. With a surge in concurrent events, we experience increased lock contention, impacting system efficiency.
  • Consumer stalling: Imagine a system with four consumers where a key is evenly distributed across all of them. However, only one consumer successfully acquires the lock and proceeds with the action, while the others remain stalled, waiting for their turn. As you can see in the diagram above, the lock acquisition timeout expires before they can acquire the lock, leading to a new batch of events being consumed, and the entire cycle to repeat. This inefficiency not only delays processing but also creates unnecessary contention, impacting overall throughput performance. As shown in the diagram above, two of the consumers have not consumed any event yet.

General concerns regarding Redis as a distributed lock

Redis-based distributed locks, such as Redlock, are not always the best fit when strong consistency guarantees are required — particularly because they do not provide features like fencing tokens. Fencing tokens are critical in systems where delays (e.g., GC pauses or network delays) can occur, as they help maintain proper ordering and prevent stale processes from executing after a lock has expired.

In applications where mutual exclusivity is essential to business correctness, it's crucial to account for these edge cases and adopt mechanisms that provide stronger guarantees. Martin Kleppmann’s blog post offers an excellent deep dive into what we should expect from a distributed lock and how Redis measures up in this regard. It's also worth reading Salvatore Sanfilippo’s (creator of Redis and Redlock) response, which provides thoughtful counterpoints and additional context to the discussion.

Evolution 2: Switching to FAILOVER Subscription type 

We reached a common consensus across the organization that it was time to evolve the architecture ahead of peak traffic during BFCM (Black Friday/Cyber Monday).

To address the bottlenecks in the existing architecture, we planned to switch to Pulsar's FAILOVER subscription mode, which guarantees ordering for a given key and ensures mutual exclusivity.

Pulsar’s FAILOVER Subscription

The FAILOVER is a type of subscription in which multiple consumers can be attached to the same subscription. A master consumer is picked for each partition of a topic to receive messages. When the master consumer disconnects, all of the messages (non-acknowledged and subsequent) are delivered to the next consumer in line. [Source: Apache Pulsar documentation]. This is similar to Kafka consumer groups, where each partition of a topic is assigned to a single consumer.

The diagram below illustrates how, for a given key—in our case, the user—events are always consumed by only one consumer at a time. Each consumer is assigned to a single partition, ensuring that all events for a specific user within that partition are routed to the same consumer. For example, events for user 1 will always be processed by consumer 1.

Impact / Result

Switching to the FAILOVER subscription mode eliminated the multiple-consumer stalling issue caused by the SHARED subscription, which occurred due to consumers waiting to acquire a lock. However, it came with trade-offs that made us hesitant about its long-term viability. In the next section, we’ll explore how we addressed these challenges.

Tradeoffs

  • Head-of-line blocking problem: When a slow or blocked task sits at the front of a queue, it prevents subsequent tasks from being processed, effectively blocking the keys associated with a given consumer. This leads to inefficiencies, increased latency, and reduced throughput in systems that depend on ordered processing.
  • Scalability concern: In FAILOVER mode, autoscaling is limited by the constraint that the number of consumers cannot exceed the number of partitions of a topic. If the number of consumers exceeds the number of available partitions, the extra consumers will remain idle.
  • Temporarily increasing the scale: One approach to handling peak load during BFCM is temporarily increasing infrastructure capacity. However, with the FAILOVER subscription mode, we are limited by the number of partitions, and increasing partitions is an irreversible operation. As a result, the only viable strategy is to scale each consumer vertically, which is not ideal.
  • Support regarding retry letter topic: Pulsar's built-in retry mechanism for failed events improves fault tolerance and minimizes user impact during transient failures. However, in FAILOVER subscription mode, retries are not supported as they would compromise ordering guarantees. To address this, we implemented a custom retry solution to maintain resilience and ensure reliable event processing.

Evolution 3: Switching to KEY SHARED Subscription type

In the next evolution of the architecture, we utilized another subscription mode provided by Apache Pulsar to help mitigate the issues we were having. 

Key Shared Subscription

Pulsar’s Key_Shared subscription allows multiple consumers to process messages concurrently, but it differs from the traditional Shared subscription in how it distributes those messages. Instead of allowing any consumer to receive any message, Key_Shared ensures that all messages with the same key are consistently delivered to the same consumer. This guarantees message ordering per key while still enabling parallel processing across different keys. [Source: Apache Pulsar Documentation]

Previously, we did not switch to KEY_SHARED due to concerns about its impact on system performance and complexity. Our Streaming team had advised against it, as it requires careful producer-side batching to maintain order as discussed previously.

However, for BFCM preparation and our specific use case, the Streaming team collaborated with us to ship the product with the calculated risk involved. 

At that time, our goal was to switch to another subscription mode that would meet our business requirement for mutual exclusivity while mitigating the tradeoffs from the previous iteration of architecture.

KEY_SHARED provided us with the ability to scale temporarily while still  honoring the mutual exclusivity property. It also helped reduce the head-of-line blocking problem at the partition level, but it did not completely eradicate it.  The diagram above illustrates how a section of users is allocated to a given consumer.

Impact / Result

The subscription mode KEY_SHARED did reduce the head-of-line blocking problem, but did not totally eliminate it. It proved invaluable during BFCM, enabling temporary scaling up and cost-efficient scaling down, resulting in a resounding success. 

Tradeoff

  • Risky Coupling between Producer and Consumer: As discussed in the definitions, the first event key must always be correct, as it determines the key for the entire batch. Otherwise, it can result in contract violations and a loss of mutual exclusivity.
  • Reduction in throughput: Batching based on the key at the producer side significantly reduces throughput. It severely impacts performance by increasing CPU load due to smaller batch sizes.
  • Head-of-Line problem: If a given consumer stalls in Pulsar's Key-Shared subscription, it creates a Head-of-Line (HoL) blocking issue, stalling the consumption of the range of keys associated with that consumer. This is still an improvement over the FAILOVER subscription mode.
  • Limited Key Balancing strategy: It is paramount that key distribution across consumers follows an algorithm that minimizes the likelihood of a hot consumer. One effective approach is Auto-split Consistent Hashing, which ensures an even spread of keys. However, this may not be available in earlier versions of Pulsar. Other algorithms, such as Auto-split Hash Range, can still result in hot consumers in certain scenarios due to uneven event distribution.
  • Mutual Exclusivity violation possible: During pod crash or shutdown, there is a risk of duplicate event consumption due to abrupt client disconnection. This risk has to be mitigated by tuning deployment configurations for graceful shutdown, as well as ensuring idempotent consumers.

Observability

Observability is important to our operations—it empowers us to pinpoint bottlenecks in each evolution, implement effective mitigation strategies, and accurately gauge their impact on the user experience.

Mutual Exclusivity Counter

It was important for us to understand the mutual exclusivity property was being maintained by the system. This helps in understanding edge cases or performance degradation within our event-driven architecture, which guarantees at-least-once delivery. 

If the counter described below is greater than 0, it implies that mutual exclusivity is not honored for that particular key. This value is published to our observability tool and the team is alerted. We use this monitor to detect outages or degraded performance. This counter solution is far from perfect, but it did give us a good confidence value during the different evolutions of the architecture. 

Below is a pseudocode to explain the mutual exclusivity counter and its usage

FUNCTION incrementMutualExclusivityCounter(key):
	// Retrieve counter from persistent store
	counter = getAtomicCounter(key)    

	IF counter.getValue() > 0:
		// Another operation is in progress.
		publishToObservabilityApp(key, counter.getValue())

	counter.increment()


FUNCTION decrementMutualExclusivityCounter(key):
	counter = getAtomicCounter(key)    
	counter.decrement()


FUNCTION executeWithMutualExclusivityCounter(key):
	// Track operation start
	incrementMutualExclusivityCounter(key)

	TRY:
    		// Perform the critical operation
    		result = performCriticalTask(key)
	FINALLY:
    		// Always decrement to maintain correct state
    		decrementMutualExclusivityCounter(key)
    
	RETURN result

Concerns with this counter

  • Not perfect: The counter in this discussion is basically a distributed counter which is operating without mutual exclusivity, which can lead to race conditions. Its primary purpose is to detect system-wide outages–and for that, it served its purpose well.
  • Only indicates existence of the problem: It does not account for the exact number of mutual exclusivity violations; rather, it only notifies about the issue. Its main intent is to help start the triaging lifecycle. 
  • Flaky during deployment: We observed spikes in this counter during deployments, making it essential to layer deployment metrics on top of it. This allows us to make informed decisions collaboratively. Also, having an idempotent consumer helped mitigate this issue.

Distributed Tracing

At Attentive, we are big fans of distributed log tracing. Having this tool in our arsenal has been invaluable in identifying deviations in behavior throughout the evolution of our architecture. Having distributed tracing helps us investigate any anomalous user experiences back to the point of consumption. It also helped us spot check during these architectural evolutions.

Lessons learned

Distributed systems are hard

Distributed locks are inherently complex and come with numerous edge cases. Addressing these challenges requires a nuanced approach. We hope this post reminds you that every architectural evolution involves trade-offs, and there are no one-size-fits-all solutions to completely resolve the underlying issues. However, we can mitigate the most critical problems that impact the product.

Simplicity helps at scale

  • Re-using existing infrastructure: By deprecating Redis and leveraging Pulsar, we eliminated the need to scale and maintain an additional technology, reducing complexity and potential points of failure.
  • Decoupling Mutual Exclusivity from Business Logic: The application’s business logic is now agnostic to mutual exclusivity, delegating that responsibility to the infrastructure. This simplification removed the need for explicit checks in the application code, making it cleaner and more maintainable.

Want to tackle important challenges like this one? Our team is hiring! Reach out about our open roles today.

View all articles

Discover what it's like to work for us —
our culture, benefits, values, and more.