消费发送
生产者向消息队列里写入消息,不同的业务场景需要生产者采用不同的写入策略。比如同步发送、异步发送、Oneway发送、延迟发送、发送事务消息等。 默认使用的是DefaultMQProducer
类,发送消息要经过五个步骤:
- 设置
Producer
的GroupName
- 设置
InstanceName
,当一个JVM
需要启动多个Producer
的时候,通过设置不同的InstanceName
来区分,不设置的话系统使用默认名称DEFAULT
- 设置发送失败重试次数,当网络出现异常的时候,这个次数影响消息的重复投递次数。想保证不丢消息,可以设置多重试几次
- 设置
NameServer
地址 - 组装消息并发送
消息发生返回状态(SendResult
#SendStatus
)有如下四种:
- FLUSH_DISK_TIMEOUT
- FLUSH_SLAVE_TIMEOUT
- SLAVE_NOT_AVAILABLE
- SEND_OK
不同状态在不同的刷盘策略和同步策略的配置下含义是不同的:
- FLUSH_DISK_TIMEOUT:表示没有在规定时间内完成刷盘(需要Broker的刷盘策略被设置成
SYNC_FLUSH
才会报这个错误) - FLUSH_SLAVE_TIMEOUT:表示在主备方式下,并且Broker被设置成
SYNC_MASTER
方式,没有在设定时间内完成主从同步 - SLAVE_NOT_AVAILABLE:这个状态产生的场景和
FLUSH_SLAVE_TIMEOUT
类似,表示在主备方式下,并且Broker被设置成SYNC_MASTER
,但是没有找到被配置成Slave的Broker - SEND_OK:表示发送成功,发送成功的具体含义,比如消息是否已经被存储到磁盘?消息是否被同步到了Slave上?消息在Slave上是否被写入磁盘?需要结合所配置的刷盘策略、主从策略来定。这个状态还可以简单理解为,没有发生上面列出的三个问题状态就是SEND_OK
写一个高质量的生产者程序,重点在于对发送结果的处理,要充分考虑各种异常,写清对应的处理逻辑。
提升写入的性能
发送一条消息出去要经过三步:
- 客户端发送请求到服务器
- 服务器处理该请求
- 服务器向客户端返回应答
一次消息的发送耗时是上述三个步骤的总和。
在一些对速度要求高,但是可靠性要求不高的场景下,比如日志收集类应用, 可以采用Oneway
方式发送。Oneway
方式只发送请求不等待应答,即将数据写入客户端的Socket缓冲区就返回,不等待对方返回结果。用这种方式发送消息的耗时可以缩短到微秒级。另一种提高发送速度的方法是增加Producer的并发量,使用多个Producer同时发送,我们不用担心多Producer同时写会降低消息写磁盘的效率,RocketMQ引入了一个并发窗口,在窗口内消息可以并发地写入DirectMem中,然后异步地将连续一段无空洞的数据刷入文件系统当中。
顺序写CommitLog可让RocketMQ无论在HDD还是SSD磁盘情况下都能保持较高的写入性能。
目前在阿里内部经过调优的服务器上,写入性能达到90万+的TPS,我们可以参考这个数据进行系统优化。
在Linux操作系统层级进行调优,推荐使用EXT4文件系统,IO调度算法使用deadline算法。
消息消费
简单总结消费的几个要点:
- 消息消费方式(Pull和Push)
- 消息消费的模式(广播模式和集群模式)
- 流量控制(可以结合sentinel来实现)
- 并发线程数设置
- 消息的过滤(Tag、Key) TagA||TagB||TagC * null
当Consumer的处理速度跟不上消息的产生速度,会造成越来越多的消息积压,这个时候首先查看消费逻辑本身有没有优化空间,除此之外还有三种方法可以提高Consumer的处理能力:
- 提高消费并行度
在同一个ConsumerGroup下(Clustering方式),可以通过增加Consumer实例的数量来提高并行度。
通过加机器,或者在已有机器中启动多个Consumer进程都可以增加Consumer实例数。注意:总的Consumer数量不要超过Topic下Read Queue数量,超过的Consumer实例接收不到消息。
此外,通过提高单个Consumer实例中的并行处理的线程数,可以在同一个Consumer内增加并行度来提高吞吐量(设置方法是修改consumeThreadMin
和consumeThreadMax
)。
- 以批量方式进行消费
某些业务场景下,多条消息同时处理的时间会大大小于逐个处理的时间总和,比如消费消息中涉及update某个数据库,一次update10条的时间会大大小于十次update1条数据的时间。
可以通过批量方式消费来提高消费的吞吐量。实现方法是设置Consumer的consumeMessageBatchMaxSize
这个参数,默认是1,如果设置为N,在消息多的时候每次收到的是个长度为N的消息链表。
- 检测延时情况,跳过非重要消息
Consumer在消费的过程中,如果发现由于某种原因发生严重的消息堆积,短时间无法消除堆积,这个时候可以选择丢弃不重要的消息,使Consumer尽快追上Producer的进度。
消息存储
存储介质
- 关系型数据库DB
Apache下开源的另外一款MQ—ActiveMQ(默认采用的KahaDB做消息存储)可选用JDBC的方式来做消息持久化,通过简单的xml配置信息即可实现JDBC消息存储。由于,普通关系型数据库(如Mysql)在单表数据量达到千万级别的情况下,其IO读写性能往往会出现瓶颈。在可靠性方面,该种方案非常依赖DB,如果一旦DB出现故障,则MQ的消息就无法落盘存储会导致线上故障。
- 文件系统
目前业界较为常用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)。消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署MQ机器本身或是本地磁盘挂了,否则一般是不会出现无法持久化的故障问题。
性能对比
文件系统 > 关系型数据库DB
消息的存储和发送
- 消息存储
目前的高性能磁盘,顺序写速度可以达到600MB/s, 超过了一般网卡的传输速度。但是磁盘随机写的速度只有大概100KB/s,和顺序写的性能相差6000倍!因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。
RocketMQ的消息用顺序写,保证了消息存储的速度。
- 存储结构
RocketMQ消息的存储是由ConsumeQueue
和CommitLog
配合完成 的,消息真正的物理存储文件是CommitLog
,ConsumeQueue
是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每 个Topic
下的每个Message Queue
都有一个对应的ConsumeQueue
文件。
消息存储架构图中主要有下面三个跟消息存储相关的文件构成:
1、CommitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824。当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件
2、ConsumeQueue:消息消费队列,引入的目的主要是提高消息消费的性能。RocketMQ是基于主题Topic的订阅模式,消息消费是针对主题进行。如果要遍历CommitLog文件根据Topic检索消息是非常低效。Consumer即可根据ConsumeQueue
来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引:保存了指定Topic下的队列消息在CommitLog
中的起始物理偏移量offset;消息大小size;消息Tag的HashCode值。ConsumeQueue文件可以看成是基于Topic的CommitLog索引文件,故ConsumeQueue文件夹的组织方式为:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。
ConsumeQueue文件采取定长设计,每个条目共20个字节,分别为:
- 8字节的commitlog物理偏移量
- 4字节的消息长度
- 8字节tag hashcode
单个文件由30W个条目组成,可以像数组一样随机访问每一个条目。每个ConsumeQueue
文件大小约5.72M。
3、IndexFile:IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。Index文件的存储位置是: $HOME/store/index/${fileName},文件名fileName是以创建时的时间戳命名的。固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap
结构,故RocketMQ的索引文件其底层实现为hash索引。
过滤消息
RocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件,是在Consumer端订阅消息时再做消息过滤的。
RocketMQ这么做是在于其Producer端写入消息和Consumer端订阅消息采用分离存储的机制来实现的,Consumer端订阅消息是需要通过ConsumeQueue这个消息消费的逻辑队列拿到一个索引,然后再从CommitLog里面读取真正的消息实体内容,所以说到底也是还绕不开其存储结构。
其ConsumeQueue的存储结构如下,可以看到其中有8个字节存储的Message Tag的哈希值,基于Tag的消息过滤正式基于这个字段值的。
主要支持如下2种的过滤方式:
- Tag过滤方式:Consumer端在订阅消息时除了指定Topic还可以指定TAG,如果一个消息有多个TAG,可以用||分隔。Consumer端会将这个订阅请求构建成一个
SubscriptionData
,发送一个Pull消息的请求给Broker端。Broker端从RocketMQ的文件存储层—Store读取数据之前,会用这些数据先构建一个MessageFilter
,然后传给Store。Store从ConsumeQueue读取到一条记录后,会用它记录的消息Tag Hash值去做过滤。在服务端只是根据hashcode进行判断,无法精确对Tag原始字符串进行过滤,在消息消费端拉取到消息后,还需要对消息的原始Tag字符串进行比对,如果不同,则丢弃该消息,不进行消息消费 - SQL92的过滤方式:仅对push的消费者起作用。Tag方式虽然效率高,但是支持的过滤逻辑比较简单。SQL表达式可以更加灵活的支持复杂过滤逻辑,这种方式的大致做法和上面的Tag过滤方式一样,只是在Store层的具体过滤过程不太一样。真正的
SQL expression
的构建和执行由rocketmq-filter
模块负责的。每次过滤都去执行SQL表达式会影响效率,所以RocketMQ使用了BloomFilter
避免了每次都去执行。SQL92的表达式上下文为消息的属性。
首先需要开启支持SQL92的特性,然后重启broker:
conf/broker.conf
mqbroker -n localhost:9876 -c /opt/rocket/conf/broker.conf
RocketMQ仅定义了几种基本的语法,用户可以扩展:
- 数字比较: >, >=, <, <=, BETWEEN, =
- 字符串比较: =, <>, IN,IS NULL或者IS NOT NULL
- 逻辑比较: AND, OR, NOT
- 常量比较: 数字如:123, 3.1415; 字符串如:'abc',必须是单引号引起来;NULL;特殊常量 布尔型如:TRUE or FALSE
除了以上两种内置的过滤方式之外,还支持用户自定义Java函数来对消息进行过滤,这种方式称为:Filter Server方式。
要使用Filter Server,首先要在启动Broker前在配置文件里加上filterServer-Nums=3
这样的配置,Broker在启动的时候,就会在本机启动3个Filter Server进程。Filter Server类似一个RocketMQ的Consumer进程,它从本机Broker获取消息,然后根据用户上传过来的Java函数进行过滤,过滤后的消息再传给远端的Consumer。
这种方式会占用很多Broker机器的CPU资源,要根据实际情况谨慎使用。上传的java代码也要经过检查,不能有申请大内存、创建线程等这样的操作,否则容易造成Broker服务器宕机。
零拷贝原理
PageCache
- 由内存中的物理page组成,其内容对应磁盘上的block
- page cache的大小是动态变化的
- backing store: cache缓存的存储设备
- 一个page通常包含多个block, 而block不一定是连续的
读Cache
- 当内核发起一个读请求时, 先会检查请求的数据是否缓存到了page cache中。如果有,那么直接从内存中读取,不需要访问磁盘, 此即 cache hit(缓存命中);如果没有, 就必须从磁盘中读取数据, 然后内核将读取的数据再缓存到cache中,如此后续的读请求就可以命中缓存了
- page可以只缓存一个文件的部分内容, 而不需要把整个文件都缓存进来
写Cache
- 当内核发起一个写请求时, 也是直接往cache中写入,后备存储中的内容不会直接更新
- 内核会将被写入的page标记为dirty,并将其加入到dirty list中
- 内核会周期性地将dirty list中的page写回到磁盘上,从而使磁盘上的数据和内存中缓存的数据一致
cache回收
- page cache的另一个重要工作是释放page,从而释放内存空间
- cache回收的任务是选择合适的page释放,如果page是dirty的, 需要将page写回到磁盘中再释放
cache和buffer的区别
Cache
缓存区,是高速缓存,是位于CPU和主内存之间的容量较小但速度很快的存储器,因为CPU的速度远远高于主内存的速度,CPU从内存中读取数据需等待很长的时间,而Cache保存着CPU刚用过的数据或循环使用的部分数据,这时从Cache中读取数据会更快,减少了CPU等待的时间,提高了系统的性能。
Cache并不是缓存文件的,而是缓存块的(块是I/O读写最小的单元)。Cache一般会用在I/O请求上,如果多个进程要访问某个文件,可以把此文件读入Cache中,这样下一个进程获取CPU控制权并访问此文件直接从Cache读取,提高系统性能。
Buffer
缓冲区,用于存储速度不同步的设备或优先级不同的设备之间传输数据。通过Buffer可以减少进程间通信需要等待的时间,当存储速度快的设备与存储速度慢的设备进行通信时,存储慢的数据先把数据存放到Buffer,达到一定程度存储快的设备再读取Buffer的数据,在此期间存储快的设备CPU可以干其他的事情。
Buffer一般是用在写入磁盘的,例如:某个进程要求多个字段被读入,当所有要求的字段被读入之前已经读入的字段会先放到Buffer中。
HeapByteBuffer和DirectByteBuffer
HeapByteBuffer,是在JVM堆上面一个Buffer,底层的本质是一个数组,用类封装维护了很多的索引(limit/position/capacity等)。
DirectByteBuffer,底层的数据是维护在操作系统的内存中,而不是JVM里,DirectByteBuffer里维护了一个引用address指向数据,进而操作数据。
HeapByteBuffer优点:内容维护在JVM里,把内容写进Buffer里速度快,更容易回收。
DirectByteBuffer优点:跟外设(IO设备)打交道时会快很多,因为外设读取JVM堆里的数据时,不是直接读取的,而是把JVM里的数据读到一个内存块里,再在这个块里读取的,如果使用DirectByteBuffer,则可以省去这一步,实现ZERO COPY(零拷贝)。
外设之所以要把JVM堆里的数据copy出来再操作,不是因为操作系统不能直接操作JVM内存,而是因为JVM在进行GC(垃圾回收)时,会对数据进行移动,一旦出现这种问题,外设就会出现数据错乱的情况。
注意:所有的通过allocate
方法创建的Buffer都是HeapByteBuffer。
堆外内存实现零拷贝
- 前者分配在JVM堆上(
ByteBuffer.allocate()
),后者分配在操作系统物理内存上(ByteBuffer.allocateDirect()
,JVM使用C库中的malloc()
方法分配堆外内存) - DirectByteBuffer可以减少JVM GC压力,当然,堆中依然保存对象引用,FULL GC发生时也会回收直接内存,也可以通过
system.gc()
主动通知JVM回收,或者通过Cleaner.clean()
主动清理。Cleaner.create()
方法需要传入一个DirectByteBuffer
对象和一个Deallocator
(一个堆外内存回收线程)。GC发生时发现堆中的DirectByteBuffer
对象没有强引用了,则调用Deallocator
的run()方法回收直接内存,并释放堆中DirectByteBuffer
的对象引用 - 底层I/O操作需要连续的内存(JVM堆内存容易发生GC和对象移动),所以在执行write操作时需要将
HeapByteBuffer
数据拷贝到一个临时的(操作系统用户态)内存空间中,会多一次额外拷贝。而DirectByteBuffer
则可以省去这个拷贝动作,这是Java层面的 “零拷贝” 技术,在Netty中广泛使用 MappedByteBuffer
底层使用了操作系统的mmap机制,FileChannel#map()
方法就会返回MappedByteBuffer
。DirectByteBuffer
虽然实现了MappedByteBuffer
,不过DirectByteBuffer
默认并没有直接使用mmap机制
缓冲IO和直接IO
缓存IO
缓存I/O又被称作标准I/O,大多数文件系统的默认I/O操作都是缓存I/O。在Linux的缓存I/O机制中,数据先从磁盘复制到内核空间的缓冲区,然后从内核空间缓冲区复制到应用程序的地址空间。
读操作:操作系统检查内核的缓冲区有没有需要的数据,如果已经缓存了,那么就直接从缓存中返回;否则从磁盘中读取,然后缓存在操作系统的缓存中。
写操作:将数据从用户空间复制到内核空间的缓存中。这时对用户程序来说写操作就已经完成,至于什么时候再写到磁盘中由操作系统决定,除非显示地调用了sync同步命令。
缓存I/O的优点:
- 在一定程度上分离了内核空间和用户空间,保护系统本身的运行安全
- 可以减少读盘的次数,从而提高性能
缓存I/O的缺点:
- 在缓存 I/O 机制中,DMA方式可以将数据直接从磁盘读到页缓存中,或者将数据从页缓存直接写回到磁盘上,而不能直接在应用程序地址空间和磁盘之间进行数据传输。数据在传输过程中就需要在应用程序地址空间(用户空间)和缓存(内核空间)之间进行多次数据拷贝操作,这些数据拷贝操作所带来的CPU以及内存开销是非常大的
直接IO
直接IO就是应用程序直接访问磁盘数据,而不经过内核缓冲区,这样做的目的是减少一次从内核缓冲区到用户程序缓存的数据复制。比如说数据库管理系统这类应用,它们更倾向于选择它们自己的缓存机制,因为数据库管理系统往往比操作系统更了解数据库中存放的数据,数据库管理系统可以提供一种更加有效的缓存机制来提高数据库中数据的存取性能。
直接IO的缺点:如果访问的数据不在应用程序缓存中,那么每次数据都会直接从磁盘加载,这种直接加载会非常缓慢。通常直接IO与异步IO结合使用,会得到比较好的性能。
下图分析了写场景下的DirectIO和BufferIO:
内存映射文件(mmap)
在LINUX中我们可以使用mmap用来在进程虚拟内存地址空间中分配地址空间,创建和物理内存的映射关系。
映射关系可以分为两种:
- 文件映射:磁盘文件映射进程的虚拟地址空间,使用文件内容初始化物理内存
- 匿名映射:初始化全为0的内存空间
而对于映射关系是否共享又分为:
- 私有映射(MAP_PRIVATE):多进程间数据共享,修改不反应到磁盘实际文件,是一个copy-on-write(写时复制)的映射方式
- 共享映射(MAP_SHARED):多进程间数据共享,修改反应到磁盘实际文件中
因此总结起来有4种组合:
- 私有文件映射:多个进程使用同样的物理内存页进行初始化,但是各个进程对内存文件的修改不会共享,也不会反应到物理文件中
- 私有匿名映射:mmap会创建一个新的映射,各个进程不共享,这种使用主要用于分配内存(malloc分配大内存会调用mmap)。 例如开辟新进程时,会为每个进程分配虚拟的地址空间,这些虚拟地址映射的物理内存空间各个进程间读的时候共享,写的时候会copy-on-write
- 共享文件映射:多个进程通过虚拟内存技术共享同样的物理内存空间,对内存文件的修改会反应到实际物理文件中,它也是进程间通信(IPC)的一种机制
- 共享匿名映射:这种机制在进行fork的时候不会采用写时复制,父子进程完全共享同样的物理内存页,这也就实现了父子进程通信(IPC)
mmap只是在虚拟内存分配了地址空间,只有在第一次访问虚拟内存的时候才分配物理内存。
在mmap之后,并没有在将文件内容加载到物理页上,只在虚拟内存中分配了地址空间。当进程在访问这段地址时,通过查找页表,发现虚拟内存对应的页没有在物理内存中缓存,则产生"缺页",由内核的缺页异常处理程序处理,将文件对应内容,以页为单位(4096)加载到物理内存,注意是只加载缺页,但也会受操作系统一些调度策略影响,加载的比所需的多。
直接内存读取并发送文件的过程
mmap读取并发送文件的过程
sendfile零拷贝读取并发送文件的过程
零拷贝(zero copy)小结
- 虽然叫零拷贝,实际上sendfile有2次数据拷贝的。第1次是从磁盘拷贝到内核缓冲区,第二次是从内核缓冲区拷贝到网卡(协议引擎)。如果网卡支持 SG-DMA(The Scatter-Gather Direct Memory Access)技术,就无需从PageCache拷贝至 Socket 缓冲区
- 之所以叫零拷贝,是从内存角度来看的,数据在内存中没有发生过拷贝,只是在内存和I/O设备之间传输。很多时候我们认为sendfile才是零拷贝,mmap严格来说不算
- Linux中的API为
sendfile
、mmap
,Java中的API为FileChanel.transferTo()
、FileChannel.map()
等 - Netty、Kafka(sendfile)、RocketMQ(mmap)、Nginx等高性能中间件中,都有大量利用操作系统零拷贝特性
同步复制和异步复制
如果一个Broker组有Master和Slave,消息需要从Master复制到Slave 上,有同步和异步两种复制方式。
同步复制
同步复制方式是等Master和Slave均写成功后才反馈给客户端写成功状态。
在同步复制方式下,如果Master出故障,Slave上有全部的备份数据,容易恢复,但是同步复制会增大数据写入延迟,降低系统吞吐量。
异步复制
异步复制方式是只要Master写成功 即可反馈给客户端写成功状态。
在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因为没有被写 入Slave,有可能会丢失。
配置
同步复制和异步复制是通过Broker配置文件里的brokerRole
参数进行设置的,这个参数可以被设置成ASYNC_MASTER
、 SYNC_MASTER
、SLAVE
三个值中的一个。
Broker的配置文件是/opt/rocket/conf/broker.conf
。
参数名 | 默认值 | 说明 |
listenPort | 10911 | 接受客户端连接的监听端口 |
namesrvAddr | null | nameServer 地址 |
brokerIP1 | 网卡的InetAddress | 当前broker监听的IP |
brokerIP2 | 跟brokerIP1一样 | 存在主从broker时,如果在broker主节点上配置了 brokerIP2属性,broker从节点会连接主节点配置的 brokerIP2进行同步 |
brokerName | null | broker的名称 |
brokerClusterName | DefaultCluster | 本broker所属的Cluser名称 |
brokerId | 0 | broker id:0 表示 master, 其他的正整数表示 slave |
storePathCommitLog | $HOME/store/commitlog/ | 存储commit log的路径 |
storePathConsumerQueue | $HOME/store/consumequeue/ | 存储consume queue的路径 |
mapedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | commit log的映射文件大小 |
deleteWhen | 04 | 在每天的什么时间删除已经超过文件保留时间的 commit log |
fileReserverdTime | 72 | 以小时计算的文件保留时间 |
brokerRole | ASYNC_MASTER | SYNC_MASTER 或者ASYNC_MASTER 或者SLAVE SYNC_MASTER 表示当前broker是一个同步复制的MasterASYNC_MASTER 表示当前broker是一个异步复制的MasterSLAVE 表示当前borker是一个Slave |
flushDiskType | ASYNC_FLUSH | SYNC_FLUSH /ASYNC_FLUSH SYNC_FLUSH 模式下的broker保证在收到确认生产者之前将消息刷盘ASYNC_FLUSH 模式下的broker则利用刷盘一组消息的模式,可以取得更好的性能 |
总结
实际应用中要结合业务场景,合理设置刷盘方式和主从复制方式, 尤其是SYNC_FLUSH
方式,由于频繁地触发磁盘写动作,会明显降低性能。通常情况下,应该把Master和Save配置成ASYNC_FLUSH
的刷盘方式,主从之间配置成SYNC_MASTER
的复制方式,这样即使有一台机器出故障,仍然能保证数据不丢,是个不错的选择。
高可用机制
RocketMQ分布式集群是通过Master和Slave的配合达到高可用性的。
Master和Slave的区别:
- 在Broker的配置文件中,参数
brokerId
的值为0表明这个Broker是Master,大于0表明这个Broker是Slave。brokerRole
参数也说明这个Broker是Master还是Slave(SYNC_MASTER
/ASYNC_MASTER
/SALVE
) - Master角色的Broker支持读和写,Slave角色的Broker仅支持读。Consumer可以连接Master角色的Broker,也可以连接Slave角色的Broker来读取消息
消息消费高可用
在Consumer的配置文件中,并不需要设置是从Master读还是从Slave 读,当Master不可用或者繁忙的时候,Consumer会被自动切换到从Slave读。
有了自动切换Consumer这种机制,当一个Master角色的机器出现故障后,Consumer仍然可以从Slave读取消息,不影响Consumer程序。
这就达到了消费端的高可用性。
消息发送高可用
如何达到发送端的高可用性呢?
在创建Topic的时候,把Topic的多个Message Queue创建在多个Broker组上(相同Broker名称,不同brokerId的机器组成一个Broker组),这样既可以在性能方面具有扩展性,也可以降低主节点故障对整体上带来的影响,而且当一个Broker组的Master不可用后,其他组的Master仍然可用,Producer仍然可以发送消息的。
RocketMQ目前还不支持把Slave自动转成Master,如果机器资源不足,需要把Slave转成Master:
- 手动停止Slave角色的Broker
- 更改配置文件
- 用新的配置文件启动Broker
这种早期方式在大多数场景下都可以很好的工作,但也面临一些问题。
比如,在需要保证消息严格顺序的场景下,由于在主题层面无法保证严格顺序,所以必须指定队列来发送消息,对于任何一个队列,它一定是落在一组特定的主从节点上,如果这个主节点宕机,其他的主节点是无法替代这个主节点的,否则就无法保证严格顺序。
在这种复制模式下,严格顺序和高可用只能选择一个。
RocketMQ 在 2018 年底迎来了一次重大的更新,引入 Dledger,增加了一种全新的复制方式。
RocketMQ 引入 Dledger,使用新的复制方式,可以很好地解决这个问题。Dledger 在写入消息的时候,要求至少消息复制到半数以上的节点之后,才给客户端返回写入成功,并且它是支持通过选举来动态切换主节点的。
当然,Dledger的复制方式也不是完美的,依然存在一些不足:
- 比如,选举过程中不能提供服务
- 最少需要 3 个节点才能保证数据一致性,3节点时,只能保证1个节点宕机时可用,如果2个节点同时宕机,即使还有 1 个节点存活也无法提供服务,资源的利用率比较低
- 由于至少要复制到半数以上的节点才返回写入成功,性能上也不如主从异步复制的方式快
刷盘机制
RocketMQ 的所有消息都是持久化的,先写入系统PageCache,然后刷盘,可以保证内存与磁盘都有一份数据, 访问时,直接从内存读取。消息在通过Producer写入RocketMQ的时候,有两种写磁盘方式,分布式同步刷盘和异步刷盘。
同步刷盘
同步刷盘与异步刷盘的唯一区别是异步刷盘写完 PageCache直接返回,而同步刷盘需要等待刷盘完成才返回, 同步刷盘流程如下:
- 写入PageCache后,线程等待,通知刷盘线程刷盘
- 刷盘线程刷盘后,唤醒前端等待线程,可能是一批线程
- 前端等待线程向用户返回成功
异步刷盘
在有RAID卡,SAS 15000 转磁盘测试顺序写文件,速度可以达到300M每秒左右,而线上的网卡一般都为千兆网卡,写磁盘速度明显快于数据网络入口速度,那么是否可以做到写完内存就向用户返回,由后台线程刷盘呢?
由于磁盘速度大于网卡速度,那么刷盘的进度肯定可以跟上消息的写入速度
万一由于此时系统压力过大,可能堆积消息,除了写入 IO,还有读取 IO,万一出现磁盘读取落后情况, 会不会导致系统内存溢出,答案是否定的,原因如下:
- 写入消息到PageCache时,如果内存不足,则尝试丢弃干净的PAGE,腾出内存供新消息使用,策略是LRU方式
- 如果干净页不足,此时写入PageCache会被阻塞,系统尝试刷盘部分数据,大约每次尝试32个PAGE , 来找出更多干净 PAGE
负载均衡
RocketMQ中的负载均衡都在Client端完成,具体来说的话,主要可以分为Producer端发送消息时候的负载均衡和Consumer端订阅消息的负载均衡。
Producer的负载均衡
如图所示,5个队列可以部署在一台机器上,也可以分别部署在5台不同的机器上,发送消息通过轮询队列的方式发送,每个队列接收平均的消息量。通过增加机器,可以水平扩展队列容量。 另外也可以自定义方式选择发往哪个队列。
# 创建主题
[root@node1 ~]# mqadmin updateTopic -n localhost:9876 -t tp_demo_02 -w 6 -b
localhost:10911
DefaultMQProducer producer = new DefaultMQProducer("producer_grp_02");
producer.setNamesrvAddr("node1:9876");
producer.start();
Message message = new Message();
message.setTopic("tp_demo_02");
message.setBody("hello rubin".getBytes());
// 指定MQ
SendResult result = producer.send(message,
new MessageQueue("tp_demo_06", "node1", 5),
1_000
);
System.out.println(result.getSendStatus());
producer.shutdown();
Consumer的负载均衡
如图所示,如果有5个队列,2 个Consumer,那么第一个Consumer消费3个队列,第二Consumer消费2个队列。 这样即可达到平均消费的目的,可以水平扩展Consumer 来提高消费能力。但是Consumer 数量要小于等于队列数量,如果Consumer超过队列数量,那么多余的Consumer 将不能消费消息 。
在RocketMQ中,Consumer端的两种消费模式(Push/Pull)底层都是基于拉模式来获取消息的,而在Push模式只是对pull模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息。
如果未拉取到消息,则延迟一下又继续拉取。
在两种基于拉模式的消费方式(Push/Pull)中,均需要Consumer端在知道从Broker端的哪一个消息队列中去获取消息。
因此,有必要在Consumer端来做负载均衡,即Broker端中多个MessageQueue分配给同一个ConsumerGroup中的哪些Consumer消费。
要做负载均衡,必须知道一些全局信息,也就是一个ConsumerGroup里到底有多少个Consumer。知道了全局信息,才可以根据某种算法来分配,比如简单地平均分到各个Consumer。
在RocketMQ中,负载均衡或者消息分配是在Consumer端代码中完成的,Consumer从Broker处获得全局信息,然后自己做负载均衡,只处理分给自己的那部分消息。
Pull Consumer可以看到所有的Message Queue,而且从哪个Message Queue读取消息,读消息时的Offset都由使用者控制,使用者可以实现任何特殊方式的负载均衡。
DefaultMQPullConsumer
有两个辅助方法可以帮助实现负载均衡,一个是registerMessageQueueListener
函数,一个是MQPullConsumerScheduleService
(使用这个Class类似使用DefaultMQPushConsumer
,但是它把Pull消息的主动性留给了使用者)。
public class MyConsumer {
public static void main(String[] args) throws MQClientException,
RemotingException, InterruptedException, MQBrokerException {
DefaultMQPullConsumer consumer = new
DefaultMQPullConsumer("consumer_pull_grp_01");
consumer.setNamesrvAddr("node1:9876");
consumer.start();
Set<MessageQueue> messageQueues =
consumer.fetchSubscribeMessageQueues("tp_demo_01");
for (MessageQueue messageQueue : messageQueues) {
// 指定从哪个MQ拉取数据
PullResult result = consumer.pull(messageQueue, "*", 0L, 10);
List<MessageExt> msgFoundList = result.getMsgFoundList();
for (MessageExt messageExt : msgFoundList) {
System.out.println(messageExt);
}
}
consumer.shutdown();
}
}
DefaultMQPushConsumer
的负载均衡过程不需要使用者操心,客户端程序会自动处理,每个DefaultMQPushConsumer
启动后,会马上会触发一个doRebalance
动作,而且在同一个ConsumerGroup
里加入新的DefaultMQPushConsumer
时,各个Consumer都会被触发doRebalance
动作。
负载均衡的分配粒度只到Message Queue,把Topic下的所有Message Queue分配到不同的Consumer中。
如下图所示,具体的负载均衡算法有几种,默认用的是AllocateMessageQueueAveragely
。
我们可以设置负载均衡的算法:
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("consumer_push_grp_01");
consumer.setNamesrvAddr("node1:9876");
// 设置负载均衡算法
consumer.setAllocateMessageQueueStrategy(new
AllocateMessageQueueAveragely());
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
// todo 处理接收到的消息
return null;
}
});
consumer.start();
以AllocateMessageQueueAveragely
策略为例,如果创建Topic的时候,把Message Queue数设为3,当Consumer数量为2的时候,有一个Consumer需要处理Topic三分之二的消息,另一个处理三分之一的消息;当Consumer数量为4的时候,有一个Consumer无法收到消息,其他3个Consumer各处理Topic三分之一的消息。
可见Message Queue数量设置过小不利于做负载均衡,通常情况下,应把一个Topic的Message Queue数设置为16。
消息重试
顺序消息的重试
对于顺序消息,当消费者消费消息失败后,消息队列RocketMQ会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("consumer_grp_04_01");
consumer.setNamesrvAddr("node1:9876");
consumer.setConsumeMessageBatchMaxSize(1);
consumer.setConsumeThreadMin(1);
consumer.setConsumeThreadMax(1);
// 消息订阅
consumer.subscribe("tp_demo_04", "*");
// 并发消费
// consumer.setMessageListener(new MessageListenerConcurrently() {
// @Override
// public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
msgs, ConsumeConcurrentlyContext context) {
// return null;
// }
// });
// 顺序消费
consumer.setMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(msg.getMsgId() + "\t" + msg.getQueueId() +
"\t" + new String(msg.getBody()));
}
return null;
}
});
consumer.start();
无序消息的重试
对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您可以通过设置返回状态达到消息重试的结果。
无序消息的重试只针对集群消费方式生效。广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。
重试次数
消息队列RocketMQ默认允许每条消息最多重试16次,每次重试的间隔时间如下:
第几次重试 | 与上次重试的间隔时间 | 第几次重试 | 与上次重试的间隔时间 |
1 | 10 秒 | 9 | 7 分钟 |
2 | 30 秒 | 10 | 8 分钟 |
3 | 1 分钟 | 11 | 9 分钟 |
4 | 2 分钟 | 12 | 10 分钟 |
5 | 3 分钟 | 13 | 20 分钟 |
6 | 4 分钟 | 14 | 30 分钟 |
7 | 5 分钟 | 15 | 1 小时 |
8 | 6 分钟 | 16 | 2 小时 |
如果消息重试 16 次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的4小时46分钟之内进行16次重试,超过这个时间范围消息将不再重试投递。
注意: 一条消息无论重试多少次,这些重试消息的Message ID不会改变。
配置方式
消费失败后,重试配置方式
集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):
- 返回
ConsumeConcurrentlyStatus.RECONSUME_LATER
(推荐) - 返回
Null
- 抛出异常
public class MyConcurrentlyMessageListener implements
MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
//处理消息
doConsumeMessage(msgs);
//方式1:返回 ConsumeConcurrentlyStatus.RECONSUME_LATER,消息将重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
//方式2:返回 null,消息将重试
return null;
//方式3:直接抛出异常, 消息将重试
throw new RuntimeException("Consumer Message exceotion");
}
}
消费失败后,不重试配置方式
集群消费方式下,消息失败后期望消息不重试,需要捕获消费逻辑中可能抛出的异常,最终返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS
,此后这条消息将不会再重试。
public class MyConcurrentlyMessageListener implements
MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
doConsumeMessage(msgs);
} catch (Throwable e) {
//捕获消费逻辑中的所有异常,并返回
ConsumeConcurrentlyStatus.CONSUME_SUCCESS
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
//消息处理正常,直接返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
自定义消息最大重试次数
消息队列 RocketMQ 允许Consumer启动的时候设置最大重试次数,重试时间间隔将按照如下策略:
- 最大重试次数小于等于16次,则重试时间间隔同上表描述
- 最大重试次数大于16次,超过16次的重试时间间隔均为每次2小时
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("consumer_grp_04_01");
// 设置重新消费的次数
// 共16个级别,大于16的一律按照2小时重试
consumer.setMaxReconsumeTimes(20);
注意:
- 消息最大重试次数的设置对相同Group ID下的所有Consumer实例有效
- 如果只对相同Group ID下两个Consumer实例中的其中一个设置了
MaxReconsumeTimes
,那么该配置对两个Consumer实例均生效 - 配置采用覆盖的方式生效,即最后启动的Consumer实例会覆盖之前的启动实例的配置
获取消息重试次数
消费者收到消息后,可按照如下方式获取消息的重试次数:
public class MyConcurrentlyMessageListener implements
MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(msg.getReconsumeTimes());
}
doConsumeMessage(msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
死信队列
RocketMQ中消息重试超过一定次数后(默认16次)就会被放到死信队列中,在消息队列RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。可以在控制台Topic列表中看到“DLQ”相关的Topic,默认命名是:
- %RETRY%消费组名称(重试Topic)
- %DLQ%消费组名称(死信Topic)
死信队列也可以被订阅和消费,并且也会过期。
可视化工具:rocketmq-console下载地址:https://github.com/apache/rocketmq-externals/archive/rocketmq-console-1.0.0.zip。也可以在附件中下载。
使用jdk8:
# 编译打包
mvn clean package -DskipTests
# 运行工具
java -jar target/rocketmq-console-ng-1.0.0.jar
页面设置NameSrv地址即可。如果不生效,就直接修改项目的application.properties中的namesrv地址选项的值。
死信特性
死信消息具有以下特性:
- 不会再被消费者正常消费
- 有效期与正常消息相同,均为3天,3天后会被自动删除。因此,请在死信消息产生后的3天内及时处理
- 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例
- 如果一个Group ID未产生死信消息,消息队列RocketMQ不会为其创建相应的死信队列
- 一个死信队列包含了对应Group ID产生的所有死信消息,不论该消息属于哪个Topic
查看死信信息
在控制台查询出现死信队列的主题信息
在消息界面根据主题查询死信消息
选择重新发送消息。一条消息进入死信队列,意味着某些因素导致消费者无法正常消费该消息,因此,通常需要您对其进行特殊处理。排查可疑因素并解决问题后,可以在消息队列RocketMQ控制台重新发送该消息,让消费者重新消费一次。
延迟消息
定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。 broker有配置项messageDelayLevel
,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义messageDelayLevel
。注意,messageDelayLevel
是broker的属性,不属于某个topic。发消息时,设置delayLevel
等级即可:msg.setDelayLevel(level)
。level有以下三种情况:
- level == 0,消息为非延迟消息
- 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
- level > maxLevel,则level== maxLevel,例如level==20,延迟2h
定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel
存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。
需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。
查看SCHEDULE_TOPIC_XXXX主题信息:
生产者:
public class MyProducer {
public static void main(String[] args) throws MQClientException,
RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new
DefaultMQProducer("producer_grp_06_01");
producer.setNamesrvAddr("node1:9876");
producer.start();
Message message = null;
for (int i = 0; i < 20; i++) {
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
message = new Message("tp_demo_06", ("hello rubin - " +
i).getBytes());
// 设置延迟级别,0表示不延迟,大于18的总是延迟2h
message.setDelayTimeLevel(i);
producer.send(message);
}
producer.shutdown();
}
}
消费者:
public class MyConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("consumer_grp_06_01");
consumer.setNamesrvAddr("node1:9876");
consumer.subscribe("tp_demo_06", "*");
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus
consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(System.currentTimeMillis() / 1000);
for (MessageExt msg : msgs) {
System.out.println(
msg.getTopic() + "\t"
+ msg.getQueueId() + "\t"
+ msg.getMsgId() + "\t"
+ msg.getDelayTimeLevel() + "\t"
+ new String(msg.getBody())
);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
顺序消息
顺序消息是指消息的消费顺序和产生顺序相同,在有些业务逻辑下,必须保证顺序。比如订单的生成、付款、发货,这3个消息必须按顺序处理才行。
顺序消息分为全局顺序消息和部分顺序消息:
- 全局顺序消息指某个Topic下的所有消息都要保证顺序
- 部分顺序消息只要保证每一组消息被顺序消费即可,比如上面订单消息的例子,只要保证同一个订单ID的三个消息能按顺序消费即可
在多数的业务场景中实际上只需要局部有序就可以了。
RocketMQ在默认情况下不保证顺序,比如创建一个Topic,默认八个写队列,八个读队列。这时候一条消息可能被写入任意一个队列里;在数据的读取过程中,可能有多个Consumer,每个Consumer也可能启动多个线程并行处理,所以消息被哪个Consumer消费,被消费的顺序和写入的顺序是否一致是不确定的。
要保证全局顺序消息,需要先把Topic的读写队列数设置为一,然后Producer和Consumer的并发设置也要是一。简单来说,为了保证整个Topic的全局消息有序,只能消除所有的并发处理,各部分都设置成单线程处理。
原理如上图所示:
要保证部分消息有序,需要发送端和消费端配合处理。在发送端,要做到把同一业务ID的消息发送到同一个Message Queue;在消费过程中,要做到从同一个Message Queue读取的消息不被并发处理,这样才能达到部分有序。消费端通过使用MessageListenerOrderly
类来解决单Message Queue的消息被并发处理的问题。
Consumer使用MessageListenerOrderly
的时候,下面四个Consumer的设置依旧可以使用:
setConsumeThreadMin
setConsumeThreadMax
setPullBatchSize
setConsumeMessageBatchMaxSize
前两个参数设置Consumer的线程数。PullBatchSize
指的是一次从Broker的一个Message Queue获取消息的最大数量,默认值是32。ConsumeMessageBatchMaxSize
指的是这个Consumer的Executor(也就是调用MessageListener处理的地方)一次传入的消息数(Listmsgs这个链表的最大长度),默认值是1。
上述四个参数可以使用,说明MessageListenerOrderly
并不是简单地禁止并发处理。在MessageListenerOrderly
的实现中,为每个Consumer Queue加个锁,消费每个消息前,需要先获得这个消息对应的Consumer Queue所对应的锁,这样保证了同一时间,同一个Consumer Queue的消息不被并发消费,但不同Consumer Queue的消息可以并发处理。
部分有序消息的生产和消费:
# 创建主题,8写8读
[root@node1 ~]# mqadmin updateTopic -b node1:10911 -n localhost:9876 -r 8 -t
tp_demo_07 -w 8
# 删除主题的操作:
[root@node1 ~]# mqadmin deleteTopic -c DefaultCluster -n localhost:9876 -t tp_demo_07
# 主题描述
[root@node1 ~]# mqadmin topicStatus -n localhost:9876 -t tp_demo_07
public class OrderProducer {
public static void main(String[] args) throws MQClientException,
RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new
DefaultMQProducer("producer_grp_07_01");
producer.setNamesrvAddr("node1:9876");
producer.start();
Message message = null;
List<MessageQueue> queues =
producer.fetchPublishMessageQueues("tp_demo_07");
System.err.println(queues.size());
MessageQueue queue = null;
for (int i = 0; i < 100; i++) {
queue = queues.get(i % 8);
message = new Message("tp_demo_07", ("hello rubin - order
create" + i).getBytes());
producer.send(message, queue);
message = new Message("tp_demo_07", ("hello rubin - order
payed" + i).getBytes());
producer.send(message, queue);
message = new Message("tp_demo_07", ("hello rubin - order ship"
+ i).getBytes());
producer.send(message, queue);
}
producer.shutdown();
}
}
public class OrderConsumer {
public static void main(String[] args) throws MQClientException,
RemotingException, InterruptedException, MQBrokerException {
DefaultMQPullConsumer consumer = new
DefaultMQPullConsumer("consumer_grp_07_01");
consumer.setNamesrvAddr("node1:9876");
consumer.start();
Set<MessageQueue> messageQueues =
consumer.fetchSubscribeMessageQueues("tp_demo_07");
System.err.println(messageQueues.size());
for (MessageQueue messageQueue : messageQueues) {
long nextBeginOffset = 0;
System.out.println("===============================");
do {
PullResult pullResult = consumer.pull(messageQueue, "*",
nextBeginOffset, 1);
if (pullResult == null || pullResult.getMsgFoundList() ==
null) break;
nextBeginOffset = pullResult.getNextBeginOffset();
List<MessageExt> msgFoundList =
pullResult.getMsgFoundList();
System.out.println(messageQueue.getQueueId() + "\t" +
msgFoundList.size());
for (MessageExt messageExt : msgFoundList) {
System.out.println(
messageExt.getTopic() + "\t" +
messageExt.getQueueId() + "\t" +
messageExt.getMsgId() + "\t" +
new String(messageExt.getBody())
);
}
} while (true);
}
consumer.shutdown();
}
}
全局有序消息的生产和消费:
# 创建主题,8写8读
[root@node1 ~]# mqadmin updateTopic -b node1:10911 -n localhost:9876 -r 1 -t
tp_demo_07_01 -w 1
# 删除主题的操作:
[root@node1 ~]# mqadmin deleteTopic -c DefaultCluster -n localhost:9876 -t tp_demo_07_01
# 主题描述
[root@node1 ~]# mqadmin topicStatus -n localhost:9876 -t tp_demo_07_01
public class GlobalOrderProducer {
public static void main(String[] args) throws MQClientException,
RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new
DefaultMQProducer("producer_grp_07_02");
producer.setNamesrvAddr("node1:9876");
producer.start();
Message message = null;
for (int i = 0; i < 100; i++) {
message = new Message("tp_demo_07_01", ("hello rubin" +
i).getBytes());
producer.send(message);
}
producer.shutdown();
}
}
public class GlobalOrderConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("consumer_grp_07_03");
consumer.setNamesrvAddr("node1:9876");
consumer.subscribe("tp_demo_07_01", "*");
consumer.setConsumeThreadMin(1);
consumer.setConsumeThreadMax(1);
consumer.setPullBatchSize(1);
consumer.setConsumeMessageBatchMaxSize(1);
consumer.setMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt>
msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
}
}
事务消息
RocketMQ的事务消息,是指发送消息事件和其他事件需要同时成功或同时失败。比如银行转账,A银行的某账户要转一万元到B银行的某账户。A银行发送“B银行账户增加一万元”这个消息,要和“从A银行账户扣除一万元”这个操作同时成功或者同时失败。
RocketMQ采用两阶段提交的方式实现事务消息,TransactionMQProducer
处理上面情况的流程是,先发一个“准备从B银行账户增加一万元”的消息,发送成功后做从A银行账户扣除一万元的操作,根据操作结果是否成功,确定之前的“准备从B银行账户增加一万元”的消息是做commit还是rollback,具体流程如下:
- 发送方向RocketMQ发送“待确认”消息
- RocketMQ将收到的“待确认”消息持久化成功后,向发送方回复消息已经发送成功,此时第一阶段消息发送完成
- 发送方开始执行本地事件逻辑
- 发送方根据本地事件执行结果向RocketMQ发送二次确认(Commit或是Rollback)消息,RocketMQ收到Commit状态则将第一阶段消息标记为可投递,订阅方将能够收到该消息;收到Rollback状态则删除第一阶段的消息,订阅方接收不到该消息
- 如果出现异常情况,步骤4提交的二次确认最终未到达RocketMQ,服务器在经过固定时间段后将对“待确认”消息发起回查请求
- 发送方收到消息回查请求后(如果发送一阶段消息的Producer不能工作,回查请求将被发送到和Producer在同一个Group里的其他Producer),通过检查对应消息的本地事件执行结果返回Commit或Roolback状态
- RocketMQ收到回查请求后,按照步骤4的逻辑处理
上面的逻辑似乎很好地实现了事务消息功能,它也是RocketMQ之前的版本实现事务消息的逻辑。但是因为RocketMQ依赖将数据顺序写到磁盘这个特征来提高性能,步骤4却需要更改第一阶段消息的状态,这样会造成磁盘Catch的脏页过多,降低系统的性能。所以RocketMQ在4.x的版本中将这部分功能去除。系统中的一些上层Class都还在,用户可以根据实际需求实现自己的事务功能。
客户端有三个类来支持用户实现事务消息,第一个类是LocalTransactionExecuter
,用来实例化步骤3的逻辑,根据情况返回LocalTransactionState.ROLLBACK_MESSAGE
或者LocalTransactionState.COMMIT_MESSAGE
状态。第二个类是TransactionMQProducer
,它的用法和DefaultMQProducer
类似,要通过它启动一个Producer并发消息,但是比DefaultMQProducer
多设置本地事务处理函数和回查状态函数。第三个类是TransactionCheckListener
,实现步骤5中MQ服务器的回查请求,返回LocalTransactionState.ROLLBACK_MESSAGE
或者LocalTransactionState.COMMIT_MESSAGE
。
RocketMQ事务消息流程概要
上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。
事务消息发送及提交:
- 发送消息(half消息)
- 服务端响应消息写入结果
- 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)
- 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
补偿流程:
- 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
- Producer收到回查消息,检查回查消息对应的本地事务的状态
- 根据本地事务状态,重新Commit或者Rollback
其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
RocketMQ事务消息设计
事务消息在一阶段对用户不可见
在RocketMQ事务消息的主要流程中,一阶段的消息如何对用户不可见。其中,事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的。那么,如何做到写入消息但是对用户不可见呢?RocketMQ事务消息的做法是:如果消息是half消息,将备份原消息的主题与消息消费队列,然后改变主题为RMQ_SYS_TRANS_HALF_TOPIC。由于消费组未订阅该主题,故消费端无法消费half类型的消息。然后二阶段会显示执行提交或者回滚half消息(逻辑删除)。当然,为了防止二阶段操作失败,RocketMQ会开启一个定时任务,从Topic为RMQ_SYS_TRANS_HALF_TOPIC中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息。
在RocketMQ中,消息在服务端的存储结构如下,每条消息都会有对应的索引信息,Consumer通过ConsumeQueue
这个二级索引来读取消息实体内容,其流程如下:
RocketMQ的具体实现策略是:写入的如果事务消息,对消息的Topic和Queue等属性进行替换,同时将原来的Topic和Queue信息存储到消息的属性中,正因为消息主题被替换,故消息并不会转发到该原主题的消息消费队列,消费者无法感知消息的存在,不会消费。其实改变消息主题是RocketMQ的常用“套路”,回想一下延时消息的实现机制。
Commit和Rollback操作以及Op消息的引入
在完成一阶段写入一条对用户不可见的消息后,二阶段如果是Commit操作,则需要让消息对用户可见;如果是Rollback则需要撤销一阶段的消息。先说Rollback的情况。对于Rollback,本身一阶段的消息对用户是不可见的,其实不需要真正撤销消息(实际上RocketMQ也无法去真正的删除一条消息,因为是顺序写文件的)。但是区别于这条消息没有确定状态(Pending状态,事务悬而未决),需要一个操作来标识这条消息的最终状态。RocketMQ事务消息方案中引入了Op消息的概念,用Op消息标识事务消息已经确定的状态(Commit或者Rollback)。如果一条事务消息没有对应的Op消息,说明这个事务的状态还无法确定(可能是二阶段失败了)。引入Op消息后,事务消息无论是Commit或者Rollback都会记录一个Op操作。Commit相对于Rollback只是在写入Op消息前创建Half消息的索引。
Op消息的存储和对应关系
RocketMQ将Op消息写入到全局一个特定的Topic中通过源码中的方法—TransactionalMessageUtil.buildOpTopic();
这个Topic是一个内部的Topic(像Half消息的Topic一样),不会被用户消费。Op消息的内容为对应的Half消息的存储的Offset,这样通过Op消息能索引到Half消息进行后续的回查操作。
Half消息的索引构建
在执行二阶段Commit操作时,需要构建出Half消息的索引。一阶段的Half消息由于是写到一个特殊的Topic,所以二阶段构建索引时需要读取出Half消息,并将Topic和Queue替换成真正的目标的Topic和Queue,之后通过一次普通消息的写入操作来生成一条对用户可见的消息。所以RocketMQ事务消息二阶段其实是利用了一阶段存储的消息的内容,在二阶段时恢复出一条完整的普通消息,然后走一遍消息写入流程。
如何处理二阶段失败的消息
如果在RocketMQ事务消息的二阶段过程中失败了,例如在做Commit操作时,出现网络问题导致Commit失败,那么需要通过一定的策略使这条消息最终被Commit。RocketMQ采用了一种补偿机制,称为“回查”。Broker端对未确定状态的消息发起回查,将消息发送到对应的Producer端(同一个Group的Producer),由Producer根据消息来检查本地事务的状态,进而执行Commit或者Rollback。Broker端通过对比Half消息和Op消息进行事务消息的回查并且推进CheckPoint(记录那些事务消息的状态是确定的)。
值得注意的是,RocketMQ并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态, RocketMQ 默认回滚该消息。
示例
public class TxProducer {
public static void main(String[] args) throws MQClientException {
TransactionListener listener = new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message
msg, Object arg) {
// 当发送事务消息prepare(half)成功后,调用该方法执行本地事务
System.out.println("执行本地事务,参数为:" + arg);
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// return LocalTransactionState.ROLLBACK_MESSAGE;
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt
msg) {
// 如果没有收到生产者发送的Half Message的响应,broker发送请求到生产
者回查生产者本地事务的状态
// 该方法用于获取本地事务执行的状态。
System.out.println("检查本地事务的状态:" + msg);
return LocalTransactionState.COMMIT_MESSAGE;
// return LocalTransactionState.ROLLBACK_MESSAGE;
}
};
TransactionMQProducer producer = new
TransactionMQProducer("tx_producer_grp_08");
producer.setTransactionListener(listener);
producer.setNamesrvAddr("node1:9876");
producer.start();
Message message = null;
message = new Message("tp_demo_08", "hello rubin - tx".getBytes());
producer.sendMessageInTransaction(message, "
{\"name\":\"zhangsan\"}");
}
}
public class TxConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("txconsumer_grp_08_01");
consumer.setNamesrvAddr("node1:9876");
consumer.subscribe("tp_demo_08", "*");
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus
consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
消息查询
RocketMQ支持按照下面两种维度(“按照Message Id查询消息”、“按照Message Key查询消息”)进行消息查询。
按照MessageId查询消息
MsgId 总共16字节,包含消息存储主机地址(ip/port),消息Commit Log offset。从MsgId中解析出Broker的地址和Commit Log的偏移地址,然后按照存储格式所在位置将消息buffer解析成一个完整的消息。
在RocketMQ中具体做法是:Client端从MessageId中解析出Broker的地址(IP地址和端口)和CommitLog的偏移地址后封装成一个RPC请求后,通过Remoting通信层发送(业务请求码:VIEW_MESSAGE_BY_ID)。Broker使用QueryMessageProcessor
,使用请求中的CommitLog offset和size去CommitLog中找到真正的记录并解析成一个完整的消息返回。
按照MessageKey查询消息
按照MessageKey查询消息,主要是基于RocketMQ的IndexFile索引文件来实现的。RocketMQ的索引文件逻辑结构,类似JDK中HashMap的实现。索引文件的具体结构如下:
- 根据查询的key的hashcode%slotNum得到具体的槽的位置(slotNum是一个索引文件里面包含的最大槽的数目, 例如图中所示 slotNum=5000000)
- 根据slotValue(slot 位置对应的值查找到索引项列表的最后一项(倒序排列,slotValue总是指向最新的一个索引项)
- 遍历索引项列表返回查询时间范围内的结果集(默认一次最大返回的 32 条记录)
- Hash 冲突:第一种,key的hash值不同但模数相同,此时查询的时候会再比较一次key的hash值(每个索引项保存了key 的hash值),过滤掉hash值不相等的项;第二种,hash值相等但key不等, 出于性能的考虑冲突的检测放到客户端处理(key的原始值是存储在消息文件中的,避免对数据文件的解析),客户端比较一次消息体的key是否相同
- 存储:为了节省空间索引项中存储的时间是时间差值(存储时间-开始时间,开始时间存储在索引文件头中), 整个索引文件是定长的,结构也是固定的
示例
package com.rubin.rocket.view;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("producer_grp_16_02");
producer.setNamesrvAddr("rocketmq-host:9876");
producer.start();
Message message = new Message("tp_demo_17", "hello rubin".getBytes());
message.setKeys("rubin_key");
SendResult sendResult = producer.send(message);
System.out.println(sendResult.getSendStatus());
System.out.println(sendResult.getMsgId());
// System.out.println(sendResult.getQueueOffset());
System.out.println(sendResult.getOffsetMsgId());
producer.shutdown();
}
}
package com.rubin.rocket.view;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.Arrays;
import java.util.Date;
public class Consumer {
public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("consuper_grp_17_01");
consumer.setNamesrvAddr("rocketmq-host:9876");
consumer.start();
final MessageExt messageExt = consumer.viewMessage("tp_demo_17", "AC32031E6B8018B4AAC27DE2E3E90000");
System.out.println(messageExt);
final MessageExt messageExt1 = consumer.viewMessage("2F5C971300002A9F000000000111808D");
System.out.println(messageExt1);
final QueryResult queryResult = consumer.queryMessage("tp_demo_17", "rubin_key", 10, new Date().getTime() - 10 * 60 * 1000, new Date().getTime());
System.out.println(Arrays.toString(queryResult.getMessageList().toArray()));
consumer.shutdown();
}
}
消息优先级
有些场景,需要应用程序处理几种类型的消息,不同消息的优先级不同。RocketMQ是个先入先出的队列,不支持消息级别或者Topic级别的优先级。业务中简单的优先级需求,可以通过间接的方式解决,下面列举三种优先级相关需求的具体处理方法。
第一种
多个不同的消息类型使用同一个topic时,由于某一个种消息流量非常大,导致其他类型的消息无法及时消费,造成不公平,所以把流量大的类型消息在一个单独的Topic,其他类型消息在另外一个Topic,应用程序创建两个 Consumer,分别订阅不同的Topic,这样就可以了。
第二种
情况和第一种情况类似,但是不用创建大量的 Topic。举个实际应用场景: 一个订单处理系统,接收从100家快递门店过来的请求,把这些请求通过 Producer 写入RocketMQ;订单处理程序通过Consumer 从队列里读取消息并处理,每天最多处理1万单 。 如果这100个快递门店中某几个门店订单量大增,比如门店一接了个大客户,一个上午就发出 2万单消息请求,这样其他的99家门店可能被迫等待门店一的2万单处理完,也就是两天后订单才能被处理,显然很不公平 。
这时可以创建一个Topic, 设置Topic的MessageQueue数量超过100个,Producer根据订单的门店号,把每个门店的订单写入一个MessageQueue
。 DefaultMQPushConsumer
默认是采用循环的方式逐个读取一个Topic的所有MessageQueue
,这样如果某家门店订单量大增,这家门店对应的MessageQueue消息数增多,等待时间增长,但不会造成其他家门店等待时间增长。
DefaultMQPushConsumer
默认的pullBatchSize是32,也就是每次从某个MessageQueue
读取消息的时候,最多可以读32个 。 在上面的场景中,为了更加公平,可以把pullBatchSize
设置成1。
第三种
强制优先级。
TypeA、 TypeB、 TypeC 三类消息 。 TypeA 处于第一优先级,要确保只要有TypeA消息,必须优先处理,TypeB处于第二优先级;,TypeC 处于第三优先级 。
对这种要求,或者逻辑更复杂的要求,就要用户自己编码实现优先级控制,如果上述的三类消息在一个Topic里,可以使用PullConsumer,自主控制MessageQueue的遍历,以及消息的读取。如果上述三类消息在三个Topic下,需要启动三个Consumer, 实现逻辑控制三个Consumer的消费 。
底层网络通信 - Netty高性能之道
RocketMQ底层通信的实现是在Remoting模块里,因为借助了Netty而没有重复造轮子,RocketMQ的通信部分没有很多的代码,就是用Netty实现了一个自定义协议的客户端/服务器程序。
- 自定义ByteBuf可以从底层解决ByteBuffer的一些问题,并且通过“内存池”的设计来提升性能
- Reactor主从多线程模型
- 充分利用了零拷贝,CAS/volatite高效并发编程特性
- 无锁串行化设计
- 管道责任链的编程模型
- 高性能序列化框架的支持
- 灵活配置TCP协议参数
RocketMQ消息队列集群主要包括NameServer、Broker(Master/Slave)、Producer、Consumer4个角色,基本通讯流程如下:
- Broker启动后需要完成一次将自己注册至NameServer的操作,随后每隔30s时间定时向NameServer上报Topic路由信息
- 消息生产者Producer作为客户端发送消息时候,需要根据消息的Topic从本地缓存的TopicPublishInfoTable获取路由信息。如果没有则更新路由信息会从NameServer上重新拉取,同时Producer会默认每隔30s向NameServer拉取一次路由信息
- 消息生产者Producer根据2中获取的路由信息选择一个队列(MessageQueue)进行消息发送。Broker作为消息的接收者接收消息并落盘存储
- 消息消费者Consumer根据2中获取的路由信息,并再完成客户端的负载均衡后,选择其中的某一个或者某几个消息队列来拉取消息并进行消费
从上面1~3中可以看出在消息生产者,Broker和NameServer之间都会发生通信(这里只说了MQ的部分通信),因此如何设计一个良好的网络通信模块在MQ中至关重要,它将决定RocketMQ集群整体的消息传输能力与最终的性能。
rocketmq-remoting模块是RocketMQ消息队列中负责网络通信的模块,它几乎被其他所有需要网络通信的模块(诸如rocketmq-client、rocketmq-broker、rocketmq-namesrv)所依赖和引用。为了实现客户端与服务器之间高效的数据请求与接收,RocketMQ消息队列自定义了通信协议并在Netty的基础之上扩展了通信模块。
RocketMQ中惯用的套路:
请求报文和响应都使用RemotingCommand,然后在Processor处理器中根据RequestCode请求码来匹配对应的处理方法。
处理器通常继承至NettyRequestProcessor
,使用前需要先注册才行,注册方式remotingServer.registerDefaultProcessor
。
网络通信核心的东西无非是:
- 线程模型
- 私有协议定义
- 编解码器
- 序列化/反序列化
- ...
既然是基于Netty的网络通信,当然少不了一堆自定义实现的Handler,例如继承至:SimpleChannelInboundHandler
、ChannelDuplexHandler
。
Remoting通信类结构
协议设计与编解码
在Client和Server之间完成一次消息发送时,需要对发送的消息进行一个协议约定,因此就有必要自定义RocketMQ的消息协议。同时,为了高效地在网络中传输消息和对收到的消息读取,就需要对消息进行编解码。在RocketMQ中,RemotingCommand
这个类在消息传输过程中对所有数据内容的封装,不但包含了所有的数据结构,还包含了编码解码操作。
Header字段 | 类型 | Request说明 | Response说明 |
code | int | 请求操作码,应答方根据不同的请求码进行不同的业务处理 | 应答响应码。0表示成功,非0则表示各种错误 |
language | LanguageCode | 请求方实现的语言 | 应答方实现的语言 |
version | int | 请求方程序的版本 | 应答方程序的版本 |
opaque | int | 相当于requestId,在同一个连接上的不同请求标识码,与响应消息中的相对应 | 应答不做修改直接返回 |
flag | int | 区分是普通RPC还是onewayRPC的标志 | 区分是普通RPC还是onewayRPC的标志 |
remark | String | 传输自定义文本信息 | 传输自定义文本信息 |
extFields | HashMap<String, String> | 请求自定义扩展信息 | 响应自定义扩展信息 |
可见传输内容主要可以分为以下4部分:
- 消息长度:总长度,四个字节存储,占用一个int类型
- 序列化类型&消息头长度:同样占用一个int类型,第一个字节表示序列化类型,后面三个字节表示消息头长度
- 消息头数据:经过序列化后的消息头数据
- 消息主体数据:消息主体的二进制字节数据内容
消息的通信方式和流程
在RocketMQ消息队列中支持通信的方式主要有同步(sync)、异步(async)、单向(oneway) 三种。其中“单向”通信模式相对简单,一般用在发送心跳包场景下,无需关注其Response。这里,主要介绍RocketMQ的异步通信流程。
Reactor主从多线程模型
RocketMQ的RPC通信采用Netty组件作为底层通信库,同样也遵循了Reactor多线程模型,同时又在这之上做了一些扩展和优化。
上面的框图中可以大致了解RocketMQ中NettyRemotingServer的Reactor多线程模型。
一个Reactor主线程(eventLoopGroupBoss
,即为上面的1)负责监听TCP网络连接请求,建立好连接,创建SocketChannel
,并注册到selector
上。
RocketMQ的源码中会自动根据OS的类型选择NIO和Epoll,也可以通过参数配置,然后监听真正的网络数据。
拿到网络数据后,再丢给Worker线程池(eventLoopGroupSelector
,即为上面的“N”,源码中默认设置为3),在真正执行业务逻辑之前需要进行SSL验证、编解码、空闲检查、网络连接管理,这些工作交给defaultEventExecutorGroup
(即为上面的“M1”,源码中默认设置为8)去做。
处理业务操作放在业务线程池中执行,根据RomotingCommand
的业务请求码code去processorTable
这个本地缓存变量中找到对应的processor,然后封装成task任务后,提交给对应的业务processor处理线程池来执行(sendMessageExecutor
,以发送消息为例,即为上面的 “M2”)。
从入口到业务逻辑的几个步骤中线程池一直再增加,这跟每一步逻辑复杂性相关,越复杂,需要的并发通道越宽。
线程数 | 线程名 | 线程具体说明 |
1 | NettyBoss_%d | Reactor主线程 |
N | NettyServerEPOLLSelector%d%d | Reactor线程池 |
M1 | NettyServerCodecThread_%d | Worker线程池 |
M2 | RemotingExecutorThread_%d |
限流
RocketMQ消费端中我们可以:
- 设置最大消费线程数
- 每次拉取消息条数等
同时:
PushConsumer
会判断获取但还未处理的消息个数、消息总大小、Offset的跨度- 任何一个值超过设定的大小就隔一段时间再拉取消息,从而达到流量控制的目的
在Apache RocketMQ中,当消费者去消费消息的时候,无论是通过pull的方式还是push的方式,都可能会出现大批量的消息突刺。如果此时要处理所有消息,很可能会导致系统负载过高,影响稳定性。但其实可能后面几秒之内都没有消息投递,若直接把多余的消息丢掉则没有充分利用系统处理消息的能力。我们希望可以把消息突刺均摊到一段时间内,让系统负载保持在消息处理水位之下的同时尽可能地处理更多消息,从而起到“削峰填谷”的效果:
上图中红色的部分代表超出消息处理能力的部分。我们可以看到消息突刺往往都是瞬时的、不规律的,其后一段时间系统往往都会有空闲资源。我们希望把红色的那部分消息平摊到后面空闲时去处理,这样既可以保证系统负载处在一个稳定的水位,又可以尽可能地处理更多消息。
Sentinel介绍
Sentinel是阿里中间件团队开源的,面向分布式服务架构的轻量级流量控制产品,主要以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度来帮助用户保护服务的稳定性。
Sentinel原理
Sentinel 专门为这种场景提供了匀速器的特性,可以把突然到来的大量请求以匀速的形式均摊,以固定的间隔时间让请求通过,以稳定的速度逐步处理这些请求,起到“削峰填谷”的效果,从而避免流量突刺造成系统负载过高。同时堆积的请求将会排队,逐步进行处理;当请求排队预计超过最大超时时长的时候则直接拒绝,而不是拒绝全部请求。
比如在RocketMQ的场景下配置了匀速模式下请求QPS为 5,则会每200ms处理一条消息,多余的处理任务将排队。同时设置了超时时间为5s,预计排队时长超过5s的处理任务将会直接被拒绝。示意图如下图所示:
RocketMQ用户可以根据不同的group和不同的topic分别设置限流规则,限流控制模式设置为匀速器模式(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER
),比如:
private void initFlowControlRule() {
FlowRule rule = new FlowRule();
rule.setResource(KEY); // 对应的key为groupName:topicName
rule.setCount(5);
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
rule.setLimitApp("default");
// 匀速器模式下,设置了QPS为5,则请求每200ms允许通过1个
rule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER);
// 如果更多的请求到达,这些请求会被置于虚拟的等待队列中。等待队列有一个max timeout,
如果请求预计的等待时间超过这个时间会直接被block
// 在这里,timeout 为 5s
rule.setMaxQueueingTimeMs(5 * 1000);
FlowRuleManager.loadRules(Collections.singletonList(rule));
}
示例
首先要添加依赖项:
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
<version>1.7.2</version>
</dependency>
public class MyProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("producer_grp_13_01");
producer.setNamesrvAddr("node1:9876");
producer.start();
Message message = null;
for (int i = 0; i < 1000; i++) {
message = new Message("tp_demo_13", ("hello rubin - " + i).getBytes());
producer.send(message);
}
producer.shutdown();
}
}
public class MyConsumer {
// 消费组名称
private static final String GROUP_NAME = "consumer_grp_13_01";
// 主题名称
private static final String TOPIC_NAME = "tp_demo_13";
// consumer_grp_13_01:tp_demo_13
private static final String KEY = String.format("%s:%s", GROUP_NAME, TOPIC_NAME);
// 使用map存放主题每个MQ的偏移量
private static final Map<MessageQueue, Long> OFFSET_TABLE = new HashMap<MessageQueue, Long>();
@SuppressWarnings("PMD.ThreadPoolCreationRule")
// 具有固定大小的线程池
private static final ExecutorService pool = Executors.newFixedThreadPool(32);
private static final AtomicLong SUCCESS_COUNT = new AtomicLong(0);
private static final AtomicLong FAIL_COUNT = new AtomicLong(0);
public static void main(String[] args) throws MQClientException {
// 初始化哨兵的流控
initFlowControlRule();
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(GROUP_NAME);
consumer.setNamesrvAddr("node1:9876");
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(TOPIC_NAME);
for (MessageQueue mq : mqs) {
System.out.printf("Consuming messages from the queue: %s%n", mq);
SINGLE_MQ:
while (true) {
try {
PullResult pullResult =
consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
if (pullResult.getMsgFoundList() != null) {
for (MessageExt msg : pullResult.getMsgFoundList()) {
doSomething(msg);
}
}
long nextOffset = pullResult.getNextBeginOffset();
// 将每个mq对应的偏移量记录在本地HashMap中
putMessageQueueOffset(mq, nextOffset);
consumer.updateConsumeOffset(mq, nextOffset);
switch (pullResult.getPullStatus()) {
case NO_NEW_MSG:
break SINGLE_MQ;
case FOUND:
case NO_MATCHED_MSG:
case OFFSET_ILLEGAL:
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
/**
* 对每个收到的消息使用一个线程提交任务
* @param message
*/
private static void doSomething(MessageExt message) {
pool.submit(() -> {
Entry entry = null;
try {
// 应用流控规则
ContextUtil.enter(KEY);
entry = SphU.entry(KEY, EntryType.OUT);
// 在这里处理业务逻辑,此处只是打印
System.out.printf("[%d][%s][Success: %d] Receive New Messages: %s %n", System.currentTimeMillis(),
Thread.currentThread().getName(), SUCCESS_COUNT.addAndGet(1), new String(message.getBody()));
} catch (BlockException ex) {
// Blocked.
System.out.println("Blocked: " + FAIL_COUNT.addAndGet(1));
} finally {
if (entry != null) {
entry.exit();
}
ContextUtil.exit();
}
});
}
private static void initFlowControlRule() {
FlowRule rule = new FlowRule();
// 消费组名称:主题名称 字符串
rule.setResource(KEY);
// 根据QPS进行流控
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
// 1表示QPS为1,请求间隔1000ms。
// 如果是5,则表示每秒5个消息,请求间隔200ms
rule.setCount(1);
rule.setLimitApp("default");
// 调用使用固定间隔。如果qps为1,则请求之间间隔为1s
rule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER);
// 如果请求太多,就将这些请求放到等待队列中
// 该队列有超时时间。如果等待队列中请求超时,则丢弃
// 此处设置超时时间为5s
rule.setMaxQueueingTimeMs(5 * 1000);
// 使用流控管理器加载流控规则
FlowRuleManager.loadRules(Collections.singletonList(rule));
}
// 获取指定MQ的偏移量
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSET_TABLE.get(mq);
if (offset != null) {
return offset;
}
return 0;
}
// 在本地HashMap中记录偏移量
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
OFFSET_TABLE.put(mq, offset);
}
}
以上就是本文的全部内容,欢迎小伙伴们积极留言交流~~~
附件
链接:https://pan.rubinchu.com/share/1462969005840007168
提取码:zcfy
文章评论