此篇文章内容较多,包括 RabbitMQ 相关的一些简单理论介绍,和消息生产者 Provider 推送实例,消息消费者 Consumer 实例,Direct、Topic、Fanout 的使用,消息回调、手动确认等。
关于 RabbitMQ 的安装,就不介绍了。在安装完 RabbitMQ 后,输入 http://ip:15672/ ,可以看到一个简单的后台管理界面。
在这个界面里面我们可以做些什么?
可以手动创建虚拟 host,创建用户,分配权限,创建交换机,创建队列等等,还有查看队列消息,消费效率,推送效率等等。以上这些管理界面的操作在这篇暂时不做扩展描述,我想着重介绍后面实例里会使用到的。首先先介绍一个简单的一个消息从推送到接收的流程,提供一个简单的图:
左边的黄色圈圈就是我们的消息推送服务,将消息推送到中间方框里面也就是 RabbitMQ 的服务器,然后经过服务器里面的交换机、队列等各种关系(后面会详细讲)将数据处理入列后,最终右边的蓝色圈圈消费者获取对应监听的消息。
常用的交换机有以下三种,因为消费者是从队列获取信息的,队列又绑定在交换机上(一般),所以对应的消息推送/接收模式也会有以下几种:
Direct Exchange
直连交换机,根据消息携带的路由键将消息投递给对应队列。
大致流程是,有一个队列绑定到一个直连交换机上,同时赋予一个路由键 Routing Key 。然后当一个消息携带着路由值为 X,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值 X 去寻找绑定值也是 X 的队列。
Fanout Exchange
扇型交换机,这个交换机没有路由键的概念,就算绑了路由键它也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。
Topic Exchange
主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。简单地介绍下规则:
* (星号) 用来表示一个单词 (必须出现的)
# (井号) 用来表示任意数量(零个或多个)单词
通配的绑定键是跟队列进行绑定的,举个小例子:
队列 Q1 的绑定键为 *.TT.*
队列 Q2 的绑定键为 TT.#
如果一条消息携带的路由键为 A.TT.B,那么队列 Q1 将会收到;
如果一条消息携带的路由键为 TT.AA.BB,那么队列 Q2 将会收到。
主题交换机是非常强大的,为啥这么膨胀?
当一个队列的绑定键为 “#“ (井号)的时候,这个队列将会无视消息的路由键,接收所有的消息,此时主题交换机就拥有扇形交换机的行为。当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有直连交换机的行为。所以主题交换机既可以实现扇形交换机的功能,也可以实现直连交换机的功能。
除了这三种常用的交换机以外,还有 Header Exchange(头交换机),Default Exchange(默认交换机),Dead Letter Exchange(死信交换机),这几种此篇暂不讲述。
代码实例
好了,一些简单的介绍到这里为止,接下来我们来一起编码。这个小例子需要创建 2 个 SpringBoot 项目,一个是 rabbitmq-provider (生产者),一个是 rabbitmq-consumer (消费者),用这两个项目来实现常用的三种交换机。
直连交换机 rabbitmq-provider
首先创建 rabbitmq-provider,pom.xml 里用到的依赖:
1 | <dependency> |
然后是 application.yml 的配置(虚拟 host 配置项不是必须的,如果在 RabbitMQ 服务器上创建了自己的虚拟 host,就配置;如果没创建,就不用配置):
1 | server: |
接着我们先使用下 Direct Exchange(直连交换机),创建 DirectRabbitConfig.java(对于队列和交换机持久化以及连接使用设置,在注释里有说明,后面的不同交换机的配置就不做同样说明了):
1 | import org.springframework.amqp.core.Binding; |
然后写个简单的接口进行消息推送(根据需求也可以改为定时任务等等,具体看需求),SendMessageController.java:
1 | import org.springframework.amqp.rabbit.core.RabbitTemplate; |
把 rabbitmq-provider 项目运行,调用下接口:
因为我们目前还没有消费者 rabbitmq-consumer,消息没有被消费的,所以我们可以去 RabbitMQ 的管理页面看看,是否推送成功:
再看看队列(界面上的各个英文项代表什么意思,可以自己查查,对理解还是有帮助的):
很好,消息已经推送到 RabbitMQ 服务器上面了。
直连交换机 rabbitmq-consumer
接下来,创建 rabbitmq-consumer 项目,pom.xml 里的依赖:
1 | <dependency> |
然后是 application.yml 的配置:
1 | server: |
然后一样,创建 DirectRabbitConfig.java(消费者单纯的使用,其实可以不用添加这个配置,直接建后面的监听就好,使用注解来让监听器监听对应的队列即可。配置上了的话,产生的变化是让消费者也拥有了生产者的身份,也能推送该消息):
1 | import org.springframework.amqp.core.Binding; |
然后是创建消息接收监听类,DirectReceiver.java:
1 |
|
然后将 rabbitmq-consumer 项目运行起来,可以看到把之前推送的那条消息消费下来了:
然后可以再继续调用 rabbitmq-provider 项目的推送消息接口,可以看到消费者即时消费消息:
那么直连交换机既然是一对一,那如果咱们配置多台监听绑定到同一个直连交互的同一个队列,会怎么样?
可以看到是用轮询的方式对消息进行消费,而且不存在重复消费。
主题交换机 rabbitmq-provider
接着,我们使用 Topic Exchange 主题交换机。在 rabbitmq-provider 项目里面创建 TopicRabbitConfig.java:
1 | import org.springframework.amqp.core.Binding; |
然后添加 2 个接口,用于推送消息到主题交换机:
1 |
|
主题交换机 rabbitmq-consumer
生产者那边完事后,先不急着运行,在 rabbitmq-consumer 项目上,创建 TopicManReceiver.java:
1 | import org.springframework.amqp.rabbit.annotation.RabbitHandler; |
再创建一个 TopicTotalReceiver.java:
1 | import org.springframework.amqp.rabbit.annotation.RabbitHandler; |
同样,加主题交换机的相关配置,TopicRabbitConfig.java(消费者一定要加这个配置吗? 不需要的其实,理由在前面已经说过了):
1 | import org.springframework.amqp.core.Binding; |
然后把 rabbitmq-provider,rabbitmq-consumer 两个项目都跑起来,先调用 /sendTopicMessage1 接口:
然后看消费者 rabbitmq-consumer 的控制台输出情况:
TopicManReceiver 监听队列1,绑定键为:topic.man
TopicTotalReceiver 监听队列2,绑定键为:topic.#
而当前推送的消息,携带的路由键为:topic.man
所以可以看到两个监听消费者 receiver 都成功消费到了消息,因为这两个 recevier 监听的队列的绑定键都能与这条消息携带的路由键匹配上:
接下来调用接口 /sendTopicMessage2:
然后看消费者 rabbitmq-consumer 的控制台输出情况:
TopicManReceiver 监听队列1,绑定键为:topic.man
TopicTotalReceiver 监听队列2,绑定键为:topic.#
而当前推送的消息,携带的路由键为:topic.woman
所以可以看到两个监听消费者只有 TopicTotalReceiver 成功消费到了消息:
扇型交换机 rabbitmq-provider
接下来是使用 Fanout Exchang 扇型交换机。同样地,先在 rabbitmq-provider 项目上创建 FanoutRabbitConfig.java:
1 | import org.springframework.amqp.core.Binding; |
然后写一个接口用于推送消息:
1 |
|
扇型交换机 rabbitmq-consumer
接着在 rabbitmq-consumer 项目里加上消息消费类,FanoutReceiverA.java:
1 | import org.springframework.amqp.rabbit.annotation.RabbitHandler; |
FanoutReceiverB.java:
1 | import org.springframework.amqp.rabbit.annotation.RabbitHandler; |
FanoutReceiverC.java:
1 | import org.springframework.amqp.rabbit.annotation.RabbitHandler; |
然后加上扇型交换机的配置类,FanoutRabbitConfig.java(消费者真的要加这个配置吗? 不需要的其实,理由在前面已经说过了):
1 | import org.springframework.amqp.core.Binding; |
最后将 rabbitmq-provider 和 rabbitmq-consumer 项目都跑起来,调用下接口 /sendFanoutMessage:
然后看看 rabbitmq-consumer 项目的控制台情况:
可以看到只要发送到 fanoutExchange 这个扇型交换机的消息, 三个队列都监听到了。
如何保证消息的可靠性
消息的可靠性需要两个方面来保证:
(1)生产者 -> 队列 (可靠投递)
(2)队列 -> 消费者 (可靠消费)
可靠投递
从生产者到队列需要经过两步:
(1.1)从 生产者 到 交换机
(1.2)从 交换机 到 队列
只要保证这两步的可靠传递就可以保证消息的可靠投递了。
生产者到交换机的保证机制
生产者到交换机的可靠保证依靠的是:confirmCallback 机制。
confirmCallback 翻译过来也就是确认回调,默认是不开启的。当开启这个模式之后,如果生产者给交换机发送一个消息,交换机收到这个消息之后,就会给生产者发送一个确认收到的信息。一般在配置文件中开启 confirmCallback:
1 | spring: |
这里用 Junit4 测试方法发送一个消息进行测试(如果在测试时发现无法触发回调函数,那么也许是因为 RabbitMQ 的版本过低,导致配置项不生效):
1 |
|
交换机到队列的保证机制
采用 returnCallback 机制,默认不开启。
returnCallback 翻译过来就是返回回调,但是这种回调机制与 confirmCallback 机制并不一样。confirmCallback 不管消息是否成功到达交换机都会被调用,而 returnCallback 只有在交换机到达队列失败的时候才会被触发。当这个回调函数被调用的时候,说明交换机的消息没有顺利的到达队列。
在配置文件中开启 returnCallback 机制:
1 | spring: |
测试代码:
1 |
|
在实际开发中,returnCallback 和 confirmCallback 通常会被一起写在配置类里:
1 | import org.springframework.amqp.core.Message; |
可靠消费
消息安全的到达消息队列之后,当它被消费者消费的时候,可能也会发生不可靠的传递。一般有两种解决方案,自动确认和手动确认。
自动确认
这是默认的消息确认方式,但不推荐。它的问题在于,只要 RabbitMQ 成功发出消息(即成功将消息写入 TCP Socket 中),它就认为本次投递已被正确处理,而不管消费端是不是真的成功处理了消息。所以在这种情况下,如果消费端的逻辑抛出异常,也就是消费端没有成功处理这条消息,那么就相当于这条消息丢失了。
手动确认
推荐使用手动确认模式。消费者收到消息后,进行相应的业务处理,然后手动调用 basicAck / basicNack / basicReject 等方法,RabbitMQ 根据被调用的方法来判断消息处理是否成功。
basicAck 用于肯定确认
basicNack 用于否定确认
在代码中的写法为 channel.basicNack(deliveryTag, false, true);第一个参数是当前消息的唯一 ID;第二个参数是指是否针对多条消息,如果是 true,则代表对当前通道中消息的 TagID 小于当前这条消息的,都拒绝确认;第三个参数是指是否重新入列,也就是指是否让消息重新回到队列里去,与 basicReject 一样,如果选择重新入列,会有消息积压的问题。
basicReject 用于否定确认
它在代码中的写法为: channel.basicReject(deliveryTag, true); 表示拒绝消费当前消息。与 basicNack 相比,它有一个限制:一次只能拒绝单条消息。如果第二个参数传 true,它会将数据重新丢回队列里,以后还会再次消费到这条消息。如果传 false,它就会直接把这条消息丢掉。传 true 的时候要格外谨慎,因为一般都是代码出现异常的时候才用 basicReject,如果使用不当,会导致一些消息一直处在被拒绝消费 -> 重新入列 -> 被拒绝消费 -> 重新入列的循环中,导致消息积压。
以上 3 个方法都表示消息已经被正确投递,但是只有 basicAck 表示消息已经被正确处理。
使用手动确认模式前需在消费者项目里配置:
1 | spring: |
测试代码:
1 |
|
总结
总的来说,消息的可靠性要依赖这些机制:
1 | 1 持久化: |
但即使做到这些,还是可能会有其他情况,例如:消息的重复消费等。可以看这一篇:设计层面的消息可靠性保证:消息补偿、消息幂等性保障
最后提一句,用 MQ 很重要的一点就是为了解耦,为了达到这个目的,我们不能把完全不同的业务耦合到一个消费者里去,要学会分成多个消费者去解决问题。
参考文章:
Springboot 整合 RabbitMQ,用心看完这一篇就够了
RabbitMQ 消息传递的可靠性保证:ConfirmCallback、returnCallback、手动确认
(完)