全方位总结中间件之RabbitMQ

快速索引

  1. 消息队列
  2. AMQP & RabbitMQ
  3. 交换机
  4. RabbitMQ可靠性传输
  5. 死信队列
  6. 消息补偿机制
  7. 延迟队列
  8. 消费顺序&消息幂等
  9. 企业级RabbitMQ框架封装

1. 消息队列

关于什么是消息队列,这里不赘述,可以参考这篇文章 消息队列(mq)是什么?

2. AMQP & RabbitMQ

RabbitMQ是采用 Erlang语言实现AMQP协议的消息中间件,AMQP全称是 Advanced Message Queue Protocolg,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开放语言等条件的限制

RabbitMQ是AMQP协议的一个开源实现,其内部模型实际上也是 AMQP的内部模型,如下图所示:

image-20210328092207208

AMQP模型的工作流程如下:消息(Message) 被发布者 (publisher) 发送给交换机(exchange),交换机常常被比喻成邮局或者邮箱,然后交换机将收到的消息根据路由规则分发给绑定的队列(queue),最后AMQP代理会将消息投递给订阅此队列的消费者,或者消费者按照需求从队列中拉取消息

总结下就是,AMQP定义了完整的消息队列协议,包括消息队列模型、消息确认机制等等,而RabbitMQ就是AMQP协议的一个broker实现, 有了RabbitMQ之后,还需要生产者和消费者也按照AMQP协议实现,从而完成整个基于AMQP协议的[producer]-Broker-comsumer消息通信。

3. 交换机

RabbitMQ中一共实现了四种类型的交换机(Exchange),分别是:

  1. Fanout
  2. Direct
  3. Topic
  4. Headers

下面来一一介绍:

1. Fanout类型的交换机

发送到Fanout交换机的消息都会路由到与该交换机绑定的所有队列上,可以用来做广播

不处理路由键,只需要简单的将队列绑定到交换机上

Fanout交换机转发消息是最快的

image-20210328093346658

2. Direct类型的交换机

把消息路由到BindingKey和RoutingKey完全匹配的队列中

image-20210328093404692

3. Topic类型的交换机

上面说到,direct类型的交换器路由规则是完全匹配RoutingKey和BindingKey。topic和direct类似,也是将消息发送到RoutingKey和BindingKey相匹配的队列中,只不过可以模糊匹配。

RoutinKey为一个被“.”号分割的字符串(如com.rabbitmq.client)
BindingKey和RoutingKey也是“.”号分割的字符串
BindKey中可以存在两种特殊字符串“*”和“#”,用于做模糊匹配,其中“*”用于匹配不多不少一个词,“#”用于匹配多个单词(包含0个,1个)
BindIngKey 能够匹配到的RoutingKey
java.# ====> java.lang,java.util, java.util.concurrent
java.* ====> java.lang,java.util
..uti ====> com.javashitang.util,org.spring.util

image-20210328093437218

4. Headers类型的交换机

headers类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送消息内容中的headers属性进行匹配。headers类型的交换器性能差,不实用,基本上不会使用。

4. RabbitMQ可靠性传输

1. 生产者到Broker的可靠性传输

1)方案一:事务

事务机制虽然保证了消息投递端的可靠性,但因为每次投递都开启了事务,所以性能较低,一般不推荐使用

为了保证这一步的可靠性,AMQP 协议在建立之初就提供了事务机制。RabbitMQ 客户端中与事务机制相关的方法有三个:channel.txSelectchannel.txCommit 以及 channel.txRollback。channel.txSelect 用于将当前的信道设置成事务模式,channel.txCommit 用于提交事务,而 channel.txRollback 用于事务回滚。在通过channel.txSelect 方法开启事务之后,我们便可以发布消息给 RabbitMQ 了,如果事务提交成功,则消息一定到达了 RabbitMQ 中,如果在事务提交执行之前由于 RabbitMQ 异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行 channel.txRollback 方法来实现事务回滚。

代码逻辑框架如下:

1
2
3
4
5
6
7
8
9
10
11
12
try {
// 开启事务
channel.txSelect();
// 发送消息
channel.basicPublish(exchange, routingKey, props, body);
// 事务提交
channel.txCommit();
} catch(Exception e) {
// 事务回滚
channel.txRollback();
e.printStackTrace();
}

2)方案二:Confirm机制

Confirm 机制就比较好的兼顾了性能以及可靠性

注意:事务机制和 Confirm 机制是互斥的,两者不能共存,会导致 RabbitMQ 报错。

开启 Confirm 机制后,所有在该信道上面发布的消息都会被指派一个唯一的ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一 ID),这就使得生产者知晓消息已经正确到达了目的地了。RabbitMQ 回传给生产者的确认消息中的 deliveryTag 包含了确认消息的序号,此外 RabbitMQ 也可以设置 channel.basicAck 方法中的 multiple 参数,表示到这个序号之前的所有消息都已经得到了处理。

代码逻辑框架如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 开启confirm机制
channel.confirmSelect();
// 添加回调监听器
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
// TODO 消息投递成功
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
// TODO 消息投递失败
}
});

confirm的流程如下图所示:

image-20210328094129487

关于mandatory参数

mandatory 参数设为 true 时,如果 Exchange 无法根据自身的类型和路由键找到一个符合条件的队列的话,那么RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者。而 mandatory 参数设置为 false 时,出现上述情形的话,消息直接被丢弃。那么生产者如何获取到没有被正确路由到合适队列的消息呢?这时候可以通过调用 channel.addReturnListener 来添加 ReturnListener 监听器实现。

代码逻辑框架如下:

1
2
3
4
5
6
7
// 添加回调监听器 
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
// TODO 消息routingKey未匹配到队列
}
});

关于备份交换机

生产者在发送消息的时候如果不设置 mandatory 参数,那么消息在未被路由的情况下将会丢失,如果设置了 mandatory参数,那么需要添加 ReturnListener 的编程逻辑,生产者的代码将变得复杂化。如果你不想复杂化生产者的编程逻辑,又不想消息丢失,那么可以使用备份交换器,这样可以将未被路由的消息存储在 RabbitMQ 中,再在需要的时候去处理这些消息。可以通过在声明交换器(调用 channel.exchangeDeclare 方法)的时候添加 alternate-exchange 参数来实现。

image-20210328094532357

2. Broker自身的可靠性

1) 持久化

持久化可以提高 RabbitMQ 的可靠性,以免在 RabbitMQ 意外宕机时数据不会丢失,RabbitMQ 的 Exchange、Queue 以及 Message 都是支持持久化的,Exchange 和 Queue 通过在声明的时候将 durable 参数置为 true 即可实现,而消息的持久化则需要将投递模式(BasicProperties 中的 deliveryMode 属性)设置为2(PERSISTENT)。但需要注意的是,必须同时将 Queue 和 Message 持久化才能保证消息不丢失,仅设置 Queue 持久化,重启之后 Message 会丢失,反之仅设置消息的持久化,重启之后 Queue 消失,既而 Message 也丢失。

2) 集群

上述持久化的操作保证了消息在 RabbitMQ 宕机时不会丢失,但却不能避免单机故障且无法修复(比如磁盘损毁)而引起的消息丢失,并且在故障发生时 RabbitMQ 不可用。这时就需要引入集群,由于 RabbitMQ 是基于 Erlang 编写的,所以其天生支持分布式,而不需要像 Kafka 那样要通过 Zookeeper 来实现,RabbitMQ Cluster 集群共有两种模式。

a) 普通模式

普通模式下,集群中的 RabbitMQ 会同步 Vhost、Exchange、Binding、Queue 的元数据(即其本身的数据,例如名称、属性等)以及 Message 结构,而不会同步 Message 数据,也就是说,如果集群中某台机器 RabbitMQ 宕掉了,则该节点上的 Message 不可用,直至该节点恢复。

b) 镜像模式

镜像队列相当于配置了副本,绝大多数分布式的东西都有多副本的概念来确保 HA(High Availability)。在镜像队列中,如果主节点(master)在此特殊时间内挂掉,可以自动切换到从节点(slave),这样有效的保证了高可用性,除非整个集群都挂掉。

3. Broker到消费者的可靠性传输

为了保证消息从队列可靠地达到消费者,RabbitMQ 提供了消息确认机制(message acknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 等于 fals e时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除)。当 autoAck 等于 true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正的消费到了这些消息。

这时大家可能会问,如果 RabbitMQ 在等待回调的过程中,消费者服务挂掉怎么办?
对于 RabbitMQ 而言,队列中的消息分成了两个部分:

  • 一部分是等待投递给消费者的消息;
  • 一部分是已经投递给消费者,但是还没有收到消费者确认信号的消息。

如果 RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者。RabbitMQ 判断此消息是否需要重新投递的唯一依据是消费该消息的消费者连接是否已经断开,这种设计允许消费者消费一条消息很久很久。

如果消息消费失败,也可以调用Basic.Reject 或者 Basic.Nack来拒绝当前消息,但需要注意的是,如果只是简单的拒绝那么消息将会丢失,需要将相应的 requeue 参数设置为 true,RabbitMQ 才会将这条消息重新存入队列。而如果 requeue 参数设置为 false 的话,RabbitMQ 立即会把消息从队列中移除,而不会把它发送给新的消费者。

代码示例:

1
2
3
4
5
6
// 确认消息
channel.basicAck(deliveryTag, multiple);
// 拒绝消息
channel.basicNack(deliveryTag, multiple, requeue);
// 拒绝消息
channel.basicReject(deliveryTag, requeue)

PS:basicNack 和 basicReject 作用基本相同,主要差别在于前者可以拒绝多条,后者只能拒绝单条,另外basicNack 不是 AMQP 0-9-1 标准。

5. 死信队列

死信队列和上面提到的备份交换机有类似之处,同样也是声明一个 Exchange,如果一个消息因为被拒绝过期或是队列已满等情况变成了死信,那么它会被重新发送到这个 Exchange 并路由到死信队列。而判断一个消息是否是死信主要有如下几条:

  1. 消费方拒绝消息时没有将 requeue 设置为 true
  2. 消息在队列中过期了(队列的过期时间可以通过 x-message-ttl 参数控制,或者发送消息时声明,同时存在取小值)
  3. 队列已经满了

使用方式也跟备份交换机很像,只不过这个是在申明队列的时候设置 x-dead-letter-exchange 参数。

image-20210328095221839

6. 消息补偿机制

除了以上的保障措施之外,为了防止生产者发送消息失败或者接收 RabbitMQ confirm 的时候网络断掉等,我们还需要一套完善的消息补偿机制,以下是目前业界主流的两种方案。

1. 消息落库,对消息进行状态标记

image-20210328095315705

2. 延迟投递,做二次确认,回调检查

image-20210328095359650

7.延迟队列

1. RabbitMQ 3.6.x之前,一般采用死信队列+TTL过期时间来实现延迟队列

这里不赘述,在第9节中,具体代码介绍这种方式是如何实现延迟队列的。这里重点说下,这种实现延迟队列方式的缺点:

  1. 如果统一用队列来设置消息的TTL,当梯度非常多的情况下,比如1分钟,2分钟,5分钟,10分钟,20分钟,30分钟……需要创建很多交换机和队列来路由消息。
  2. 如果单独设置消息的TTL,则可能会造成队列中的消息阻塞——前一条消息没有出队(没有被消费),后面的消息无法投递。
  3. 可能存在一定的时间误差。

关于这些缺点的体现,在第9节中具体介绍。

2. RabbitMQ 3.6.x 开始,RabbitMQ 官方提供了延迟队列的插件

插件官方地址: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

安装方式这里不介绍了,可以看官方文档。如果安装成功后,应该看到一个x-delayed-message类型的交换机,如下图所示:

image-20210328100009618

这里需要提一下使用此插件的一个问题,在其项目的README末尾也提到这个局限性。

image-20210328100214347

关键原话翻译出来就是:

该插件是在考虑磁盘节点的情况下创建的。当前不支持RAM节点,并且对它们的支持并不是优先考虑的事情

所以如果要使用此插件实现RabbitMQ的延迟队列,这点需要评估下。

8. 消费顺序&消息幂等

1. 消费顺序

​ 一般来讲,消费者是无序处理消息的。但是对于一些需要保证顺序的异步操作,比如,减库存->发通知,这两件事必须要按顺序进行,但是又都想让他们异步进行,这个时候如果不做特殊处理可能就会变成先发通知后减库存了。这种情况下可以只异步执行发通知,或者将发通知放在减库存异步执行成功时调用,或者减库存和发通知在一个消费者逻辑中同步执行。 具体业务情况需要具体设计处理。 一般不会强制让MQ顺序消费,因为那样会大大的降低MQ的性能。

2. 消息幂等

这里不罗列解决方案了,通常都是通过一个记录一个唯一id的状态来避免重复消费。当然具体到消费者逻辑里面,还需要保证业务上操作的幂等。

9. 企业级RabbitMQ框架封装

1. 封装后的使用方式

首先创建一个MQ处理器,继承自MqProcessor

image-20210328101619637

然后重写init方法

image-20210328101638453

在init方法中添加注册消息消费的代码逻辑

image-20210328101709459

然后就可以这样发送消息了

image-20210328101727329

2. 源码解析

1)关键方法实现分析 : MqProcessor#addReceiver

我们从这个方法的实现分析中最主要想获得是以下几点:

  • 如何创建普通队列的?
  • 如何创建延迟队列的?
  • 如何创建失败队列的?

在调用addReceiver方法后,会创建队列;

image-20210328101849972

我们仔细看这个方法的实现:

image-20210328101917478

先分别创建交换机:

image-20210328101932835

再创建队列:

image-20210328101950844

image-20210328102005822

最后绑定交换机和队列:

image-20210328102107623

然后创建返回监听容器:

image-20210328102127240

然后通过监听容器的回调进行消费逻辑处理。这里面的处理逻辑非常丰富。包含了消息幂等处理、延迟消息处理、消费业务逻辑执行、错误&异常处理、消费重试等。接下来具体分析。

image-20210328102145951

  • 如何保证消息幂等的?

看到这一行代码:

image-20210328102228570

image-20210328102241670

可以看到其中通过查询另一个redisKey来决定是否要继续往下执行。那么这个key是用来干嘛的呢? 没错,就是框架对消息幂等的一个处理。如果这个消息已经消费过了,那么不会重复消费。

我们多跳几行,看下这个key是什么时候设置上的。

image-20210328102310095

image-20210328102320064

可以看到,框架中是通过缓存唯一Id的方式来进行消息幂等处理的。通过traceId+traceLevel+queueName形成一个消息的标示,由此来保证同一个消息如果已经消费成功了,不会重复消费。

  • 何时执行延迟消息?

延迟消息的处理关键逻辑如下:

image-20210328102352412

这里是判断是否需要转发到延迟队列的逻辑。如果消息中包含了hdf-message-d这个头部属性,并且取出的延迟时间相对当前时间大于1秒,则发送到延迟队列,否则直接消费。

image-20210328102419597

转发到延迟队列的逻辑也很简单。 延迟队列本质上也是一个普通队列,只不过没有这个队列没有绑定消费者。当消息设置的TTL到达时,会自动通过队列中设置的死信队列配置,被rabbitmq转发到正常的队列,然后被监听正常队列的消费者消费。

image-20210328102438225

这里为什么正常的队列可以消费到呢,因为在创建队列时,有一行代码正是将正常队列绑定到了delay这个交换机。因此,转发消息到delay,通过正常的routingKey可以匹配到正常的队列中,这样消息就可以被放到有消费者的队列里了。

image-20210328102457831

  • 如何实现消费失败自动重试的?

image-20210328102528495

可以看到当消费者返回false,当出现rpcBizException,或其他异常时,会产生非零的状态码,于是进入后处理逻辑中。

image-20210328102547897

image-20210328102601600

可以看到,processFail的逻辑是:当重试次数少于5此,就重新发送到延迟队列,不过会设置一个更长的TTL时间;如果重试次数大于5时,直接转发到失败队列了。此时,我们需要认为去处理失败队列。

image-20210328102625604

最后,消费完成后,不管是消费失败还是成功,都会ack告诉MQ,这个消息我处理完了。可以从MQ中删除这个消息了。

最后总结下,这样一个企业级MQ实现,其实就是通过完全手动确认的方式,完成了正常的消息队列创建、发消息、消费消息、以及延迟消息的转发、消费失败通过延迟重试的方式补偿以及提供失败队列,供系统维护者手动处理失败队列。

这样一个封装下,基本可以满足业务开发需要了。但是还是存在几个缺点:

  1. 生产者到MQ的消息投递没有保证真正的可靠性,需要业务方自己兜底;

  2. 延迟队列是使用死信队列+TTL方式实现,需要创建大量梯队的延迟队列,无法做到无消耗的随机时间延迟,并且这种延迟实现的方式存在一定的时间误差;

    当然,在业务量一般的情况下,基本满足生产需要了。比较适合的才是最好的,没必要过于追求完美。毕竟,追求完美也是有代价的!