如何处理消息重复的问题呢
...大约 2 分钟Rocketmq框架
只有让消费者的处理逻辑具有幂等性,保证无论同一条消息被消费多少次,结果都是一样的,从而避免因重复消费带来的副作用。
注意
网络通信里存在两种行为可能会导致接口被重复调用
用户的重复提交或恶意攻击
分布式架构下,为了避免网络导致的数据丢失,设计超时重置机制
什么是幂等性
幂等性是指一个操作无论执行多少次,产生的结果都是一样的。例如,HTTP 的 PUT 方法是幂等的,因为无论对同一资源执行多少次 PUT 操作,结果都是相同的。
update t1 set money = 150 where id = 1 and money = 100;
执行多少遍 money 都是 150,这就叫幂等。
如何幂等处理重复消息
需要改造业务处理逻辑,使得在重复消息的情况下也不会影响最终的结果。
主要是利用唯一标识(ID)去重。
在消息中引入全局唯一 ID,例如 UUID、订单号等,利用 redis 等缓存,或者数据库来存储消息 ID,然后消费者在处理消息时可以检查该消息 ID 是否存在代表此消息是否已经处理过。
去重缓存的使用:
- 使用 Redis 的
SETNX
指令处理的消息 ID,以减少数据库的查询压力。 - 可以为 Redis 中的去重记录设置过期时间,例如 7 天,以便自动清理历史消息,减小存储压力。
去重表的设计:
- 在数据库中创建一张去重表,用来存储已处理消息的 ID 及处理时间。在消费每条消息前,先查询该表。
- 对于高并发场景,可以结合数据库的唯一索引来避免多次插入同一个消息 ID(
insert into update on duplicate key...
),确保去重表中的记录唯一。
此外,一些触发数据库修改操作的消息,可以像上述举例的 SQL 一样,做了个前置条件判断,即 money = 100 这样的条件直接修改,这样无论执行多少次都不影响。