Home/Blog/Java/Low-Level Design: Building a Thread-Safe In-Memory Pub-Sub Queue
JavaIntermediate12 min read

Low-Level Design: Building a Thread-Safe In-Memory Pub-Sub Queue

Design a high-throughput, concurrent pub-sub queue in Java using locks and thread pools.

Abstract Algorithms

Abstract Algorithms

Helping engineers master software engineering topics.

TLDR: Building an in-memory message queue is a premier LLD problem testing multithreaded design. By combining the observer pattern with Java reentrant locks, conditions, and concurrent atomic offsets, we implement a thread-safe broker that decouples producers and consumers.


📖 Design Challenge: Thread Contention in Event-Driven Systems

Imagine you are building an in-memory logging and analytics processing engine for a microservice. Different parts of the application (e.g., HTTP request loggers, billing events, security auditors) generate messages that must be processed asynchronously by multiple worker threads.

A naive observer pattern implementation couples the publisher and subscriber. In a synchronous implementation, when a publisher generates an event, it loops through a list of registered observers and calls their notification methods sequentially. If one observer experiences latency—such as waiting for a database write lock or a slow downstream HTTP call—the publisher's execution thread blocks.

In a high-throughput production environment, this synchronous blockage creates a cascading failure. If your web server utilizes a thread pool (like Tomcat's 200 default worker threads) to handle incoming requests, and each request publishes an event that blocks, the entire pool will starve within milliseconds. New incoming requests are queued, socket timeouts accumulate, and the service goes offline.

To solve this, we must build a decoupled publisher-subscriber queue. The publisher writes messages to in-memory partitions and returns immediately. Consumer threads run asynchronously, polling partitions at their own speed using read offsets, protecting system throughput. The queue behaves as a shock absorber, smoothing out load spikes and preventing slow downstreams from affecting core write throughput.


🔍 Scope: System Actors and Core Requirements

Before implementing the class model, we establish the requirements for our in-memory queue and define how actors interact.

System Actors

  • Producer: Represents the client thread generating messages. It publishes events to specific topics.
  • Consumer: An independent execution thread that continuously polls messages from specific topics and partitions.
  • Consumer Group: A logical boundary grouping multiple consumer instances. It guarantees that the partition load of a topic is divided dynamically among the group members, ensuring each message is processed by exactly one instance.

Core Functional Requirements

  • Topic & Partition Management: A Topic represents a logical stream of messages. To support parallel consumption, each topic is divided into one or more partitions.
  • Thread-Safe Publish: Multiple producer threads can append messages to partitions concurrently without corrupting memory or losing writes.
  • Asynchronous Consumption: Consumers run on separate, dedicated thread loops. They poll messages from assigned partitions using local read offsets, ensuring they do not miss messages or read duplicates.
  • Consumer Group Partition Rebalancing: When consumer instances join or leave a group, the broker must reassign partitions to balance the load. We will implement static partition selection, allowing consumers to target specific partition IDs.

📊 Architectural Blueprint: Class Diagram

The class diagram below outlines the structural composition of our in-memory queue:

classDiagram
    QueueBroker "1" *-- "*" Topic : manages
    Topic "1" *-- "*" TopicPartition : contains
    TopicPartition "1" *-- "*" Message : holds
    ConsumerGroup "1" *-- "*" ConsumerInstance : coordinates
    ConsumerInstance --> QueueBroker : polls from
    class TopicPartition {
        -List~Message~ messages
        -ReentrantLock lock
        -Condition hasMessages
        +append(Message msg) void
        +read(int offset) Message
    }
    class ConsumerInstance {
        -String id
        -ConsumerGroup group
        +poll() void
    }

This class diagram details the relationships between the core components of our concurrent queue broker. The QueueBroker manages multiple Topic objects. Each Topic contains multiple TopicPartition structures. Each partition encapsulates a thread-safe array of Message entities, utilizing a ReentrantLock and a Condition variable to coordinate producers and consumers safely. The ConsumerGroup dynamically manages partition allocations across its active ConsumerInstance threads.


⚙️ Core Mechanics: Mapping the OOP Pillars

Our design leverages the four pillars of Object-Oriented Programming to ensure clean separation of concerns:

  • Abstraction: The QueueBroker abstracts partition management, offsets, and thread coordination. Producers simply call publish(topic, message) without managing lock states. The details of how messages are stored, indexed, and synchronized are completely hidden from the publisher.
  • Encapsulation: The message list and read offsets are encapsulated inside TopicPartition and ConsumerGroup respectively. They cannot be manipulated directly, ensuring invariants (like monotonically increasing offsets) are maintained.
  • Inheritance: We define a base MessageQueue interface that our QueueBroker implements, allowing alternative broker behaviors (e.g., priority queues) to be swapped in.
  • Polymorphism: Routing strategies (e.g., Round-Robin, Key-Hashing) implement a RoutingStrategy interface, allowing dynamic partition selection at runtime.

🧠 Deep Dive: Thread Synchronization and Consumer Offsets

Let's look at the concurrency mechanics required to handle high-throughput message ingestion.

The Internals of Lock-Free Offsets and Blocking Queues

To guarantee thread-safety without creating bottlenecks, we isolate lock boundaries. If we locked the entire QueueBroker or Topic during a write, we would serialise all operations. Producers writing to unrelated topics would block each other.

To solve this, we place a ReentrantLock inside each individual TopicPartition. This allows producers to write to Partition 0 and Partition 1 concurrently without lock contention.

For consumer offset tracking, we avoid locks entirely. Instead of locking the partition offset array when a consumer reads a message, we use Java's AtomicInteger inside our partition tracker. It leverages CPU-level Compare-And-Swap (CAS) instructions. A CAS instruction checks if the offset matches the expected value, and if so, updates it in a single atomic cycle. If another thread updated it first, the CAS fails and retries, eliminating thread suspension overhead.

Performance Analysis of ReentrantLocks vs. Synchronized Blocks

While Java's synchronized block is easy to write, it lacks advanced features. We use ReentrantLock for three primary reasons:

  • Interruptible Locks: A consumer waiting for a message can be interrupted using lockInterruptibly(), preventing thread locks during system shutdown.
  • Condition Variables: We associate a Condition (named hasMessages) with our write lock. When a partition is empty, consumer threads invoke hasMessages.await(), releasing the lock and entering a waiting state. When a producer appends a message, it calls hasMessages.signal(), waking up the sleeping consumer thread.
  • Fair Lock Options: ReentrantLock allows a fair flag to prevent starvation, ensuring threads get lock access in order.

📊 Visualizing Concurrency: The Producer-Consumer Flow

The flowchart below visualizes the thread interaction between producers appending messages and consumers waiting for new data.

flowchart TD
    Producer[Producer Thread] -->|lock| Partition[Topic Partition]
    Partition -->|append message| MsgList[Message List]
    MsgList -->|signal hasMessages| Consumer[Consumer Thread]
    Consumer -->|await on hasMessages| Idle[Waiting State]
    MsgList -->|read message| Consumer

This concurrency sequence diagram illustrates how producers and consumers synchronize. A consumer thread attempting to read from an empty partition enters a waiting state by calling await() on the partition's condition variable. When a producer thread locks the partition and appends a message, it invokes signal(). This wakes up the waiting consumer thread, which re-acquires the lock and consumes the message.


🧭 Decision and Extension: Custom Partition Routing

In production systems, routing messages to partitions depends on the use case. By introducing a routing abstraction, we can dynamically change routing behavior without modifying the broker class. We compare standard strategies below:

StrategyKey BenefitFailure Mode
Round-RobinPerfect load balancing across partitionsBreaks order guarantees for related messages
Key-HashingGuarantees FIFO order for messages with the same keyCan cause hot partitions if key distribution is skewed
Sticky-PartitionReduces context switching overhead in batch writersSlightly higher latency for individual messages

Using a polymorphic RoutingStrategy allows the client to select the correct balancing behavior dynamically based on system demands.


⚖️ Trade-offs and Failure Modes: In-Memory Storage Limits

Operating an event queue entirely in-memory introduces critical trade-offs:

  • JVM Memory Pressure (Out of Memory): Since messages are stored inside Java lists, a slow consumer group will cause partitions to grow indefinitely. In production, you must enforce a maximum partition capacity. When the buffer fills, producers must either block or discard older messages.
  • Loss of Durability: Because the broker holds data in RAM, restarting the JVM clears all messages and resets consumer offsets.
  • Garbage Collection Overhead: Storing millions of small message objects on the heap leads to long Major GC pauses. To mitigate this, we should implement message compaction or age-based eviction policies to clean up old consumed events.

🧪 Implementation: Full Java Domain Model

Here is the complete thread-safe, in-memory domain model.

1. Polymorphic Routing Strategy

public interface RoutingStrategy {
    int selectPartition(String key, int numPartitions);
}

public class RoundRobinRoutingStrategy implements RoutingStrategy {
    private final java.util.concurrent.atomic.AtomicInteger counter = 
        new java.util.concurrent.atomic.AtomicInteger(0);

    @Override
    public int selectPartition(String key, int numPartitions) {
        return Math.abs(counter.getAndIncrement()) % numPartitions;
    }
}

public class KeyHashRoutingStrategy implements RoutingStrategy {
    @Override
    public int selectPartition(String key, int numPartitions) {
        if (key == null) {
            return 0;
        }
        return Math.abs(key.hashCode()) % numPartitions;
    }
}

2. Message and Core Domain Classes

import java.util.UUID;

public class Message {
    private final String id;
    private final String payload;
    private final String key;
    private final long timestamp;

    public Message(String payload, String key) {
        this.id = UUID.randomUUID().toString();
        this.payload = payload;
        this.key = key;
        this.timestamp = System.currentTimeMillis();
    }

    public String getId() { return id; }
    public String getPayload() { return payload; }
    public String getKey() { return key; }
    public long getTimestamp() { return timestamp; }
}

3. Thread-Safe Topic Partition

import java.util.*;
import java.util.concurrent.locks.*;

public class TopicPartition {
    private final int partitionId;
    private final List<Message> messages = new ArrayList<>();
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition hasMessages = lock.newCondition();

    public TopicPartition(int partitionId) {
        this.partitionId = partitionId;
    }

    public void append(Message message) {
        lock.lock();
        try {
            messages.add(message);
            hasMessages.signalAll(); // Wake up consumers waiting for messages
        } finally {
            lock.unlock();
        }
    }

    public Message read(int offset) throws InterruptedException {
        lock.lock();
        try {
            // Block until the requested offset becomes available
            while (offset >= messages.size()) {
                hasMessages.await();
            }
            return messages.get(offset);
        } finally {
            lock.unlock();
        }
    }

    public int getPartitionId() { return partitionId; }
    public int size() {
        lock.lock();
        try {
            return messages.size();
        } finally {
            lock.unlock();
        }
    }
}

4. Topic and Broker Context

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

public class Topic {
    private final String name;
    private final List<TopicPartition> partitions;

    public Topic(String name, int partitionCount) {
        this.name = name;
        List<TopicPartition> temp = new ArrayList<>();
        for (int i = 0; i < partitionCount; i++) {
            temp.add(new TopicPartition(i));
        }
        this.partitions = Collections.unmodifiableList(temp);
    }

    public String getName() { return name; }
    public List<TopicPartition> getPartitions() { return partitions; }

    public TopicPartition getPartition(int id) {
        return partitions.get(id);
    }
}

public class QueueBroker {
    private static final QueueBroker instance = new QueueBroker();
    private final Map<String, Topic> topics = new ConcurrentHashMap<>();
    private RoutingStrategy routingStrategy = new KeyHashRoutingStrategy();

    private QueueBroker() {}

    public static QueueBroker getInstance() { return instance; }

    public void setRoutingStrategy(RoutingStrategy strategy) {
        this.routingStrategy = strategy;
    }

    public void createTopic(String name, int partitionCount) {
        topics.putIfAbsent(name, new Topic(name, partitionCount));
    }

    public Topic getTopic(String name) {
        return topics.get(name);
    }

    public void publish(String topicName, Message message) {
        Topic topic = topics.get(topicName);
        if (topic == null) {
            throw new IllegalArgumentException("Topic does not exist: " + topicName);
        }

        int numPartitions = topic.getPartitions().size();
        int partitionId = routingStrategy.selectPartition(message.getKey(), numPartitions);

        topic.getPartition(partitionId).append(message);
    }
}

5. Consumer Group Coordination

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

public class ConsumerGroup {
    private final String groupId;
    private final String topicName;
    private final Map<Integer, AtomicInteger> partitionOffsets = new ConcurrentHashMap<>();

    public ConsumerGroup(String groupId, String topicName) {
        this.groupId = groupId;
        this.topicName = topicName;
    }

    public Message consume(int partitionId) throws InterruptedException {
        QueueBroker broker = QueueBroker.getInstance();
        Topic topic = broker.getTopic(topicName);
        if (topic == null) return null;

        TopicPartition partition = topic.getPartition(partitionId);

        // Initialize offset atomic tracker for partition if absent
        partitionOffsets.putIfAbsent(partitionId, new AtomicInteger(0));
        AtomicInteger offsetTracker = partitionOffsets.get(partitionId);

        int targetOffset = offsetTracker.get();
        Message msg = partition.read(targetOffset);

        // Atomically increment offset after successful read
        offsetTracker.compareAndSet(targetOffset, targetOffset + 1);
        return msg;
    }

    public String getGroupId() { return groupId; }
}

🌍 Real-World Endpoint: Spring REST API

To verify our in-memory queue concurrency under load, we construct a Spring @RestController to expose message publication and group polling endpoints.

import org.springframework.web.bind.annotation.*;
import java.util.concurrent.ConcurrentHashMap;

@RestController
@RequestMapping("/api/queue")
public class QueueController {

    private final QueueBroker broker = QueueBroker.getInstance();
    private final Map<String, ConsumerGroup> activeGroups = new ConcurrentHashMap<>();

    @PostMapping("/topic")
    public String createTopic(@RequestParam String name, @RequestParam int partitions) {
        broker.createTopic(name, partitions);
        return "Topic " + name + " created with " + partitions + " partitions.";
    }

    @PostMapping("/routing")
    public String setRouting(@RequestParam String strategy) {
        if ("roundrobin".equalsIgnoreCase(strategy)) {
            broker.setRoutingStrategy(new RoundRobinRoutingStrategy());
        } else {
            broker.setRoutingStrategy(new KeyHashRoutingStrategy());
        }
        return "Routing strategy updated to " + strategy;
    }

    @PostMapping("/publish")
    public String publishMessage(@RequestParam String topic, 
                                 @RequestParam String payload, 
                                 @RequestParam(required = false) String key) {
        broker.publish(topic, new Message(payload, key));
        return "Message sent.";
    }

    @GetMapping("/consume")
    public String consumeMessage(@RequestParam String topic, 
                                 @RequestParam String group, 
                                 @RequestParam int partitionId) throws InterruptedException {
        String groupKey = group + ":" + topic;
        activeGroups.putIfAbsent(groupKey, new ConsumerGroup(group, topic));

        ConsumerGroup consumerGroup = activeGroups.get(groupKey);
        Message message = consumerGroup.consume(partitionId);

        return message != null ? message.getPayload() : "Empty";
    }
}

This API supports concurrent HTTP threads interacting with our unified QueueBroker singleton, verifying our reentrant locking boundary.


📚 Lessons Learned: Concurrency LLD Best Practices

Keep these best practices in mind when designing multithreaded queues:

  1. Avoid Global Locks: Place locks inside individual partitions rather than at the topic level to isolate lock contention.
  2. Release Locks in Finally Blocks: Always release locks in a finally block to prevent deadlocks if an exception is thrown inside a critical section.
  3. Use CAS for Offset Tracking: Leverage AtomicInteger to manage reader offsets without the overhead of suspending OS threads.
  4. Implement Queue Bounds: Always limit queue buffer size in production to avoid unchecked memory growth and OOM crashes.
  5. Manage Garbage Collection: Monitor the size of long-running collections to prevent Java heap memory exhaustion due to accumulated message history.

📌 Summary: The Pub-Sub LLD Cheatsheet

  • Partition Isolation: Place locks inside individual partitions to support concurrent parallel writes.
  • Reentrant Locks: Use Java's ReentrantLock and Condition to block and wake up consumer threads safely.
  • Atomic Offsets: Manage consumer offsets using lock-free AtomicInteger Compare-And-Swap (CAS) instructions.
  • Memory Bounds: Implement partition size limits to prevent unchecked heap expansion and JVM crashes.
  • Key Routing: Hash message keys to select partition IDs, ensuring order guarantees for related events.

AI-generated article quiz

Test your understanding

🧠

Ready to test what you just learned?

Generate four focused questions from this article. Answers include immediate explanations.

Guided series path

Low-Level Design Guide

View all lessons →
Lesson 1 of 11

Reader feedback

Was this article useful?

Rate it if it helped, then continue with the next deep dive when you are ready.

Sign in to save your rating.