顺序消息如何实现?

tim-qtp...大约 3 分钟Rocketmq框架

常见方法:

单一生产者和单一消费者

  • 使用单个生产者发送消息到单个队列,并由单个消费者处理消息。这样可以确保消息按照生产者的发送顺序消费。
  • 这种方法简单但容易成为性能瓶颈,无法充分利用并发的优势。

分区与顺序键(Partition Key)

  • 在支持分区(Partition) 的消息队列中(如 Kafka、RocketMQ),可以通过 Partition Key 将消息发送到特定的分区。每个分区内部是有序的,这样可以保证相同 Partition Key 的消息按顺序消费。
  • 例如,在订单处理系统中,可以使用订单号作为 Partition Key,将同一个订单的所有消息路由到同一个分区,确保该订单的消息顺序。

顺序队列(Ordered Queue)

  • 一些消息队列系统(如 RabbitMQ)支持顺序队列,消息在队列中的存储顺序与投递顺序一致。如果使用单个顺序队列,消息将按顺序被消费。
  • 可以使用多个顺序队列来提高并发处理能力,并使用特定规则将消息分配到不同的顺序队列中。

还有一个概念叫全部消息和局部消息:

如果要保证消息的全局有序,首先只能由一个生产者往 Topic 发送消息,并且一个 Topic 内部只能有一个队列(分区)。消费者也必须是单线程消费这个队列。这样的消息就是全局有序的!

不过一般情况下我们都不需要全局有序,即使是同步 MySQL Binlog 也只需要保证单表消息有序即可。

绝大部分的有序需求是部分有序,部分有序我们就可以将 Topic 内部划分成我们需要的队列数,把消息通过特定的策略发往固定的队列中,然后每个队列对应一个单线程处理的消费者。

例如同一个订单或用户的消息按顺序消费,而不同订单或用户之间的顺序不做保证。

这样即完成了部分有序的需求,又可以通过队列数量的并发来提高消息处理效率。

然后,在 RocketMQ 中,可以通过消息队列选择器MessageQueueSelector)将消息发送到指定的队列,从而实现顺序消费。

生产者代码

String orderId = "order-123";

for (int i = 1; i <= 5; i++) {
    String body = "Order Step " + i + " for " + orderId;
    Message message = new Message("OrderTopic", body.getBytes(StandardCharsets.UTF_8));

    // 使用 MessageQueueSelector 按照 orderId 选择队列
    SendResult sendResult = producer.send(message, new MessageQueueSelector() {
        @Override
        /**
            mqs:当前 OrderTopic 中所有的消息队列。
            msg:当前要发送的消息。
            arg:传入的参数(这里是 orderId),用来选择目标队列。
        */
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            // 将 orderId 转为 hash 值,以决定消息发往哪个队列
            String orderId = (String) arg;
            int index = orderId.hashCode() % mqs.size();
            return mqs.get(Math.abs(index)); //取绝对值,避免负索引。
        }
    }, orderId);
}

消费者代码

// 使用并发消费消息监听器,保证单队列内的顺序性
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    for (MessageExt msg : msgs) {
        String body = new String(msg.getBody());
        System.out.printf("Consumed message: %s from queue: %d\n", body, msg.getQueueId());
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
});

提示

这里虽然MessageListenerConcurrently是并发监听,但是

RocketMQ 会为每个队列分配独立的线程。

不同队列之间的消费是并发的,但同一个队列内仍然按顺序消费。

所以这里的msgs是一次性接收的消息列表(RocketMQ 支持批量消费)