type
status
date
slug
summary
tags
category
icon
password

ActiveMQ安装(Docker环境下)

首先在docker搜索ActiveMQ,发现webcenter/activemq 这个版本使用是最多的
notion image
然后选择版本,下载activeMQ
notion image
查看镜像,配置好activemq容器的后台端口(61616)和前台端口(8161)的映射关系,并启动它
notion image
浏览器访问,如下图,便安装完成
notion image

JMS实现消息的生产者和消费者

队列的方式

生产者:
执行后的结果:
notion image
消费者(receive()方法获取信息):
执行结果:
notion image
消费者(监听器MessageListener方式获取信息)
消费者获取消息的方式
receive() 方法:该方法是同步阻塞的,如果调用者(队列或者订阅方)没有接受到消息,就会一直等待
除此之外它有一个重载方法 receive(long timeout) ,参数表示的是 等待的超时时间(毫秒),在等待时间内没有接受到消息就自动停止
setMessageListener(MessageListener listener) 方法:该方法是异步非阻塞的,而方法参数类型是MessageListener接口,这个接口里只定义了一个用于处理接收到的消息的onMessage方法,该方法只接收一个Message参数监听器。当消息到达之后,会自动调用onMessage方法获取信息
消费者的三种情况
  • 先启动生产者,生产消息,然后再启动一个消费者,是否可以正常消费信息 ?
    • 答案:可以的
  • 还是先启动生产者,生产消息,再分别启动两个消费者,后一个启动的消费者可以消费到消息吗 ?
    • 答案:否,后一个启动的消费者是消费不到的消息,消息都被前一个启动的消息者消费完了
  • 先启动两个消费者,再启动生产者生产6条消息,结果如何 ?
    • 答案:消息被平分,两个消费者一人一半,分别消费3条消息

发布订阅方式

生产者:
发布订阅的方式和队列方式, 从生产者的代码可以发现,两者是一样的,唯一不同的就是,创建目的地的方法不一样,队列是createQueue(String queueName),而发布订阅是createTopic(String topicName)。同样的,消费者的代码和队列也是一样的
消费者
不过要注意的是,发布订阅的模式,要先启用订阅方,也就是消费者方,再启动发布方(生产者)。否则,就类似微信公众号一样,没有人订阅,生产方发布消息是没有人接受的到,会被丢弃。这种则是废消息。

队列和发布订阅两者对比

比 较 项 目
Topic (发布订阅)模式
Queue (队列)模式
工 作 模 式
“订阅-发布”模式,如果当前没有订阅 者,消息将会被丢弃,如果有多个订阅 者,那么这些订阅者都会收到消息。
负载均衡 模式,如果当前没有消费者,消息也不会丢弃,如果有多个消费者,那么一条消息 也只会发送给其中一个消费者,并且要求消费者ack信息
有 无 状 态
无状态
Queue 数据默认会在 mq 服务器上以文件形式 保存,比如 Active MQ 一般保存在 $AMQ_HOME\data\kr-store\data下面,也可 以配置成DB存储
传 递 完 整 性
如果没有订阅者,消息会被丢弃
消息不会丢弃
处 理 效 率
由于消息要按照订阅者的数量进行复 制,所以处理性能会随着订阅者的增加 而明显降低,并且还要结合不同消息协 议自身的性能差异
由于一条消息只发送给一个消费者,所以就算 消费者再多,性能也不会有明显降低。当然不 同消息协议的具体性能也是有差异的。

JMS

JMS是Java EE的一个面向消息中间件的一套规范,具体需要靠厂商的产品实现
notion image

消息的三种部分

消息头

  • 在生产者方,消息的API看出可以针对每一条消息设置不同的消息头属性
notion image
  • 如果不想为每一条消息单独设置消息头属性,可以通过Send()方法的重载,批处理消息的消息头属性
notion image
  • 从批处理消息头属性和单独设置消息头属性两者结合来看,主要介绍五个消息头属性:
JMSDestination: 消息发送的目的地,主要是指 Queue 和 Topic
JMSDeliveryMode: 持久模式和非持久模式,可设置的参数为DeliveryMode.NON_PERSISTENT(非持久)DeliveryMode.PERSISTENT(持久)
一条持久性的消息:应该被传送 “一次仅仅一次”,这就意味着如果JMS 提供者出现故障,该消息并不会丢失,它会在服务器恢复之后再次传递消息。 一条非持久的消息:多会传送一次,这意味着服务器出现故障,该消息将永久丢失。
JMSPriority : 消息优先级,从0 ~ 9 十个级别,0 ~ 4 是普通消息,5 ~ 9是加急消息。
JMS 不要求 MQ 严格按照这十个优先级发送消息,但必须保证加急消息要先于普通消息到达,默认是4 级。
JMSExpiration: 可以设置消息在一定时间后过期,默认是永不过期。消息过期时间,等于Destination的send方法中的timeToLive值加上发送时刻的GMT时间
值。 如果timeToLive值等于零,则 JMSExpiration 被设为零,表示该消息永不过期。如果发送后,在消息过期时间之后消息还没有被发送到目的地,则该消息被清除。
JMSMessageID : 唯一识别每个消息的标识,由MQ产生。

消息体

封装具体的消息数据,发送和接收的消息体类型必须一致对应!
五种消息体格式:
TextMessage: 普通字符串消息,包含一个String 。
MapMessage: 一个Map类型的消息,key为String类型,而值为Java的基本类型。
BytesMessage : 二进制数组消息,包含一个byte[] 。
StreamMessage : Java数据流消息,用标准流操作来顺序的填充和读取。
ObjectMessage :对象消息,包含一个可序列化的 Java 对象。

消息属性

如果需要除消息头字段以外的值,那么可以使用消息属性!
识别、去重、重点标注等操作非常有用的方法!
消息属性是以属性名和属性值对的形式指定的。可以将属性是为消息头的扩展,属性指定一些消息头没有包括的附加信息,比如可以在属性里指定消息选择器。
以Property结尾:
notion image

消息的可靠性

持久性 队列

队列是否设置为持久,可以通过setDeliveryMode(int deliveryMode)方法设置,默认为持久

持久性 发布订阅

发布方(生产者),顺序与之前有一点不同,需要设定先持久化,在启动连接
测试代码:
订阅方(消费者)测试代码:
先启动订阅方,再启动发布方,到ActiveMQ前台的Subscribers查看
启动前情况:
notion image
启动订阅方后情况:
notion image
notion image
启动生产方 后:
notion image
订阅方停止后,会被列 离线的一栏去:
notion image
如果这时,发布者再次发布消息,订阅方重启启动是否能接收到消息
notion image
再次启动订阅方(不停止)后
notion image
notion image
小结:
第一次,一定要先启动订阅方,之后启动发布方,发布消息。这时不论订阅方在不在线,都可以接受到消息,不在线,重新启动后会接受没有收到过的消息接受下来

事务

事务偏向于生产者、签收偏向于消费者
事务只有两个值,true和false
生产者一方
当值为false时,消息会被自动发送到目的地
当值为true时,需要在调用send()方法后,调用commit()方法,手动提交
消费者一方
当值为false时,消息只会被消费一次
当值为true时,消息会被重复消费,且在目的地中,消息不会出队的,即未被消费
测试:

签收机制

签收针对的是消费者一方,分为以下四种
当事务没有开启时 : 即非事务,消息的确认情况 则以签收机制为主
当事务开始时 : 即事务情况下,消息的确认以事务为主,即是否commit的为主,不用管签收机制的情况。
注:常用的签收模式 主要以自动签收(AUTO_ACKNOWLEDGE)和手动签收(CLIENT_ACKNOWLEDGE)为主

SpringBoot结合ACtiveMQ

队列模式

1、导入maven依赖
2、编写application.properties配置文件
3、编写配置类
4、编写队列的消息生产者Produce
5、编写controller类,并访问接口
查看ActiveMQ的前台,消息已经发送到队列了
notion image
6、编写队列的消费者
删除之前入队的信息,然后重新启动 启动类,再次访问接口,并查看ActiveMQ的前台和打印台
notion image
notion image

订阅发布模式

1、导入maven依赖
2、编写application.properties配置文件
3、编写配置类
4、编写主题的消息生产者(发布方)
5、编写controller类
6、编写主题的消费者(订阅方)
7、启动主启动类,访问接口,让发布方生产消息,并查看ActiveMQ的前台和打印台
notion image
notion image
定时的订阅发布模式
让发布方每过5秒,发送一次消息
添加 @EnableScheduling 注解,发现定时任务让它后台执行
notion image
修改主题的消费者(订阅方),添加LocalDateTime.now()方法,方便在控制台查看
启动程序,查看控制台打印内容,以及(同一时间点内)查看ActiveMQ的前台
notion image
notion image

ActiveMQ传输协议

在连接activemq时,默认是用TCP连接,那Activemq是否还支持其它协议吗?
查看activemq的conf文件夹下的activemq.xml
notion image
可以看出,除了支持默认的TCP协议,还支持 amqp ,stomp,mqtt,ws等协议,而且协议对应的端口号也不一样,这就是用TCP连接时,端口号是61616的原因了。
从activemq前台也可以查看出来
notion image
如果想配置其它协议,可以查看官网的文档
配置NIO协议
notion image
从官网的文档得知,配置NIO协议,只需要修改activemq.xml文件
notion image
重启activemq(重启容器),查看activemq前台
notion image
修改代码的连接方式,并测试
查看前台
notion image

ActiveMQ持久化

持久化不同于之前的持久性,持久化是以防服务器本身挂了,重启服务器也可以恢复消息
ActiveMQ目前默认的持久化方式是KahaDB,此外还有JDBC, LevelDB等,不过它们大体上的逻辑是一样,消息中心接受到消息。会把消息备份存储起来。

KahaDB(默认)

KahaDB:这个5.4版本之后出现的默认的持久化方式, KahaDB的持久化机制是基于日志文件,索引和缓存。
activemq.xml配置文件中可以得知
notion image
它是把数据存储在activemq的data目录下的kahadb文件夹中
在这个文件夹中会有四个文件来完成消息持久化
  • db.data 它是消息的索引文件,本质上是B-Tree(B树),使用B-Tree作为索引指向db-*.log里面存储的消息
  • db.redo 用来进行消息恢复
  • db-*.log 存储消息内容。新的数据以APPEND的方式追加到日志文件末尾。属于顺序写入,因此消息存储是比较快的。默认是32M,达到阀值会自动递增文 件,文件名按照数字进行编号,如 db-1.log、db-2.log、.....
  • lock 文件 锁,写入当前获得kahadb读写权限的broker ,用于在集群环境下的竞争处理
KahaDB配置官网链接: http://activemq.apache.org/kahadb

LevelDB

这种文件系统是从ActiveMQ 5.8 之后引进的,它和 KahaDB 非常相似,也是基于文件的本地数据库存 储形式,但是它提供比 KahaDB更快的持久性。
但它不使用自定义 B-Tree 实现来索引预写日志,而是使用基于LevelDB的索引。
可是为什么LeavelDB 更快,并且5.8 以后就支持,为什么还是默认 KahaDB 引擎,因为 activemq 官网本身没有定论,LeavelDB 之后又出了可复制的
LeavelDB 比LeavelDB 更性能更优越,但需要基于 ZooKeeper 所以这些官方还没有定论,仍旧使用 KahaDB。
默认配置:
LevelDB配置官网链接: http://activemq.apache.org/leveldb-store

JDBC

看名字就知道和数据库有关,所以JDB的持久化就是把消息备份到数据库中
由于用docker比较麻烦,所以直接改为windows本地测试
首先下载activemq的windows版本,http://activemq.apache.org/components/classic/download/
然后下载mysql的驱动包,
把mysql驱动包丢到activemq的lib下
notion image
打开activemq的conf目录下的activemq.xml文件,配置内容:
参数解释:
dataSource: 表示的是引用的数据源名称
createTablesOnStartup: 表示启动的时候是否去创建表,一般第一次为true,之后改为false
接着由于指定了数据源,所以需要在activemq.xml配置一个数据源,不过要注意位置
然后创建好数据库,数据库名字与数据源链接的要一致
测试:
Queue
启动后,分别查看数据库和前台:
notion image
notion image
启动消费者查看数据库和前台:
notion image
notion image
Queue 小结:
在点对点类型中,当DeliveryMode设置为NON_PERSISTENCE 时,消息被保存在内存中; 当DeliveryMode设置为PERSISTENCE 时,消息被保存在Broker的相应的文件或者数据库中。而且点对点类型中消息一旦被Consumer消费就从Broker中删除
Topic
先启动订阅方然后再启动发布方,查看数据库和前台:
notion image
notion image
notion image
notion image
Topic 小结:
一定要先启动消费订阅然后再启动生产,消息会被保留在 activemq_msgs表中消费完毕也不会删除,而订阅关系在 acks 表 中。
消息中间件之RabbitMQJava多线程