1.消息队列的坑之非幂等

(1)幂等性概念

所谓幂等性就是无论多少次操作和第一次的操作结果一样。如果消息被多次消费,很有可能造成数据的不一致。而如果消息不可避免地被消费多次,如果我们开发人员能通过技术手段保证数据的前后一致性,那也是可以接受的 。

RabbitMQRocketMQKafka 消息队列中间件都有可能出现消息重复消费问题。这种问题并不是 MQ 自己保证的,而是需要开发人员来保证。

这几款消息队列中间都是是全球最牛的分布式消息队列,那肯定考虑到了消息的幂等性。我们以 Kafka 为例,看看 Kafka 是怎么保证消息队列的幂等性。

Kafka 有一个 偏移量 的概念,代表着消息的序号,每条消息写到消息队列都会有一个偏移量,消费者消费了数据之后,每过一段固定的时间,就会把消费过的消息的偏移量提交一下,表示已经消费过了,下次消费就从偏移量后面开始消费。

(2)避坑指南

微信支付结果通知场景

  • 微信官方文档上提到微信支付通知结果可能会推送多次,需要开发者自行保证幂等性。第一次我们可以直接修改订单状态(如支付中 -> 支付成功),第二次就根据订单状态来判断,如果不是支付中,则不进行订单处理逻辑。

插入数据库场景

  • 每次插入数据时,先检查下数据库中是否有这条数据的主键 id,如果有,则进行更新操作。

写 Redis 场景

  • Redis 的 Set 操作天然幂等性,所以不用考虑 Redis 写数据的问题。

其他场景方案

  • 生产者发送每条数据时,增加一个全局唯一 id,类似订单 id。每次消费时,先去 Redis 查下是否有这个 id,如果没有,则进行正常处理消息,且将 id 存到 Redis。如果查到有这个 id,说明之前消费过,则不要进行重复处理这条消息。

  • 不同业务场景,可能会有不同的幂等性方案,大家选择合适的即可,上面的几种方案只是提供常见的解决思路。

2.消息队列的坑之消息丢失

消息丢失会带来什么问题?如果是订单下单、支付结果通知、扣费相关的消息丢失,则可能造成财务损失,如果量很大,就会给甲方带来巨大损失

(1)生产者存放消息的过程中丢失消息

解决方案

  • 事务机制(不推荐,异步方式)

对于 RabbitMQ 来说,生产者发送数据之前开启 RabbitMQ 的事务机制channel.txselect ,如果消息没有进队列,则生产者受到异常报错,并进行回滚 channel.txRollback,然后重试发送消息;如果收到了消息,则可以提交事务 channel.txCommit。但这是一个同步的操作,会影响性能。

  • confirm 机制(推荐,异步方式)

我们可以采用另外一种模式: confirm 模式来解决同步机制的性能问题。每次生产者发送的消息都会分配一个唯一的 id,如果写入到了 RabbitMQ 队列中,则 RabbitMQ 会回传一个 ack 消息,说明这个消息接收成功。如果 RabbitMQ 没能处理这个消息,则回调 nack 接口。说明需要重试发送消息。

也可以自定义超时时间 + 消息 id 来实现超时等待后重试机制。但可能出现的问题是调用 ack 接口时失败了,所以会出现消息被发送两次的问题,这个时候就需要保证消费者消费消息的幂等性。

事务模式confirm 模式的区别:

  • 事务机制是同步的,提交事务后悔被阻塞直到提交事务完成后。

  • confirm 模式异步接收通知,但可能接收不到通知。需要考虑接收不到通知的场景。

(2)消息队列丢失消息

消息队列的消息可以放到内存中,或将内存中的消息转到硬盘(比如数据库)中,一般都是内存和硬盘中都存有消息。如果只是放在内存中,那么当机器重启了,消息就全部丢失了。如果是硬盘中,则可能存在一种极端情况,就是将内存中的数据转换到硬盘的期间中,消息队列出问题了,未能将消息持久化到硬盘。

解决方案

  • 创建 Queue 的时候将其设置为持久化。这个地方没搞懂,欢迎探讨解答。

  • 发送消息的时候将消息的 deliveryMode 设置为 2 。

  • 开启生产者 confirm 模式,可以重试发送消息。

(3)消费者丢失消息

消费者刚拿到数据,还没开始处理消息,结果进程因为异常退出了,消费者没有机会再次拿到消息。

解决方案

  • 关闭 RabbitMQ 的自动 ack,每次生产者将消息写入消息队列后,就自动回传一个 ack 给生产者。

  • 消费者处理完消息再主动 ack,告诉消息队列我处理完了。

问题: 那这种主动 ack 有什么漏洞了?如果 主动 ack 的时候挂了,怎么办?

则可能会被再次消费,这个时候就需要幂等处理了。

问题: 如果这条消息一直被重复消费怎么办?

则需要有加上重试次数的监测,如果超过一定次数则将消息丢失,记录到异常表或发送异常通知给值班人员。

(4)Kafka 消息丢失

场景:Kafka 的某个 broker(节点)宕机了,重新选举 leader (写入的节点)。如果 leader 挂了,follower 还有些数据未同步完,则 follower 成为 leader 后,消息队列会丢失一部分数据。

解决方案

  • 给 topic 设置 replication.factor 参数,值必须大于 1,要求每个 partition 必须有至少 2 个副本。

  • 给 kafka 服务端设置 min.insyc.replicas 必须大于 1,表示一个 leader 至少一个 follower 还跟自己保持联系。

3. 消息队列的坑之消息乱序

用户先下单成功,然后取消订单,如果顺序颠倒,则最后数据库里面会有一条下单成功的订单。

RabbitMQ 场景:

  • 生产者向消息队列按照顺序发送了 2 条消息,消息1:增加数据 A,消息2:删除数据 A。

  • 期望结果:数据 A 被删除。

  • 但是如果有两个消费者,消费顺序是:消息2、消息 1。则最后结果是增加了数据 A。

RabbitMQ 解决方案:

  • 将 Queue 进行拆分,创建多个内存 Queue,消息 1 和 消息 2 进入同一个 Queue。

  • 创建多个消费者,每一个消费者对应一个 Queue。

Kafka 场景:

  • 创建了 topic,有 3 个 partition。

  • 创建一条订单记录,订单 id 作为 key,订单相关的消息都丢到同一个 partition 中,同一个生产者创建的消息,顺序是正确的。

  • 为了快速消费消息,会创建多个消费者去处理消息,而为了提高效率,每个消费者可能会创建多个线程来并行的去拿消息及处理消息,处理消息的顺序可能就乱序了。

Kafka 解决方案:

  • 解决方案和 RabbitMQ 类似,利用多个 内存 Queue,每个线程消费 1个 Queue。

  • 具有相同 key 的消息 进同一个 Queue。

4. 消息队列的坑之消息积压

消息积压:消息队列里面有很多消息来不及消费。

场景 1: 消费端出了问题,比如消费者都挂了,没有消费者来消费了,导致消息在队列里面不断积压。

场景 2: 消费端出了问题,比如消费者消费的速度太慢了,导致消息不断积压。

比如线上正在做订单活动,下单全部走消息队列,如果消息不断积压,订单都没有下单成功 ,那么会造成很大的损失

解决方案:解铃还须系铃人

  • 修复代码层面消费者的问题,确保后续消费速度恢复或尽可能加快消费的速度。

  • 停掉现有的消费者。

  • 临时建立好原先 5 倍的 Queue 数量。

  • 临时建立好原先 5 倍数量的 消费者。

  • 将堆积的消息全部转入临时的 Queue,消费者来消费这些 Queue。

5. 消息队列的坑之消息过期失效

RabbitMQ 可以设置过期时间,如果消息超过一定的时间还没有被消费,则会被 RabbitMQ 给清理掉。消息就丢失了

解决方案:

  • 准备好批量重导的程序

  • 手动将消息闲时批量重导

6. 消息队列的坑之队列写满

当消息队列因消息积压导致的队列快写满,所以不能接收更多的消息了。生产者生产的消息将被丢弃。

解决方案:

  • 判断哪些是无用的消息,RabbitMQ 可以进行 Purge Message 操作。

  • 如果是有用的消息,则需要将消息快速消费,将消息里面的内容转存到数据库。

  • 准备好程序将转存在数据库中的消息再次重导到消息队列。

  • 闲时重导消息到消息队列。

原文链接:https://www.cnblogs.com/jackson0714/p/fenbushi.html