如何处理消息重复的问题呢

tim-qtp...大约 2 分钟Rocketmq框架

只有让消费者的处理逻辑具有幂等性,保证无论同一条消息被消费多少次,结果都是一样的,从而避免因重复消费带来的副作用。

注意

网络通信里存在两种行为可能会导致接口被重复调用

用户的重复提交或恶意攻击

分布式架构下,为了避免网络导致的数据丢失,设计超时重置机制

什么是幂等性

幂等性是指一个操作无论执行多少次,产生的结果都是一样的。例如,HTTP 的 PUT 方法是幂等的,因为无论对同一资源执行多少次 PUT 操作,结果都是相同的。

update t1 set money = 150 where id = 1 and money = 100;

执行多少遍 money 都是 150,这就叫幂等。

如何幂等处理重复消息

需要改造业务处理逻辑,使得在重复消息的情况下也不会影响最终的结果。

主要是利用唯一标识(ID)去重

在消息中引入全局唯一 ID,例如 UUID、订单号等,利用 redis 等缓存,或者数据库来存储消息 ID,然后消费者在处理消息时可以检查该消息 ID 是否存在代表此消息是否已经处理过。

去重缓存的使用

  • 使用 Redis 的 SETNX 指令处理的消息 ID,以减少数据库的查询压力。
  • 可以为 Redis 中的去重记录设置过期时间,例如 7 天,以便自动清理历史消息,减小存储压力。

去重表的设计

  • 在数据库中创建一张去重表,用来存储已处理消息的 ID 及处理时间。在消费每条消息前,先查询该表。
  • 对于高并发场景,可以结合数据库的唯一索引来避免多次插入同一个消息 ID(insert into update on duplicate key...),确保去重表中的记录唯一。

此外,一些触发数据库修改操作的消息,可以像上述举例的 SQL 一样,做了个前置条件判断,即 money = 100 这样的条件直接修改,这样无论执行多少次都不影响。