Rubin's Blog

  • 首页
  • 关于作者
  • 隐私政策
享受恬静与美好~~~
分享生活的点点滴滴~~~
  1. 首页
  2. RabbitMQ
  3. 正文

RabbitMQ之架构与实战

2021年 11月 19日 838点热度 0人点赞 0条评论

JMS规范和AMQP协议

JMS经典模式详解

JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM,Message oriented Middleware)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。与具体平台无关的API,绝大多数MOM提供商都支持。它类似于JDBC(Java Database Connectivity)。

JMS消息

消息是JMS中的一种类型对象,由两部分组成:报文头和消息主体。报文头包括消息头字段和消息头属性。字段是JMS协议规定的字段,属性可以由用户按需添加。

JMS报文头全部字段:

字段名称含义
JMSDestinationJMSDestination字段包含了消息要发送到的目的地
JMSDeliveryModeJMSDeliveryMode字段包含了消息在发送的时候指定的投递模式
JMSMessageID该字段包含了服务器发送的每个消息的唯一标识
JMSTimestamp该字段包含了消息封装完成要发往服务器的时间。不是真正向服务器发送的时间,因为真正的发送时间,可能会由于事务或客户端消息排队而延后
JMSCorrelationID客户端使用该字段的值与另一条消息关联。一个典型的场景是使用该字段将响应消息与请求消息关联。JMSCorrelationID可以包含如下值:
- 服务器规定的消息ID
- 应用指定的字符串
- 服务器原生的byte[]值
JMSReplyTo该字段包含了在客户端发送消息的时候指定的Destination。即对该消息的响应应该发送到该字段指定的Destination。设置了该字段值的消息一般期望收到一个响应
JMSRedelivered如果这个字段是true,则告知消费者应用这条消息已经发送过了,消费端应用应该小心别重复处理了
JMSType消息发送的时候用于标识该消息的类型。具体有哪些类型,由JMS实现厂商决定
JMSExpiration发送消息时,其到期时间将计算为send方法上指定的生存时间值与当前GMT值之和。 从send方法返回时,消息的JMSExpiration标头字段包含此值。 收到消息后,其JMSExpiration标头字段包含相同的值
JMSPriorityJMS定义了一个十级优先级值,最低优先级为0,最高优先级为9。 此外,客户端应将优先级0-4视为正常优先级,将优先级5-9视为快速优先级。JMS不需要服务器严格执行消息的优先级排序, 但是,它应该尽力在普通消息之前传递加急消息

消息主体则携带着应用程序的数据或有效负载。

根据有效负载的类型来划分,可以将消息分为几种类型:

  1. 简单文本(TextMessage)
  2. 可序列化的对象(ObjectMessage)
  3. 属性集合(MapMessage)
  4. 字节流(BytesMessage)
  5. 原始值流(StreamMessage)
  6. 无有效负载的消息(Message)

体系架构

JMS由以下元素组成

  1. JMS供应商产品:JMS接口的一个实现。该产品可以是Java的JMS实现,也可以是非Java的面向消息中间件的适配器
  2. JMS Client:生产或消费基于消息的Java的应用程序或对象
  3. JMS Producer:创建并发送消息的JMS客户
  4. JMS Consumer:接收消息的JMS客户
  5. JMS Message:包括可以在JMS客户之间传递的数据的对象
  6. JMS Queue:缓存消息的容器。消息的接受顺序并不一定要与消息的发送顺序相同。消息被消费后将从队列中移除
  7. JMS Topic:Pub/Sub模式

对象模型

ConnectionFactory接口(连接工厂)

用户用来创建到JMS提供者的连接的被管对象。JMS客户通过可移植的接口访问连接,这样当下层的实现改变时,代码不需要进行修改。管理员在JNDI名字空间中配置连接工厂,这样JMS客户才能够查找到它们。根据消息类型的不同,用户将使用队列连接工厂,或者主题连接工厂。

Connection接口(连接)

连接代表了应用程序和消息服务器之间的通信链路。在获得了连接工厂后,就可以创建一个与JMS提供者的连接。根据不同的连接类型,连接允许用户创建会话,以发送和接收队列和主题到目标。

Destination接口(目标)

目标是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。JMS管理员创建这些对象,然后用户通过JNDI发现它们。和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的队列,以及发布者/订阅者模型的主题。

Session接口(会话)

表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个接收的。会话的好处是它支持事务。如果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用户可以使用回滚操作取消这些消息。一个会话允许用户创建消息,生产者来发送消息,消费者来接收消息。

MessageConsumer接口(消息消费者)

由会话创建的对象,用于接收发送到目标的消息。消费者可以同步地(阻塞模式),或(非阻塞)接收队列和主题类型的消息。

MessageProducer接口(消息生产者)

由会话创建的对象,用于发送消息到目标。用户可以创建某个目标的发送者,也可以创建一个通用的发送者,在发送消息时指定目标。

Message接口(消息)

是在消费者和生产者之间传送的对象,也就是说从一个应用程序传送到另一个应用程序。一个消息有三个主要部分:

  • 消息头(必须):包含用于识别和为消息寻找路由的操作设置
  • 一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)
  • 一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)

模式

Java消息服务应用程序结构支持两种模式:

  1. 点对点也叫队列模式
  2. 发布/订阅模式

在点对点或队列模型下,一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将消息发送到消费者的队列,概括为:

  • 一条消息只有一个消费者获得
  • 生产者无需在接收者消费该消息期间处于运行状态,接收者也同样无需在消息发送时处于运行状态
  • 每一个成功处理的消息要么自动确认,要么由接收者手动确认

发布/订阅模式:

  • 支持向一个特定的主题发布消息
  • 0或多个订阅者可能对接收特定消息主题的消息感兴趣
  • 发布者和订阅者彼此不知道对方
  • 多个消费者可以获得消息

在发布者和订阅者之间存在时间依赖性:

  • 发布者需要建立一个主题,以便客户能够订阅
  • 订阅者必须保持持续的活动状态以接收消息,否则会丢失未上线时的消息
  • 对于持久订阅,订阅者未连接时发布的消息将在订阅者重连时重发

传递方式

JMS有两种传递消息的方式:

标记为NON_PERSISTENT的消息最多投递一次,而标记为PERSISTENT的消息将使用暂存后再转送的机理投递。如果一个JMS服务下线,持久性消息不会丢失,等该服务恢复时再传递。默认的消息传递方式是非持久性的。使用非持久性消息可能降低内务和需要的存储器,当不需要接收所有消息时使用。

供应商

开源软件:

  1. Apache ActiveMQ
  2. RabbitMQ
  3. RocketMQ
  4. JBoss 社区所研发的 HornetQ
  5. Joram
  6. Coridan的MantaRay
  7. The OpenJMS Group的OpenJMS

专有的供应商包括:

  1. BEA的BEA WebLogic Server JMS
  2. TIBCO Software的EMS
  3. GigaSpaces Technologies的GigaSpaces
  4. Softwired 2006的iBus
  5. IONA Technologies的IONA JMS
  6. SeeBeyond的IQManager(2005年8月被Sun Microsystems并购)
  7. webMethods的JMS+-
  8. my-channels的Nirvana
  9. Sonic Software的SonicMQ
  10. SwiftMQ的SwiftMQ
  11. IBM的WebSphere MQ

JMS在应用集群中的问题

生产中应用基本上都是以集群部署的。在Queue模式下,消息的消费没有什么问题,因为不同节点的相同应用会抢占式地消费消息,这样还能分摊负载。

如果使用Topic广播模式?对于一个消息,不同节点的相同应用都会收到该消息,进行相应的操作,这样就重复消费了。。。

产中似乎需要结合这两种模式:即不同节点的相同应用间存在竞争,会部分消费(P2P),而不同的应用都需要消费到全量的消息(Topic)模式。这样就可以避免重复消费。

JMS规范文档(jms-1_1-fr-spec.pdf)下载地址:https://download.oracle.com/otndocs/jcp/7195-jms-1.1-fr-spec-oth-JSpec/。

JMS是JEE平台的标准消息传递API。它可以在商业和开源实现中使用。每个实现都包括一个JMS服务器,一个JMS客户端库,以及用于管理消息传递系统的其他特定于实现的组件。 JMS提供程序可以是消息传递服务的独立实现,也可以是非JMS消息传递系统的桥梁。

JMS客户端API是标准化的,因此JMS应用程序可在供应商的实现之间移植。但是:

  1. 底层消息传递实现未指定,因此JMS实现之间没有互操作性。除非存在桥接技术,否则想要共享消息传递的Java应用程序必须全部使用相同的JMS实现
  2. 如果没有供应商特定的JMS客户端库来启用互操作性,则非Java应用程序将无法访问JMS
  3. AMQP 0-9-1是一种消息传递协议,而不是像JMS这样的API。任何实现该协议的客户端都可以访问支持AMQP 0-9-1的代理
  4. 协议级的互操作性允许以任何编程语言编写且在任何操作系统上运行的AMQP 0-9-1客户端都可以参与消息传递系统,而无需桥接不兼容的服务器实现

AMQP协议剖析

AMQP全称高级消息队列协议(Advanced Message Queuing Protocol),是一种标准,类似于JMS,兼容JMS协议。目前RabbitMQ主流支持AMQP 0-9-1,3.8.4版本支持AMQP 1.0。

AMQP中的概念

  • Publisher:消息发送者,将消息发送到Exchange并指定RoutingKey,以便queue可以接收到指定的消息
  • Consumer:消息消费者,从queue获取消息,一个Consumer可以订阅多个queue以从多个queue中接收消息
  • Server:一个具体的MQ服务实例,也称为Broker
  • Virtual host:虚拟主机,一个Server下可以有多个虚拟主机,用于隔离不同项目,一个Virtual host通常包含多个Exchange、Message Queue
  • Exchange:交换器,接收Producer发送来的消息,把消息转发到对应的Message Queue中
  • Routing key:路由键,用于指定消息路由规则(Exchange将消息路由到具体的queue中),通常需要和具体的Exchange类型、Binding的Routing key结合起来使用
  • Bindings:指定了Exchange和Queue之间的绑定关系。Exchange根据消息的Routing key和Binding配置(绑定关系、Binding、Routing key等)来决定把消息分派到哪些具体的queue中。这依赖于Exchange类型
  • Message Queue:实际存储消息的容器,并把消息传递给最终的Consumer

AMQP传输层架构

简要概述

AMQP是一个二进制的协议,信息被组织成数据帧,有很多类型。数据帧携带协议方法和其他信息。所有数据帧都拥有基本相同的格式:帧头,负载,帧尾。数据帧负载的格式依赖于数据帧的类型。

我们假定有一个可靠的面向流的网络传输层(TCP/IP或等价的协议)。

在一个单一的socket连接中,可能有多个相互独立的控制线程,称为“channel”。每个数据帧使用通道号码编号。通过数据帧的交织,不同的通道共享一个连接。对于任意给定通道,数据帧严格按照序列传输。

我们使用小的数据类型来构造数据帧,如bit,integer,string以及字段表。数据帧的字段做了轻微的封装,不会让传输变慢或解析困难。根据协议规范机械地生成成数据帧层相对简单。

线级别的格式被设计为可伸缩和足够通用,以支持任意的高层协议(不仅是AMQP)。我们假定AMQP会扩展,改进以及随时间的其他变化,并要求wire-level格式支持这些变化。

数据类型

AMQP使用的数据类型如下:

  • Integers(数值范围1-8的十进制数字):用于表示大小,数量,限制等,整数类型无符号的,可以在帧内不对齐
  • Bits(统一为8个字节):用于表示开/关值
  • Short strings:用于保存简短的文本属性,字符串个数限制为255,8个字节
  • Long strings:用于保存二进制数据块
  • Field tables:包含键值对,字段值一般为字符串,整数等

协议协商

AMQP客户端和服务端进行协议协商。意味着当客户端连接上之后,服务端会向客户端提出一些选项,客户端必须能接收或修改。如果双方都认同协商的结果,继续进行连接的建立过程。协议协商是一个很有用的技术手段,因为它可以让我们断言假设和前置条件。

在AMQP中,我们需要协商协议的一些特殊方面:

  1. 真实的协议和版本。服务器可能在同一个端口支持多个协议
  2. 双方的加密参数和认证方式。这是功能层的一部分
  3. 数据帧最大大小,通道数量以及其他操作限制

对限制条件的认同可能会导致双方重新分配key的缓存,避免死锁。每个发来的数据帧要么遵守认同的限制,也就是安全的,要么超过了限制,此时另一方出错,必须断开连接。出色地践行了“要么一切工作正常,要么完全不工作”的RabbitMQ哲学。

协商双方认同限制到一个小的值,如下:

  1. 服务端必须告诉客户端它加上了什么限制
  2. 客户端响应服务器,或许会要求对客户端的连接降低限制

数据帧界定

TCP/IP是流协议,没有内置的机制用于界定数据帧。现有的协议从以下几个方面来解决:

  1. 每个连接发送单一数据帧。简单但是慢
  2. 在流中添加帧的边界。简单,但是解析很慢
  3. 计算数据帧的大小,在每个数据帧头部加上该数据帧大小。这简单,快速,AMQP的选择

AMQP客户端实现JMS客户端

RabbitMQ的JMS客户端用RabbitMQ Java客户端实现,既与JMS API兼容,也与AMQP 0-9-1协议兼容。

RabbitMQ JMS客户端不支持某些JMS 1.1功能:

  • JMS客户端不支持服务器会话
  • XA事务支持接口未实现
  • RabbitMQ JMS主题选择器插件支持主题选择器。队列选择器尚未实现
  • 支持RabbitMQ连接的SSL和套接字选项,但仅使用RabbitMQ客户端提供的(默认)SSL连接协议
  • RabbitMQ不支持JMS NoLocal订阅功能,该功能禁止消费者接收通过消费者自己的连接发布的消息。可以调用包含NoLocal参数的方法,但该方法将被忽略

RabbitMQ使用amqp协议,JMS规范仅对于Java的使用作出的规定,跟其他语言无关,协议是语言无关的,只要语言实现了该协议,就可以做客户端。如此,则不同语言之间互操作性得以保证。

AMQP协议文档下载地址:https://www.amqp.org/sites/amqp.org/files/amqp0-9-1.zip。

RabbitMQ介绍、概念、基本架构

RabbitMQ介绍

RabbitMQ,俗称“兔子MQ”(可见其轻巧,敏捷),是目前非常热门的一款开源消息中间件,不管是互联网行业还是传统行业都广泛使用(最早是为了解决电信行业系统之间的可靠通信而设计)。

其特点是:

  1. 高可靠性、易扩展、高可用、功能丰富等
  2. 支持大多数(甚至冷门)的编程语言客户端
  3. RabbitMQ遵循AMQP协议,自身采用Erlang(一种由爱立信开发的通用面向并发编程的语言)编写
  4. RabbitMQ也支持MQTT等其他协议

RabbitMQ具有很强大的插件扩展能力,官方和社区提供了非常丰富的插件可供选择:https://www.rabbitmq.com/community-plugins.html。

RabbitMQ整体逻辑架构

RabbitMQ Exchange类型

RabbitMQ常用的交换器类型有:Fanout、Direct 、Topic 、Headers四种。

Fanout

会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中,如图:

Direct

Direct类型的交换器路由规则很简单,它会把消息路由到那些BindingKey和RoutingKey完全匹配的队列中,如下图:

Topic

Topic类型的交换器在Direct匹配规则上进行了扩展,也是将消息路由到BindingKey和RoutingKey相匹配的队列中,这里的匹配规则稍微不同,它约定:BindingKey和RoutingKey一样都是由"."分隔的字符串;BindingKey中可以存在两种特殊字符“*”和“#”,用于模糊匹配,其中"*"用于匹配一个单词,"#"用于匹配多个单词(可以是0个)。

Headers

Headers类型的交换器不依赖于路由键的匹配规则来路由信息,而是根据发送的消息内容中的Headers属性进行匹配。在绑定队列和交换器时指定一组键值对,当发送的消息到交换器时,RabbitMQ会获取到该消息的Headers,对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果匹配,消息就会路由到该队列。Headers类型的交换器性能很差,不实用。

RabbitMQ数据存储

存储机制

RabbitMQ消息有两种类型:

  1. 持久化消息和非持久化消息
  2. 这两种消息都会被写入磁盘

持久化消息在到达队列时写入磁盘,同时会内存中保存一份备份,当内存吃紧时,消息从内存中清除。这会提高一定的性能。

非持久化消息一般只存于内存中,当内存压力大时数据刷盘处理,以节省内存空间。

RabbitMQ存储层包含两个部分:队列索引和消息存储。

队列索引:rabbit_queue_index

索引维护队列的落盘消息的信息,如存储地点、是否已被给消费者接收、是否已被消费者ack等。每个队列都有相对应的索引。

索引使用顺序的段文件来存储,后缀为.idx,文件名从0开始累加,每个段文件中包含固定的segment_entry_count 条记录,默认值是16384。每个index从磁盘中读取消息的时候,至少要在内存中维护一个段文件,所以设置 queue_index_embed_msgs_below值得时候要格外谨慎,一点点增大也可能会引起内存爆炸式增长。

消息存储:rabbit_msg_store

消息以键值对的形式存储到文件中,一个虚拟主机上的所有队列使用同一块存储,每个节点只有一个。存储分为持久化存储(msg_store_persistent)和短暂存储(msg_store_transient)。持久化存储的内容在broker重启后不会丢失,短暂存储的内容在broker重启后丢失。

store使用文件来存储,后缀为.rdq,经过store处理的所有消息都会以追加的方式写入到该文件中,当该文件的大小超过指定的限制(file_size_limit)后,将会关闭该文件并创建一个新的文件以供新的消息写入。文件名从0开始进行累加。在进行消息的存储时,RabbitMQ会在ETS(Erlang Term Storage)表中记录消息在文件中的位置映射和文件的相关信息。

消息(包括消息头、消息体、属性)可以直接存储在index中,也可以存储在store中。最佳的方式是较小的消息存在index中,而较大的消息存在store中。这个消息大小的界定可以通过queue_index_embed_msgs_below来配置,默认值为4096B。当一个消息小于设定的大小阈值时,就可以存储在index中,这样性能上可以得到优化。一个完整的消息大小小于这个值,就放到索引中,否则放到持久化消息文件中。

rabbitmq.conf中的配置信息:

## Size in bytes below which to embed messages in the queue index.
## Related doc guide: https://rabbitmq.com/persistence-conf.html
##
# queue_index_embed_msgs_below = 4096
## You can also set this size in memory units
##
# queue_index_embed_msgs_below = 4kb

读取消息时,先根据消息的ID(msg_id)找到对应存储的文件,如果文件存在并且未被锁住,则直接打开文件,从指定位置读取消息内容。如果文件不存在或者被锁住了,则发送请求由store进行处理。

删除消息时,只是从ETS表删除指定消息的相关信息,同时更新消息对应的存储文件和相关信息。在执行消息删除操作时,并不立即对文件中的消息进行删除,也就是说消息依然在文件中,仅仅是标记为垃圾数据而已。当一个文件中都是垃圾数据时可以将这个文件删除。当检测到前后两个文件中的有效数据可以合并成一个文件,并且所有的垃圾数据的大小和所有文件(至少有3个文件存在的情况下)的数据大小的比值超过设置的阈值garbage_fraction(默认值0.5)时,才会触发垃圾回收,将这两个文件合并,执行合并的两个文件一定是逻辑上相邻的两个文件。合并逻辑:

  1. 锁定这两个文件
  2. 先整理前面的文件的有效数据,再整理后面的文件的有效数据
  3. 将后面文件的有效数据写入到前面的文件中
  4. 更新消息在ETS表中的记录
  5. 删除后面文件

队列结构

通常队列由rabbit_amqqueue_process和backing_queue这两部分组成,rabbit_amqqueue_process负责协议相关的消息处理,即接收生产者发布的消息、向消费者交付消息、处理消息的确认(包括生产端的confirm和消费端的ack)等。backing_queue是消息存储的具体形式和引擎,并向rabbit_amqqueue_process提供相关的接口以供调用。

如果消息投递的目的队列是空的,并且有消费者订阅了这个队列,那么该消息会直接发送给消费者,不会经过队列这一步。当消息无法直接投递给消费者时,需要暂时将消息存入队列,以便重新投递。

rabbit_variable_queue.erl 源码中定义了RabbitMQ队列的4种状态:

  • alpha:消息索引和消息内容都存内存,最耗内存,很少消耗CPU
  • beta:消息索引存内存,消息内容存磁盘
  • gama:消息索引内存和磁盘都有,消息内容存磁盘
  • delta:消息索引和内容都存磁盘,基本不消耗内存,消耗更多CPU和I/O操作

消息存入队列后,不是固定不变的,它会随着系统的负载在队列中不断流动,消息的状态会不断发生变化。持久化的消息,索引和内容都必须先保存在磁盘上,才会处于上述状态中的一种。gama状态只有持久化消息才会有的状态。

在运行时,RabbitMQ会根据消息传递的速度定期计算一个当前内存中能够保存的最大消息数量(target_ram_count),如果alpha状态的消息数量大于此值,则会引起消息的状态转换,多余的消息可能会转换到beta、gama或者delta状态。区分这4种状态的主要作用是满足不同的内存和CPU需求。

对于普通没有设置优先级和镜像的队列来说,backing_queue的默认实现是rabbit_variable_queue,其内部通过5个子队列Q1、Q2、delta、Q3、Q4来体现消息的各个状态。

消费者获取消息也会引起消息的状态转换。

当消费者获取消息时:

  1. 首先会从Q4中获取消息,如果获取成功则返回
  2. 如果Q4为空,则尝试从Q3中获取消息,系统首先会判断Q3是否为空,如果为空则返回队列为空,即此时队列中无消息
  3. 如果Q3不为空,则取出Q3中的消息;进而再判断此时Q3和Delta中的长度,如果都为空,则可以认为 Q2、Delta、 Q3、Q4 全部为空,此时将Q1中的消息直接转移至Q4,下次直接从Q4 中获取消息
  4. 如果Q3为空,Delta不为空,则将Delta的消息转移至Q3中,下次可以直接从Q3中获取消息。在将消息从Delta转移到Q3的过程中,是按照索引分段读取的,首先读取某一段,然后判断读取的消息的个数与Delta中消息的个数是否相等,如果相等,则可以判定此时Delta中己无消息,则直接将Q2和刚读取到的消息一并放入到Q3中,如果不相等,仅将此次读取到的消息转移到Q3

这里就有个疑问:为什么Q3为空则可以认定整个队列为空?

  1. 试想一下,如果Q3为空,Delta不为空,那么在Q3取出最后一条消息的时候,Delta 上的消息就会被转移到Q3这样与 Q3 为空矛盾
  2. 如果Delta 为空且Q2不为空,则在Q3取出最后一条消息时会将Q2的消息并入到Q3中,这样也与Q3为空矛盾
  3. 在Q3取出最后一条消息之后,如果Q2、Delta、Q3都为空,且Q1不为空时,则Q1的消息会被转移到Q4,这与Q3为空矛盾

其实这一番论述也解释了另一个问题:为什么Q3和Delta都为空时,则可以认为 Q2、Delta、Q3、Q4全部为空?

通常在负载正常时,如果消费速度大于生产速度,对于不需要保证可靠不丢失的消息来说,极有可能只会处于alpha状态。

对于持久化消息,它一定会进入gamma状态,在开启publisher confirm机制时,只有到了gamma状态时才会确认该消息己被接收,若消息消费速度足够快、内存也充足,这些消息也不会继续走到下一个状态。

为什么消息的堆积导致性能下降

在系统负载较高时,消息若不能很快被消费掉,这些消息就会进入到很深的队列中去,这样会增加处理每个消息的平均开销。因为要花更多的时间和资源处理“堆积”的消息,如此用来处理新流入的消息的能力就会降低,使得后流入的消息又被积压到很深的队列中,继续增大处理每个消息的平均开销,继而情况变得越来越恶化,使得系统的处理能力大大降低。

应对这一问题一般有3种措施:

  1. 增加prefetch_count的值,即一次发送多条消息给消费者,加快消息被消费的速度
  2. 采用multiple ack,降低处理ack带来的开销
  3. 流量控制

安装和配置RabbitMQ

安装环境:

  1. 虚拟机软件:VMWare 15.1.0
  2. 操作系统:CentOS Linux release 7.7.1908
  3. Erlang:erlang-23.0.2-1.el7.x86_64
  4. RabbitMQ:rabbitmq-server-3.8.4-1.el7.noarch

虚拟机之外的软件可以在附件下载也可以去对应官网下载。

RabbitMQ的安装需要首先安装Erlang,因为它是基于Erlang的VM运行的。

RabbitMQ需要的依赖:socat和logrotate,logrotate操作系统中已经存在了,只需要安装socat就可以了。

RabbitMQ与Erlang的兼容关系详见:https://www.rabbitmq.com/which-erlang.html。

安装依赖:

yum install socat -y 

安装Erlang

erlang-23.0.2-1.el7.x86_64.rpm下载地址:https://github.com/rabbitmq/erlang-rpm/releases/download/v23.0.2/erlang-23.0.2-1.el7.x86_64.rpm。

首先将erlang-23.0.2-1.el7.x86_64.rpm上传至服务器,然后执行下述命令:

rpm -ivh erlang-23.0.2-1.el7.x86_64.rpm 

安装RabbitMQ

rabbitmq-server-3.8.4-1.el7.noarch.rpm下载地址:https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.5/rabbitmq-server-3.8.5-1.el7.noarch.rpm。

首先将rabbitmq-server-3.8.4-1.el7.noarch.rpm上传至服务器,然后执行下述命令:

rpm -ivh rabbitmq-server-3.8.4-1.el7.noarch.rpm 

启用RabbitMQ的管理插件

rabbitmq-plugins enable rabbitmq_management 

开启RabbitMQ

systemctl start rabbitmq-server 
或者
rabbitmq-server 
或者后台启动
rabbitmq-server -detached

添加用户

rabbitmqctl add_user root 123456 

给用户添加权限

给root用户在虚拟主机"/"上的配置、写、读的权限

rabbitmqctl set_permissions root -p / ".*" ".*" ".*" 

给用户设置标签

rabbitmqctl set_user_tags root administrator 

用户的标签和权限:

TagCapabilities
(None)没有访问management插件的权限
management可以使用消息协议做任何操作的权限,加上:
- 可以使用AMQP协议登录的虚拟主机的权限
- 查看它们能登录的所有虚拟主机中所有队列、交换器和绑定的权限
- 查看和关闭它们自己的通道和连接的权限
- 查看它们能访问的虚拟主机中的全局统计信息,包括其他用户的活动
policymaker所有management标签可以做的,加上:
- 在它们能通过AMQP协议登录的虚拟主机上,查看、创建和删除策略以及虚
- 拟主机参数的权限
monitoring所有management能做的,加上:
- 列出所有的虚拟主机,包括列出不能使用消息协议访问的虚拟主机的权限
- 查看其他用户连接和通道的权限
- 查看节点级别的数据如内存使用和集群的权限
- 查看真正的全局所有虚拟主机统计数据的权限
administrator所有policymaker和monitoring能做的,加上:
- 创建删除虚拟主机的权限
- 查看、创建和删除用户的权限
- 查看、创建和删除权限的权限
- 关闭其他用户连接的权限

打开浏览器,访问http://<安装了CentOS的VMWare虚拟机IP地址>:15672

使用刚才创建的用户登录

RabbitMQ常用操作命令

# 前台启动Erlang VM和RabbitMQ
rabbitmq-server
# 后台启动
rabbitmq-server -detached
# 停止RabbitMQ和Erlang VM
rabbitmqctl stop
# 查看所有队列
rabbitmqctl list_queues
# 查看所有虚拟主机
rabbitmqctl list_vhosts
# 在Erlang VM运行的情况下启动RabbitMQ应用
rabbitmqctl start_app
rabbitmqctl stop_app
# 查看节点状态
rabbitmqctl status
# 查看所有可用的插件
rabbitmq-plugins list
# 启用插件
rabbitmq-plugins enable <plugin-name>
# 停用插件
rabbitmq-plugins disable <plugin-name>
# 添加用户
rabbitmqctl add_user username password
# 列出所有用户:
rabbitmqctl list_users
# 删除用户:
rabbitmqctl delete_user username
# 清除用户权限:
rabbitmqctl clear_permissions -p vhostpath username
# 列出用户权限:
rabbitmqctl list_user_permissions username
# 修改密码:
rabbitmqctl change_password username newpassword
# 设置用户权限:
rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*"
# 创建虚拟主机:
rabbitmqctl add_vhost vhostpath
# 列出所以虚拟主机:
rabbitmqctl list_vhosts
# 列出虚拟主机上的所有权限:
rabbitmqctl list_permissions -p vhostpath
# 删除虚拟主机:
rabbitmqctl delete_vhost vhost vhostpath
# 移除所有数据,要在 rabbitmqctl stop_app 之后使用:
rabbitmqctl reset

RabbitMQ工作流程详解

生产者发送消息的流程

  1. 生产者连接RabbitMQ,建立TCP连接(Connection),开启信道(Channel)
  2. 生产者声明一个Exchange(交换器),并设置相关属性,比如交换器类型、是否持久化等
  3. 生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等
  4. 生产者通过 bindingKey (绑定Key)将交换器和队列绑定( binding )起来
  5. 生产者发送消息至RabbitMQ Broker,其中包含 routingKey (路由键)、交换器等信息
  6. 相应的交换器根据接收到的 routingKey 查找相匹配的队列
  7. 如果找到,则将从生产者发送过来的消息存入相应的队列中
  8. 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
  9. 关闭信道
  10. 关闭连接

消费者接收消息的过程

  1. 消费者连接到RabbitMQ Broker ,建立一个连接(Connection ) ,开启一个信道(Channel)
  2. 消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数, 以及做一些准备工作
  3. 等待RabbitMQ Broker 回应并投递相应队列中的消息, 消费者接收消息
  4. 消费者确认( ack) 接收到的消息
  5. RabbitMQ 从队列中删除相应己经被确认的消息
  6. 关闭信道
  7. 关闭连接

案例

Hello World一对一的简单模式。生产者直接发送消息给RabbitMQ,另一端消费。未定义和指定Exchange的情况下,使用的是AMQP default这个内置的Exchange。

依赖坐标

<dependency>
     <groupId>com.rabbitmq</groupId>
     <artifactId>amqp-client</artifactId>
     <version>5.9.0</version>
</dependency>

HelloProducer.java

public class HelloProducer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取连接工厂
        ConnectionFactory factory = new ConnectionFactory();
//        建立的连接是否要求自动恢复
//        factory.setAutomaticRecoveryEnabled(false);

//        ExecutorService executor = Executors.

//        factory.setSharedExecutor(executor);

        // 设置主机名 hostname
        factory.setHost("node1");
        // 设置虚拟主机名称  /在url中的转义字符 %2f
        factory.setVirtualHost("/");
        // 用户名
        factory.setUsername("root");
        // 密码
        factory.setPassword("123456");
        // amqp的端口号
        factory.setPort(5672);

        // 建立TCP连接
        Connection connection = factory.newConnection();
        // 获取通道
        Channel channel = connection.createChannel();

        // 声明消息队列   消息队列名称
        // 是否是持久化的
        // 是否是排他的
        // 是否是自动删除的
        // 消息队列的属性信息。使用默认值;
        channel.queueDeclare("queue.biz", false, false, true, null);

        // 声明交换器
        // 交换器的名称
        // 交换器的类型
        // 交换器是否是持久化的
        // 交换器是否是自动删除的
        // 交换器的属性map集合
        channel.exchangeDeclare("ex.biz", BuiltinExchangeType.DIRECT, false, false, null);
        // 将交换器和消息队列绑定,并指定路由键
        channel.queueBind("queue.biz", "ex.biz", "hello.world");
        // 发送消息
        // 交换器的名字
        // 该消息的路由键
        // 该消息的属性BasicProperties对象
        // 消息的字节数组
        channel.basicPublish("ex.biz", "hello.world", null, "hello world 2".getBytes());

        // 关闭通道
        channel.close();
        // 关闭连接
        connection.close();
    }
}

HelloConsume.java

public class HelloConsume {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@node1:5672/%2f");

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        // 确保MQ中有该队列,如果没有则创建
        channel.queueDeclare("queue.biz", false, false, true, null);


        DeliverCallback callback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {

            }

        };
        // 监听消息,一旦有消息推送过来,就调用第一个lambda表达式
        channel.basicConsume("queue.biz", (consumerTag, message) -> {
            System.out.println(new String(message.getBody()));
        }, (consumerTag) -> {});

//        channel.close();
//        connection.close();
    }
}

主动拉取案例:HelloGetConsumer.java

public class HelloGetConsumer {
    public static void main(String[] args) throws NoSuchAlgorithmException, KeyManagementException, URISyntaxException, IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        // 指定协议: amqp://
        // 指定用户名  root
        // 指定密码   123456
        // 指定host   node1
        // 指定端口号  5672
        // 指定虚拟主机  %2f
        factory.setUri("amqp://root:123456@node1:5672/%2f");

        final Connection connection = factory.newConnection();
        System.out.println(connection.getClass());

        final Channel channel = connection.createChannel();

        // 拉消息模式
        // 指定从哪个消费者消费消息
        // 指定是否自动确认消息  true表示自动确认
        final GetResponse getResponse = channel.basicGet("queue.biz", true);
        // 获取消息体  hello world 1
        final byte[] body = getResponse.getBody();
        System.out.println(new String(body));

//        final AMQP.BasicProperties props = getResponse.getProps();

        channel.close();
        connection.close();

    }
}

Connection和Channel关系

生产者和消费者,需要与RabbitMQ Broker建立TCP连接,也就是Connection 。一旦TCP 连接建立起来,客户端紧接着创建一个AMQP 信道(Channel),每个信道都会被指派一个唯一的ID。信道是建立在Connection 之上的虚拟连接, RabbitMQ 处理的每条AMQP 指令都是通过信道完成的。

为什么不直接使用TCP连接,而是使用信道?

RabbitMQ 采用类似NIO的做法,复用TCP 连接,减少性能开销,便于管理。当每个信道的流量不是很大时,复用单一的Connection可以在产生性能瓶颈的情况下有效地节省TCP连接资源。

当信道本身的流量很大时,一个Connection 就会产生性能瓶颈,流量被限制。需要建立多个Connection ,分摊信道。具体的调优看业务需要。

信道在AMQP中是一个很重要的概念,大多数操作都是在信道这个层面进行的。

channel.exchangeDeclare
channel.queueDeclare
channel.basicPublish
channel.basicConsume
// ...

RabbitMQ 相关的API与AMQP紧密相连,比如channel.basicPublish对应AMQP的Basic.Publish命令。

RabbitMQ工作模式详解

官网地址:https://www.rabbitmq.com/getstarted.html。

Work Queue

生产者发消息,启动多个消费者实例来消费消息,每个消费者仅消费部分信息,可达到负载均衡的效果。

Producer.java

public class Producer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@node1:5672/%2f");

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        // 声明消息队列
        channel.queueDeclare("queue.wq", true, false, false, null);
        //声明一个交换器
        channel.exchangeDeclare("ex.wq", BuiltinExchangeType.DIRECT, true, false, null);
        // 将交换器绑定到消息队列,同时指定绑定键(binding-key)
        channel.queueBind("queue.wq", "ex.wq", "key.wq");
        for (int i = 0; i < 15; i++) {
            channel.basicPublish("ex.wq", "key.wq", null, ("工作队列:" + i).getBytes("utf-8"));
        }

        // 关闭通道
        channel.close();
        // 关闭连接
        connection.close();
    }
}

Consumer.java

public class Consumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@node1:5672/%2f");

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
//        保险起见,先声明一下,如果RabbitMQ的虚拟主机中有该队列,当然好,如果没有,则创建
//        此处的队列应该和生产者声明的队列属性等一致
        channel.queueDeclare("queue.wq", true, false, false, null);

        channel.basicConsume("queue.wq", new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                System.out.println(new String(message.getBody(), "utf-8"));
            }
        }, new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {
                System.out.println("consumerTag  :  " + consumerTag);
            }
        });

//        channel.close();
//        connection.close();
    }
}

我们测试时先启动几个消费者客户端,再执行生产者观察现象。

发布订阅模式

使用Fanout类型交换器,routingKey忽略。每个消费者定义生成一个队列并绑定到同一个Exchange,每个消费者都可以消费到完整的消息。

Producer.java

public class Producer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@node1:5672/%2f");

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        // 声明fanout类型的交换器
        channel.exchangeDeclare("ex.myfan", "fanout", true, false, null);

        for (int i = 0; i < 20; i++) {
            channel.basicPublish("ex.myfan",
                    "",  // fanout类型的交换器不需要指定路由键
                    null,
                    ("hello world fan:" + i).getBytes("utf-8"));
        }

        channel.close();
        connection.close();
    }
}

接下来我们定义三个消费者:

OneConsumer.java

public class OneConsumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@node1:5672/%2f");

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

//        声明临时队列,队列的名字由RabbitMQ自动生成
        final String queueName = channel.queueDeclare().getQueue();
        System.out.println("生成的临时队列的名字为:" + queueName);

        channel.exchangeDeclare("ex.myfan",
                BuiltinExchangeType.FANOUT,
                true,
                false,
                null);

        // fanout类型的交换器绑定不需要routingkey
        channel.queueBind(queueName, "ex.myfan", "");

        channel.basicConsume(queueName, (consumerTag, message) -> {
            System.out.println("One   " + new String(message.getBody(), "utf-8"));
        }, consumerTag -> {});

    }
}

TwoConsumer.java

public class TwoConsumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@node1:5672/%2f");

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        final String queueName = channel.queueDeclare().getQueue();
        System.out.println("生成的临时队列的名字为:" + queueName);

        channel.exchangeDeclare("ex.myfan",
                BuiltinExchangeType.FANOUT,
                true,
                false,
                null);

        // fanout类型的交换器绑定不需要routingkey
        channel.queueBind(queueName, "ex.myfan", "");

        channel.basicConsume(queueName, (consumerTag, message) -> {
            System.out.println("Two   " + new String(message.getBody(), "utf-8"));
        }, consumerTag -> {});

    }
}

ThreeConsumer.java

public class ThreeConsumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@node1:5672/%2f");

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        final String queueName = channel.queueDeclare().getQueue();
        System.out.println("生成的临时队列的名字为:" + queueName);

        channel.exchangeDeclare("ex.myfan",
                BuiltinExchangeType.FANOUT,
                true,
                false,
                null);

        // fanout类型的交换器绑定不需要routingkey
        channel.queueBind(queueName, "ex.myfan", "");

        channel.basicConsume(queueName, (consumerTag, message) -> {
            System.out.println("Three   " + new String(message.getBody(), "utf-8"));
        }, consumerTag -> {});

    }
}

路由模式

分布式系统中有很多应用,这些应用需要运维平台的监控,其中一个重要的信息就是服务器的日志记录。我们需要将不同日志级别的日志记录交给不同的应用处理。如何解决?

我们需要使用Direct类型的交换器。 Direct交换器的路由算法很简单:只要消息的routingKey和队列的bindingKey对应,消息就可以推送给该队列。

Producer.java

public class Producer {

    private final static String[] LOG_LEVEL = {
            "ERROR",
            "FATAL",
            "WARN"
    };

    private static Random random = new Random();

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@node1:5672/%2f");

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        // 声明direct类型的交换器,交换器和消息队列的绑定不需要在这里处理
        channel.exchangeDeclare("ex.routing", "direct", false, false, null);

        for (int i = 0; i < 100; i++) {
            String level = LOG_LEVEL[random.nextInt(100) % LOG_LEVEL.length];
            channel.basicPublish("ex.routing", level, null, ("这是【" + level + "】的消息").getBytes());
        }

    }
}

上述代码我们可以看到,我们根据日志级别作为routingKey来向各个绑定的队列发送对应日志级别的日志。接下来,我们定义各个日志级别的消费者即可:

ErrorConsumer.java

public class ErrorConsumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@node1:5672/%2f");

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.exchangeDeclare("ex.routing", "direct", false, false, null);
        // 此处也可以声明为临时消息队列
        channel.queueDeclare("queue.error", false, false, false, null);

        channel.queueBind("queue.error", "ex.routing", "ERROR");

        channel.basicConsume("queue.error", ((consumerTag, message) -> {
            System.out.println("ErrorConsumer收到的消息:" + new String(message.getBody(), "utf-8"));
        }), consumerTag -> { });

    }
}

FatalConsumer.java

public class FatalConsumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@node1:5672/%2f");

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.exchangeDeclare("ex.routing", "direct", false, false, null);
        // 此处也可以声明为临时消息队列
        channel.queueDeclare("queue.fatal", false, false, false, null);

        channel.queueBind("queue.fatal", "ex.routing", "FATAL");

        channel.basicConsume("queue.fatal", ((consumerTag, message) -> {
            System.out.println("FatalConsumer收到的消息:" + new String(message.getBody(), "utf-8"));
        }), consumerTag -> { });

    }
}

WarnConsumer.java

public class WarnConsumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@node1:5672/%2f");

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.exchangeDeclare("ex.routing", "direct", false, false, null);
        // 此处也可以声明为临时消息队列
        channel.queueDeclare("queue.warn", false, false, false, null);

        channel.queueBind("queue.warn", "ex.routing", "WARN");

        channel.basicConsume("queue.warn", ((consumerTag, message) -> {
            System.out.println("WarnConsumer收到的消息:" + new String(message.getBody(), "utf-8"));
        }), consumerTag -> { });

    }
}

主题模式

使用Topic类型的交换器,队列绑定到交换器、 bindingKey时使用通配符,交换器将消息路由转发到具体队列时会根据消息 routingKey模糊匹配,比较灵活。

上个模式中,我们通过 direct 类型的交换器做到了根据日志级别的不同,将消息发送给了不同队列的。

这里有一个限制,加入现在我不仅想根据日志级别划分日志消息,还想根据日志来源划分日志,怎么做?

比如,我想监听cron服务发送的 error 消息,又想监听从kern服务发送的所有消息。

此时可以使用RabbitMQ的主题模式( Topic )。

要想Topic类型的交换器,routingKey就不能随便写了,它必须得是点分单词。单词可以随便写,生产中一般使用消息的特征。如:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”等。该点分单词字符串最长255字节。

bindingKey也必须是这种形式。 Topic类型的交换器背后原理跟Direct类型的类似:只要队列的bindingKey的值与消息的routingKey匹配,队列就可以收到该消息。有两个不同:

  • * 匹配一个单词
  • # 匹配0到多个单词

接下来我们看一个案例:

我们延续上述的日志案例,在日志级别的基础上增加地区和项目的级别限制,我们的日志绑定key规则为:地区.项目.日志级别。

Producer.java

public class Producer {

    private static final String[] LOG_LEVEL = {"info", "error", "warn"};
    private static final String[] LOG_AREA = {"beijing", "shanghai", "shenzhen"};
    private static final String[] LOG_BIZ = {"edu-online", "biz-online", "emp-online"};

    private static final Random RANDOM = new Random();

    public static void main(String[] args) throws Exception {

        final ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@node1:5672/%2f");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.exchangeDeclare("ex.topic", "topic", true, false, null);

        String area, level, biz;

        String routingKey, message;
        for (int i = 0; i < 100; i++) {

            area = LOG_AREA[RANDOM.nextInt(LOG_AREA.length)];
            level = LOG_LEVEL[RANDOM.nextInt(LOG_LEVEL.length)];
            biz = LOG_BIZ[RANDOM.nextInt(LOG_BIZ.length)];

            // routingKey中包含了三个维度
            routingKey = area + "." + biz + "." + level;
            message = "LOG: [" + level + "] :这是 [" + area + "] 地区 [" + biz + "] 服务器发来的消息,MSG_SEQ = " + i;

            channel.basicPublish("ex.topic", routingKey, null, message.getBytes("utf-8"));
        }

        channel.close();
        connection.close();
    }
}

接下来我们编写几个消费者,分别是:

只关心北京地区日志的消费者:BeijingConsumer.java

public class BeijingConsumer {
    public static void main(String[] args) throws Exception {
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@node1:5672/%2f");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        // 临时队列,返回值是服务器为该队列生成的名称
        final String queue = channel.queueDeclare().getQueue();
        channel.exchangeDeclare("ex.topic", "topic", true, false, null);
//       beijing.biz-online.error
//        只要routingKey是以beijing开头的,后面不管几个点分单词,都可以接收
        channel.queueBind(queue, "ex.topic", "beijing.#");

        channel.basicConsume(queue, (consumerTag, message) -> {
            System.out.println(new String(message.getBody(), "utf-8"));
        }, consumerTag -> {});

    }
}

只关心错误日志的消费者:ErrorConsumer.java

public class ErrorConsumer {
    public static void main(String[] args) throws Exception {
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@node1:5672/%2f");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        // 临时队列,返回值是服务器为该队列生成的名称
        final String queue = channel.queueDeclare().getQueue();
        channel.exchangeDeclare("ex.topic", "topic", true, false, null);
//       beijing.biz-online.error
//        不管前面有几个点分单词,只要最后一个点分单词是error就可以路由消息
        channel.queueBind(queue, "ex.topic", "#.error");

        channel.basicConsume(queue, (consumerTag, message) -> {
            System.out.println(new String(message.getBody(), "utf-8"));
        }, consumerTag -> {});

    }
}

只关心上海来的错误日志的消费者:ShanghaiErrorConsumer.java

public class ShanghaiErrorConsumer {
    public static void main(String[] args) throws Exception {
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@node1:5672/%2f");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        // 临时队列,返回值是服务器为该队列生成的名称
        final String queue = channel.queueDeclare().getQueue();
        channel.exchangeDeclare("ex.topic", "topic", true, false, null);
//       beijing.biz-online.error
        channel.queueBind(queue, "ex.topic", "shanghai.*.error");

        channel.basicConsume(queue, (consumerTag, message) -> {
            System.out.println(new String(message.getBody(), "utf-8"));
        }, consumerTag -> {});

    }
}

只关心深圳emp-online项目的日志的消费者:ShenZhenEmponlineConsumer.java

public class ShenZhenEmponlineConsumer {
    public static void main(String[] args) throws Exception {
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@node1:5672/%2f");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        // 临时队列,返回值是服务器为该队列生成的名称
        final String queue = channel.queueDeclare().getQueue();
        channel.exchangeDeclare("ex.topic", "topic", true, false, null);
//       beijing.biz-online.error
        channel.queueBind(queue, "ex.topic", "shenzhen.emp-online.*");

        channel.basicConsume(queue, (consumerTag, message) -> {
            System.out.println(new String(message.getBody(), "utf-8"));
        }, consumerTag -> {});

    }
}

Spring整合RabbitMQ

spring-amqp是对AMQP的一些概念的一些抽象,spring-rabbit是对RabbitMQ操作的封装实现。

主要有几个核心类RabbitAdmin、RabbitTemplate、SimpleMessageListenerContainer等。

RabbitAdmin类完成对Exchange,Queue,Binding的操作,在容器中管理了RabbitAdmin类的时候,可以对Exchange,Queue,Binding进行自动声明。

RabbitTemplate类是发送和接收消息的工具类。

SimpleMessageListenerContainer是消费消息的容器。

基于配置文件的整合

创建maven项目,配置pom.xml,添加rabbit的spring依赖:

<dependencies>
     <dependency>
          <groupId>org.springframework.amqp</groupId>
          <artifactId>spring-rabbit</artifactId>
          <version>2.2.7.RELEASE</version>
     </dependency>
</dependencies>

编写配置文件spring-rabbit.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/rabbit
        http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <!--创建连接工厂-->
    <rabbit:connection-factory id="connectionFactory"
                               host="rabbitmq-host" virtual-host="/"
                               username="root" password="123456"
                               port="5672" />

    <!--用于自动向RabbitMQ声明队列、交换器、绑定等操作的工具类-->
    <rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory" />
    <!--用于简化操作的模板类-->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" />

    <!--声明一个消息队列-->
    <rabbit:queue id="q1" name="queue.q1" durable="false" exclusive="false" auto-delete="false" />

    <!--声明交换器-->
    <rabbit:direct-exchange name="ex.direct" durable="false" auto-delete="false" id="directExchange">
        <!--        <rabbit:exchange-arguments>-->
        <!--            <entry key="" value=""/>-->
        <!--        </rabbit:exchange-arguments>-->
        <rabbit:bindings>
            <!--key表示绑定键-->
            <!--queue表示将交换器绑定到哪个消息队列,使用bean的id,不要使用队列的名字-->
            <!--exchange表示将交换器绑定到哪个交换器-->
            <!--            <rabbit:binding queue="" key="" exchange=""></rabbit:binding>-->
            <rabbit:binding queue="q1" key="routing.q1" />
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <rabbit:listener-container connection-factory="connectionFactory">
        <!--        <rabbit:listener ref="" method="" queues="q1" />-->
        <rabbit:listener ref="messageListener" queues="q1" />
    </rabbit:listener-container>

    <bean id="messageListener" class="com.rubin.rabbit.spring.MyMessageListener"/>

</beans>

创建生产者:ProducerApp.java

public class ProducerApp {
    public static void main(String[] args) throws UnsupportedEncodingException {
        AbstractApplicationContext context
                = new ClassPathXmlApplicationContext("spring-rabbit.xml");

        RabbitTemplate template = context.getBean(RabbitTemplate.class);

        Message msg;

        final MessagePropertiesBuilder builder = MessagePropertiesBuilder.newInstance();
        builder.setContentEncoding("gbk");
        builder.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);

//        msg = MessageBuilder.withBody("你好,世界".getBytes("gbk"))
//                .andProperties(builder.build())
//                .build();
//
//        template.send("ex.direct", "routing.q1", msg);

        for (int i = 0; i < 1000; i++) {
            msg = MessageBuilder.withBody(("你好,世界" + i).getBytes("gbk"))
                    .andProperties(builder.build())
                    .build();

            template.send("ex.direct", "routing.q1", msg);
        }

        context.close();
    }
}

创建消费者:ConsumerApp.java

public class ConsumerApp {

    public static void main(String[] args) throws UnsupportedEncodingException {
        AbstractApplicationContext context = new ClassPathXmlApplicationContext("spring-rabbit.xml");

        final RabbitTemplate template = context.getBean(RabbitTemplate.class);

        final Message message = template.receive("queue.q1");

        // 拉消息模式
        System.out.println(new String(message.getBody(), message.getMessageProperties().getContentEncoding()));
//        System.out.println(new String(message.getBody(), "utf-8"));

        context.close();
    }

}

创建监听器消费者来自动拉取消息消费:MyMessageListener.java

public class MyMessageListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        System.out.println(new String(message.getBody(), message.getMessageProperties().getContentEncoding()));
    }

    public static void main(String[] args) {
        new ClassPathXmlApplicationContext("spring-rabbit.xml");
    }

}

基于注解的整合

创建maven项目,配置pom.xml,添加rabbit的spring依赖:

<dependencies>
  <dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>2.2.7.RELEASE</version>
  </dependency>
</dependencies

添加配置类:RabbitConfig.java

@Configuration
@ComponentScan(basePackages = "com.rubin.rabbit.spring.annotation")
@EnableRabbit
public class RabbitConfig {

    // 连接工厂
    @Bean
    public ConnectionFactory connectionFactory() {
        ConnectionFactory factory = new CachingConnectionFactory(URI.create("amqp://root:123456@rabbitmq-host:5672/%2f"));
        return factory;
    }

    // RabbitTemplate
    @Bean
    @Autowired
    public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {

        RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);

        return rabbitTemplate;
    }

    // RabbitAdmin
    @Bean
    @Autowired
    public RabbitAdmin rabbitAdmin(ConnectionFactory factory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(factory);
        return rabbitAdmin;
    }

    // Queue
    @Bean
    public Queue queue() {
        final Queue queue = QueueBuilder.nonDurable("queue.anno").build();
        return queue;
    }

    // Exchange
    @Bean
    public Exchange exchange() {
        final FanoutExchange fanoutExchange = new FanoutExchange("ex.anno.fanout", false, false, null);
        return fanoutExchange;
    }

    // Binding
    @Bean
    @Autowired
    public Binding binding(Queue queue, Exchange exchange) {
        // 创建一个绑定,不指定绑定的参数
        final Binding binding = BindingBuilder.bind(queue).to(exchange).with("key.anno").noargs();
        return binding;
    }

    @Bean("rabbitListenerContainerFactory")
    @Autowired
    public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
//        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//        factory.setAcknowledgeMode(AcknowledgeMode.NONE);
//        factory.setConcurrentConsumers(10);
//        factory.setMaxConcurrentConsumers(15);
        // 按照批次消费消息
//        factory.setBatchSize(10);

        return factory;
    }

}

创建消息生产者:ProducerApp.java

public class ProducerApp {

    public static void main(String[] args) throws UnsupportedEncodingException {
        AbstractApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfig.class);

        final RabbitTemplate template = context.getBean(RabbitTemplate.class);

        final MessageProperties messageProperties = MessagePropertiesBuilder
                .newInstance()
                .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
                .setContentEncoding("gbk")
                .setHeader("myKey", "myValue")
                .build();

//        final Message message = MessageBuilder
//                .withBody("你好,世界".getBytes("gbk"))
//                .andProperties(messageProperties)
//                .build();
//        template.send("ex.anno.fanout", "key.anno", message);

        for (int i = 0; i < 1000; i++) {
            final Message message = MessageBuilder
                    .withBody(("你好,世界" + i).getBytes("gbk"))
                    .andProperties(messageProperties)
                    .build();
            template.send("ex.anno.fanout", "key.anno", message);
        }

        context.close();
    }

}

消息消费者:ConsumerApp.java

public class ConsumerApp {
    public static void main(String[] args) throws UnsupportedEncodingException {

        // 从指定类加载配置信息
        AbstractApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfig.class);
        // 获取RabbitTemplate对象
        final RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
        // 接收消息
        final Message message = rabbitTemplate.receive("queue.anno");
        // 打印消息
        System.out.println(new String(message.getBody(), message.getMessageProperties().getContentEncoding()));

        // 关闭spring的上下文
        context.close();

    }
}

消息监听器:MyMessageListener.java


@Component
public class MyMessageListener {

    /**
     * com.rabbitmq.client.Channel channel对象
     * org.springframework.amqp.core.Message message对象 可以直接操作原生的AMQP消息
     * org.springframework.messaging.Message to use the messaging abstraction counterpart
     *
     * @Payload 注解方法参数,该参数的值就是消息体
     * @Header 注解方法参数,访问指定的消息头字段的值
     * @Headers 该注解的方法参数获取该消息的消息头的所有字段,参数类型对应于map集合。
     * MessageHeaders 参数类型,访问所有消息头字段
     * MessageHeaderAccessor or AmqpMessageHeaderAccessor 访问所有消息头字段
     */
//    @RabbitListener(queues = "queue.anno")
//    public void whenMessageCome(Message message) throws UnsupportedEncodingException {
//        System.out.println(new String(message.getBody(), message.getMessageProperties().getContentEncoding()));
//    }
    @RabbitListener(queues = "queue.anno")
    public void whenMessageCome(@Payload String messageStr) {
        System.out.println(messageStr);
    }

    public static void main(String[] args) throws IOException {
        final AnnotationConfigApplicationContext annotationConfigApplicationContext = new AnnotationConfigApplicationContext(RabbitConfig.class);
        annotationConfigApplicationContext.start();
        System.in.read();
    }

}

SpringBoot整合RabbitMQ

添加starter依赖

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

添加连接配置信息

spring.application.name=springboot_rabbit
spring.rabbitmq.host=node1
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
spring.rabbitmq.port=5672

主入口类

@SpringBootApplication
public class SpringbootRabbitmqApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringbootRabbitmqApplication.class, args);
    }

}

配置类

@Configuration
public class RabbitConfig {

    @Bean
    public Queue queue() {
        return new Queue("queue.boot", false, false, false, null);
    }

    @Bean
    public Exchange exchange() {
        return new TopicExchange("ex.boot", false, false, null);
    }


    @Bean
    public Binding binding() {
        return new Binding("queue.boot",
                Binding.DestinationType.QUEUE,
                "ex.boot",
                "key.boot",
                null);
    }

}

使用RestController发送消息

@RestController
public class MessageController {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    @RequestMapping("/rabbit/{message}")
    public String receive(@PathVariable String message) throws UnsupportedEncodingException {

        final MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
                .setContentEncoding("utf-8")
                .setHeader("hello", "world")
                .build();

        final Message msg = MessageBuilder
                .withBody(message.getBytes("utf-8"))
                .andProperties(messageProperties)
                .build();

        rabbitTemplate.send("ex.boot", "key.boot", msg);

        return "ok";
    }

}

创建消息消费者:

@Component
public class MyMessageListener {

//    @RabbitListener(queues = "queue.boot")
//    public void getMyMessage(@Payload String message, @Header(name = "hello") String value, Channel channel) {
//        System.out.println(message);
//        System.out.println("hello = " + value);
//
//        // 确认消息
//        channel.basicAck();
//        // 拒收消息
//        channel.basicReject();
//    }

    private Integer index = 0;

    @RabbitListener(queues = "queue.boot")
    public void getMyMessage(Message message, Channel channel) throws IOException {
        String value = message.getMessageProperties().getHeader("hello");

        System.out.println(message);
        System.out.println("hello = " + value);

        final long deliveryTag = message.getMessageProperties().getDeliveryTag();

        if (index % 2 == 0) {
            // 确认消息
            channel.basicAck(deliveryTag, false);
        } else {
            // 拒收消息
            channel.basicReject(deliveryTag, false);
        }
        index++;
    }

}

以上就是本文的全部内容。欢迎小伙伴们积极留言交流~~~

附件

链接:https://pan.rubinchu.com/share/1462960576488538112

提取码:rboe

本作品采用 知识共享署名 4.0 国际许可协议 进行许可
标签: RabbitMQ
最后更新:2022年 6月 9日

RubinChu

一个快乐的小逗比~~~

打赏 点赞
下一篇 >

文章评论

razz evil exclaim smile redface biggrin eek confused idea lol mad twisted rolleyes wink cool arrow neutral cry mrgreen drooling persevering
取消回复
文章目录
  • JMS规范和AMQP协议
    • JMS经典模式详解
      • JMS消息
      • 体系架构
      • 对象模型
      • 模式
      • 传递方式
      • 供应商
    • JMS在应用集群中的问题
  • AMQP协议剖析
    • AMQP中的概念
    • AMQP传输层架构
      • 简要概述
      • 数据类型
      • 协议协商
      • 数据帧界定
    • AMQP客户端实现JMS客户端
  • RabbitMQ介绍、概念、基本架构
    • RabbitMQ介绍
    • RabbitMQ整体逻辑架构
    • RabbitMQ Exchange类型
      • Fanout
      • Direct
      • Topic
      • Headers
    • RabbitMQ数据存储
      • 存储机制
      • 队列索引:rabbit_queue_index
      • 消息存储:rabbit_msg_store
      • 队列结构
      • 为什么消息的堆积导致性能下降
  • 安装和配置RabbitMQ
  • RabbitMQ常用操作命令
  • RabbitMQ工作流程详解
    • 生产者发送消息的流程
    • 消费者接收消息的过程
    • 案例
    • Connection和Channel关系
  • RabbitMQ工作模式详解
    • Work Queue
    • 发布订阅模式
    • 路由模式
    • 主题模式
  • Spring整合RabbitMQ
    • 基于配置文件的整合
    • 基于注解的整合
  • SpringBoot整合RabbitMQ
  • 附件
最新 热点 随机
最新 热点 随机
问题记录之Chrome设置屏蔽Https禁止调用Http行为 问题记录之Mac设置软链接 问题记录之JDK8连接MySQL数据库失败 面试系列之自我介绍 面试总结 算法思维
Dubbo之应用案例 SpringMVC应用 SpringCloud Alibaba之微服务开发 java并发编程之线程池 Spring之手写IoC框架 SpringCloud Alibaba之Sentinel

COPYRIGHT © 2021 rubinchu.com. ALL RIGHTS RESERVED.

Theme Kratos Made By Seaton Jiang

京ICP备19039146号-1