引言
什么是MQ
MQ(Message Quene) 翻译为 消息队列 ,通过典型的生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间的解耦。别名为消息中间件 通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成
MQ有哪些
当今市面上很多主流的消息中间件,如老牌ActiveMQ、RabbitMQ,炙手可热的kafka,阿里自主开发RocketMQ等
不同MQ的特点
1、ActiveMQ
Apache出品,最流行的,能力强劲的开源消息总线。他是完全支持JMS规范的消息中间件。丰富的API,多种集群架构模式让ActiveMQ在业界成为老牌的消息中间件,在中小型企业颇受欢迎(性能瓶颈比较大,吞吐量不太高)
2、Kafka
Kafka是分布式发布-订阅消息系统,主要特点基于pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。(效率最高,但是不保证数据丢失以及重复)
3、RocketMQ
阿里开源消息中间件,纯java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点,RocketMQ思路起源于kafka,但是并不是kafka的一个Copy,它对消息的可靠传输及事务性做优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推算、日志流式处理、binlog分布等场景(收费,开源的功能被阉割)
4、RabbitMQ
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现,AMQP的主要特征是面向消息、队列、路由(包括点对点的发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次
RabbitMQ比kafka可靠,kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集。
RabbitMQ详解
RabbitMQ介绍
是一个在AMQP(Advanced Message Queuing Protocol)基础上实现,可复用的企业消息系统。它可以用于大型软件系统各个模块之间的高效通信,支持高并发,支持可扩展。支持多种客户端如: Python 、Ruby 、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX,持久化,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP、XMPP、SMTP、STOMP,也正是如此,使得它变的非常重量级,更适合于企业级的开发。它同时实现了一个Broker架构,这意味着消息在发送给客户端时先在中心队列排队,对路由(Routing)、负载均衡(Load Balance)或者数据持久化都有很好的支持。
RabbitMQ的特点
可靠性、灵活的路由、扩展性、高可用性、多种协议、多语言客户端、管理界面、插件机制
AMQP介绍
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端、中间件不同产品以及不同开发语言等条件的限制。
什么是消息队列
MQ全程Message Queue,消息队列。是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用链接来链接他们。
消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信。队列的使用除去了接受和发送应用程序同时执行的要求
在项目中,将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量
RabbitMQ应用场景
对于一个大型的软件系统来说,它会有很多的组件或者说模块或者说子系统。那么这些模块之间如何通信?这个传统的IPC有很大区别,传统的IPC很多都是在单一系统上的,慕课耦合性很大,不适合扩展(Scalability),如果使用socket那么不同的模块的确可以部署到不同的机器上,但是还是有很多问题需要解决。比如:
1、信息的发送者和接收者如何维持这个连接,如果一方的连接中断,这期间的数据如何方式丢失?
2、如何降低发送者和接收者的耦合度?
3、如何让priority高的接收者先接到数据?
4、如何做到load balance?有效均衡接收者的负载?
5、如何有效的将数据发送到相关的接收者?也就是说将接受者subscribe不同的数据,如何做有效的filter。
6、如何做到可扩展,甚至将这个通信模块发到cluster上?
7、如何保证接收者接收到了完整,正确的数据?
AMQP协议解决了以上的问题,而RabbitMQ实现了AMQP
RabbitMQ概念介绍
Broker:简单来说就是消息队列服务器实体
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列
Queue:消息队列载体,每个消息都会被投入到一个或者多个队列
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来
Routing Key:路由关键字,exchange根据这个关键字进行消息投递
Vhost:虚拟主机,一个broker里可以开设多个vhost,作用不同用户的权限分离
Producer:消息生产者,就是投递消息的程序
Consumer:消息消费者,就是接受消息的程序
Channel:消息通道,在客户端的每个连接里,可以建立多个channel,每个channel代表一个会话任务。
RabbitMQ从整体上来看是一个典型的生产者消费者模型,主要负责接收、存储和转发消息
RabbitMQ使用流程
AMQP模型中,消息在producer中产生,发送到MQ的exchange上,exchange根据配置的路由方式发到响应的Queue上,Queue又将消息发送给consumer,消息从queue到consumer有push和pull两种方式。消息队列的使用过程大概如下:
1、客户端连接到消息队列服务器,打开一个channel。
2、客户端声明一个exchange,并设置相关属性
3、客户端声明一个queue,并设置相关属性
4、客户端使用routing key,在exchange和queue之间建立好绑定关系
5、客户端投递消息到exchange
Exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。Exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如:绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为abc的才会投递到队列
RabbitMQ单机安装
安装arlang
官网https://www.erlang-solutions.com/resources/download.html
注意需要下載rabbitmq版本所支持对应的版本
1
2
3
4
5
6
7
8
9
|
[root@master ~]# cd /usr/local/src/
[root@master src]# mkdir /usr/local/src/rabbitmq
[root@master src]# cd !$
[root@master rabbitmq]# wget https://packages.erlang-solutions.com/erlang/rpm/centos/7/x86_64/esl-erlang_23.2.1-1~centos~7_amd64.rpm
[root@master rabbitmq]# yum -y install esl-erlang_23.2.1-1~centos~7_amd64.rpm
|
安装rabbitMQ
官网:https://www.rabbitmq.com/install-rpm.html#install-erlang
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
[root@master rabbitmq]# wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.9/rabbitmq-server-3.8.9-1.el7.noarch.rpm
[root@master rabbitmq]# yum -y install rabbitmq-server-3.8.9-1.el7.noarch.rpm
常用命令:
启动
[root@master rabbitmq]# systemctl start rabbitmq-server
查看状态
[root@master rabbitmq]# systemctl status rabbitmq-server
停止
[root@master rabbitmq]# systemctl stop rabbitmq-server
开机自启动
[root@master rabbitmq]# systemctl enable rabbitmq-server
设置配置文件参考https://www.rabbitmq.com/configure.html
|
相关操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
开启web界面管理工具
[root@master rabbitmq]# rabbitmq-plugins enable rabbitmq_management
关闭web界面管理工具
[root@master rabbitmq]# rabbitmq-plugins disbale rabbitmq_management
用户管理:默认用户guest,如果不配置的话只能在本地登录
新增一个用户:
[root@master rabbitmq]# rabbitmqctl add_user admin 123456
删除用户
rabbitmqctl delete_user admin
修改用户的密码
rabbitmqctl change_password admin 120110
查看当前用户列表
Rabbitmqctl list_user
用户角色,分为5类,超级管理员、监控者、策略制定者、普通管理员以及其他
超级管理员(administrator)
可登录管理控制台(启用management plugin的情况下),可查看所有信息,并且可以对用户,策略进行操作
监控者(monitoring)
可登录管理控制台(启动management plugin的情况下),可以查看rabbitmq节点的相关信息(进程数、内存使用情况、磁盘使用情况)
策略制定者(policymaker)
可登陆管理控制台(启用management plugin的情况下), 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
与administrator的对比,administrator能看到这些内容
普通管理者(management)
仅可登陆管理控制台(启用management plugin的情况下),无法看到节点信息,也无法对策略进行管理。
其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
了解了这些后,就可以根据需要给不同的用户设置不同的角色,以便按需管理。
如:给admin设置超级管理员角色权限
[root@master rabbitmq]# rabbitmqctl set_user_tags admin administrator
用户权限
用户权限指的是用户对exchange,queue的操作权限,包括配置权限,读写权限。配置权限会影响到exchange,queue的声明和删除。读写权限影响到从queue里取消息,向exchange发送消息以及queue和exchange的绑定(bind)操作。
例如: 将queue绑定到某exchange上,需要具有queue的可写权限,以及exchange的可读权限;向exchange发送消息需要具有exchange的可写权限;从queue里取数据需要具有queue的可读权限。详细请参考官方文档中"How permissions work"部分。
相关命令为:
(1) 设置用户权限
rabbitmqctl set_permissions -p VHostPath User ConfP WriteP ReadP
(2) 查看(指定hostpath)所有用户的权限信息
rabbitmqctl list_permissions [-p VHostPath]
(3) 查看指定用户的权限信息
rabbitmqctl list_user_permissions User
(4) 清除用户的权限信息
rabbitmqctl clear_permissions [-p VHostPath] User
查看所有的队列
[root@master rabbitmq]# rabbitmqctl list_queues
清除所有的队列
[root@master rabbitmq]# rabbitmqctl reset
关闭应用
[root@master rabbitmq]# rabbitmqctl stop_app
启动应用
[root@master rabbitmq]# rabbitmqctl start_app
|
rabbitMQ集群部署以及配置
RabbitMQ是依据erlang的分布式特性(RabbitMQ底层是通过Erlang架构来实现的,所以rabbitmqctl会启动Erlang节点,并基于Erlang节点来使用Erlang系统连接RabbitMQ节点,在连接过程中需要正确的Erlang Cookie和节点名称,Erlang节点通过交换Erlang Cookie以获得认证)来实现的,所以部署Rabbitmq分布式集群时需要先按照Erlang,并把其中一个服务的Cookie复制到另外的节点。
RabbitMQ集群中,各个RabbitMQ为对等节点,即每个节点均提供给客户端连接,进行消息的接收和发送。节点分为内存节点和磁盘节点,一般的。均应建立为磁盘节点,为了防止机器重启后的消息丢失;
RabbitMQ的cluster集群模式一般分为2种,普通模式和镜像模式。消息队列通过RabbitMQ HA镜像队列进行消息队列实体复制。
普通模式下,以两个节点(Rabbit01、Rabbit02)为例来进行说明。对于Queue来说,消息实体只存在于其中一个节点Rabbit01(或者Rabbit02),Rabbit01和Rabbit02两个节点仅有相同的元数据,即队列的结构。当消息进入rabbit01节点的Queue后,consumer从rabbit02节点消费时,RabbitMQ会临时在rabbit01、rabbit02间进行消息传输,把A中的消息实体取出并经过B发送给consumer。所以consumer应尽量连接每一个节点,从中取消息。即对于同一个逻辑队列,要在多个节点建立物理Queue。否则无论consumer连接rabbit01或rabbit02,出口总在rabbit01,会产生瓶颈
镜像模式下,将需要消费的队列变为了镜像队列,存在于多个节点,这样就可以实现RabbitMQ的HA高可用性。作用就是消息实体会主动在镜像节点之间实现同步,而不是像普通模式那样临时读取。缺点就是,集群内部的同步通讯会占用大量的网络宽度
集群安装
主机名 |
Ip |
Rabbitmq1 |
192.168.116.20 |
Rabbitmq2 |
192.168.116.21 |
Rabbitmq3 |
192.168.116.22 |
节点数目
因为有些功能(如:仲裁队列、MQTT中的客户端跟踪)需要集群成员即节点之间达成共识,所以集群中的节点数最好使用奇数
节点发现
rabbitmq 集群有很多节点发现方式
1、启动rabbitmq之后,使用命令行工具动态加入节点到集群
2、配置文件发现,在配置文件中直接写死有哪些节点,例如
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config # 配置节点发现方式为配置文件发现(静态),还有其他的发现方式(动态的) 或者直接在命令行加节点
cluster_formation.classic_config.nodes.1 = test_rabbit1@rabbit1
cluster_formation.classic_config.nodes.2 = test_rabbit2@rabbit2
cluster_formation.classic_config.nodes.3 = test_rabbit3@rabbit3
3、其他:如DNS动态发现等,可以查看官方文档
基础环境准备(所有机器)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
1 修改hosts,保证3台主机都能互通(都需要修改)以及关闭selinux和防火墙
[root@rabbitmq2 ~]# cat /etc/hosts
192.168.116.20 rabbitmq1
192.168.116.21 rabbitmq2
192.168.116.22 rabbitmq3
[root@rabbitmq1 rabbit]# systemctl disable --now firewalld
sed -ri 's/(^SELINUX=).*/\1disabled/' /etc/selinux/config && setenforce 0
2 安装依赖环境erlange
[root@rabbitmq1 ~]# mkdir rabbit
[root@rabbitmq1 ~]# cd !$
[root@rabbitmq1 rabbit]# wget https://packages.erlang-solutions.com/erlang/rpm/centos/7/x86_64/esl-erlang_23.2.1-1~centos~7_amd64.rpm
[root@rabbitmq1 rabbit]# yum -y install esl-erlang_23.2.1-1~centos~7_amd64.rpm
3 安装rabbitmq
[root@rabbitmq1 rabbit]# wget https://bintray.com/rabbitmq/rpm/download_file?file_path=rabbitmq-server%2Fv3.8.x%2Fel%2F7%2Fnoarch%2Frabbitmq-server-3.8.9-1.el7.noarch.rpm
[root@rabbitmq1 rabbit]# yum -y install rabbitmq-server-3.8.9-1.el7.noarch.rpm
4 分别启动rabbitmq以及rabbitmq_manage
systemctl enable --now rabbitmq-server
rabbitmq-plugins enable rabbitmq_management
因为默认guest不能远程登陆,所以分别创建登陆用户,并授予admin权限
rabbitmqctl add_user admin 123456
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
|
搭建rabbitmq的一般模式集群
安装完rabbitmq之后,在机器中会有如下一个文件。路径在$HOME中或者在/var/lib/rabbitmq中,文件名称为.erlang.cookie,他是一个隐藏文件,name这文件存储的内容是什么?有什么作用?
rabbitMQ的集群是依赖erlang集群,而erlang集群是通过这个cookie进行通信认证的,因此我们做集群的第一个就是干cookie
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
1 统一erlang.cookie文件中的cookie值
必须要使集群中所有rabbitmq的.erlang.cookie文件中的cookie值一致,并且权限为owner只读
[root@rabbitmq1 ~]# scp /var/lib/rabbitmq/.erlang.cookie rabbitmq2:/var/lib/rabbitmq/
[root@rabbitmq1 ~]# scp /var/lib/rabbitmq/.erlang.cookie rabbitmq3:/var/lib/rabbitmq/
[root@rabbitmq1 ~]# ll /var/lib/rabbitmq/ -a
-r-------- 1 rabbitmq rabbitmq 20 Dec 30 00:00 .erlang.cookie
所有机器重启rabbitmq
systemctl restart rabbitmq-server
2、查看集群状态
[root@rabbitmq3 ~]# rabbitmqctl status #查看单个rabbitmq状态
[root@master ~]# rabbitmqctl cluster_status
此时的集群状态还只有1台机器
3、将节点加入集群,在rabbitmq2/rabbitmq3上执行
rabbitmqctl stop_app
rabbitmqctl join_cluster --ram rabbit@rabbitmq1 #注意rabbit@master 为第一个机器,可以用命令rabbitmqctl status查看
rabbitmqctl start_app
再次进行查看可以看到其他节点的信息,web端也可以看到其他节点的信息,如下:
|
镜像模式
镜像模式需要依赖policy模块
需要设置exchanges或者queue的数据需要复制同步,如何复制同步
1
2
3
4
5
6
7
8
9
|
[root@rabbitmq1 ~]# rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
参数意思:
- ha-all 集群策略名称
- ^ : 匹配符。只有一个^代表匹配所有,^xie 为匹配名称为xie的exchanges或者queue
- ha-mode 为匹配类型,分为3种模式
- all 所有queue
- exctly 部分(需要配置ha-params参数,此参数为int类型,比如3,众多集群中的随机3台机器)
- node 指定(需要配置ha-params参数,此参数为数组类型,比如[“rabbit@mini1”,”rabbit@mini2”]这样来指定为F与G这2台机器)
|
在web管理界面也是可以配置的
配置内存|磁盘空间阈值
支持的单位符号
## k, kiB: kibibytes (2^10 - 1,024 bytes)
## M, MiB: mebibytes (2^20 - 1,048,576 bytes)
## G, GiB: gibibytes (2^30 - 1,073,741,824 bytes)
## kB: kilobytes (10^3 - 1,000 bytes)
## MB: megabytes (10^6 - 1,000,000 bytes)
## GB: gigabytes (10^9 - 1,000,000,000 bytes)
内存警告
RabbitMQ服务器在启动和执行 rabbitmqctl set_vm_memory_high_watermark 的时候会检测机器上的RAM大小,默认情况下,RabbitMQ使用内存超过40%的时候,会发出内存警告,阻塞所有发布消息的连接,一旦警告解除(例如:服务器paging消息到硬盘或者分发消息到消费者并且确认)服务会恢复正常。
默认的内存阀值是40%,注意,这并不会阻止RabbitMQ Server使用不到40%,仅仅意味着到达这个点的时候,发布者会被阻塞block,最坏的情况下,Erlang虚拟机会引起双倍的内存使用(RAM的80%),强烈建议开启操作系统的SWAP和Page files.
内存达到阀值后,发布者会被阻塞,但是消费者不会被阻塞,消费者继续消费消息,当内存降低到阀值以下后,发布者继续开始发布消息。
配置内存阈值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
vm_memory_high_watermark.relative = 0.4
另外我们可以设置节点使用的RAM的限制(以字节为单位)
vm_memory_high_watermark.absolute = 1073741824
另外你可以使用内存单位设置阀值,如果定义了relative,那么absolute将会被忽略
vm_memory_high_watermark.absolute = 2GB
rabbitmq ctl工具修改阀值,服务重启后设置失效,如果要永久生效要修改配置文件
rabbitmqctl set_vm_memory_high_watermark 0.4
配置paging阀值
队列中的消息到达阀值上限之前,它会尝试将页面消息page到磁盘上以释放内存;
默认情况下:在broker达到阀值的50%时(默认内存阀值是0.4)会发生这种情况,可以通过修改如下配置进行修改:
vm_memory_high_watermark_paging_ratio = 0.5
禁用所有的发布者
设置阀值(threshold)为0,会立即出发内存警告,阻塞所有的发布连接(如果你希望禁用全局发布);
rabbitmqctl set_vm_memory_high_watermark 0
经典配置,可以直接配置在advanced.config配置文件中,永久有效
[{rabbit, [{vm_memory_high_watermark_paging_ratio, 0.75},{vm_memory_high_watermark, 0.4}]}].
|
磁盘警告
当可用的磁盘空间下降到配置值(最低阀值默认为48MIB,版本3.8.0)之下,会触发所有的警告,所有的生产者会被阻塞,目标是避免填充整个磁盘,这个会导致写操作的失败,导致RabbitMQ中断,为了减小填充磁盘的风险,所有进来的消息都会阻塞;但是消费者不会被阻塞,直到磁盘的可用空间升高到最低阀值之上,生产者就会继续开始推送消息。
配置磁盘阈值
1
2
3
4
5
6
7
8
9
10
|
低于1.0的值很危险,应谨慎使用,值为1.0代表16GIB
disk_free_limit.relative = 2.0
磁盘可用空间absolute设置,单位:字节
disk_free_limit.absolute = 1000000000
加上单位
disk_free_limit.absolute = 1GB
使用CTL工具修改阀值,服务重启后设置失效,如果要永久生效要修改配置文件
rabbitmqctl set_disk_free_limit 42949672000
经典配置,mem_relative值为1.0代表16GIB,可以直接配置在advanced.config配置文件中,永久有效
[{rabbit, [{disk_free_limit, {mem_relative, 1.0}}]}].
|