如何保证MQ队列中的消息不被重复消费

如何保证MQ队列中的消息不被重复消费

doMore 1,245 2020-01-19

最近接手一个项目,其主要功能是从第三方拿数据,然后自己处理、展示。出于成本原因,产品希望多个人在查询同一条数据的时候,不会重复的请求第三方(因为每次请求都要花钱),并且从第三方更新这条数据后,十分钟之内不允许再去请求。(本人认为可以当做十分钟内不允许重复消费队列中的消息,当然具体的还是需要具体去区别对待)

1. 如何确定是同一条数据

通过拼接请求参数以及请求的接口生成唯一id,并利用redis的setNX特性(如果key值存在,不添加该值,保留原数据)。

2. 架构预想

重复消费.jpg

3. 具体实现

1)由前端将所要查询的数据填好通过MQ传输到与渠道方交互的项目中。

注意:防止消息重复消费通常不是 MQ 自己保证的,是由我们开发来保证的。故此在与渠道方交互的项目中做处理操作。

2)通过工具方法生成唯一ID,在利用redisTemplate将id写入。

注意:这里我规定了两种状态更新中(updating)、更新完成(updated)也会拼接在唯一ID中当做标识。先将更新中的状态插入、当方法执行完成之后再将更新完成状态插入,设置过期时间为十分钟。

3)每次消费者接收到 MQ 传过来的消息,先看redis中是不是存在更新完成id,如果存在直接查询本地库,如果不存在,则插入更新中id,更新中id插入不成功,在两分钟内轮循插入(相当于获取执行权限,即锁)获取到权限之后再次判断是不是存在更新完成id,如果存在还是请求本地库。

4. 题外话

很多人在使用redisTemplate过程中并没有发现有 setNX 的方法。不是他没有,而是需要先绑定类型。

// 这里使用String 类型作为展示

redisTemplate.opsForValue().setIfAbsent(key, value, timeout, timeUnit);

// absent :缺少的;不存在的
// 根据字面意思能看出是如果不存在就插入
// 查看源码发现 SetOption.ifAbsent()
/**
* {@code NX}
*
* @return
*/
// SET_IF_ABSENT
// 官方文档说这就是redis的setNX方法,所以放心使用。其他类型也有对应的方法
execute(connection -> connection.set(rawKey, rawValue, expiration, SetOption.ifAbsent()), true)