RabbitMQ学习笔记
一、概述
1、中间件概述
中间件是介于应用系统和系统软件之间的一类软件,它使用系统软件所提供的基础服务(功能),衔接网络上应用系统的各个部分或不同的应用,能够达到资源共享、功能共享的目的。
2、消息中间件概述
1、概述
消息中间件是利用可靠的消息传递机制进行系统和系统直接的通讯;以及通过提供消息传递和消息的排队机制,它可以在分布式系统环境下扩展进程间的通讯。常见的消息中间件有ActiveMQ、RabbitMQ、Kafka、RocketMQ等。
2、应用场景
- 跨系统数据传递
- 高并发的流量削峰
- 数据的分发和异步处理
- 大数据分析与传递
- 分布式事务
3、核心组成部分
- 消息的协议
- 消息的持久化机制
- 消息的分发策略
- 消息的高可用,高可靠
- 消息的容错机制
4、其他
常见的持久化方式
ActiveMQ | RabbitMQ | Kafka | RocketMQ | 是否支持 |
---|---|---|---|---|
文件存储 | 支持 | 支持 | 支持 | 支持 |
数据库 | 支持 | / | / | / |
消息分发策略的机制和对比
ActiveMQ | RabbitMQ | Kafka | RocketMQ | 是否支持 |
---|---|---|---|---|
发布订阅 | 支持 | 支持 | 支持 | 支持 |
轮询分发 | 支持 | 支持 | 支持 | / |
公平分发 | / | 支持 | 支持 | / |
重发 | 支持 | 支持 | / | 支持 |
消息拉取 | / | 支持 | 支持 | 支持 |
二、RabbitMQ安装与入门
1、概述
RabbitMQ是一个开源的遵循AMQP协议实现的基于Erlang语言编写,支持多种客户端(语言)。用于在分布式系统中存储消息,转发消息,具有高可用,高可扩性,易用性等特征。
2、RabbitMQ安装
1、环境准备
RabbitMQ是采用Erlang语言开发的,所以系统环境必须提供Erlang环境,第一步就是安装Erlang。
RabbitMQ下载地址:https://www.rabbitmq.com/download.html
Erlang下载地址:https://www.erlang-solutions.com/downloads/
erlang和RabbitMQ版本的按照比较: https://www.rabbitmq.com/which-erlang.html
Linux环境
1 | #命令 |
2、Erlang安装
rpm包下载地址
1 | #下载rpm包 |
3、RabbitMQ安装
1 | #下载rpm包 |
4、RabbitMQ运行
1 | # 启动服务 |
5、MQ的相关端口
注意端口的开放与安全组端口开放
1 | 5672 #RabbitMQ的通讯端口 |
3、RabbitMQWeb管理界面及授权操作
1、RabbitMQ管理界面安装
1 | #默认情况下,rabbitmq是没有安装web端的客户端插件,需要安装才可以生效 |
最后访问http://ip:15672/
即可
2、授权账号和密码
rabbitmq有一个默认账号和密码是:guest
默认情况只能在localhost本机下访问,所以需要添加一个远程登录的用户。
1 | #新增用户 |
相关操作
1 | rabbitmqctl add_user 账号 密码 |
4、Docker安装RabbitMQ
1、Docker安装
1 | #yum 包更新到最新 |
详情查看https://blog.csdn.net/lemon_TT/article/details/117983127
2、docker相关命令
1 | # 启动docker: |
3、RabbitMQ安装运行
1 | #这个镜像带管理界面 |
最后访问http://ip:15672/
即可
5、RabbitMQ角色介绍
1、none
- 不能访问management plugin
2、management:查看自己相关节点信息
- 列出自己可以通过AMQP登入的虚拟机
- 查看自己的虚拟机节点 virtual hosts的queues,exchanges和bindings信息
- 查看和关闭自己的channels和connections
- 查看有关自己的虚拟机节点virtual hosts的统计信息。包括其他用户在这个节点virtual hosts中的活动信息。
3、Policymaker
- 包含management所有权限
- 查看和创建和删除自己的virtual hosts所属的policies和parameters信息。
4、Monitoring
- 包含management所有权限
- 罗列出所有的virtual hosts,包括不能登录的virtual hosts。
- 查看其他用户的connections和channels信息
- 查看节点级别的数据如clustering和memory使用情况
- 查看所有的virtual hosts的全局统计信息。
5、Administrator
- 最高权限
- 可以创建和删除virtual hosts
- 可以查看,创建和删除users
- 查看创建permisssions
- 关闭所有用户的connections
三、RabbitMQ入门实战
1、概述
AMQP全称:Advanced Message Queuing Protocol(高级消息队列协议)。是应用层协议的一个开发标准,为面向消息的中间件设计。
AMQP生产者流转过程
AMQP消费者流转过程
Server:又称Broker ,接受客户端的连接,实现AMQP实体服务。 安装rabbitmq-server
Connection:连接,应用程序与Broker的网络连接 TCP/IP/ 三次握手和四次挥手
Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务。
Message :消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
Virtual Host 虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange
Exchange:交换机,接受消息,根据路由键发送消息到绑定的队列。(==不具备消息存储的能力==)
Bindings:Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key.
Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。
Queue:队列:也成为Message Queue,消息队列,保存消息并将它们转发给消费者。
2、Simple简单模式
- 特点:普通消费者生产者模型

1、代码实现
实现步骤
1、jdk1.8
2、构建一个maven工程
3、导入rabbitmq的maven依赖
4、启动rabbitmq-server服务
5、定义生产者
6、定义消费者
7、观察消息的在rabbitmq-server服务中的过程
导入maven依赖
java原生依赖
1 | <dependency> |
spring依赖
1 | <dependency> |
springboot依赖
1 | <dependency> |
上面依赖根据自己的项目环境进行选择即可。
定义生产者
1 | //这里我使用了原生java依赖,导的包都是rabbitmq.client下 |
定义生产者
1 | public class Comsumer { |
2、Web界面操作
3、work工作模式
1、概述
- 特点:分发机制。包括轮询模式和公平分发模式
2、代码实现
1、轮询模式
生产者
1 | public class Producer { |
消费者,这里需要创建两个类,work1和work2,内容一样,只贴出一份
1 | public class Work1 { |
运行结果为两个消费分别消费一个,轮着消费,即使每个线程运行时间不同
2、公平分发模式
生产者不变,消费者改成手动消费,仍然创建两个各,其中每个类运行睡眠时间不一样,结果可以发现对于睡眠时间短的消费消息更多,即性能好的消费更多
1 | package com.shawn.rabbitmq.work.fair; |
4、fanout发布订阅模式
1、概述
- 特点:Fanout—发布与订阅模式,是一种广播机制,它是没有路由key的模式。只要订阅了该交换机,那么就可以获取数据
2、代码实现
生产者(注意已经绑定好关系可以不用在代码中编写绑定关系了)
1 | public class Producer { |
消费者
1 | //这里注意消费者会将消息都消费 |
5、direct路由模式
- 特点:有routing-key的匹配模式,根据给定的路由key确定对应的队列
代码实现如fanout模式,将生产者交换机以及路由修改即可,这里路由是精确匹配
6、topic主题模式
-
特点:模糊的routing-key的匹配模式
#表示0级或多级
*表示必须有1级
代码实现如fanout模式,将生产者交换机以及路由修改即可,这里路由是正则匹配方式
7、headers参数模式
- 特点:通过参数进行相应队列的分发
四、SPringBoot整合RabbitMQ入门实战
1、环境准备
创建SPringBoot父子项目,兵在pom.xml
引入RabbitMQ依赖,后面就创建子模块,pom依赖关系配置详见父子项目搭建参考
1 | <dependency> |
目标架构
2、fanout模式
1、创建生产者模块
创建springboot-rabbitmq-fanout-producer模块,并在在application.yml进行配置
1 | # 服务端口 |
定义订单生产者
1 | package com.shawn.springbootrabbitmqfanoutproducer.service; |
定义绑定关系,,相当于之前的创建交换机、绑定队列等
1 | package com.shawn.springbootrabbitmqfanoutproducer.config; |
最后进行测试类,成功后即可发现队列有了消息
1 |
|
2、创建消费者模块
创建springboot-rabbitmq-fanout-consumer模块,并在在application.yml进行配置
1 | # 服务端口 |
分别创建三个消费者类,这里举例其中一个SMS消费类,最后启动即可
1 | // bindings其实就是用来确定队列和交换机绑定关系 |
3、direct模式
新建项目,配置DirectConfiguration中的交换机和队列,并在订单业务中设置路由key,其他操作与fanout模式类似
1 |
|
4、topic模式
在这里可以不定义figuration,直接通过队列绑定交换机的路由关系,其他操作与fanout模式类似(通过注解方式绑定,之前都是通过配置文件方式绑定)
1 | // bindings其实就是用来确定队列和交换机绑定关系 |
5、过期时间ttl和死信队列(★)
过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息和队列设置TTL。目前有两种方法可以设置,若两个同时设置,则以时间最短的为准。
- 通过队列属性设置,队列中所有消息都有相同的过期时间。
- 对消息进行单独设置,每条消息TTL可以不同。
DLX,全称为Dead-Letter-Exchange , 可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX ,绑定DLX的队列就称之为死信队列。
消息变成死信,可能是由于以下的原因:
- 消息被拒绝
- 消息过期
- 队列达到最大长度
DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。要想使用死信队列,只需要在定义队列的时候设置队列参数
x-dead-letter-exchange
指定交换机即可。
首先定义交换机、队列以及绑定关系,在定义队列时设置ttl参数以及dead参数,即创建后该队列会显示ttl以及DLX,代表是会自动删除消息的队列,并且删除的消息会发送到死信队列。这里定义了两种过期类型:队列过期和消息过期,其中队列过期可以将过期消息送到死信队列,而消息过期是完全删除了消息。
1 |
|
1 | //死信队列交换机和队列配置 |
创建服务类
1 |
|
最后创建测试类,启动后过期队列的消息过期会送到死信队列,而过期消息则直接删除
1 |
|
6、消息确认机制的配置
NONE值是禁用发布确认模式,是默认值;
CORRELATED值是发布消息成功到交换器后会触发回调方法;
SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker;
1 | # 服务端口 |
在代码中配置确认机制,生产者无论成功发送与否,都会收到消息
1 | //Java中该注解的说明:@PostConstruct该注解被用来修饰一个非静态的void()方法。被PostConstruct修饰的方法会在服务器加我Servlet的时候运行, |
对于消费者来说,需要在配置文件重新配置过,解决消息重试的几种方案:
- 控制重发的次数
- try+catch+手动ack
- try+catch+手动ack+死信队列处理
7、序列化和反序列化
传输bean对象时可能会出现list无法传输,有两种方案
- 始终转换推断类型converter.setAlwaysConvertToInferredType(true);
- 将 spring-amqp 升级到 2.2.13.RELEASE 或以上
1 | /** |
五、RabbitMQ内存磁盘的监控
1、概述
把消息默认放在内存中是为了加快传输和消费的速度,存入磁盘是保证消息数据的持久化。
当RabbitMQ警告时,即内存或者磁盘爆红,所有队列会进入阻塞状态,RabbitMQ无法正常运行。当出现警告的时候,可以通过配置去修改和调整
2、RabbitMQ的内存控制
1、命令的方式
fraction/value 为内存阈值。默认情况是:0.4/2GB,代表的含义是:当RabbitMQ的内存超过40%时,就会产生警告并且阻塞所有生产者的连接。通过此命令修改阈值在Broker重启以后将会失效,通过修改配置文件方式设置的阈值则不会随着重启而消失,但修改了配置文件一样要重启broker才会生效。
1 | rabbitmqctl set_vm_memory_high_watermark <fraction> |
2、配置文件方式 rabbitmq.conf
当前配置文件:/etc/rabbitmq/rabbitmq.conf
(若不存在可自行创建)
1 | #默认,但是配置完需要重启 |
3、RabbitMQ的内存换页
在某个Broker节点及内存阻塞生产者之前,它会尝试将队列中的消息换页到磁盘以释放内存空间,持久化和非持久化的消息都会写入磁盘中,其中持久化的消息本身就在磁盘中有一个副本,所以在转移的过程中持久化的消息会先从内存中清除掉。
默认情况下,内存到达的阈值是50%时就会换页处理。
也就是说,在默认情况下该内存的阈值是0.4的情况下,当内存超过0.40.5=0.2时,会进行换页动作
可以通过设置 vm_memory_high_watermark_paging_ratio
来进行调整
1 | vm_memory_high_watermark.relative = 0.4 |
4、RabbitMQ的磁盘预警
当磁盘的剩余空间低于确定的阈值时,RabbitMQ同样会阻塞生产者,这样可以避免因非持久化的消息持续换页而耗尽磁盘空间导致服务器崩溃。
默认情况下:磁盘预警为50MB的时候会进行预警。表示当前磁盘空间第50MB的时候会阻塞生产者并且停止内存消息换页到磁盘的过程。
这个阈值可以减小,但是不能完全的消除因磁盘耗尽而导致崩溃的可能性。比如在两次磁盘空间的检查空隙内,第一次检查是:60MB ,第二检查可能就是1MB,就会出现警告。
通过命令方式修改如下:
1 | rabbitmqctl set_disk_free_limit <disk_limit> |
通过配置文件配置如下:
1 | disk_free_limit.relative = 3.0 |
六、RabbitMQ高级
1、消息队列高可用和高可靠
所谓高可用:是指产品在规定的条件和规定的时刻或时间内处于可执行规定功能状态的能力。
当业务量增加时,请求也过大,一台消息中间件服务器的会触及硬件(CPU,内存,磁盘)的极限,一台消息服务器你已经无法满足业务的需求,所以消息中间件必须支持集群部署。来达到高可用的目的。所谓高可用是指:是指系统可以无故障低持续运行,比如一个系统突然崩溃,报错,异常等等并不影响线上业务的正常运行,出错的几率极低,就称之为:高可靠。
1、 Master-slave主从共享数据模式
生产者讲消费发送到Master节点,所有的都连接这个消息队列共享这块数据区域,Master节点负责写入,一旦Master挂掉,slave节点继续服务。从而形成高可用,
2、Master- slave主从同步模式
写入消息在Master主节点上,但是主节点会同步数据到slave节点形成副本,和zookeeper或者redis主从机制很类同。这样可以达到负载均衡的效果,如果消费者有多个这样就可以去不同的节点就行消费,以为消息的拷贝和同步会暂用很大的带宽和网络资源。在后续的rabbtmq中会有使用。
3、多主集群同步部署模式
其写入可以往任意节点去写入。
4、多主集群转发部署模式
如果插入的数据是broker-1中,元数据信息会存储数据的相关描述和记录存放的位置(队列)。它会对描述信息也就是元数据信息就行同步,如果消费者在broker-2中进行消费,发现自己几点没有对应的消息,可以从对应的元数据信息中去查询,然后返回对应的消息信息,场景:比如买火车票或者黄牛买演唱会门票,比如第一个黄牛有顾客说要买的演唱会门票,但是没有但是他会去联系其他的黄牛询问,如果有就返回。
5、Master-slave与Breoker-cluster组合的方案
实现多主多从的热备机制来完成消息的高可用以及数据的热备机制,在生产规模达到一定的阶段的时候,这种使用的频率比较高。
2、集群搭建
1、概述
RabbitMQ这款消息队列中间件产品本身是基于Erlang编写,Erlang语言天生具备分布式特性(通过同步Erlang集群各节点的magic cookie来实现)。因此,RabbitMQ天然支持Clustering。这使得RabbitMQ本身不需要像ActiveMQ、Kafka那样通过ZooKeeper分别来实现HA方案和保存集群的元数据。集群是保证可靠性的一种方式,同时可以通过水平扩展以达到增加消息吞吐量能力的目的。
官网参考:https://www.rabbitmq.com/clustering.html
2、环境准备
保证RabbitMQ是可执行的,并把单机版的RabbitMQ服务停止,后台看不到RabbitMQ的进程为止。这里我在一台主机发布多个RabbitMQ,用的是 Master-slave主从共享数据模式。
1 | #查看进程 |
3、单机多实例搭建
分别启动两个节点
1 | sudo RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit-1 rabbitmq-server start & |
rabbit-1操作作为主节点
1 | #停止应用 |
rabbit2操作为从节点
1 | # 停止应用 |
验证集群状态
1 | sudo rabbitmqctl cluster_status -n rabbit-1 |
4、其他
如果采用多机部署方式,需读取其中一个节点的cookie, 并复制到其他节点(节点之间通过cookie确定相互是否可通信)。cookie存放在/var/lib/rabbitmq/.erlang.cookie
。
例如:主机名分别为rabbit-1、rabbit-2
1、逐个启动各节点
2、配置各节点的hosts文件( vim /etc/hosts)或者加入主节点的时候采用ip
ip1:rabbit-1
ip2:rabbit-2
其它步骤雷同单机部署方式,另外对于集群来说,springboot的yml配置需要更改为集群模式连接
3、下单配送分布式高可用实战
4、其他配置详解
1、application.yml详解
1 | rabbitmq: |
对于发送方而言,需要做以下配置:
-
配置CachingConnectionFactory
-
配置Exchange/Queue/Binding
-
配置RabbitAdmin创建上一步的Exchange/Queue/Binding
-
配置RabbitTemplate用于发送消息,RabbitTemplate通过CachingConnectionFactory获取到Connection,然后想指定Exchange发送
对于消费方而言,需要做以下配置:
- 配置CachingConnectionFactory
- 配置Exchange/Queue/Binding
- 配置RabbitAdmin创建上一步的Exchange/Queue/Binding
- 配置RabbitListenerContainerFactory
- 配置@RabbitListener/@RabbitHandler用于接收消息
默认情况下主要的配置
Spring AMQP的主要对象
2、通过java bean方式配置
1 | import org.slf4j.Logger; |