生产者
Tags的使用
一个应用尽可能用一个Topic,而消息子类型则可以用tags来标识。tags可以由应用自由设置,只有生产者在发送消息设置了tags,消费方在订阅消息时才可以利用tags通过broker做消息过滤:message.setTags("TagA")
。
Keys的使用
每个消息在业务层面的唯一标识码要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic、key来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。
// 订单Id
String orderId = "20034568923546";
message.setKeys(orderId);
日志的打印
消息发送成功或者失败要打印消息日志,务必要打印SendResult
和key
字段。send
消息方法只要不抛异常,就代表发送成功。发送成功会有多个状态,在sendResult
里定义。
以下对每个状态进行说明:
- SEND_OK:消息发送成功。要注意的是消息发送成功也不意味着它是可靠的。要确保不会丢失任何消息,还应启用同步Master服务器或同步刷盘,即
SYNC_MASTER
或SYNC_FLUSH
- FLUSH_DISK_TIMEOUT:消息发送成功但是服务器刷盘超时。此时消息已经进入服务器队列(内存),只有服务器宕机,消息才会丢失。消息存储配置参数中可以设置刷盘方式和同步刷盘时间长度。如果Broker服务器设置了刷盘方式为同步刷盘,即
FlushDiskType=SYNC_FLUSH
(默认为异步刷盘方式),当Broker服务器未在同步刷盘时间内(默认为5s)完成刷盘,则将返回该状态——刷盘超时 - FLUSH_SLAVE_TIMEOUT:消息发送成功,但是服务器同步到Slave时超时。此时消息已经进入服务器队列,只有服务器宕机,消息才会丢失。如果Broker服务器的角色是同步Master,即
SYNC_MASTER
(默认是异步Master即ASYNC_MASTER
),并且从Broker服务器未在同步刷盘时间(默认为5秒)内完成与主服务器的同步,则将返回该状态——数据同步到Slave服务器超时 - SLAVE_NOT_AVAILABLE:消息发送成功,但是此时Slave不可用。如果Broker服务器的角色是同步Master,即
SYNC_MASTER
(默认是异步Master服务器即ASYNC_MASTER
),但没有配置Slave Broker服务器,则将返回该状态——无Slave服务器可用
消息发送失败处理方式
Producer的send
方法本身支持内部重试,重试逻辑如下:
- 至多重试2次(同步发送为2次,异步发送为0次)
- 如果发送失败,则轮转到下一个Broker。这个方法的总耗时时间不超过
sendMsgTimeout
设置的值,默认10s - 如果本身向broker发送消息产生超时异常,就不会再重试
以上策略也是在一定程度上保证了消息可以发送成功。如果业务对消息可靠性要求比较高,建议应用增加相应的重试逻辑:比如调用send
同步方法发送失败时,则尝试将消息存储到db,然后由后台线程定时重试,确保消息一定到达Broker。
上述db重试方式为什么没有集成到MQ客户端内部做,而是要求应用自己去完成,主要基于以下几点考虑:
- MQ的客户端设计为无状态模式,方便任意的水平扩展,且对机器资源的消耗仅仅是cpu、内存、网络
- 如果MQ客户端内部集成一个KV存储模块,那么数据只有同步落盘才能较可靠,而同步落盘本身性能开销较大,所以通常会采用异步落盘,又由于应用关闭过程不受MQ运维人员控制,可能经常会发生 kill -9 这样暴力方式关闭,造成数据没有及时落盘而丢失
- Producer所在机器的可靠性较低,一般为虚拟机,不适合存储重要数据。综上,建议重试过程交由应用来控制
选择oneway形式发送
通常消息的发送是这样一个过程:
- 客户端发送请求到服务器
- 服务器处理请求
- 服务器向客户端返回应答
所以,一次消息发送的耗时时间是上述三个步骤的总和,而某些场景要求耗时非常短,但是对可靠性要求并不高,例如日志收集类应用,此类应用可以采用oneway形式调用,oneway形式只发送请求不等待应答,而发送请求在客户端实现层面仅仅是一个操作系统系统调用的开销,即将数据写入客户端的socket缓冲区,此过程耗时通常在微秒级。
消费者
消费过程幂等
RocketMQ无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。
可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。
在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)
msgId一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同msgId的情况(生产者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。
消费速度慢的处理方式
提高消费并行度
绝大部分消息消费行为都属于 IO 密集型,即可能是操作数据库,或者调用 RPC,这类消费行为的消费速度在于后端数据库或者外系统的吞吐量。
通过增加消费并行度,可以提高总的消费吞吐量,但是并行度增加到一定程度,反而会下降。
所以,应用必须要设置合理的并行度。 如下有几种修改消费并行度的方法:
- 同一个ConsumerGroup下,通过增加Consumer实例数量来提高并行度(需要注意的是超过订阅队列数的Consumer实例无效)。可以通过加机器,或者在已有机器启动多个进程的方式
- 提高单个Consumer的消费并行线程,通过修改参数
consumeThreadMin
、consumeThreadMax
实现 - 丢弃部分不重要的消息
批量方式消费
某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吐量。
例如订单扣款类应用,一次处理一个订单耗时1s,一次处理10个订单可能也只耗时2s,这样即可大幅度提高消费的吞吐量,通过设置 consumer的consumeMessageBatchMaxSize
返个参数,默认是1,即一次只消费一条消息,例如设置为N,那么每次消费的消息数小于等于N。
跳过非重要消息
发生消息堆积时,如果消费速度一直追不上发送速度,如果业务对数据要求不高的话,可以选择丢弃不重要的消息。
例如,当某个队列的消息数堆积到100000条以上,则尝试丢弃部分或全部消息,这样就可以快速追上发送消息的速度。示例代码如下:
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
long offset = msgs.get(0).getQueueOffset();
String maxOffset =
msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET);
long diff = Long.parseLong(maxOffset) - offset;
if (diff > 100000) {
// TODO 消息堆积情况的特殊处理
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
// TODO 正常消费过程
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
优化每条消息消费过程
举例如下,某条消息的消费过程如下:
- 根据消息从DB查询【数据 1】
- 根据消息从DB查询【数据 2】
- 复杂的业务计算
- 向DB插入【数据 3】
- 向DB插入【数据 4】
这条消息的消费过程中有4次与DB的交互,如果按照每次5ms 计算,那么总共耗时20ms,假设业务计算耗时5ms,那么总过耗时25ms,所以如果能把4次DB交互优化为2次,那么总耗时就可以优化到15ms,即总体性能提高了40%。所以应用如果对时延敏感的话,可以把DB部署在SSD硬盘,相比于SCSI磁盘,前者的RT会小很多。
消费打印日志
如果消息量较少,建议在消费入口方法打印消息,消费耗时等,方便后续排查问题。
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
log.info("RECEIVE_MSG_BEGIN: " + msgs.toString());
// TODO 正常消费过程
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
如果能打印每条消息消费耗时,那么在排查消费慢等线上问题时,会更方便。
其他消费建议
关于消费者和订阅
第一件需要注意的事情是,不同的消费组可以独立的消费一些topic,并且每个消费组都有自己的消费偏移量。
确保同一组内的每个消费者订阅信息保持一致。
关于有序消息
消费者将锁定每个消息队列,以确保他们被逐个消费,虽然这将会导致性能下降,但是当你关心消息顺序的时候会很有用。
这里不建议抛出异常,你可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
作为替代。
关于并发消费
顾名思义,消费者将并发消费这些消息,建议你使用它来获得良好性能,我们不建议抛出异常,你可以返回 ConsumeConcurrentlyStatus.RECONSUME_LATER
作为替代。
关于消费状态Consume Status
对于并发的消费监听器,你可以返回RECONSUME_LATER
来通知消费者现在不能消费这条消息,并且希望可以稍后重新消费它。然后,你可以继续消费其他消息。对于有序的消息监听器,因为你关心它的顺序,所以不能跳过消息,但是你可以返回SUSPEND_CURRENT_QUEUE_A_MOMENT
告诉消费者等待片刻。
关于Blocking
不建议阻塞监听器,因为它会阻塞线程池,并最终可能会终止消费进程。
关于线程数设置
消费者使用ThreadPoolExecutor在内部对消息进行消费,所以你可以通过设置setConsumeThreadMin
或 setConsumeThreadMax
来改变它。
关于消费位点
当建立一个新的消费组时,需要决定是否需要消费已经存在于Broker中的历史消息。
CONSUME_FROM_LAST_OFFSET
将会忽略历史消息,并消费之后生成的任何消息。
CONSUME_FROM_FIRST_OFFSET
将会消费每个存在于Broker中的信息。
也可以使用CONSUME_FROM_TIMESTAMP
来消费在指定时间戳后产生的消息。
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("consumer_grp_15_01");
consumer.setNamesrvAddr("node1:9886");
consumer.subscribe("tp_demo_15", "*");
// 以下三个选一个使用,如果是根据时间戳进行消费,则需要设置时间戳
// 从第一个消息开始消费,从头开始
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 从最后一个消息开始消费,不消费历史消息
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 从指定的时间戳开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
// 指定时间戳的值
/**
* Backtracking consumption time with second precision. Time format is
* 20131223171201<br>
* Implying Seventeen twelve and 01 seconds on December 23, 2013 year<br>
* Default backtracking consumption time Half an hour ago.
*/
consumer.setConsumeTimestamp("");
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
msgs, ConsumeConcurrentlyContext context) {
// TODO 处理消息的业务逻辑
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
// return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
}
Broker
Broker角色
Broker角色分为ASYNC_MASTER
(异步主机)、SYNC_MASTER
(同步主机)以及SLAVE
(从机)。如果对消息的可靠性要求比较严格,可以采用SYNC_MASTER
加SLAVE
的部署方式。如果对消息可靠性要求不高,可以采用ASYNC_MASTER
加SLAVE
的部署方式。如果只是测试方便,则可以选择仅ASYNC_MASTER
或仅SYNC_MASTER
的部署方式。
FlushDiskType
SYNC_FLUSH
(同步刷新)相比于ASYNC_FLUSH
(异步处理)会损失很多性能,但是也更可靠,所以需要根据实际的业务场景做好权衡。
Broker配置
参数名 | 默认值 | 说明 |
listenPort | 10911 | 接受客户端连接的监听端口 |
namesrvAddr | null | nameServer地址 |
brokerIP1 | 网卡的InetAddress | 当前broker监听的IP |
brokerIP2 | 跟brokerIP1一样 | 存在主从broker时,如果在broker主节点上配置了brokerIP2属性,broker 从节点会连接主节点配置的brokerIP2进行同步 |
brokerName | null | broker的名称 |
brokerClusterName | DefaultCluster | 本broker所属的Cluser名称 |
brokerId | 0 | broker id: 0 表示master, 其他的正整数表示slave |
storePathCommitLog | 存储commit log的路径 | |
storePathConsumerQueue | 存储consume queue的路径 | |
mappedFileSizeCommitLog | commit log的映射文件大小 | |
deleteWhen | 在每天的什么时间删除已经超过文件保留时间的commit log | |
fileReservedTime | 以小时计算的文件保留时间 | |
brokerRole | SYNC_MASTER /ASYNC_MASTER /SLAVE | |
flushDiskType | SYNC_FLUSH /ASYNC_FLUSH SYNC_FLUSH 模式下的broker保证在收到确认生产者之前将消息刷盘。ASYNC_FLUSH 模式下的broker则利用刷盘一组消息的模式,可以取得更好的性能 |
NameServer
RocketMQ的架构图:
NameServer的设计:
- NameServer互相独立,彼此没有通信关系,单台NameServer挂掉,不影响其他NameServer
- NameServer不去连接别的机器,不主动推消息
- 单个Broker(Master、Slave)与所有NameServer进行定时注册,以便告知NameServer自己还活着
Broker每隔30秒向所有NameServer发送心跳,心跳包含了自身的topic配置信息。NameServer每隔10秒,扫描所有还存活的broker连接,如果某个连接的最后更新时间与当前时间差值超过2分钟,则断开此连接,NameServer也会断开此broker下所有与slave的连接。同时更新topic与队列的对应关系,但不通知生产者和消费者。Broker slave 同步或者异步从Broker master 上拷贝数据。
- Consumer随机与一个NameServer建立长连接,如果该NameServer断开,则从NameServer列表中查找下一个进行连接
Consumer主要从NameServer中根据Topic查询Broker的地址,查到就会缓存到客户端,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。
如果Broker宕机,则NameServer会将其剔除,而Consumer端的定时任务MQClientInstance.this.updateTopicRouteInfoFromNameServer每30秒执行一次,将Topic对应的Broker地址拉取下来,此地址只有Slave地址了,此时Consumer从Slave上消费。
消费者与Master和Slave都建有连接,在不同场景有不同的消费规则。
- Producer随机与一个NameServer建立长连接,每隔30秒(此处时间可配置)从NameServer获取Topic的最新队列情况,如果某个Broker Master宕机,Producer最多30秒才能感知,在这个期间,发往该broker master的消息失败。Producer向提供Topic服务的Master建立长连接,且定时向Master发送心跳
生产者与所有的master连接,但不能向slave写入。
客户端是先从NameServer寻址的,得到可用Broker的IP和端口信息,然后据此信息连接broker。
综上所述,NameServer在RocketMQ中的作用:
- NameServer 用来保存活跃的broker列表,包括Master和Slave
- NameServer 用来保存所有topic和该topic所有队列的列表
- NameServer 用来保存所有broker的Filter列表
- NameServer 为客户端,包括生产者,消费者和命令行客户端提供最新的路由信息
RocketMQ为什么不使用ZooKeeper而自己开发NameServer?
在服务发现领域,ZooKeeper根本就不能算是最佳的选择。
1、注册中心是CP还是AP系统?
在分布式系统中,即使是对等部署的服务,因为请求到达的时间,硬件的状态,操作系统的调度,虚拟机的GC等,任何一个时间点,这些对等部署的节点状态也不可能完全一致,而流量不一致的情况下,只要注册中心在A承诺的时间内(例如1s内)将数据收敛到一致状态(即满足最终一致),流量将很快趋于统计学意义上的一致,所以注册中心以最终一致的模型设计在生产实践中完全可以接受。
2、分区容忍及可用性需求分析实践中,注册中心不能因为自身的任何原因破坏服务之间本身的可连通性,这是注册中心设计应该遵循的铁律!
在CAP的权衡中,注册中心的可用性比数据强一致性更宝贵,所以整体设计更应该偏向AP,而非CP,数据不一致在可接受范围,而P下舍弃A却完全违反了注册中心不能因为自身的任何原因破坏服务本身的可连通性的原则。
3、服务规模、容量、服务联通性
当数据中心服务规模超过一定数量,作为注册中心的ZooKeeper性能堪忧。
在服务发现和健康监测场景下,随着服务规模的增大,无论是应用频繁发布时的服务注册带来的写请求,还是刷毫秒级的服务健康状态带来的写请求,还是恨不能整个数据中心的机器或者容器皆与注册中心有长连接带来的连接压力上,ZooKeeper很快就会力不从心,而ZooKeeper的写并不是可扩展的,不可以通过加节点解决水平扩展性问题。
4、注册中心需要持久存储和事务日志么? 需要,也不需要。
在服务发现场景中,其最核心的数据——实时的健康的服务的地址列表,真的需要数据持久化么?不需要
在服务发现中,服务调用发起方更关注的是其要调用的服务的实时的地址列表和实时健康状态,每次发起调用时,并不关心要调用的服务的历史服务地址列表、过去的健康状态。
但是一个完整的生产可用的注册中心,除了服务的实时地址列表以及实时的健康状态之外,还会存储一些服务的元数据信息,例如服务的版本,分组,所在的数据中心,权重,鉴权策略信息,服务标签等元数据,这些数据需要持久化存储,并且注册中心应该提供对这些元数据的检索的能力。
5、服务健康检查
使用ZooKeeper作为服务注册中心时,服务的健康检测绑定在了ZooKeeper对于Session的健康监测上,或者说绑定在TCP长链接活性探测上了。
ZK与服务提供者机器之间的TCP长链接活性探测正常的时候,该服务就是健康的么?答案当然是否定的!注册中心应该提供更丰富的健康监测方案,服务的健康与否的逻辑应该开放给服务提供方自己定义,而不是一刀切搞成了TCP活性检测!
健康检测的一大基本设计原则就是尽可能真实的反馈服务本身的真实健康状态,否则一个不敢被服务调用者相信的健康状态判定结果还不如没有健康检测。
6、注册中心的容灾考虑
如果注册中心(Registry)本身完全宕机了,服务调用链路应该受到影响么?
不应该受到影响。
服务调用(请求响应流)链路应该是弱依赖注册中心,必须仅在服务发布,机器上下线,服务扩缩容等必要时才依赖注册中心。
这需要注册中心仔细的设计自己提供的客户端,客户端中应该有针对注册中心服务完全不可用时做容灾的手段,例如设计客户端缓存数据机制就是行之有效的手段。另外,注册中心的健康检查机制也要仔细设计以便在这种情况不会出现诸如推空等情况的出现。
ZooKeeper的原生客户端并没有这种能力,所以利用ZooKeeper实现注册中心的时候我们一定要问自己,如果把ZooKeeper所有节点全干掉,你生产上的所有服务调用链路能不受任何影响么?
阿里巴巴是不是完全没有使用 ZooKeeper?并不是。
熟悉阿里巴巴技术体系的都知道,其实阿里巴巴维护了目前国内最大规模的ZooKeeper集群,整体规模有近千台的ZooKeeper服务节点。
在粗粒度分布式锁,分布式选主,主备高可用切换等不需要高TPS支持的场景下有不可替代的作用,而这些需求往往多集中在大数据、离线任务等相关的业务领域,因为大数据领域,讲究分割数据集,并且大部分时间分任务多进程/线程并行处理这些数据集,但是总是有一些点上需要将这些任务和进程统一协调,这时候就是ZooKeeper发挥巨大作用的用武之地。
但是在交易场景交易链路上,在主业务数据存取,大规模服务发现、大规模健康监测等方面有天然的短板,应该竭力避免在这些场景下引入ZooKeeper,在阿里巴巴的生产实践中,应用对ZooKeeper申请使用的时候要进行严格的场景、容量、SLA需求的评估。
对于ZooKeeper,大数据使用,服务发现不用。
客户端配置
相对于RocketMQ的Broker集群,生产者和消费者都是客户端。
DefaultMQProducer
、TransactionMQProducer
、DefaultMQPushConsumer
、DefaultMQPullConsumer
都继承于ClientConfig
类,ClientConfig
为客户端的公共配置类。
客户端的配置都是get、set
形式,每个参数都可以用spring来配置,也可以在代码中配置。
例如namesrvAddr
这个参数可以这样配置,producer.setNamesrvAddr("192.168.0.1:9876")
,其他参数同理。
客户端寻址方式
RocketMQ可以令客户端找到NameServer, 然后通过NameServer再找到Broker。如下所示有多种配置方式,优先级由高到低,高优先级会覆盖低优先级。
- 代码中指定NameServer地址,多个namesrv地址之间用分号分割
producer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
consumer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
- Java启动参数中指定NameServer地址
-Drocketmq.namesrv.addr=192.168.0.1:9876;192.168.0.2:9876
- 环境变量指定NameServer地址
export NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876
- HTTP静态服务器寻址(默认)
该静态地址,客户端第一次会10s后调用,然后每个2分钟调用一次。
客户端启动后,会定时访问一个静态HTTP服务器,地址如下:http://jmenv.tbsite.net:8080/rocketmq/nsaddr,这个URL的返回内容如下:
192.168.0.1:9876;192.168.0.2:9876
源码:
org.apache.rocketmq.common.MixAll.java中:
推荐使用HTTP静态服务器寻址方式,好处是客户端部署简单,且NameServer集群可以热升级。因为只需要修改域名解析,客户端不需要重启。
客户端的公共配置
参数名 | 默认值 | 说明 |
namesrvAddr | NameServer地址列表,多个NameServer地址用分号隔开 | |
clientIP | 本机IP | 客户端本机IP地址,某些机器会发生无法识别客户端IP地址情况,需要应用在代码中强制指定 |
instanceName | DEFAULT | 客户端实例名称,客户端创建的多个Producer、Consumer实际是共用一个内部实例(这个实例包含网络连接、线程资源等) |
clientCallbackExecutorThreads | 4 | 通信层异步回调线程数 |
pollNameServerInteval | 30000 | 轮询NameServer间隔时间,单位毫秒 |
heartbeatBrokerInterval | 30000 | 向Broker发送心跳间隔时间,单位毫秒 |
persistConsumerOffsetInterval | 5000 | 持久化Consumer消费进度间隔时间,单位毫秒 |
Producer配置
参数名 | 默认值 | 说明 |
producerGroup | DEFAULT_PRODUCER | Producer组名,多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组 |
createTopicKey | TBW102 | 在发送消息时,自动创建服务器不存在的topic,需要指定Key,该Key可用于配置发送消息所在topic的默认路由 |
defaultTopicQueueNums | 4 | 在发送消息,自动创建服务器不存在的topic时,默认创建的队列数 |
sendMsgTimeout | 10000 | 发送消息超时时间,单位毫秒 |
compressMsgBodyOverHowmuch | 4096 | 消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节 |
retryAnotherBrokerWhenNotStoreOK | FALSE | 如果发送消息返回sendResult,但是sendStatus!=SEND_OK,是否重试发送 |
retryTimesWhenSendFailed | 2 | 如果消息发送失败,最大重试次数,该参数只对同步发送模式起作用 |
maxMessageSize | 4MB | 客户端限制的消息大小,超过报错,同时服务端也会限制,所以需要跟服务端配合使用 |
transactionCheckListener | 事务消息回查监听器,如果发送事务消息,必须设置 | |
checkThreadPoolMinSize | 1 | Broker回查Producer事务状态时,线程池最小线程数 |
checkThreadPoolMaxSize | 1 | Broker回查Producer事务状态时,线程池最大线程数 |
checkRequestHoldMax | 2000 | Broker回查Producer事务状态时,Producer本地缓冲请求队列大小 |
RPCHook | null | 该参数是在Producer创建时传入的,包含消息发送前的预处理和消息响应后的处理两个接口,用户可 以在第一个接口中做一些安全控制或者其他操作 |
PushConsumer配置
参数名 | 默认值 | 说明 |
consumerGroup | DEFAULT_CONSUMER | Consumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组 |
messageModel | CLUSTERING | 消费模型支持集群消费和广播消费两种 |
consumeFromWhere | CONSUME_FROM_LAST_OFFSET | Consumer启动后,默认从上次消费的位置开始消费,这包含两种情况:一种是上次消费的位置未过期,则消费从上次中止的位置进行;一种是上次消费位置已经过期,则从当前队列第一条消息开始消费 |
consumeTimestamp | 半个小时前 | 只有当consumeFromWhere值为CONSUME_FROM_TIMESTAMP 时才起作用 |
allocateMessageQueueStrategy | AllocateMessageQueueAveragely | Rebalance算法实现策略 |
subscription | 订阅关系 | |
messageListener | 消息监听器 | |
offsetStore | 消费进度存储 | |
consumeThreadMin | 10 | 消费线程池最小线程数 |
consumeThreadMax | 20 | 消费线程池最大线程数 |
consumeConcurrentlyMaxSpan | 2000 | 单队列并行消费允许的最大跨度 |
pullThresholdForQueue | 1000 | 拉消息本地队列缓存消息最大数 |
pullInterval | 0 | 拉消息间隔,由于是长轮询,所以为0,但是如果应用为了流控,也可以设置大于0的值,单位毫秒 |
consumeMessageBatchMaxSize | 1 | 批量消费,一次消费多少条消息 |
pullBatchSize | 32 | 批量拉消息,一次最多拉多少条 |
PullConsumer配置
参数名 | 默认值 | 说明 |
consumerGroup | DEFAULT_CONSUMER | Consumer组名,多个Consumer如果属 于一个应用,订阅同样的消息,且消 费逻辑一致,则应该将它们归为同一组 |
brokerSuspendMaxTimeMillis | 20000 | 长轮询,Consumer拉消息请求在 Broker挂起最长时间,单位毫秒 |
consumerTimeoutMillisWhenSuspend | 30000 | 长轮询,Consumer拉消息请求在 Broker挂起超过指定时间,客户端认为超时,单位毫秒 |
consumerPullTimeoutMillis | 10000 | 非长轮询,拉消息超时时间,单位毫秒 |
messageModel | BROADCASTING | 消息支持两种模式:集群消费和广播消费 |
messageQueueListener | 监听队列变化 | |
offsetStore | 消费进度存储 | |
registerTopics | 注册的topic集合 | |
allocateMessageQueueStrategy | AllocateMessageQueueAveragely | Rebalance算法实现策略 |
Message数据结构
字段名 | 默认值 | 说明 |
Topic | null | 必填,消息所属topic的名称 |
Body | null | 必填,消息体 |
Tags | null | 选填,消息标签,方便服务器过滤使用。目前只支持每个消息设置一个tag |
Keys | null | 选填,代表这条消息的业务关键词,服务器会根据keys创建哈希索引,设置后,可以在Console系统根据Topic、Keys来查询消息,由于是哈希索引,请尽可能保证key唯一,例如订单号,商品Id等 |
Flag | 0 | 选填,完全由应用来设置,RocketMQ不做干预 |
DelayTimeLevel | 0 | 选填,消息延时级别,0表示不延时,大于0会延时特定的时间才会被消费 |
WaitStoreMsgOK | TRUE | 选填,表示消息是否在服务器落盘后才返回应答 |
系统配置
JVM选项
设置Xms和Xmx一样大,防止JVM重新调整堆空间大小影响性能。
-server -Xms8g -Xmx8g -Xmn4g
设置DirectByteBuffer内存大小。当DirectByteBuffer占用达到这个值,就会触发Full GC。
-XX:MaxDirectMemorySize=15g
如果不太关心RocketMQ的启动时间,可以设置pre-touch,这样在JVM启动的时候就会分配完整的页空间。
-XX:+AlwaysPreTouch
禁用偏向锁可能减少JVM的停顿,因为偏向锁在线程需要获取锁之前会判断当前线程是否拥有锁,如果拥有,就不用再去获取锁了。
在并发小的时候使用偏向锁有利于提升JVM效率,在高并发场合禁用掉。
-XX:-UseBiasedLocking
推荐使用JDK1.8的G1垃圾回收器:
当在G1的GC日志中看到to-space overflow
或者to-space exhausted
的时候,表示G1没有足够的内存使用的(可能是 survivor
区不够了,可能是老年代不够了,也可能是两者都不够了),这时候表示Java堆占用大小已经达到了最大值。比如: 924.897: [GC pause (G1 Evacuation Pause)(mixed) (to-space exhausted), 0.1957310 secs] 924.897: [GC pause (G1 Evacuation Pause) (mixed) (to-space overflow), 0.1957310 secs] 。
为了解决这个问题,请尝试做以下调整:
- 增加预留内存:增大参
-XX:G1ReservePercent
的值(相应的增加堆内存)来增加预留内存 - 更早的开始标记周期:减小
-XX:InitiatingHeapOccupancyPercent
参数的值,以更早的开始标记周期 - 增加并发收集线程数:增大
-XX:ConcGCThreads
参数值,以增加并行标记线程数
对G1而言,大小超过region大小50%的对象将被认为是大对象,这种大对象将直接被分配到老年代的humongous regions中,humongous regions是连续的region集合,StartsHumongous表记集合从那里开始,ContinuesHumongous标记连续集合。
在分配大对象之前,将会检查标记阈值,如果有必要的话,还会启动并发周期。
死亡的大对象会在标记周期的清理阶段和发生Full GC的时候被清理。
为了减少复制开销,任何转移阶段都不包含大对象的复制。在Full GC时,G1在原地压缩大对象。
因为每个独立的humongous regions只包含一个大对象,因此从大对象的结尾到它占用的最后一个region的结尾的那部分空间时没有被使用的,对于那些大小略大于region整数倍的对象,这些没有被使用的内存将导致内存碎片化。
如果你看到因为大对象的分配导致不断的启动并发收集,并且这种分配使得老年代碎片化不断加剧,那么请增加-XX:G1HeapRegionSize
参数的值,这样的话,大对象将不再被G1认为是大对象,它会走普通对象的分配流程。
# G1回收器将堆空间划分为1024个region,此选项指定堆空间region的大小
-XX:+UseG1GC -XX:G1HeapRegionSize=16m
-XX:G1ReservePercent=25
-XX:InitiatingHeapOccupancyPercent=30
上述设置可能有点儿激进,但是对于生产环境,性能很好。
-XX:MaxGCPauseMillis
不要设置的太小,否则JVM会使用小的年轻代空间以达到此设置的值,同时引起很频繁的minor GC。
推荐使用GC log文件:
-XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=5
-XX:GCLogFileSize=30m
如果写GC文件增加了Broker的延迟,可以考虑将GC log文件写到内存文件系统:
-Xloggc:/dev/shm/mq_gc_%p.log123
Linux内核参数
os.sh脚本在bin文件夹中列出了许多内核参数,可以进行微小的更改然后用于生产用途。下面的参数需要注意,更多细节请参考/proc/sys/vm/*的文档。
# 获取内核参数值
sysctl vm.extra_free_kbytes
# 设置内核参数值
sudo sysctl -w vm.overcommit_memory=1
- vm.extra_free_kbytes:告诉VM在后台回收(kswapd)启动的阈值与直接回收(通过分配进程)的阈值之间保留额外的可用内存。RocketMQ使用此参数来避免内存分配中的长延迟(与具体内核版本相关)
- vm.min_free_kbytes:如果将其设置为低于1024KB,将会巧妙的将系统破坏,并且系统在高负载下容易出现死锁
- vm.max_map_count:限制一个进程可能具有的最大内存映射区域数。RocketMQ将使用mmap加载CommitLog和ConsumeQueue,因此建议将为此参数设置较大的值
- vm.swappiness:定义内核交换内存页面的积极程度。较高的值会增加攻击性,较低的值会减少交换量。建议将值设置为10来避免交换延迟
- File descriptor limits:RocketMQ需要为文件(CommitLog和ConsumeQueue)和网络连接打开文件描述符。我们建议设置文件描述符的值为655350
echo '* hard nofile 655350' >> /etc/security/limits.conf
- Disk scheduler:RocketMQ建议使用I/O截止时间调度器,它试图为请求提供有保证的延迟
动态扩缩容
动态增减Namesrv机器
NameServer是RocketMQ集群的协调者,集群的各个组件是通过NameServer获取各种属性和地址信息的。
主要功能包括两部分:
- 各个Broker定期上报自己的状态信息到NameServer
- 各个客户端,包括Producer、Consumer,以及命令行工具,通过NameServer获取最新的状态信息
所以,在启动Broker、生产者和消费者之前,必须告诉它们NameServer的地址,为了提高可靠性,建议启动多个NameServer。NameServer占用资源不多,可以和Broker部署在同一台机器。有多个NameServer后,减少某个NameServer不会对其他组件产生影响。
动态增减Broker机器
由于业务增长,需要对集群进行扩容的时候,可以动态增加Broker角色的机器。只增加Broker不会对原有的Topic产生影响,原来创建好的Topic中数据的读写依然在原来的那些Broker上进行。
集群扩容后,一是可以把新建的Topic指定到新的Broker机器上,均衡利用资源;另一种方式是通过updateTopic命令更改现有的Topic配置,在新加的Broker上创建新的队列。比如TestTopic是现有的一个Topic,因为数据量增大需要扩容,新增的一个Broker机器地址是192.168.0.1:10911,这个时候执行下面的命令:sh ./bin/mqadmin updateTopic -b 192.168.0.1:10911 -t TestTopic -n 192.168.0.100:9876 -w 8 -r 8
,结果是在新增的Broker机器上,为TestTopic新创建了8个读写队列。
mqadmin updateTopic -b <arg> | -c <arg> [-h] [-n <arg>] [-o <arg>] [-p <arg>] [-r
<arg>] [-s <arg>] -t <arg> [-u <arg>] [-w <arg>]
[root@node1 ~]# mqadmin topicStatus -n localhost:9876 -t tp_demo_07
[root@node1 ~]# mqadmin updateTopic -b node2:10911 -t tp_demo_07 -n
'node1:9876;node2:9876' -w 8 -r 8
[root@node1 ~]# mqadmin topicStatus -n localhost:9876 -t tp_demo_07
如果因为业务变动或者置换机器需要减少Broker,此时该如何操作呢?减少Broker要看是否有持续运行的Producer,当一个Topic只有一个Master Broker,停掉这个Broker后,消息的发送肯定会受到影响,需要在停止这个Broker前,停止发送消息。
当某个Topic有多个Master Broker,停了其中一个,这时候是否会丢失消息呢?答案和Producer使用的发送消息的方式有关,如果使用同步方式send(msg)发送,在DefaultMQProducer
内部有个自动重试逻辑,其中一个Broker停了,会自动向另一个Broker发消息,不会发生丢消息现象。如果使用异步方式发送send(msg,callback),或者用sendOneWay
方式,会丢失切换过程中的消息。因为在异步和sendOneWay
这两种发送方式下,Producer.setRetryTimesWhenSendFailed
设置不起作用,发送失败不会重试。DefaultMQProducer
默认每30秒到NameServer请求最新的路由消息,Producer如果获取不到已停止的Broker下的队列信息,后续就自动不再向这些队列发送消息。
如果Producer程序能够暂停,在有一个Master和一个Slave的情况下也可以顺利切换。可以关闭Producer后关闭Master Broker,这个时候所有的读取都会被定向到Slave机器,消费消息不受影响。把Master Broker机器置换完后,基于原来的数据启动这个Master Broker,然后再启动Producer程序正常发送消息。
用Linux的kill pid
命令就可以正确地关闭Broker,BrokerController
下有个shutdown
函数,这个函数被加到了ShutdownHook
里,当用Linux的kill命令时(不能用kill-9),shutdown
函数会先被执行。也可以通过RocketMQ提供的工具(mqshutdown broker
)来关闭Broker,它们的原理是一样的。
各种故障对消息的影响
我们期望消息队列集群一直可靠稳定地运行,但有时候故障是难免的,本节我们列出可能的故障情况,看看如何处理:
- Broker正常关闭,启动
- Broker异常Crash,然后启动
- OS Crash,重启
- 机器断电,但能马上恢复供电
- 磁盘损坏
- CPU、主板、内存等关键设备损坏
假设现有的RocketMQ集群,每个Topic都配有多Master角色的Broker供写入,并且每个Master都至少有一个Slave机器(用两台物理机就可以实现上述配置),我们来看看在上述情况下消息的可靠性情况。
第1种情况属于可控的软件问题,内存中的数据不会丢失。如果重启过程中有持续运行的Consumer,Master机器出故障后,Consumer会自动重连到对应的Slave机器,不会有消息丢失和偏差。当Master角色的机器重启以后,Consumer又会重新连接到Master机器(注意在启动Master机器的时候,如果Consumer正在从Slave消费消息,不要停止Consumer。假如此时先停止Consumer后再启动Master机器,然后再启动Consumer,这个时候Consumer就会去读Master机器上已经滞后的offset值,造成消息大量重复)。
如果第1种情况出现时有持续运行的Producer,一台Master出故障后,Producer只能向Topic下其他的Master机器发送消息,如果Producer采用同步发送方式,不会有消息丢失。
第2、3、4种情况属于软件故障,内存的数据可能丢失,所以刷盘策略不同,造成的影响也不同,如果Master、Slave都配置成SYNC_FLUSH,可以达到和第1种情况相同的效果。
第5、6种情况属于硬件故障,发生第5、6种情况的故障,原有机器的磁盘数据可能会丢失。如果Master和Slave机器间配置成同步复制方式,某一台机器发生5或6的故障,也可以达到消息不丢失的效果。如果Master和Slave机器间是异步复制,两次Sync间的消息会丢失。
总的来说,当设置成:
- 多Master,每个Master带有Slave
- 主从之间设置成SYNC_MASTER
- Producer用同步方式写
- 刷盘策略设置成SYNC_FLUSH
就可以消除单点依赖,即使某台机器出现极端故障也不会丢消息。
以上就是本文的全部内容了。欢迎小伙伴们积极留言交流~~~
文章评论