1. 如何保证消息不丢失

Broker:消息队列的核心组件,负责接收、存储和分发消息;

  • 生产者的消息确认:生产者发送消息,需要通过消息确认机制来确保消息成功到达队列中;
    • 开启confirm模式,在生产者端维护消息的状态,超过一定时间未接收到消息回调,进行消息的重发;
  • 存储消息:Broker收到消息后,将消息持久化到磁盘中,避免宕机或重启导致消息的丢失;
    • 配合生产者的确认机制,当消息持久化到磁盘后,再通知生产者ACK;
  • 消费者的消息确认:消费者处理完消息,需要向MQ发送ACK确认,如果未确认,MQ需要重新投递该消息
    • 手动ACK;
  • 死信队列(兜底):对于多次消费失败的消息,发送到死信队列中,进行处理 => 提醒和记录日志

2. 如何处理重复消息

让消费者的处理逻辑具有幂等性,无论消息被消费多少次,结果都是一样的。

产生原因:消息消费的时候的网络延迟,或者投递的时候的网络延迟;

  • 利用唯一标识ID去重:引入全局唯一ID,例如订单ID,利用Redis进行缓存或者数据库存储,在处理消息时,检查消息是否被处理过;
  • 去重表:利用唯一索引来避免多次插入同一消息的ID (Insert into update on duplicate key ...)

3. 消息的有序性

保证消息的全局有序消费

  • 单一生产者和单一消费者模式:顺序消费消息,但是性能容易出现瓶颈,且无法利用并发优势;

  • 分区与顺序键:在支持分区的消息队列中,可以通过Partition Key将消息发送到特征的分区。分区内部有序。例如:将同一个订单的所有消息路由到同一个分区,确保该订单消息的顺序消费。

  • **顺序队列:**消息的消费顺序与投递顺序一致。

有序消息之全局有序

单生产者+单消费者

有序消息之部分有序

多组Queue+单消费者

组间消息可以无序;组内消息有序

这样将需要保证有序消息的信息,发送到同一组中,既保证了部分有序性,又提高了并发效率。

通过OrderId的哈希与组数量,将消息路由到不同队列中,相同的OrderId路由到同一队列中。

4. 如何处理消息堆积问题

生产者的生成数据远大于消费者

提高消费者消费速度

  • 增加消费者线程数量:提高消费速度;
  • 增加消费实例:分布式系统中,水平扩展一下消费实例;(多消费者)
  • 优化消费者逻辑:减少IO操作,使用批量处理;
    • 批量消费:一次消费多条消息,减少网络开销;
    • 避免阻塞操作和不必要的计算。

降低生产者生产速度

  • 对生产速度进行限流(令牌桶限流算法)或延迟处理,先消费高优先级的消息。

扩容消息队列

  • 💡临时扩展多个消费者队列:将挤压的消息分发到不同的队列中进行消费。消费完成后再关闭临时队列;
    • 增加消息队列容量,配置更大的内存和磁盘空间

5. 消息队列设计成推消息还是拉消息

Push推模式:MQ主动推送消息给消费者,适合实时性高、需要及时处理的常见。

  • 优点:实时性好
  • 缺点:难以控制消费速度,容易导致消费者过载(高并发下)。

Pull拉模式:消费者主动从MQ拉取消息,适合消费者能力有限,需要自身调整速率的场景

  • 优点:消费者根据自身处理能力控制拉取频率,避免过载,更适合批量处理,一次给我能接受的批量数据。

  • 缺点:实时性差,消息延迟。

6. RabbitMQ中消息什么时候会进入死信交换机

  • 消息被拒绝:消费者明确拒绝消息,不要求重新投递时;

  • 消息过期:TTL(生存时间)到期;

  • **队列长度达到最大长度:**存不下了,将最早放入的消息转到死信交换机中,因为最新的可能正在执行,淘汰最老的消息;

死信交换机用处:

  • 监控分析异常消息
  • 订单的超时处理:处理时检查订单状态,没变化就是没支付,取消返回库存。
  • 高负载场景下的队列限流和空流:

7.AMQP协议

Advanced Message Queuing Protocol

定义消息格式,传输方式,处理机制。面向消息、异步传输的协议

  • 连接(Connection):客户端&MQ
  • 信道(Channel):
  • 交换机(exchange)
  • 队列(queue)
  • 绑定(Binding):交换机与队列之间的关系,通过路由键实现消息路由。

消息路由模型

  • Direct Exchange:直发,精准匹配对应队列
  • Fanout Exchange:广播
  • Topic Exchaneg:广播到订阅了主题的绑定的队列中(支持模糊匹配)

8. RabbitMQ怎么实现延迟队列

TTL+死信队列

image-20250415183028470

通过RabbitMQ延迟队列消息插件实现