基础概念

消息队列是一个可以用于存放消息的容器

消息队列满足FIFO原则

参与消息传递的双方被称为生产者和消费者,生产者负责发送消息,消费者负责处理消息

消息队列是一种中间件

中间件是一种为应用软件服务的软件,不同的软件和技术架构可以借助中间件来实现信息与资源的共享

使用消息队列的优点:

  • 异步处理提高系统性能(减少响应所需时间)
  • 消峰/限流
  • 降低系统耦合性

消峰/限流:

  • 将短时间高并发的事务消息存储在消息队列中,之后后端服务再慢慢根据自己的能力去消费这些消息,可以避免直接把后端服务打垮掉

降低系统耦合性:

  • 消息队列可以使用发布-订阅模式工作,生产者发布消息,一个或者多个消息接收者订阅消息。消息的生产者和消息的消费者之间没有直接的耦合。同时,如果希望新增消费者,只需要订阅该消息,对原有的系统和业务没有影响,提高系统的可拓展性。

实现分布式事务:

  • 分布式事务的解决方案之一就是MQ事务。事务允许事件流应用将消费,处理,生产消息整个过程定义为一个原子操作

使用消息队列带来的问题:

  • 导致系统可用性降低:消息队列可能挂掉
  • 系统的复杂性提高:消息重复消费问题,处理消息丢失的情况,保证消息传递的顺序性
  • 存在数据一致性问题:异步操作导致的数据一致性

AMQP:

一个提供统一消息服务的应用层标准高级消息队列协议

规定了五种消息模型:

  • direct exchange
  • fanout exchange
  • Topic exchange
  • Headers exchange
  • System exchange
  • 后面四种消息模型本质上都是pub/sub模型,区别于消息路由的机制

分层:

  • Module Layer:最高层,定义客户端调用的命令,客户端可以通过这些命令来实现自己的业务逻辑
  • Session Layer:中间层,负责客户端发送命令给服务器,在获取服务端的应答返回给客户端。提供可靠性同步机制和错误处理
  • Transport Layer:最底层,用于传输二进制数据流。提供帧处理、信道复用、错误检测、数据表示等等

AMQP 模型的三大组件

  • **交换器 (Exchange)**:消息代理服务器中用于把消息路由到队列的组件。
  • **队列 (Queue)**:用来存储消息的数据结构,位于硬盘或内存中。
  • **绑定 (Binding)**:一套规则,告知交换器消息应该将消息投递给哪个队列

RabbitMQ

特点:

  • 可靠性:使用持久化传输确认发布确认来保证消息的可靠性
  • 路由灵活:在消息进入队列之前,通过交换机来路由消息,内置了一些简单的交换机,交换机之间可以绑定在一起,也可以通过插件机制实现自己的交换机
  • 扩展性:多个RabbitMQ节点可以组成一个集群,可以根据实际业务情况,动态的扩展集群中的节点
  • 高可用
  • 多协议支持
  • 多语言支持
  • UI美观易用
  • 插件机制

消息的构成:

  • **消息头(标签Label)**:
    • 属性:
      • Routing-key 路由键
      • Priority (相较于其他消息的优先权)
      • Delivery-mode (指出该消息可能需要持久化存储)
  • 消息体

运行逻辑:生产者将消息交给RabbitMQ后,RabbitMQ会根据消息头,将消息发送给感兴趣的消费者

**Exchange (交换器)**:

  • 位置处于发送者与消息队列之间,行为是根据规则,将消息分配到对应的 Queue 中
  • 四种交换机类型:
    • Fanout:广播,将发到该交换机的消息路由到所有绑定的队列中
    • Direct:将消息路由到 BingingKey 与 RoutingKey 完全匹配的 Queue 中
    • Topic:RoutingKey 是一串由 “.” 分割的字符串 (eg: city.chengdu),BindingKey 类似,但存在两种特殊的字符串,”“ 和 “#”,其中 ““ 用于匹配一个单词,而 “#” 用于匹配非负数个单词
    • Headers:通过消息头属性 header 精确匹配 bindingKey 来路由

运行逻辑:在使用 exchange 之前,需要将 exchange 与特定的消息队列绑定起来,通过 BindingKey 来标识绑定关系(一个绑定本质上是一种路由规则,负责将消息经过 exchange 路由到队列),当消息的 RoutingKey 和 BindingKey 相匹配的时候,消息会被路由到相应的队列中

**Queue (消息队列)**:

  • 用于保存消息和将消息发送给消费者,它既是消息的容器,也是消息的终点,一个消息可以投入多个队列,在消费者未消费之前,会一直存储在队列中
  • 当多个消费者同时订阅了同一个队列的时候,队列中的消息会被平均分配(轮询)给多个消费者处理
  • RabbitMQ 不知道队列层面的广播消费

Broker (消息中间件的服务节点)

  • 可以看作一个消息队列的服务实例(一个服务节点,一台服务器)

死信队列

  • 当一个队列中的消息变为死信后,会被重新发送到另一个交换器 (DLX) 中,绑定 DLX 交换器的队列就是死信队列
  • 导致死信的原因:
    • 消息被拒绝 (Basic.Reject / Basic.Nack 且 requeue = false)
    • 消息 TTL 过期
    • 队列满了,无法添加

延迟队列

  • 拥有延迟消费者获取消息能力的队列,当消息来到延迟队列中,消费者无法立刻拿到消息,而是需要等待特定时间后,才可以拿到这个消息。
  • RabbitMQ 本身没有延迟队列,但是可以通过 TTL (Time To Live) 和 DLX 来模拟延迟队列(设置 DXL 队列 TTL 或者设置消息 TTL)
    • image-20240811183831965

RabbitMQ消息怎么传输?

  • 基于信道 (Channel),生产者和消费者会通过 Channel 来传输数据
  • Channel 基于 TCP,是建立在 TCP 链接上的虚拟链接,RabbitMQ 可以通过一条 TCP 链接创建任意数量 Channel,该 TCP 链接被线程共享,每个信道都有唯一的 ID,对应一个线程使用

如何保证消息的可靠性:

  • 消息丢失的可能:
    • 消息到MQ的过程丢失
      • 事务机制
      • Confirm机制(两者互斥):消息成功发送到 MQ 后,MQ 会回复确认
        • 一个是 publisher-return:当消息无法路由给任何队列的时候,会返回给发送者
          • Publisher-return 主要用于判断消息是否找到了匹配的队列
        • 一个是 publisher-confirm:用于确认消息是否被 RabbitMQ 成功接收和处理
          • Publisher-confirm:用于确认消息被正确的接收和处理
          • 正确的处理后会返回 AcK,其他情况则是返回 Nack 和错误原因
    • MQ自己丢失
      • 持久化
      • 集群
      • 普通模式
      • 镜像模式
    • MQ到消费过程丢失
      • basicAck 机制
      • 死信队列
      • 消息补偿机制
    • image-20240811183846923

消息队列的工作模式:

  • P2P:一个交换机对应一个队列对应一个消费者,也就是 direct
  • Pub/sub:消费者监听自己的队列,一个交换机绑定了多个队列,交换机收到消息会发送到绑定的各个队列中
  • Work Queues
    • 多个消费者,消费一个队列中的消息,使用轮询的方式
  • Routing
    • 消费者监听自己的队列,交换机根据 RoutingKey 来转发消息到指定队列
  • Topic
    • 消费者监听自己的队列,交换机根据带有通配符的 RoutingKey 转发消息到指定队列

如何保证 RabbitMQ 消息的顺序性:

  • 拆分多个 queue,每个 queue 对应一个 consumer
    • image-20240811183854406
  • 一个 queue 对应一个 consumer,该 consumer 在内部用内存队列排队,分发给底层不同的 worker 处理
    • image-20240811183859894

如何保证 RabbitMQ 的高可用:

  • 普通集群(只同步队列,不同步队列中的数据)让多个 Broker 为一个 queue 服务,提高了吞吐量
  • 镜像集群(每个实例都拥有完整的 queue)

如何解决消息队列的延时以及过期失效的问题:

  • 当大量数据堆积在消息队列中且无法及时得到处理,导致 TTL 用光被丢弃或者长期得不到处理的时候,可以记录下被阻塞或者丢弃的消息,在消息队列负载较低的时候重新放入

RabbitMQ高级

消息可靠性:

  • 发送者可靠性

    • 失败重连
      • image-20240811183927970
    • 生产者确认
      • image-20240811183927970
  • MQ的可靠性

    • 数据持久化
    • LazyQueue
      • image-20240811183927970
  • 消费者的可靠性

    • image-20240811184009686
    • 解决消息的幂等性
      • image-20240811184009686
      • 基于业务本身判断
  • 延迟消息

    • image-20240811184041996