Distributed message queue
Questions
What
- What’s the format and average size of messages? (Text messages, generally in the range of kilobytes)
- What data delivery semantics do we need to support? (At-least-once; ideally, all semantics including at-most-once and exactly-once)
How
- How are messages consumed? (In the same order they were produced)
- How long is data retained? (Two weeks)
- How many producers and consumers are we going to support? (The more the better)
- How should the system handle throughput and latency? (Support high throughput for log aggregation and low latency for traditional message queue use cases)
Who
- Who can consume the messages? (Messages can be repeatedly consumed by different consumers)
Why
- Why do we need to support message retention and ordered consumption? (These are added features beyond a traditional message queue)
Overview
the architecture supports efficient message handling and processing:
- Producers generate messages and push them to specific topics.
- Consumer groups subscribe to these topics and consume messages.
- Brokers manage partitions, holding subsets of messages for each topic.
- Storage is divided into:
- Data storage for message persistence.
- State storage for managing consumer states.
- Metadata storage for configuration and topic properties.
- Coordination services ensure the system’s stability and organization through service discovery and leader election, facilitated by tools like Apache Zookeeper or etcd.
Benefits of Message Queues
-
Decoupling:
- Eliminates tight coupling between components.
- Allows independent updates of components without affecting others.
-
Improved Scalability:
- Enables independent scaling of producers and consumers based on traffic load.
- Example: Adding more consumers during peak hours to handle increased traffic.
-
Increased Availability:
- Ensures continued interaction with the queue even if one part of the system goes offline.
-
Better Performance:
- Facilitates asynchronous communication.
- Producers add messages to the queue without waiting for responses.
- Consumers process messages when available, avoiding mutual waiting.
Message Queues vs. Event Streaming Platforms
-
Message Queues:
- Examples: RocketMQ, ActiveMQ, RabbitMQ, ZeroMQ.
- Traditional use: Decoupling, scalability, availability, and asynchronous communication.
- Features: May include long message retention and repeated message consumption.
-
Event Streaming Platforms:
- Examples: Apache Kafka, Apache Pulsar.
- Primary use: Event streaming and log processing.
- Features: Append-only log, long message retention, repeated consumption.
-
Convergence of Features:
- RabbitMQ: Added an optional streaming feature for repeated consumption and long message retention using an append-only log.
- Apache Pulsar: Functions both as an event streaming platform and a distributed message queue due to its flexibility and performance.
Designing a Distributed Message Queue
-
Objectives:
- Long data retention.
- Repeated consumption of messages.
- Enhanced features typically available in event streaming platforms.
-
Considerations:
- The design will be more complex due to additional features.
- Aim to blend the benefits of traditional message queues with the advanced capabilities of event streaming platforms.
Adjustments for Traditional Message Queues
-
Retention Requirements:
- Traditional message queues do not require strong retention like event streaming platforms.
- Messages are retained in memory only until they are consumed.
- On-disk overflow capacity is used but is much smaller compared to event streaming platforms.
-
Message Ordering:
- Traditional message queues do not guarantee message ordering.
- Messages can be consumed in a different order than they were produced.
-
Storage and Management:
- Focus on efficient in-memory processing with minimal reliance on disk storage.
- Metadata and state management are simplified, focusing on immediate message delivery rather than long-term persistence.
Messaging Models
Point-to-Point Model
-
Description:
- Commonly used in traditional message queues.
- A message is sent to a queue and consumed by one and only one consumer.
- Multiple consumers can wait to consume messages, but each message is only consumed once.
-
Characteristics:
- Consumption: Once a consumer acknowledges a message, it is removed from the queue.
- Data Retention: Typically, there is no data retention; messages are deleted after consumption.
- Our Design: Includes a persistence layer, retaining messages for two weeks, allowing repeated consumption.
-
Implementation in Our Design:
- Simulated by using consumer groups (to be detailed later).
Publish-Subscribe Model
-
Description:
- Messages are categorized into topics.
- Each topic has a unique name within the message queue service.
- Messages are sent to and read from specific topics.
-
Characteristics:
- Consumption: Messages sent to a topic are received by all consumers subscribed to that topic.
- Example: Message A is consumed by both Consumer 1 and Consumer 2.
-
Implementation in Our Design:
- Supported naturally through the use of topics.
Supporting Both Models in a Distributed Message Queue
-
Point-to-Point Model Simulation:
- Consumer Groups:
- Used to simulate point-to-point behavior.
- Each consumer group can have multiple consumers, but each message is processed by only one consumer within the group.
- Consumer Groups:
-
Publish-Subscribe Model:
- Topics:
- Messages are organized and sent to topics.
- All consumers subscribed to a topic receive the messages.
- Topics:
Example Usage
-
Point-to-Point Model:
- A producer sends a message to a queue.
- Multiple consumers wait to consume messages.
- Each message is consumed by one consumer and then removed.
-
Publish-Subscribe Model:
- A producer sends a message to a topic.
- All consumers subscribed to that topic receive the message.
Topics, partitions, and brokers
Topics and Partitions
-
Topics:
- Messages are categorized and persisted by topics.
-
Partitions:
- To handle large data volumes, topics are divided into partitions (sharding).
- A partition is a subset of messages for a topic.
- Partitions distribute messages evenly across brokers.
- Operate as queues with FIFO (first in, first out) mechanism, preserving message order within a partition.
- Each message in a partition has an offset indicating its position.
Brokers
-
Role:
- Brokers are servers that hold partitions.
- Distribute partitions among themselves to support high scalability.
- By increasing the number of partitions, the topic capacity can be scaled.
-
Distribution:
- Partitions are evenly distributed across the broker cluster.
Message Handling
-
Producer to Partition:
- Messages are sent to partitions within a topic.
- Message Key:
- Optional; determines partitioning.
- Messages with the same key (e.g., user ID) go to the same partition.
- No Key:
- Messages are randomly assigned to partitions.
-
Consumer from Partition:
- Consumers subscribe to topics and pull data from one or more partitions.
- When multiple consumers subscribe to a topic, they form a consumer group.
- Each consumer in a group is responsible for a subset of partitions.
Example Workflow
-
Producer Workflow:
- Sends a message with a key (e.g., user ID) to a topic.
- Message is routed to the appropriate partition based on the key.
- If no key, the message is assigned to a random partition.
-
Consumer Workflow:
- Subscribes to a topic and joins a consumer group.
- Pulls messages from assigned partitions within the topic.
- Processes messages in FIFO order within each partition.
Consumer Groups
In Apache Kafka, consumer groups are a way to parallelize consumption of topics by distributing partitions across a group of consumers. Each consumer group has multiple consumers, and each consumer within a group is assigned a subset of partitions for the topics it subscribes to.
How Consumer Groups Work
-
Partition Assignment: Kafka ensures that each partition in a topic is consumed by exactly one consumer within a consumer group. The assignment is based on a partition assignment strategy (e.g., round-robin, range-based, etc.).
-
Consumer Coordination: Consumers in the same group coordinate to ensure each partition is consumed by exactly one consumer at any time. If a consumer fails or leaves the group, partitions are reassigned to other consumers in the group.
-
Offsets: Each consumer group tracks the offset (position) of the last consumed message for each partition. This offset is committed periodically or after processing batches of messages, ensuring fault tolerance and enabling message replay.
- Load Balancing: Kafka automatically balances partitions across consumers in a group, ensuring efficient and parallel consumption of messages.
- Scalability: Adding more consumers increases parallelism and throughput, as each consumer processes a subset of partitions independently.
Achieving Point-to-Point Model
- Single Partition Constraint: Ensures messages in the same partition are consumed by only one consumer, equivalent to the point-to-point model.
- Scalability:
- Allocate enough partitions in advance.
- To handle high scale, simply add more consumers.
Based on the provided details about the data storage requirements for a message queue with write-heavy and read-heavy access patterns, let’s summarize the recommended option and its rationale:
Write-ahead Log (WAL)
Description:
- Functionality: WAL is an append-only log file where new entries (messages) are sequentially appended to the end.
- Access Pattern: Purely sequential read/write access, which aligns well with write-heavy and read-heavy traffic patterns.
- Implementation: Messages are persisted as WAL log files on disk. Each message is appended to the tail of a partition with a monotonically increasing offset.
- Segmentation: WAL files are divided into segments. Only the active segment accepts new messages, while non-active segments serve read requests. When the active segment reaches a certain size, a new segment is created, and old non-active segments can be truncated based on retention policies.
- Disk Performance: Benefits from the high sequential read/write performance of disk drives, especially in RAID configurations. Modern rotational disks are capable of achieving several hundred MB/sec read and write speeds, which are sufficient for the workload described.
- Cost Efficiency: Rotational disks are cost-effective and provide large storage capacities, making them suitable for storing large volumes of data with high retention requirements.
Advantages:
- Performance: Sequential access patterns optimize disk performance, leveraging modern OS disk caching mechanisms effectively.
- Scalability: Scalable by adding new segments as needed, accommodating increasing message volumes over time.
- Cost-Effective: Rotational disks provide affordability and large storage capacities compared to other storage options like SSDs.
- Reliability: Simple, robust design suitable for handling large-scale data streaming without the complexity of relational or NoSQL databases.
Message Structure in a Message Queue
-
Message Key:
- Determines the partition of the message.
- If not defined, partitioning is random.
- Defined as
hash(key) % numPartitions. - Provides flexibility for custom partitioning algorithms.
- Not equivalent to the partition number.
-
Message Value:
- Payload of the message.
- Can be plaintext or compressed binary data.
-
Other Fields:
- Topic: Name of the topic to which the message belongs.
- Partition: ID of the partition that stores the message.
- Offset: Position of the message within the partition.
- Timestamp: Time when the message was stored.
- Size: Size of the message.
- CRC: Cyclic Redundancy Check to ensure data integrity.
-
Additional Features:
- Optional fields can be added based on specific requirements.
- Example: Tags for message filtering.
Key Points and Considerations
- Performance: Emphasizes minimizing data copying for high throughput.
- Partitioning: Key-based partitioning allows even distribution of messages across partitions.
- Flexibility: Producers can define custom partitioning algorithms.
- Data Integrity: CRC ensures that message data remains intact during transmission and storage.
- Topic-based Organization: Messages are grouped under topics, facilitating logical segregation and management.
- Scalability: Ability to accommodate changes in the number of partitions without affecting message distribution.
Batching in Message Queues
Batching plays a crucial role in optimizing the performance of message queue systems. It impacts both producer and consumer flows, as well as the internals of the message queue itself.
Importance of Batching
-
Network Efficiency:
- Batching allows messages to be grouped into larger payloads, reducing the number of network round trips.
- This amortizes the overhead of network communication, as fewer network calls are made for the same amount of data.
-
Disk Write Efficiency:
- In the message queue’s internal workings, batching results in messages being written to append logs in larger chunks.
- Larger write operations lead to sequential writes on disk, which are generally faster than random writes.
- Operating systems benefit from larger contiguous blocks of disk cache, further enhancing sequential disk access throughput.
-
Throughput vs. Latency Tradeoff:
- The batch size can be tuned based on the specific requirements of the system.
- Lower batch sizes reduce latency but may impact overall throughput.
- Higher batch sizes improve throughput but might increase latency slightly due to larger batches waiting to be processed.
-
Partitioning Considerations:
- Tuning for higher throughput may necessitate more partitions per topic.
- This compensates for potentially slower sequential disk write throughput when larger batches are used.
Producer and Consumer Flows
-
Producer: Batches messages before sending them to the message queue.
- This reduces the number of network calls made by aggregating multiple messages into fewer, larger payloads.
- Helps in efficiently utilizing network bandwidth and reducing latency.
-
Consumer: Can consume messages in batches from the message queue.
- Reduces the overhead of fetching individual messages, especially useful in scenarios with high message volume.
- Improves processing efficiency by handling multiple messages at once.
System Optimization
- Performance Tuning:
- Depending on the deployment scenario (e.g., latency-sensitive vs. throughput-focused), adjust batch sizes accordingly.
- Fine-tune the number of partitions per topic to balance throughput requirements with disk performance.
Producer Flow in a Message Queue System
1. Routing Messages to Brokers
- Routing Layer: Initially, messages from the producer are sent to a routing layer.
- This layer determines the correct broker, specifically the leader replica, for the target partition.
- The routing layer consults a replica distribution plan stored in metadata to identify the leader replica of the target partition.
2. Sending Messages to Leader Replica
- Leader Replica: Once identified, the routing layer forwards the message to the leader replica of the target partition.
- The leader replica is responsible for coordinating data writes and ensuring data consistency across replicas.
- It receives the message and begins the replication process to follower replicas.
3. Replication Process
- Follower Replicas: Follower replicas synchronize data from the leader replica.
- Once a sufficient number of replicas (in-sync replicas) have replicated the data, the leader commits the data to persistent storage (disk).
- This ensures that the message is durable and can be consumed by consumers.
4. Advantages of Integrated Producer Components
- Buffering and Batching:
- Buffer Component: Integrated into the producer, it accumulates messages in memory.
- Batching: Messages are sent out in larger batches to reduce network overhead and improve throughput.
- Producers can optimize batch sizes based on the tradeoff between latency (small batches) and throughput (large batches).
5. Benefits of Integrated Approach
- Reduced Latency: By eliminating the additional network hops introduced by a standalone routing layer, latency is reduced.
- Improved Throughput: Batching and buffering mechanisms allow for more efficient use of network resources and disk writes.
- Flexibility: Producers can dynamically determine the partition to send messages based on custom logic, enhancing flexibility and performance.
6. Considerations and Tradeoffs
- Batch Size Tuning: Choosing the right batch size is crucial and involves a tradeoff between latency and throughput.
- Larger batches increase throughput but may introduce higher latency due to batch accumulation time.
- Smaller batches reduce latency but may impact overall system throughput.
Consumer Flow in a Message Queue System
1. Choosing Consumption Model: Push vs. Pull
Consumers can adopt either a push model or a pull model to receive messages from brokers:
-
Push Model:
- Pros:
- Low latency: Brokers push messages to consumers immediately upon receipt.
- Cons:
- Risk of overwhelming consumers if consumption rate is slower than production rate.
- Limited control over consumption rate, which can be challenging for consumers with varying processing capabilities.
- Pros:
-
Pull Model:
- Pros:
- Consumers control consumption rate, allowing real-time or batch processing as needed.
- Scalability: Can scale out consumers to match production rates or catch up during high load periods.
- Cons:
- Potential for wasted resources if consumers continuously pull when no messages are available.
- Addressed by long polling, where consumers wait for a specified time for new messages.
- Pros:
Given these considerations, most modern message queues opt for the pull model due to its flexibility and scalability benefits.
2. Consumer Group Coordination
- Consumer Group: Consumers can join a consumer group to share message consumption across partitions.
- Coordinator: Each consumer group has a coordinator, typically chosen based on hashing the group name.
- The coordinator assigns partitions to consumers within the group using strategies like round-robin or range-based assignments.
3. Message Consumption Process
- Subscription: A consumer subscribes to a specific topic within the group and connects to the coordinator-assigned broker.
- Partition Assignment: The coordinator assigns partitions to the consumer, ensuring even distribution across group members.
- Fetching Messages:
- The consumer fetches messages from the last consumed offset, managed by state storage.
- This ensures that messages are processed in the correct order and that no messages are missed.
4. Processing and Offset Committing
- Message Processing: Consumers process fetched messages based on application logic.
- Offset Committing:
- After processing, the consumer commits the offset back to the broker.
- The order of processing and offset committing impacts message delivery semantics and ensures reliable message consumption.
Consumer Rebalancing in Distributed Systems
Consumer rebalancing is a critical function in distributed systems to ensure efficient and fair distribution of partitions among consumers. The coordinator plays a central role in managing this process, from maintaining consumer lists to handling failures and generating new partition plans. By effectively managing consumer rebalancing, the system ensures reliability and continuous operation even in the face of consumer changes or failures.
Role of the Coordinator
-
Coordinator Definition:
- A coordinator is a broker responsible for managing consumer rebalancing.
- It communicates with consumers, receives heartbeats, and manages their offsets on the partitions.
-
Finding the Coordinator:
- Each consumer belongs to a group.
- The dedicated coordinator for a group is found by hashing the group name.
- All consumers in the same group connect to the same coordinator.
Coordinator Responsibilities
-
Maintain Joined Consumer List:
- The coordinator keeps a list of consumers currently in the group.
- When this list changes (due to a consumer joining, leaving, or crashing), the coordinator initiates the rebalancing process.
-
Elect Group Leader:
- When the consumer list changes, the coordinator elects a new leader for the consumer group.
- The leader is responsible for generating a new partition dispatch plan.
-
Generate Partition Dispatch Plan:
- The new leader generates a plan for partition allocation among consumers.
- This plan is then reported back to the coordinator.
-
Broadcast the Plan:
- The coordinator broadcasts the new partition dispatch plan to all consumers in the group.
Handling Consumer Issues
-
Network Issues, Crashes, Restarts:
- Consumers may encounter various issues in a distributed system.
- The coordinator detects these issues when it stops receiving heartbeats from a consumer.
-
Triggering Rebalance Process:
- When heartbeats are no longer received, the coordinator triggers the rebalancing process.
- Partitions are re-dispatched to ensure that each partition is handled by an active consumer.
when a new consumer B joins the group
- Initially, only consumer A is in the group. It consumes all the partitions and keeps the heartbeat with the coordinator.
- Consumer B sends a request to join the group.
- The coordinator knows it’s time to rebalance, so it notifies all the consumers in the group in a passive way. When A’s heartbeat is received by the coordinator, it asks A to rejoin the group.
- Once all the consumers have rejoined the group, the coordinator chooses one of them as the leader and informs all the consumers about the election result.
- The leader consumer generates the partition dispatch plan and sends it to the coordinator. Follower consumers ask the coordinator about the partition dispatch plan.
- Consumers start consuming messages from newly assigned partitions.
when an existing consumer A leaves the group
- Consumer A and B are in the same consumer group.
- Consumer A needs to be shut down, so it requests to leave the group.
- The coordinator knows it’s time to rebalance. When B’s heartbeat is received by the coordinator, it asks B to rejoin the group.
when an existing consumer A crashes
-
Consumer A and B keep heartbeats with the coordinator.
-
Consumer A crashes, so there is no heartbeat sent from consumer A to the coordinator. Since the coordinator doesn’t get any heartbeat signal within a specified amount of time from consumer A, it marks the consumer as dead.
-
The coordinator triggers the rebalance process.
State Storage
In a message queue broker, state storage is crucial for managing the mapping between partitions and consumers, as well as tracking the last consumed offsets for consumer groups. This ensures that message consumption can resume accurately after a consumer crash or other interruptions.
Key Responsibilities of State Storage
-
Mapping Between Partitions and Consumers:
- Keeps track of which consumer is responsible for which partition.
-
Last Consumed Offsets:
- Stores the last consumed offsets for each partition for all consumer groups.
- Example: If a consumer in group 1 consumes messages up to offset 6 in a partition and then commits this offset, the state storage records this. If the consumer crashes, a new consumer can resume from offset 6.
Data Access Patterns
-
Frequent Read and Write Operations:
- The state storage experiences frequent updates as consumers read and write their offsets.
-
Low Data Volume:
- Despite frequent operations, the overall volume of data is relatively low.
-
Frequent Updates, Rare Deletions:
- Data is updated frequently but rarely deleted, ensuring a consistent state.
-
Random Access:
- Read and write operations are performed randomly across different partitions and consumers.
-
Data Consistency:
- Maintaining data consistency is crucial to ensure accurate resumption of message consumption.
Storage Solutions
Given the need for data consistency and fast read/write operations, a key-value store (KV store) like Zookeeper is often used. However, Kafka has transitioned its offset storage from Zookeeper to Kafka brokers for enhanced performance and integration.
Example: Consumer State Management
Scenario:
- A consumer in group 1 is consuming messages from a partition and has committed offset 6.
- If this consumer crashes, the state storage ensures that a new consumer in the same group can resume from offset 6.
Data Access:
- The state storage frequently reads the current offsets and updates them as new messages are consumed.
- These operations are performed randomly but require consistency to avoid data corruption.
Choosing a Storage Solution
Considering the requirements, Zookeeper has been a popular choice due to its:
- Strong consistency guarantees.
- Efficient handling of frequent read/write operations.
However, Kafka has improved its system by moving offset storage to Kafka brokers, allowing for:
- Reduced dependency on external systems.
- Improved performance through tight integration with the Kafka ecosystem.
Replication
Replication Process
Partition Replication:
- Each partition has multiple replicas (commonly three) distributed across different broker nodes.
- For each partition, one replica is designated as the leader, and the others are followers.
- Producers send messages to the leader replica, while follower replicas pull new messages from the leader.
- Once messages are synchronized to a sufficient number of replicas, the leader acknowledges the producer.
Replica Distribution Plan:
- The replica distribution plan outlines how replicas are distributed across broker nodes.
- Example Plan:
- Partition 1 of Topic A: Leader on Broker 1, Followers on Brokers 2 and 3.
- Partition 2 of Topic A: Leader on Broker 2, Followers on Brokers 3 and 4.
- Partition 1 of Topic B: Leader on Broker 3, Followers on Brokers 4 and 1.
Coordination Service:
- One broker node is elected as the leader with the help of a coordination service.
- The leader generates the replica distribution plan and persists it in metadata storage.
- All brokers follow the generated plan.
In-Sync Replicas (ISR)
Definition:
- ISRs are replicas that are synchronized with the leader.
- The definition of “in-sync” depends on topic configuration, such as the
replica.lag.max.messagessetting.
Example:
- If
replica.lag.max.messagesis 4, a follower is considered in-sync if it is behind the leader by no more than 3 messages.
Functionality:
- Committed Offset: The last offset where all messages before and at this offset are synchronized across all ISRs.
- ISR Management: If a replica falls behind beyond the configured lag, it is removed from the ISR. Once it catches up, it can rejoin the ISR.
Importance of ISR:
- Reflects the trade-off between performance and durability.
- Ensures message durability by waiting for ISRs to synchronize before acknowledging producers.
Acknowledgment Settings
ACK=all:
- Producer receives an acknowledgment after all ISRs have received the message.
- Provides the strongest message durability but increases latency due to waiting for the slowest ISR.
ACK=1:
- Producer receives an acknowledgment once the leader persists the message.
- Improves latency by not waiting for follower synchronization but risks data loss if the leader fails before followers replicate the message.
- This setting is suitable for low latency systems where occasional data loss is acceptable.
ACK=0:
- Producer sends messages without waiting for any acknowledgment and does not retry.
- Offers the lowest latency but with the highest risk of message loss, suitable for use cases like logging where occasional data loss is acceptable.
Consumer Rebalancing and Reading Strategies
Consumer Rebalancing:
- Determines which consumer handles which partitions, triggered by changes such as consumer joins, leaves, crashes, or partition adjustments.
- The coordinator manages the consumer list and triggers rebalancing, with a leader generating a new partition plan.
Reading from Leader vs. ISR:
- Typically, consumers read from the leader replica to ensure design simplicity and operational efficiency.
- For high traffic topics, scaling is achieved by increasing the number of partitions and consumers.
- In scenarios where consumers are in different data centers from the leader, reading from the closest ISR can improve performance.
Scalability
Producers
Scalability:
- Producers are simpler to scale compared to consumers since they don’t require group coordination.
- Scalability is achieved by adding or removing producer instances.
Consumers
Consumer Groups:
- Consumer groups are isolated from each other, making it easy to add or remove groups.
- Inside a consumer group, the rebalancing mechanism handles changes such as adding, removing, or crashing of consumers.
Rebalancing Mechanism:
- When a consumer joins or leaves, or a partition is adjusted, the group rebalances, ensuring even distribution of load and fault tolerance.
Brokers
Failure Recovery:
- Brokers play a critical role in fault tolerance. The system needs to handle broker failures gracefully.
Fault Tolerance Considerations:
- Minimum ISRs: Specifies how many replicas must receive a message before it’s considered committed.
- Replica Distribution: Ensure replicas are spread across different nodes to avoid single points of failure.
- Data Safety vs. Latency: Balance between distributing replicas across data centers (higher safety, higher latency) and local replication (lower latency, lower safety).
Broker Scalability:
- Redistribute replicas when brokers are added or removed.
- Temporarily allow more replicas than configured to avoid data loss during scaling.
Partitions
Scaling Up Partitions:
- Increasing partitions is straightforward and safe for both producers and consumers.
- Producers are notified of new partitions, and consumers trigger rebalancing.
Data Storage:
- Adding partitions does not require data migration; new messages are distributed across all partitions.
Scaling Down Partitions:
- More complex due to the need to handle existing data and consumer offsets.
Data Delivery Semantics
At-Most Once
Definition:
- Messages are delivered at most once. They may be lost, but are not redelivered.
How It Works:
- Producer: Sends messages asynchronously to a topic without waiting for acknowledgment (
ack=0). If message delivery fails, there is no retry. - Consumer: Fetches the message and commits the offset before processing the data. If the consumer crashes after committing the offset, the message will not be re-consumed.
Use Cases:
- Suitable for scenarios where a small amount of data loss is acceptable, such as monitoring metrics.
Pros and Cons:
- Pros: Simple and fast, low latency.
- Cons: Potential message loss.
At-Least Once
Definition:
- Messages are delivered at least once. They may be redelivered, but are not lost.
How It Works:
- Producer: Sends messages synchronously or asynchronously with a response callback, setting
ack=1orack=allto ensure delivery to the broker. If delivery fails or times out, the producer retries. - Consumer: Fetches the message and commits the offset only after successful processing. If the consumer crashes before committing, the message is re-consumed upon restart, resulting in possible duplicates.
Use Cases:
- Suitable for scenarios where message loss is unacceptable, but duplicates can be managed, such as logging or data processing where deduplication is feasible.
Pros and Cons:
- Pros: Ensures no message loss.
- Cons: Potential for message duplication, requiring additional handling to deduplicate messages.
Exactly Once
Definition:
- Messages are delivered exactly once. No duplicates and no message loss.
How It Works:
- Producer: Utilizes transactional messaging to ensure messages are delivered exactly once to the broker.
- Consumer: Processes the message and commits the offset in a way that ensures the message is neither lost nor duplicated. This typically involves using mechanisms such as idempotent writes and transaction management.
Use Cases:
- Crucial for financial applications (e.g., payments, trading, accounting) where duplication is unacceptable and downstream services do not support idempotency.
Pros and Cons:
- Pros: Guarantees no message loss or duplication.
- Cons: Complex to implement, higher latency, and increased resource usage.
Comparison of Delivery Semantics
| Semantics | Message Loss | Message Duplication | Complexity | Use Cases |
|---|---|---|---|---|
| At-Most Once | Possible | None | Low | Monitoring metrics, logging where some loss is okay |
| At-Least Once | None | Possible | Medium | Logging, data processing with deduplication |
| Exactly Once | None | None | High | Financial transactions, critical data pipelines |
- At-Most Once: Ensures simplicity and low latency but allows for possible message loss. Suitable for non-critical applications where occasional data loss is acceptable.
- At-Least Once: Guarantees no message loss at the cost of potential duplicates. Suitable for applications where data loss is unacceptable, and duplicates can be managed.
- Exactly Once: Provides the highest guarantee with no message loss or duplication, suitable for critical applications but comes with increased complexity and performance overhead.
Message Filtering
Purpose:
- Allow consumer groups to consume only specific subtypes of messages from a topic.
Challenges:
- Creating dedicated topics for every consumer subtype is resource-intensive and tightly couples producers and consumers.
- Fetching and filtering the full set of messages at the consumer side increases unnecessary traffic and affects system performance.
Solution:
- Broker-side Filtering:
- Filter messages on the broker side to reduce unnecessary traffic.
- Avoid data decryption or deserialization in brokers to maintain performance.
- Use metadata for filtering to keep the payload unreadable and secure.
Implementation:
- Message Tags:
- Attach tags to messages.
- Brokers use tags to filter messages efficiently.
- Consumers subscribe to messages based on specified tags.
Delayed Messages
Purpose:
- Delay the delivery of messages to a consumer for a specified period.
Use Case:
- An order should be checked 30 minutes after creation to see if the payment is completed.
High-Level Design:
- Temporary Storage:
- Store delayed messages in special topics until delivery time.
- Timing Function:
- Manage the timing for when messages should be moved from temporary storage to actual topics.
Popular Solutions:
- Dedicated Delay Queues:
- Use predefined delay levels (e.g., 1s, 5s, 10s, etc.).
- Example: RocketMQ supports specific delay levels.
- Hierarchical Time Wheel:
- Efficiently manage delayed tasks with various time intervals.
Scheduled Messages
Purpose:
- Deliver a message to the consumer at a specific scheduled time.
Implementation:
- Similar to delayed messages, but based on a fixed schedule rather than a delay.
High-Level Design:
- Temporary Storage:
- Store scheduled messages until the scheduled delivery time.
- Timing Function:
- Manage the timing to move messages from temporary storage to actual topics.