绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
SpringBoot2.x系列教程63--SpringBoot整合消息队列之RabbitMQ详解
2020-04-21 09:52:31

SpringBoot2.x系列教程63--SpringBoot整合消息队列之RabbitMQ详解

作者:一一哥

一. RabbitMQ 简介

1. RabbitMQ 背景

RabbitMQ起源于金融系统,主要用于分布式系统的内部各子系统之间的数据存储转发,这是系统解耦方面的一种运用.

2. RabbitMQ 概述

RabbitMQ是一种基于erlang语言开发的流行的开源消息中间件,或者说是一个消息队列系统.它是对AMQP协议的实现,支持多种客户端,可以对来自客户端的异步消息进行存储转发,在易用性、扩展性、高可用性等方面表现不俗.

趣味定义: 兔子行动非常迅速而且繁殖起来也非常疯狂,用Rabbit来命名这个分布式软件,呼应了RabbitMQ的主要任务是处理海量的信息.

3. RabbitMQ 的优点

  • 基于 ErLang 语言开发具有高可用高并发的优点,适合集群服务器;
  • 健壮、稳定、易用、跨平台、支持多种语言、文档齐全;
  • 有消息确认机制和持久化机制,可靠性高;
  • 开源.

4. 为什么选择RabbitMQ

现在市面上有很多MQ可以选择,比如ActiveMQ、ZeroMQ、Appche Qpid,Kafka,RocketMQ等,那问题来了为什么要选择RabbitMQ?

  • 除了Qpid,RabbitMQ是一个实现了AMQP标准的消息服务器;
  • 可靠性:RabbitMQ支持持久化,保证了消息的稳定性;
  • 高并发: RabbitMQ使用了Erlang作为开发语言,Erlang是为电话交换机开发的语言,天生自带高并发和高可用的光环;
  • 集群部署简单:正是因为Erlang使得RabbitMQ集群部署变的超级简单;
  • 社区活跃度高:根据网上资料来看,RabbitMQ也是.

5. RabbitMQ 应用场景

  • 解耦: 在单体应用通常可以使用内存队列,如Java的BlockingQueue来进行不同模块间的信息传递.而将单体应用拆分为分布式系统之后,可以通过RabbitMQ这种进程间队列来在各子系统之间进行消息传递,从而达到解耦的作用;
  • 流量削峰: RabbitMQ还可以被用在高并发系统当中的流量削峰,即将请求流量数据临时存放到RabbitMQ当中,从而避免大量的请求流量直接达到后台服务,把后台服务冲垮.通过使用RabbitMQ来存放这些请求流量,后台服务从RabbitMQ中消费数据,从而达到流量削峰的目的.
  • 消息通讯: 除了系统解耦和流量削峰外,RabbitMQ也常用于消息通讯,即可以用于实现IM聊天系统.

6. RabbitMQ 基本工作流程

消息的生产者把要发送的消息放入到消息队列中,消息的接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息.RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子系统间进行通信,RabbitMQ是分布式系统的标准配置.

7. RabbitMQ的核心思想

RabbitMQ中消息传递模型的核心思想是生产者永远不会将任何消息直接发送到队列.

实际上,生产者通常甚至不知道消息是否会被传递到任何队列.而且生产者只能向交换机发送消息.

交换是一件非常简单的事情.一方面,它接收来自生产者的消息;另一方面将它们推送到队列.交换机必须确切知道如何处理它收到的消息---它应该附加到特定队列吗?它应该附加到许多队列吗?或者它应该被丢弃吗?

二. RabbitMQ 核心概念

1. 生产者和消费者

  • Producer: 消息的生产者,用于发布消息;
  • Consumer: 消息的消费者,用于从队列中获取消息.消费者只需关注队列即可,不需要关注交换机和路由键.消费者可以通过basicConsume(订阅模式可以从队列中一直持续的自动的接收消息)或者basicGet(先订阅消息,然后获取单条消息,再然后取消订阅,也就是说basicGet一次只能获取一条消息,如果还想再获取下一条还要再次调用basicGet)来从队列中获取消息.

2. RabbitMQ broker 服务器

RabbitMQ broker: 官方定义"RabbitMQ isn’t a food truck, it’s a delivery service",指明RabbitMQ是一种传输服务.

3. ExChange 交换机

Exchange: 生产者会将消息发送到交换机,然后交换机通过路由策略(规则)将消息路由到匹配的队列中去. ExchangeType决定了Exchange路由消息的行为,在RabbitMQ中,ExchangeType有direct、Fanout、Topic和Header 4种.
Exchange 类似于数据通信网络中的交换机,提供消息路由策略.

在RabbitMQ 中,Producer 不是通过信道直接将消息发送给 Queue,而是先发送给 ExChange. 一个 ExChange 可以和多个 Queue 进行绑定,Producer 在传递消息的时候,会传递一个 ROUTING_KEY, ExChange 会根据这个 ROUTING_KEY 按照特定的路由算法,将消息路由给指定的 Message Queue.与 Queue 一样, ExChange 也可设置为持久化,临时或者自动删除.

4. Binding 绑定

所谓绑定就是将一个特定的 ExChange 和一个特定的 Queue 绑定起来,所以Binding不是一个概念,而是一种操作.RabbitMQ中通过绑定,以路由键作为桥梁将Exchange与Queue关联起来(Exchange—>Routing Key—>Queue),这样RabbitMQ就知道如何正确地将消息路由到指定的队列了,通过queueBind()方法将Exchange、Routing Key、Queue绑定起来.ExChange 和 Queue 的绑定可以是多对多的关系.

5. Binding Key 绑定键

Binding Key: 它表示的是Exchange与Message Queue是通过binding key进行绑定联系的,这个关系是固定的.初始化的时候,我们就会建立该队列.

6. Routing Key 路由键

Routing Key: 它是一个String值,用于定义路由规则.生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则.在队列绑定的时候需要指定路由键,在生产者发布消息的时候需要指定路由键,当消息的路由键和队列绑定的路由键匹配时,消息就会发送到该队列.

7. Message Queue 消息队列

用于存储消息的容器,可以看成一个有序的数组,生产者生产的消息会发送到交换机中,终交换机将消息存储到某个或某些队列中.队列可被消费者订阅,消费者从订阅的队列中获取消息.

  • Message Queue: 消息队列,我们发送给RabbitMQ的消息后都会到达各种queue,并且存储在其中(如果路由找不到相应的queue则数据会丢失),等待消费者来取.
  • 消息队列提供了 FIFO 的处理机制,具有缓存消息的能力.在RabbitMQ 中,队列消息可以设置为持久化,临时或者自动删除.
  • 设置为持久化的队列,Queue 中的消息会在 Server 本地硬盘存储一份,防止系统 Crash,数据丢失;
  • 设置为临时的队列,Queue 中的数据在系统重启之后就会丢失;
  • 设置为自动删除的队列,当没有用户连接到 Server,队列中的数据会被自动删除.

8. Virtual Host 虚拟主机

每一个RabbitMQ服务器都能创建多个虚拟消息服务器,我们称之为虚拟主机.每一个vhost本质上是一个mini版的RabbitMQ服务器,拥有自己的交换机、队列、绑定等,拥有自己的权限机制.vhost相对于RabbitMQ就像虚拟机之于物理机一样.他们通过在各个实例间提供逻辑上的分离,允许不同的应用程序安全保密的运行数据,这很有用,它既能将同一个Rabbit的众多客户区分开来,又可以避免队列和交换器的命名冲突.RabbitMQ提供了开箱即用的默认的虚拟主机“/”,如果不需要多个vhost可以直接使用这个默认的vhost,通过使用缺省的guest用户名和guest密码来访问默认的vhost.

vhost之间是相互独立的,这避免了各种命名的冲突,就像App中的沙盒的概念一样,每个沙盒是相互独立的,且只能访问自己的沙盒,以保证非法访问别的沙盒带来的安全隐患.

三. ExChange 的 4 种类型

1. 直接交换器(direct,默认)

直接交换器direct(默认):工作方式类似于单播,Binding_Key和Routing_Key相同才能收到消息,ExChange会将消息发送给 Binding_Key和ROUTING_KEY相匹配的Queue.

有一个需要注意的地方:如果找不到指定的exchange,就会报错.但routing key找不到的话,不会报错,这条消息会直接丢失,所以此处要小心.

2. 广播式交换器(fanout)

广播式交换器(fanout):不管消息的 ROUTING_KEY 是什么,ExChange 都会将消息转发给所有绑定的 Queue(无视 key,所有的 queue都能收到消息).

Fanout 扇出,顾名思义,就是像风扇吹面粉一样,吹得到处都是.如果使用fanout类型的exchange,那么routing key就不起作用了.因为凡是绑定到这个exchange的queue,都会收到消息.

3.主题交换器(topic)

主题交换器(topic):工作方式类似于组播,采用模糊匹配,ExChange 会将消息转发给与 ROUTING_KEY 匹配模式相同的所有队列(Binding Key和Routing Key都是被点"."分个开的多个"单词").比如,ROUTING_KEY 为 user.stock 的 Message 会转发给绑定匹配模式为* .stock,user.stock, * . *#.user.stock.# 的队列(* 表是匹配一个任意单词,# 表示匹配 0 个或多个单词).

  • direct是将消息放到exchange绑定的一个queue里(一对一);
  • fanout是将消息放到exchange绑定的所有queue里(一对所有);
  • topic类型的exchange可以实现(一对部分)把消息放到exchange绑定的一部分queue里,或者多个routing key可以路由到一个queue里.

topic应用场景:

打印不同级别的错误日志.
例如,我们的系统出错后会根据不同的错误级别生成error_levelX.log日志,我们在后台首先要把所有的error保存在一个总的queue(绑定了一个*.error的路由键)里,然后再按level分别存放在不同的queue.

routing key绑定如下图:

4. headers交换机

headers:消息体的 header 匹配,无视 key.headers类型不是基于消息的路由键来进行匹配的,而是基于消息的headers属性的键值对来进行匹配的.首先交换器和队列之间基于一个键值对来建立起绑定映射关系,当交换器接收到消息时,分析该消息的headers属性的键值对是否与这个建立交换器和队列绑定关系的键值对完全匹配,是则投递到该队列.由于这种方式性能较低,故基本不会使用.

四. RabbitMQ 消息传递流程

  • 1️⃣.客户端连接到消息队列服务器,打开一个 Channel;
  • 2️⃣.客户端声明一个 ExChange,并设置相关属性;
  • 3️⃣.客户端声明一个 Queue,并设置相关属性;
  • 4️⃣.客户端使用 Routing Key,在 ExChange 和 Queue 之间建立好绑定关系;
  • 5️⃣.客户端投递消息到 ExChange;
  • 6️⃣.ExChange 接收到消息后,就根据消息的 key 和已经设置的 binding,进行消息路由,将消息投递到一个或多个队列里.

五. RabbitMQ 核心设计

RabbitMQ是基于AMQP协议的一个消息队列中间件,主要用于分布式系统当中不同系统之间的消息传递,所以在核心设计层面也是围绕AMQP协议来展开的.如下为RabbitMQ的核心架构示意图:

1. 虚拟主机vhost与权限

1.1 虚拟主机

虚拟主机vhost也被称为多租户,主要用于实现不同业务系统之间的消息队列的隔离.也就是说只部署一个RabbitMQ服务端,但是可以设置多个虚拟主机给多个不同的业务系统使用,这些虚拟主机对应的消息队列内部的数据是相互隔离的.所以多个虚拟主机也类似于同一栋公寓楼里面的多个租户,每个租户都在自己家里生活,而不会去其他租户家里过日子.

虚拟主机的概念相当于Java应用程序的命名空间namespace,不同虚拟主机内部可以包含相同名字的队列.

RabbitMQ服务器包含一个默认的虚拟主机,即“/”.如果需要创建其他的虚拟主机,可以在RabbitMQ控制台执行如下命令:
比如通过rabbitmqctl add_vhost命令添加一个新的“test_host”虚拟主机.

#创建新的虚拟主机
rabbitmqctl add_vhost test_host

#查看已有的虚拟主机
rabbitmqctl list_vhosts

1.2 用户与权限

一个RabbitMQ服务端可以包含多个虚拟主机,而这多个虚拟主机通常是对应多个不同的业务.所以为了保证不同业务不相互影响,则RabbitMQ中定义了用户和权限的概念.

在RabbitMQ中,权限控制是以虚拟主机vhost为单位的,即当创建一个用户时,该用户需要被授予对一个或者多个虚拟主机进行操作的权限,而操作的对象主要包括交换器,队列和绑定关系等,如添加,删除交换器、队列等操作.

创建用户和设置权限的相关命令主要在rabbitmqctl定义,RabbitMQ默认包含一个guest用户,密码也是guest,该用户的角色为管理员:

#列出已存在的用户
rabbitmqctl list_users

#列出某个虚拟机的权限
rabbitmqctl list_permissions -p /

#列出某个虚拟机的权限
rabbitmqctl list_permissions -p test_host

2. 连接Connection与信道Channel

  • 在高并发系统设计当中,需要尽量减少服务器的连接数,因为每个连接都需要占用服务器的一个文件句柄,而服务器的文件句柄数量是有限的,具体可以通过ulimit命令查看.
  • 所以为了减少连接的数量,AMQP协议抽象了信道Channel的概念,一个客户端与RabbitMQ服务器之间只建立一个TCP连接,但是客户端可以创建多个Channel,这多个Channel公用这个TCP连接来进行与服务端之间的数据传输.即Channel是建立在这个TCP连接之上的虚拟连接,就相当于每个Channel都是一个独立的TCP连接一样.为了保证数据的安全性,RabbitMQ的设计为每个不同Channel实例都分配一个的ID.故这个真实的TCP连接发送和接收到数据时,可以根据这个的ID来确定这个数据属于哪个Channel.
  • 使用Channel的场景通常是为在客户端中的每个线程使用一个独立的Channel实例来进行数据传输,这样就实现了不同线程之间的隔离.不过由于所有线程都共用一个TCP连接进行数据传输,如果传输的数据量小则问题不大,如果需要进行大数据量传输,则该TCP连接的带宽就会成为性能瓶颈,所以此时需要考虑使用多个TCP连接.

3. RabbitMQ服务器Broker

在AMQP协议中,消息队列服务器称为Broker.在Broker中接收生产者的产生的消息,然后将该消息放入到对应的消息队列中,后再将消息分发给这个消息队列对应的消费者.所以Broker内部通常包含数据交换器Exchanger,队列Queue两大组件和需要实现这两大组件之间的绑定.

3.1 交换器Exchanger

在RabbitMQ的设计当中,交换器主要用于分析生产者传递过来的消息,根据消息的路由信息,即路由键route key,和自身维护的和队列Queue的绑定信息来将将消息放到对应的队列中,或者如果没有匹配的队列,则丢弃该消息或者扔回给生产者。

3.2 交换器类型

在RabbitMQ的交换器设计当中,交换器主要包含四种类型,分别为fanout,direct,topic和headers.

3.3 队列Queue与绑定Binding

  • 在RabbitMQ的设计当中,队列Queue是进行数据存放的地方,即交换器Exchanger其实只是一个映射关系而已,不会实际占用RabbitMQ服务器的资源.而队列Queue由于在消费者消费消息之前,需要临时存放生产者传递过来的消息,故需要占用服务器的内存和磁盘资源.
  • 默认情况下,RabbitMQ的数据是存放在内存中的,当消费者消费了队列的消息并发回了ACK确认时,RabbitMQ服务器才会将内存中的数据,即队列Queue中的数据,标记为删除,并在之后某个时刻进行实际删除.
  • 不过RabbitMQ也会使用磁盘来存放消息:
    种场景是内存不够用时,RabbitMQ服务器会将内存中的数据临时换出到磁盘中存放,之后当内存充足或者消费者需要消费时,再换回内存;
    第二种场景是队列Queue和生产者发送过来的消息都是持久化类型的.其中队列Queue持久化需要在创建该队列时指定,而消息的持久化为通过设置消息的deliveryMode属性为2来提示RabbitMQ服务器持久化这条消息到磁盘.
  • 如果RabbitMQ服务器采用集群部署,但是没有开启镜像队列,则消息也是只存放在一个队列中的,这种情况下集群的目的主要是在不同的机器节点部署不同的队列Queue,从而来解决单机性能瓶颈,而不是解决数据的高可靠性.如果开启了镜像队列,则是基于Master-Slave的模式,将队列的数据复制到集群其他节点的队列中存放,从而实现数据高可用和高可靠.

4. 生产者

生产者主要负责投递消息到RabbitMQ服务器broker.首先建立一个与broker的TCP连接,然后创建一个或者多个虚拟连接的Channel通道,在Channel中指定需要投递的交换器,消息的路由键和消息内容,后调用publish方法发布到这个交换器.

4.1 路由键Route key

生产者需要指定消息的路由键route key,路由键通常与broker的交换器和队列之间的绑定键binding key对应,然后结合交换器的类型,路由键和绑定键来决定投递给哪个队列.如果没有可以投递的队列,则丢失消息或者返回消息给生产者.

4.2 消息确认机制

消息确认机制主要用于保证生产者投递的消息成功到达RabbitMQ服务器.具体为成功到达RabbitMQ服务器的交换器,如果此交换器没有匹配的队列,则也会丢失该消息.

如果要保证数据成功到达队列,则可以结合Java API的mandatory参数,即如果没有匹配的队列可投递,则返回该消息给生产者,有生产者设置回调来处理,或者转发给备份队列来处理.

5. 消费者

消费者用于消费队列中的消息,与生产者类似,消费者也是作为RabbitMQ服务器的一个客户端.即首先建立一个TCP连接,然后建立channel作为消费者,从而实现不同channel对应不同队列消费者.

在数据消费层面,RabbitMQ服务器会将同一个队列数据以轮询的负载均衡方式分发给消费这个队列的多个消费者,每个消息默认只会给到其中一个消费者.

5.1 推模式和拉模式

消费者消费队列中的数据可以基于推、拉两种模式.其中推模式为当RabbitMQ服务器中的队列有数据时,主动推送给消费者的channel;而拉模式则是消费者channel主动发起获取数据的请求,每发起一次则获取一次数据,不发起则不会获取数据.如果在一个while死循环中轮询,则相当于推模式,不过这种方式很耗费资源,通常使用推模式代替.

5.2 消息确认ACK与队列的消息删除

在RabbitMQ的设计当中,RabbitMQ服务器是不会主动删除队列中的消息的,而是需要等到消费这条消息的消费者发送ACK确认时才会将队列的这条消息删除.

注意:

RabbitMQ服务器在等待消费者的ACK确认过程中,是没有超时的概念的.

如果该消费者的连接还存在且没有回传ACK,则这条消息一直保留在该队列中.如果该消费者连接断了且没有回传ACK,则RabbitMQ服务器将该消息发送给另外一个消费者.

消费者确认可以使用自动确认和手动确认.其中自动确认会存在消费者还没处理就崩溃的情况,此时出现数据丢失,是“至多一次”的场景;如果手动确认,存在处理完还没提交ACK,则消费者崩溃,此时RabbitMQ会重复投递给其他消费者,故是“至少一次”的场景,存在消费重复.

所以RabbitMQ在数据重复性和数据丢失方面,提供的是“至少一次”和“至多一次”的保证,不提供“恰好一次”的保证,即会存在重复消息和丢失消息.

5.3 消息拒绝与重入队

当消费者接收到RabbitMQ服务器发送过来的消息时,可以选择拒绝这条消息.消费者拒绝的时候,可以告诉RabbitMQ服务器是否将该消息重新入队,如果是,则RabbitMQ服务器会将该消息重新投递给其他消费者,否则丢弃这条消息。

分享好友

分享这个小栈给你的朋友们,一起进步吧。

Spring Boot
创建时间:2020-06-22 17:22:00
SpringBoot是由Pivotal团队在2013年开始研发、2014年4月发布个版本的全新开源的轻量级框架。它基于Spring4.0设计,不仅继承了Spring框架原有的特性,而且还通过简化配置来进一步简化了Spring应用的整个搭建和开发过程。另外SpringBoot通过集成大量的框架使得依赖包的版本冲突,以及引用的不稳定性等问题得到了很好的解决。
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

栈主、嘉宾

查看更多
  • duanhao
    栈主

小栈成员

查看更多
  • ?
  • zander
  • 凉茶cooltea
戳我,来吐槽~