顺序消息如何实现?
...大约 3 分钟Rocketmq框架
常见方法:
单一生产者和单一消费者:
- 使用单个生产者发送消息到单个队列,并由单个消费者处理消息。这样可以确保消息按照生产者的发送顺序消费。
- 这种方法简单但容易成为性能瓶颈,无法充分利用并发的优势。
分区与顺序键(Partition Key):
- 在支持分区(Partition) 的消息队列中(如 Kafka、RocketMQ),可以通过 Partition Key 将消息发送到特定的分区。每个分区内部是有序的,这样可以保证相同 Partition Key 的消息按顺序消费。
- 例如,在订单处理系统中,可以使用订单号作为 Partition Key,将同一个订单的所有消息路由到同一个分区,确保该订单的消息顺序。
顺序队列(Ordered Queue):
- 一些消息队列系统(如 RabbitMQ)支持顺序队列,消息在队列中的存储顺序与投递顺序一致。如果使用单个顺序队列,消息将按顺序被消费。
- 可以使用多个顺序队列来提高并发处理能力,并使用特定规则将消息分配到不同的顺序队列中。
还有一个概念叫全部消息和局部消息:
如果要保证消息的全局有序,首先只能由一个生产者往 Topic 发送消息,并且一个 Topic 内部只能有一个队列(分区)。消费者也必须是单线程消费这个队列。这样的消息就是全局有序的!
不过一般情况下我们都不需要全局有序,即使是同步 MySQL Binlog 也只需要保证单表消息有序即可。

绝大部分的有序需求是部分有序,部分有序我们就可以将 Topic 内部划分成我们需要的队列数,把消息通过特定的策略发往固定的队列中,然后每个队列对应一个单线程处理的消费者。
例如同一个订单或用户的消息按顺序消费,而不同订单或用户之间的顺序不做保证。
这样即完成了部分有序的需求,又可以通过队列数量的并发来提高消息处理效率。

然后,在 RocketMQ 中,可以通过消息队列选择器(MessageQueueSelector
)将消息发送到指定的队列,从而实现顺序消费。
生产者代码
String orderId = "order-123";
for (int i = 1; i <= 5; i++) {
String body = "Order Step " + i + " for " + orderId;
Message message = new Message("OrderTopic", body.getBytes(StandardCharsets.UTF_8));
// 使用 MessageQueueSelector 按照 orderId 选择队列
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
@Override
/**
mqs:当前 OrderTopic 中所有的消息队列。
msg:当前要发送的消息。
arg:传入的参数(这里是 orderId),用来选择目标队列。
*/
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 将 orderId 转为 hash 值,以决定消息发往哪个队列
String orderId = (String) arg;
int index = orderId.hashCode() % mqs.size();
return mqs.get(Math.abs(index)); //取绝对值,避免负索引。
}
}, orderId);
}
消费者代码
// 使用并发消费消息监听器,保证单队列内的顺序性
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
String body = new String(msg.getBody());
System.out.printf("Consumed message: %s from queue: %d\n", body, msg.getQueueId());
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
});
提示
这里虽然MessageListenerConcurrently
是并发监听,但是
RocketMQ 会为每个队列分配独立的线程。
不同队列之间的消费是并发的,但同一个队列内仍然按顺序消费。
所以这里的msgs是一次性接收的消息列表(RocketMQ 支持批量消费)