type
status
date
slug
summary
tags
category
icon
password
💡
RabbitMQ使用方式

Docker 安装RabbitMQ

这里注意获取镜像的时候要获取management版本的,不要获取last版本的,management版本的才带有管理界面。
查询镜像
得到下列结果
notion image
下载镜像
出现如下结果
notion image
运行镜像
访问管理界面
地址为 服务器ip:15672 用户名和密码都是guest
notion image

Java方式实现RabbitMQ

1、helloword

生产者
消费者

提取公共部分,创建工具类

RabbitMQUtils

2、Work queues

生产者
消费者1
消费者2

3、fanout

生产者
消费者一
消费者二

4、direct

生产者
消费者一
消费者二

5、topic

  • 符号可以代替一个单词。
  • #符号可以替代零个或多个单词。
生产者
消费者一
消费者二

Springboot实现方式

消费者方绑定队列和交换机的方式

环境

编写application.yml文件

1、helloword

生产者
消费者

2、Work queues

生产者
消费者

3、fanout

生产者
消费者

4、direct

生产者
消费者

5、topic

生产者
消费者

生产者方绑定队列和交换机的方式

1、fanout

生产者方
在pom.xml文件导入Maven依赖
配置application.yml文件
编写配置类
编写生产者的接口,发送消息
启动项目,调用接口
notion image
检查rabbitMq管理页,消息已经发送成功
notion image
消费者方
在pom.xml文件导入Maven依赖
配置application.yml文件
编写监听类
启动项目,查看控制台
notion image

2、direct

生产者方
pom.xml文件
application.yml文件
编写配置类
编写生产者的接口,发送消息
启动项目,调用接口
notion image
查看rabbitmq控制台,由于发送消息方法是往路由key值为info队列发送,所以只有路由key为info的directQueue1队列可以接收到,而路由key值为error的directQueue2队列接受不到消息
notion image
消费者方
pom.xml文件
application.yml文件
编写监听类
启动项目,查看控制台,由于只有directQueue1里有消息,所以消费者只能看到这个队列消息
notion image

3、topic

生产者
编写pom.xml文件,导入Maven依赖
配置文件application.yml
编写配置类
编写接口
启动项目,调用接口
notion image
查看rabbitmq控制台,由于发送消息的方法 路由key值为user.save,所以只有路由key为user.#的topicQueue1的队列和路由key值为user.save的topicQueue2队列可以收到消息,而路由key值为user的topicQueue3队列则收不到消息
notion image
消费者方
pom.xml文件
application.yml文件
编写监听类,获取消息
启动项目,在控制台查看打印信息,由于只有队列topicQueue1,topicQueue2里有消息,所以消费者只能看到这两个队列消息
notion image

RabbitMQ的高级特性

生产者方消息确认机制

在使用RabbitMQ的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
  • confirm 确认模式
  • return 退回模式
RabbitMQ大体流程路径:
notion image
  • 消息从消费者(Producer) 到 交换机(exchange) 则会调用一个回调方法setConfirmCallback()
    • 方法参数: correlationData 相关信息、ack 是否发送成功、cause 失败原因
  • 消息从交换机(exchange)到 队列(queue) 则会调用一个回调方法setReturnCallback()
    • 方法参数: message消息内容、 replyCode回应码、replyText回应消息、exchange交换机、routingKey路由key值
编写配置文件application.yml
编写配置类
编写接口
启动测试,查看控制台信息
notion image
根据RabbitMQ大体流程路径,从生产者角度,做如下测试:
1、如果发送的交换机没有进行配置,即不存在
启动测试,调用接口,查看控制台
2、如果交换机配置了,但是没有绑定队列
启动测试,调用接口,查看控制台

消费者方消息确认机制

这里是把消息的自动确认机制,改为手动确认消息,即把 AcknowledgeMode.NONE 改为 AcknowledgeMode.MANUAL
配置文件application.yml
编写开启手动确认,以及自定义监听类
如果执行业务出错,则可以拒绝消息,对此方法有channel.basicReject(deliveryTag, true)channel.basicNack(deliveryTag, false, true) 两种方式处理
channel.basicReject(deliveryTag, true): 拒绝消费当前消息,如果第二参数传入true,就是将数据重新丢回队列里,那么下次还会消费这消息。设置false,
就是告诉服务器,我已经知道这条消息数据了,因为一些原因拒绝它,而且服务器也把这个消息丢掉就行。 下次不想再消费这条消息了。使用拒绝后重新入列这
个确认模式要谨慎,因为一般都是出现异常的时候,catch异常再拒绝入列,选择是否重入列。但是如果使用不当会导致一些每次都被你重入列的消息一直消费-入
列-消费-入列这样循环,会导致消息积压。
channel.basicNack(deliveryTag, false, true): 第一个参数依然是当前消息到的数据的唯一id;
第二个参数是指是否针对多条消息;如果是true,也就是说一次性针对当前通道的消息的tagID小于当前这条消息的,都拒绝确认。
第三个参数是指是否重新入列,也就是指不确认的消息是否重新丢回到队列里面去。
同样使用不确认后重新入列这个确认模式要谨慎,因为这里也可能因为考虑不周出现消息一直被重新丢回去的情况,导致积压。

消费端限流

为防止大量消息全部给到消费端系统,可以对消费端做一个限流操作
开启方式的前提一定是消费端改为手动确认信息,否则就不成功

TTL:消息过期时间

1、队列内消息统一过期

notion image

2、消息单独过期

单独为某条消息设置过期属性,可以通过发布消息的convertAndSend()重载方法中,MessagePostProcessor消息发布处理器来设置过期属性
notion image
notion image
通过源码可以发现,MessagePostProcessor是一个函数式接口,而接口的方法有一个Message类型的参数,以此便可以设置消息属性了
notion image
不过,需要注意的是,当队列中有多条消息,有过期时间的消息不会在达到时间后从队列中自动消失,而是只有消息在队列顶端时,才会判断其否过期了
而如果同时设置队列过期时间和消息过期时间,则以时间短的为准

死信队列

Rabbitmq的死信队列,其实指的死信交换机(DLX),但虽然名称是这么一个叫法,但是死信交换机也是一个正常的交换机,和一般的交换机是没有区别的
而进入死信交换机的消息,大致有三种情况:

1、消息TTL过期

生产方的配置类
编写接口
启动测试,调用接口发送消息,不消费消息,3秒后查看rabbitmq管理页面,消息已进入死信队列
notion image

2、队列达到最大长度

生产方的配置类
编写接口
启动测试,调用接口发送消息,不消费消息,查看rabbitmq管理页面,由于设置队列最大长度为5,所以有5条消息进入死信队列
notion image

3、消息被拒绝

编写生产方配置类
编写接口
编写 开启手动确认,以及自定义监听类
执行业务出错,消息被拒绝(basic.reject/ basic.nack)并且requeue=false
启动测试,调用接口发送消息,查看rabbitmq管理页面,由于消费者方处理业务失败产生错误,拒绝签收消息,且不重回队列,则消息进入死信队列
notion image

延迟队列

RabbitMQ很遗憾不支持原生的延迟队列,但是我们 可以根据TTL过期时间和死信队列组合实现延迟队列 功能
生产方的配置类
编写接口
编写消费者方
启动测试,调用接口发送消息,查看消费者方控制台,5秒后会打印一条消息
notion image

延迟队列(插件方式)

注:前提是通过docker 安装RabbitMQ-Management
下载插件
复制到docker容器MQ的插件目录下
注:docker cp 文件目录 docker容器id:容器内的路径
进入到容器中
开启插件
查看插件是否安装
notion image

消费者方绑定方式实现延迟队列

创建接收消息类,使用@RabbitListener注解监听消息并同时绑定queue队列、exchange交换机、routing key
发送消息
然后启动项目,可以看到rabbitMQ控制台显示出了定义的exchange,注意type显示为x-delayed-message
notion image
:注意插件版的死信队列Features字段不会显示DLX
调用接口,定义10秒的延迟
notion image
 

RabbitMQ消息序列化

配置类

消费者代码

连接多个RabbitMQ源

注:以下为springboot方式

自定义配置文件内容

创建对应的Properties属性类

记得加上@EnableConfigurationProperties({CustomRabbitProperties.class})
主要内容如下:

Rabbitmq配置类

主要内容如下:

发送消息

根据配置内容,@Qualifier 指定发送对象

接受消息

根据配置内容,指定监听器工厂 containerFactory = "primaryListenerFactory"
@RabbitHandler:如果@RabbitListener注解在类上,可通过@RabbitHandler在不同方法上,接受不同对象类型消息
除此之外,需要在声明队列,交换机,绑定的地方指定对应的AmqpAdmin, 否则每个virtual-host上都创建一边
Docker 容器消息中间件之ActiveMQ