消息可靠性
消息可靠性指的是我们在支付场景中通过异步方式来实现而不出现单边账的问题。也就是说,我们放弃了分布式锁(因为可能会影响性能)而使用异步消息处理的方式来达到数据的最终一致性。
异常捕获机制
先执行行业务操作,业务操作成功后执行行消息发送,消息发送过程通过try catch 方式捕获异常,在异常处理理的代码块中执行行回滚业务操作或者执行行重发操作等。这是一种最大努力确保的方式,并无法保证100%绝对可靠,因为这里没有异常并不代表消息就一定投递成功。
另外,可以通过spring.rabbitmq.template.retry.enabled=true
配置开启发送端的重试。
AMQP/RabbitMQ的事务机制
没有捕获到异常并不能代表消息就一定投递成功了。一直到事务提交后都没有异常,确实就说明消息是投递成功了。但是,这种方式在性能方面的开销比较大,一般也不推荐使用。
发送端确认机制
RabbitMQ后来引入了一种轻量量级的方式,叫发送方确认(publisher confirm)机制。生产者将信道设置成confirm(确认)模式,一旦信道进入confirm 模式,所有在该信道上面面发布的消息都会被指派一个唯一的ID(从1 开始),一旦消息被投递到所有匹配的队列之后(如果消息和队列是持久化的,那么确认消息会在消息持久化后发出),RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这样生产者就知道消息已经正确送达了。
RabbitMQ 回传给生产者的确认消息中的deliveryTag
字段包含了确认消息的序号,另外,通过设置channel.basicAck
方法中的multiple
参数,表示到这个序号之前的所有消息是否都已经得到了处理了。生产者投递消息后并不需要一直阻塞着,可以继续投递下一条消息并通过回调方式处理理ACK响应。如果 RabbitMQ 因为自身内部错误导致消息丢失等异常情况发生,就会响应一条nack(Basic.Nack)命令,生产者应用程序同样可以在回调方法中处理理该nack命令。
我们下面通过三个案例来详细了解一下:
第一个案例就是最简单的开启确认机制并逐条确认消息:
public class PublisherConfirmsProducer {
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@node1:5672/%2f");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 向RabbitMQ服务器发送AMQP命令,将当前通道标记为发送方确认通道
final AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();
channel.queueDeclare("queue.pc", true, false, false, null);
channel.exchangeDeclare("ex.pc", "direct", true, false, null);
channel.queueBind("queue.pc", "ex.pc", "key.pc");
// 发送消息
channel.basicPublish("ex.pc", "key.pc", null, "hello world".getBytes());
try {
// 同步的方式等待RabbitMQ的确认消息
channel.waitForConfirmsOrDie(5_000);
System.out.println("发送的消息已经得到确认");
} catch (IOException ex) {
System.out.println("消息被拒收");
} catch (IllegalStateException ex) {
System.out.println("发送消息的通道不是PublisherConfirms通道");
} catch (TimeoutException ex) {
System.out.println("等待消息确认超时");
}
channel.close();
connection.close();
}
}
第二个案例为批量确认消息:
public class PublisherConfirmsProducer2 {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@node1:5672/%2f");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 向RabbitMQ服务器发送AMQP命令,将当前通道标记为发送方确认通道
final AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();
channel.queueDeclare("queue.pc", true, false, false, null);
channel.exchangeDeclare("ex.pc", "direct", true, false, null);
channel.queueBind("queue.pc", "ex.pc", "key.pc");
String message = "hello-";
// 批处理的大小
int batchSize = 10;
// 用于对需要等待确认消息的计数
int outstrandingConfirms = 0;
for (int i = 0; i < 103; i++) {
channel.basicPublish("ex.pc", "key.pc", null, (message + i).getBytes());
outstrandingConfirms++;
if (outstrandingConfirms == batchSize) {
// 此时已经有一个批次的消息需要同步等待broker的确认消息
// 同步等待
channel.waitForConfirmsOrDie(5_000);
System.out.println("消息已经被确认了");
outstrandingConfirms = 0;
}
}
if (outstrandingConfirms > 0) {
channel.waitForConfirmsOrDie(5_000);
System.out.println("剩余消息已经被确认了");
}
channel.close();
connection.close();
}
}
第三种方式为异步处理MQ服务器发送的确认回执:
public class PublisherConfirmsProducer3 {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@node1:5672/%2f");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 向RabbitMQ服务器发送AMQP命令,将当前通道标记为发送方确认通道
final AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();
channel.queueDeclare("queue.pc", true, false, false, null);
channel.exchangeDeclare("ex.pc", "direct", true, false, null);
channel.queueBind("queue.pc", "ex.pc", "key.pc");
ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
ConfirmCallback clearOutstandingConfirms = (deliveryTag, multiple) -> {
if (multiple) {
System.out.println("编号小于等于 " + deliveryTag + " 的消息都已经被确认了");
final ConcurrentNavigableMap<Long, String> headMap
= outstandingConfirms.headMap(deliveryTag, true);
// 清空outstandingConfirms中已经被确认的消息信息
headMap.clear();
} else {
// 移除已经被确认的消息
outstandingConfirms.remove(deliveryTag);
System.out.println("编号为:" + deliveryTag + " 的消息被确认");
}
};
// 设置channel的监听器,处理确认的消息和不确认的消息
channel.addConfirmListener(clearOutstandingConfirms, (deliveryTag, multiple) -> {
if (multiple) {
// 将没有确认的消息记录到一个集合中
// 此处省略实现
System.out.println("消息编号小于等于:" + deliveryTag + " 的消息不确认");
} else {
System.out.println("编号为:" + deliveryTag + " 的消息不确认");
}
});
String message = "hello-";
for (int i = 0; i < 1000; i++) {
// 获取下一条即将发送的消息的消息ID
final long nextPublishSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish("ex.pc", "key.pc", null, (message + i).getBytes());
System.out.println("编号为:" + nextPublishSeqNo + " 的消息已经发送成功,尚未确认");
outstandingConfirms.put(nextPublishSeqNo, (message + i));
}
// 等待消息被确认
Thread.sleep(10000);
channel.close();
connection.close();
}
}
使用SpringBoot同样可以设置消息确认机制:
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
private RabbitTemplate rabbitTemplate;
@Autowired
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
this.rabbitTemplate.setConfirmCallback((correlationData, flag, cause) -> {
if (flag) {
try {
System.out.println("消息确认:" + correlationData.getId() + " " + new String(correlationData.getReturnedMessage().getBody(), "utf-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
} else {
System.out.println(cause);
}
});
}
持久化存储机制
持久化是提高RabbitMQ可靠性的基础,否则当RabbitMQ遇到异常时(如:重启、断电、停机等)数据将会丢失。主要从以下几个方面来保障消息的持久性:
- Exchange的持久化。通过定义时设置
durable
参数为ture来保证Exchange相关的元数据不丢失 - Queue的持久化。也是通过定义时设置durable 参数为ture来保证Queue相关的元数据不丢失
- 消息的持久化。通过将消息的投递模式 (
BasicProperties
中的deliveryMode
属性)设置为2即可实现消息的持久化,保证消息自身不丢失
/**
* 消息持久化
*/
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@node1:5672/%2f");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// durable:true表示是持久化消息队列
channel.queueDeclare("queue.persistent", true, false, false, null);
// 持久化的交换器
channel.exchangeDeclare("ex.persistent", "direct", true, false, null);
channel.queueBind("queue.persistent", "ex.persistent", "key.persistent");
final AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 表示是持久化消息
.build();
channel.basicPublish("ex.persistent",
"key.persistent",
properties, // 设置消息的属性,此时消息是持久化消息
"hello world".getBytes());
channel.close();
connection.close();
}
}
RabbitMQ中的持久化消息都需要写入磁盘(当系统内存不足时,非持久化的消息也会被刷盘处理),这些处理动作都是在“持久层”中完成的。持久层是一个逻辑上的概念,实际包含两个部分:
- 队列索引(
rabbit_queue_index
)。rabbit_queue_index
负责维护Queue中消息的信息,包括消息的存储位置、是否已交给消费者、是否已被消费及Ack确认等,每个Queue都有与之对应的rabbit_queue_index
- 消息存储(
rabbit_msg_store
)。rabbit_msg_store
以键值对的形式存储消息,它被所有队列列共享,在每个节点中有且只有一个
下图中,RABBITMQ_HOME/var/lib/mnesia/rabir@HOSTNAME/msg_stores/vhosts/$VHostId
这个路路径下包含 queues
、msg_store_persistent
、msg_store_transient
这 3 个目录,这是实际存储消息的位置。其中queues
目录中保存着rabbit_queue_index
相关的数据,而msg_store_persistent
保存着持久化消息数据,msg_store_transient
保存着非持久化相关的数据。
另外,RabbitMQ通过配置queue_index_embed_msgs_below
可以根据消息大小决定存储位置,默认queue_index_embed_msgs_below
是4096字节(包含消息体、属性及headers),小于该值的消息存在rabbit_queue_index
中。
Consumer ACK
如何保证消息被消费者成功消费?
前面我们介绍了生产者发送确认机制和消息的持久化存储机制,然而这依然无法完全保证整个过程的可靠性,因为如果消息被消费过程中业务处理失败了但是消息却已经出列了(被标记为已消费了),我们又没有任何重试,那结果跟消息丢失没什么分别。
RabbitMQ在消费端会有Ack机制,即消费端消费消息后需要发送Ack确认报文给Broker端,告知自己是否已消费完成,否则可能会一直重发消息直到消息过期(AUTO模式)。
这也是我们之前一直在说的“最终一致性”、“可恢复性” 的基础。
一般而言,我们有如下处理手段:
- 采用NONE模式,消费的过程中自行捕获异常,引发异常后直接记录日志并落到异常恢复表,再通过后台定时任务扫描异常恢复表尝试做重试动作。如果业务不自行处理则有丢失数据的风险
- 采用AUTO(自动Ack)模式,不主动捕获异常,当消费过程中出现异常时会将消息放回Queue中,然后消息会被重新分配到其他消费者节点(如果没有则还是选择当前节点)重新被消费,默认会一直重发消息并直到消费完成返回Ack或者一直到过期
- 采用MANUAL(手动Ack)模式,消费者自行控制流程并手动调用channel相关的方法返回Ack
下面我们看一组示例:
MyProducer.java
public class MyProducer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@rabbitmq-host:5672/%2f");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare("queue.ca", false, false, false, null);
channel.exchangeDeclare("ex.ca", "direct", false, false, null);
channel.queueBind("queue.ca", "ex.ca", "key.ca");
for (int i = 0; i < 5; i++) {
channel.basicPublish("ex.ca", "key.ca", null, ("hello-" + i).getBytes());
}
channel.close();
connection.close();
}
}
MyConsumer.java
public class MyConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@rabbitmq-host:5672/%2f");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare("queue.ca", false, false, false, null);
// 拉消息的模式
// final GetResponse getResponse = channel.basicGet("queue.ca", false);
// channel.basicReject(getResponse.getEnvelope().getDeliveryTag(), true);
// 推消息模式
// autoAck:false表示手动确认消息
channel.basicConsume("queue.ca", false, "myConsumer", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println(new String(body));
// 确认消息
// channel.basicAck(envelope.getDeliveryTag(), false);
// 第一个参数是消息的标签,第二个参数表示不确认多个消息还是一个消息
// 第三个参数表示不确认的消息是否需要重新入列,然后重发
// 可以用于拒收多条消息
// channel.basicNack(envelope.getDeliveryTag(), false, true);
// 用于拒收一条消息
// 对于不确认的消息,是否重新入列,然后重发
// channel.basicReject(envelope.getDeliveryTag(), true);
channel.basicReject(envelope.getDeliveryTag(), false);
}
});
// channel.close();
// connection.close();
}
}
SpringBoot的方式:
/**
* NONE模式,则只要收到消息后就立即确认(消息出列,标记已消费),有丢失数据的风险
* AUTO模式,看情况确认,如果此时消费者抛出异常则消息会返回到队列中
* MANUAL模式,需要显式的调用当前channel的basicAck方法
* @param channel
* @param deliveryTag
* @param message
*/
@RabbitListener(queues = "lagou.topic.queue", ackMode = "AUTO")
public void handleMessageTopic(Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, @Payload byte[]
message) {
System.out.println("RabbitListener消费消息,消息内容:" + new
String((message)));
try {
// 手动ack,deliveryTag表示消息的唯一标志,multiple表示是否是批量确认
channel.basicAck(deliveryTag, false);
// 手动nack,告诉broker消费者处理失败,最后一个参数表示是否需要将消息重新
入列
channel.basicNack(deliveryTag, false, true);
// 手动拒绝消息。第二个参数表示是否重新入列
channel.basicReject(deliveryTag, true);
} catch (IOException e) {
e.printStackTrace();
}
}
上面是通过在消费端直接配置指定ackMode,在一些比较老的spring项目中一般是通过xml方式去定义、声明和配置的,不管是XML还是注解,相关配置、属性这些其实都是大同小异,触类旁通。然后需要注意的是channel.basicAck这几个手工Ack确认的方法。
SpringBoot项目中支持如下的一些配置:
# 最大重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=5
# 是否开启消费者重试(为false时关闭消费者重试,意思不是“不重试”,而是一直收到消息直到ack
确认或者一直到超时)
spring.rabbitmq.listener.simple.retry.enabled=true
# 重试间隔时间(单位毫秒)
spring.rabbitmq.listener.simple.retry.initial-interval=5000
# 重试超过最大次数后是否拒绝
spring.rabbitmq.listener.simple.default-requeue-rejected=false
# ack模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual
本小节的内容总结起来就如图所示,本质上就是“请求/应答”确认模式。
消费端限流
在电商的秒杀活动中,活动一开始会有大量并发写请求到达服务端,需要对消息进行削峰处理,如何削峰?
当消息投递速度远快于消费速度时,随着时间积累就会出现“消息积压”。消息中间件本身是具备一定的缓冲能力的,但这个能力是有容量限制的,如果长期运行并没有任何处理,最终会导致Broker崩溃,而分布式系统的故障往往会发生上下游传递,连锁反应那就会很悲剧…
下面我将从多个角度介绍QoS与限流,防止上面的悲剧发生。
- RabbitMQ 可以对内存和磁盘使用量设置阈值,当达到阈值后,生产者将被阻塞(block),直到对应项指标恢复正常。全局上可以防止超大流量、消息积压等导致的Broker被压垮。当内存受限或磁盘可用空间受限的时候,服务器都会暂时阻止连接,服务器将暂停从发布消息的已连接客户端的套接字读取数据。连接心跳监视也将被禁用。所有网络连接将在rabbitmqctl和管理插件中显示为“已阻止”,这意味着它们已发布,现在已暂停。兼容的客户端被阻止时将收到通知
在/etc/rabbitmq/rabbitmq.conf
中配置磁盘可用空间大小:
- RabbitMQ还默认提供了一种基于
credit flow
的流控机制,面向每一个连接进行流控。当单个队列达到最大流速时,或者多个队列达到总流速时,都会触发流控。触发单个链接的流控可能是因为connection
、channel
、queue
的某一个过程处于flow
状态,这些状态都可以从监控平台看到
- RabbitMQ中有一种QoS保证机制,可以限制Channel上接收到的未被Ack的消息数量,如果超过这个数量限制RabbitMQ将不会再往消费端推送消息。这是一种流控手段,可以防止大量消息瞬时从Broker送达消费端造成消费端巨大压力(甚至压垮消费端)。比较值得注意的是QoS机制仅对于消费端推模式有效,对拉模式无效。而且不支持NONE Ack模式。执行
channel.basicConsume
方法之前通过channel.basicQoS
方法可以设置该数量。消息的发送是异步的,消息的确认也是异步的。在消费者消费慢的时候,可以设置Qos的prefetchCount
,它表示broker在向消费者发送消息的时候,一旦发送了prefetchCount
个消息而没有一个消息确认的时候,就停止发送。消费者确认一个,broker就发送一个,确认两个就发送两个。换句话说,消费者确认多少,broker就发送多少,消费者等待处理的个数永远限制在prefetchCount
个。如果对于每个消息都发送确认,增加了网络流量,此时可以批量确认消息。如果设置了multiple
为true
,消费者在确认的时候,比如说id是8的消息确认了,则在8之前的所有消息都确认了
public class MyConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@rabbitmq-host:5672/%2f");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare("queue.qos", false, false, false, null);
// 使用basic做限流,仅对消息推送模式生效。
// 表示Qos是10个消息,最多有10个消息等待确认
channel.basicQos(10);
// 表示最多10个消息等待确认。如果global设置为true,则表示只要是使用当前的channel的Consumer,该设置都生效
// false表示仅限于当前Consumer
channel.basicQos(10, false);
// 第一个参数表示未确认消息的大小,Rabbit没有实现,不用管。
channel.basicQos(1000, 10, true);
channel.basicConsume("queue.qos", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
// some code going on
// 可以批量确认消息,减少每个消息都发送确认带来的网络流量负载。
channel.basicAck(envelope.getDeliveryTag(), true);
}
});
channel.close();
connection.close();
}
}
生产者往往是希望自己产生的消息能快速投递出去,而当消息投递太快且超过了下游的消费速度时就容易出现消息积压/堆积,所以,从上游来讲我们应该在生产端应用程序中也可以加入限流、应急开关等控制手段,避免超过Broker端的极限承载能力或者压垮下游消费者。
再看看下游,我们期望下游消费端能尽快消费完消息,而且还要防止瞬时大量消息压垮消费端(推模式),我们期望消费端处理速度是最快、最稳定而且还相对均匀(比较理想化)。
提升下游应用的吞吐量和缩短消费过程的耗时,优化主要以下几种方式:
- 优化应用程序的性能,缩短响应时间(需要时间)
- 增加消费者节点实例(成本增加,而且底层数据库操作这些也可能是瓶颈)
- 调整并发消费的线程数(线程数并非越大越好,需要大量压测调优至合理值)
@Bean
public RabbitListenerContainerFactory
rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
// SimpleRabbitListenerContainerFactory发现消息中有content_type有text
就会默认将其
// 转换为String类型的,没有content_type都按byte[]类型
SimpleRabbitListenerContainerFactory factory = new
SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// 设置并发线程数
factory.setConcurrentConsumers(10);
// 设置最大并发线程数
factory.setMaxConcurrentConsumers(20);
return factory;
}
消息幂等性处理
追求高性能就无法保证消息的顺序,而追求可靠性那么就可能产生重复消息,从而导致重复消费…真是应证了那句老话:做架构就是权衡取舍。
RabbitMQ层面有实现“去重机制”来保证“恰好一次”吗?答案是并没有。而且这个在目前主流的消息中间件都没有实现。
当为了在基础的分布式中间件中实现某种相对不太通用的功能,需要牺牲到性能、可靠性、扩展性时,并且会额外增加很多复杂度,最简单的办法就是交给业务自己去处理。事实证明,很多业务场景下是可以容忍重复消息的。例如:操作日志收集,而对一些金融类的业务则要求比较严苛。
一般解决重复消息的办法是,在消费端让我们消费消息的操作具备幂等性。
幂等性问题并不是消息系统独有,而是(分布式)系统中普遍存在的问题。例如:RPC框架调用超后会重试,HTTP请求会重复发起(用户手抖多点了几下按钮)。
幂等(Idempotence)是一个数学上的概念,它是这样定义的:
这个概念被拓展到计算机领域,被用来描述一个操作、方法或者服务。一个幂等操作的特点是,其任意多次执行所产生的影响均与一次执行的影响相同。一个幂等的方法,使用同样的参数,对它进行多次调用和一次调用,对系统产生的影响是一样的。
对于幂等的方法,不用担心重复执行会对系统造成任何改变。
业界对于幂等性的一些常见做法:
- 借助数据库唯一索引,重复插入直接报错,事务回滚。还是举经典的转账的例子,为了保证不重复扣款或者重复加钱,我们这边维护一张“资金变动流水表”,里面至少需要交易单号、变动账户、变动金额等3个字段。我们选择交易单号和变动账户做联合唯一索引(单号是上游生成的可保证唯一性),这样如果同一笔交易发生重复请求时就会直接报索引冲突,事务直接回滚。现实中,数据库唯一索引的方式通常做为兜底保证
- 前置检查机制。这个很容易理解,并且有几种实现办法。还是引用上面转账的例子,当我在执行更改账户余额这个动作之前,我得先检查下资金变动流水表(或者Tair中)中是否已经存在这笔交易相关的记录了,如果已经存在,那么直接返回,否则执行正常的更新余额的动作。为了防止并发问题,我们通常需要借助“排他锁”来完成。在支付宝有一条铁律叫:一锁、二判、三操作。当然,我们也可以使用乐观锁或CAS机制,乐观锁一般会使用扩展一个版本号字段做判断条件
- 唯一Id机制,比较通用的方式。对于每条消息我们都可以生成唯一Id,消费前判断Tair中是否存在(MsgId做Tair排他锁的key),消费成功后将状态写入Tair中,这样就可以防止重复消费了
对于接口请求类的幂等性保证要相对更复杂,我们通常要求上游请求时传递一个类GUID的请求号(或TOKEN),如果我们发现已经存在了并且上一次请求处理结果是成功状态的(有时候上游的重试请求是正常诉求,我们不能将上一次异常/失败的处理结果返回或者直接提示“请求异常”,如果这样重试就变得没意义了)则不继续往下执行,直接返回“重复请求”的提示和上次的处理结果(上游通常是由于请求超时等未知情况才发起重试的,所以直接返回上次请求的处理结果就好了)。如果请求ID都不存在或者上次处理结果是失败/异常的,那就继续处理流程,并最终记录最终的处理结果。这个请求序号由上游自己生成,上游通用需要根据请求参数、时间间隔等因子来生成请求ID。同样也需要利用这个请求ID做分布式锁的KEY实现排他。
可靠性分析
在使用任何消息中间件的过程中,难免会出现消息丢失等异常情况,这个时候就需要有一个良好的机制来跟踪记录消息的过程(轨迹溯源),帮助我们排查问题。
在RabbitMQ 中可以使用Firehose
功能来实现消息追踪,Firehose
可以记录每一次发送或者消费消息的记录,方便RabbitMQ的使用者进行调试、排错等。
Firehose
的原理是将生产者投递给RabbitMQ的消息,或者RabbitMQ投递给消费者的消息按照指定的格式发送到默认的交换器上。这个默认的交换器的名称为amq.rabbitmq.trace
,它是一个Topic
类型的交换器。发送到这个交换器上的消息的路由键为publish.{exchangename}
和deliver.{queuename}
。其中exchangename
和queuename
为交换器和队列的名称,分别对应生产者投递到交换器的消息和消费者从队列中获取的消息。
开启Firehose
命令:
rabbitmqctl trace_on [-p vhost]
其中[-p vhost]
是可选参数,用来指定虚拟主机vhost。
对应的关闭命令为:
rabbitmqctl trace_off [-p vhost]
Firehose
默认情况下处于关闭状态,并且Firehose
的状态是非持久化的,会在RabbitMQ服务重启的时候还原成默认的状态。Firehose
开启之后多少会影响RabbitMQ 整体服务性能,因为它会引起额外的消息生成、路由和存储。
rabbitmq_tracing
插件相当于Firehose
的GUI 版本,它同样能跟踪RabbitMQ中消息的流入流出情况。rabbitmq_tracing
插件同样会对流入流出的消息进行封装,然后将封装后的消息日志存入相应的trace
文件中。
可以使用如下命令开启插件:
rabbitmq-plugins enable rabbitmq_tracing
使用如下命令禁用插件:
rabbitmq-plugins disable rabbitmq_tracing
Name
表示rabbitmq_tracing
的一个条目的名称,Format
可以选择Text或JSON,连接的用户名写root,密码写123456。
Pattern:发布的消息:publish.<exname>
Pattern:消费的消息:deliver.<queuename>
TTL机制
在京东下单,订单创建成功,等待支付,一般会给30分钟的时间,开始倒计时。如果在这段时间内用户没有支付,则默认订单取消。
TTL,Time to Live 的简称,即过期时间。RabbitMQ可以对消息和队列两个维度来设置TTL。
任何消息中间件的容量和堆积能力都是有限的,如果有一些消息总是不被消费掉,那么需要有一种过期的机制来做兜底。
目前有两种方法可以设置消息的TTL:
- 通过Queue属性设置,队列中所有消息都有相同的过期时间
- 对消息自身进行单独设置,每条消息的TTL可以不同
如果两种方法一起使用,则消息的TTL 以两者之间较小数值为准。通常来讲,消息在队列中的生存时间一旦超过设置的TTL 值时,就会变成“死信”(Dead Message),消费者默认就无法再收到该消息。
原生API案例:
public class Producer {
public static void main(String[] args) throws NoSuchAlgorithmException, KeyManagementException, URISyntaxException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@rabbitmq-host:5672/%2f");
try (final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel()) {
Map<String, Object> arguments = new HashMap<>();
// 消息队列中消息过期时间,10s,过期后会把消息放到死信队列中
arguments.put("x-message-ttl", 10 * 1000);
// 如果消息队列没有消费者,则60s后消息过期,消息队列也删除
// arguments.put("x-expires", 10 * 1000);
arguments.put("x-expires", 60 * 1000);
channel.queueDeclare("queue.ttl.waiting",
true,
false,
false,
arguments);
channel.exchangeDeclare("ex.ttl.waiting",
"direct",
true,
false,
null);
channel.queueBind("queue.ttl.waiting", "ex.ttl.waiting", "key.ttl.waiting");
final AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.contentEncoding("utf-8")
.deliveryMode(2) // 持久化的消息
.build();
channel.basicPublish("ex.ttl.waiting",
"key.ttl.waiting",
null,
("O" + new Random().nextInt(Integer.MAX_VALUE)).getBytes("utf-8"));
System.out.println("消息发送在: " + new SimpleDateFormat("yyyy-MM-ss HH:mm:ss").format(new Date()));
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
此外,还可以通过命令行方式设置全局TTL,执行如下命令:
rabbitmqctl set_policy TTL ".*" '{"message-ttl":30000}' --apply-to queues
默认规则:
- 如果不设置TTL,则表示此消息不会过期
- 如果TTL设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃
SpringBoot案例:
@Bean
public Queue queueTTLWaiting() {
Map<String, Object> props = new HashMap<>();
// 对于该队列中的消息,设置都等待10s
props.put("x-message-ttl", 10000);
Queue queue = new Queue("q.pay.ttl-waiting", false, false,
false, props);
return queue;
}
@RestController
public class PayController {
@Autowired
private AmqpTemplate rabbitTemplate;
@RequestMapping("/pay/queuettl")
public String sendMessage() {
rabbitTemplate.convertAndSend("ex.pay.ttl-waiting", "pay.ttl-waiting", "发送了TTL-WAITING-MESSAGE");
return "queue-ttl-ok";
}
@RequestMapping("/pay/msgttl")
public String sendTTLMessage() throws UnsupportedEncodingException {
MessageProperties properties = new MessageProperties();
properties.setExpiration("5000");
Message message = new Message("发送了WAITING-MESSAGE".getBytes("utf-8"), properties);
rabbitTemplate.convertAndSend("ex.pay.waiting", "pay.waiting", message);
return "msg-ttl-ok";
}
}
死信队列
DLX,全称为Dead-Letter-Exchange,死信交换器。消息在一个队列中变成死信(Dead Letter)之后,被重新发送到一个特殊的交换器(DLX)中,同时,绑定DLX的队列就称为“死信队列”。
以下几种情况导致消息变为死信:
- 消息被拒绝(
Basic.Reject/Basic.Nack
),并且设置requeue
参数为false
- 消息过期
- 队列达到最大长度
对于RabbitMQ 来说,DLX是一个非常有用的特性。它可以处理异常情况下,消息不能够被消费者正确消费(消费者调用了Basic.Nack
或者Basic.Reject
)而被置入死信队列中的情况,后续分析程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统。
原生API案例:
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@rabbitmq-host:5672/%2f");
try (final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel()) {
// 正常业务的交换器
channel.exchangeDeclare("ex.biz", "direct", true);
// 声明死信交换器 DLX
channel.exchangeDeclare("ex.dlx", "direct", true);
// 声明队列做死信队列
channel.queueDeclare("queue.dlx", true, false, false, null);
// 绑定死信交换器和死信队列
channel.queueBind("queue.dlx", "ex.dlx", "key.dlx");
Map<String, Object> arguments = new HashMap<>();
// 指定消息队列中的消息过期时间
arguments.put("x-message-ttl", 10000);
// 指定过期消息通过死信交换器发送到死信队列,死信交换器的名称,DLX
arguments.put("x-dead-letter-exchange", "ex.dlx");
// 指定死信交换器的路由键
arguments.put("x-dead-letter-routing-key", "key.dlx");
channel.queueDeclare("queue.biz", true, false, false, arguments);
// 绑定业务的交换器和消息队列
channel.queueBind("queue.biz", "ex.biz", "key.biz");
channel.basicPublish("ex.biz", "key.biz", null, "orderid.45789987678".getBytes());
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
SpringBoot案例:
@Configuration
public class RabbitConfig {
@Bean
public Queue queue() {
Map<String, Object> props = new HashMap<>();
// 消息的生存时间 10s
props.put("x-message-ttl", 10000);
// 设置该队列所关联的死信交换器(当队列消息TTL到期后依然没有消费,则加
入死信队列)
props.put("x-dead-letter-exchange", "ex.go.dlx");
// 设置该队列所关联的死信交换器的routingKey,如果没有特殊指定,使用原
队列的routingKey
props.put("x-dead-letter-routing-key", "go.dlx");
Queue queue = new Queue("q.go", true, false, false, props);
return queue;
}
@Bean
public Queue queueDlx() {
Queue queue = new Queue("q.go.dlx", true, false, false);
return queue;
}
@Bean
public Exchange exchange() {
DirectExchange exchange = new DirectExchange("ex.go", true,
false, null);
return exchange;
}
/**
* 死信交换器
* @return
*/
@Bean
public Exchange exchangeDlx() {
DirectExchange exchange = new DirectExchange("ex.go.dlx",
true, false, null);
return exchange;
}
@Bean
public Binding binding() {
return
BindingBuilder.bind(queue()).to(exchange()).with("go").noargs();
}
/**
* 死信交换器绑定死信队列
* @return
*/
@Bean
public Binding bindingDlx() {
return
BindingBuilder.bind(queueDlx()).to(exchangeDlx()).with("go.dlx").no
args();
}
}
延迟队列
延迟消息是指的消息发送出去后并不想立即就被消费,而是需要等(指定的)一段时间后才触发消费。
这种需求在我们的日常生活中很常见,比如:订单自动取消、自动收货等等。那么怎么实现这种延迟消费的需求呢?聪明的你肯定会说使用上面的TTL机制加死信队列不就可以了吗?这种方式确实是可以,但是是有弊端的。我们试想一下,死信队列里面的消息全是消息过期导致的吗?不全是,有的是消费失败等其他异常情况下导致的,这就导致了我们死信队列里面的消息类型不唯一,所以我们就不能把死信队列当成延迟队列的一种解决方案。
业界通常是使用rabbitmq_delayed_message_exchange
插件实现延迟队列。
插件的原理如图:
- 生产者将消息(msg)和路由键(routekey)发送指定的延时交换机(exchange)上
- 延时交换机(exchange)存储消息等待消息到期根据路由键(routekey)找到绑定自己的队列(queue)并把消息给它
- 队列(queue)再把消息发送给监听它的消费者(customer)
插件安装
- 下载插件
也可以在附件中下载对应插件压缩包。
- 安装插件
将插件拷贝到rabbitmq-server的安装路径:/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.5/plugins
。
- 启用插件
rabbitmq-plugins list
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- 重启rabbitmq-server
systemctl restart rabbitmq-server
使用案例
原生API案例:
Producer.java
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@rabbitmq-host:5672/%2f");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 声明direct类型的交换器,交换器和消息队列的绑定不需要在这里处理
Map<String, Object> arguments = new HashMap<>();
// 使用x-delayed-type指定交换器的类型
arguments.put("x-delayed-type", "direct");
channel.exchangeDeclare("ex.delay", "x-delayed-message", false, false, arguments);
channel.queueDeclare("queue.delay",
true,
false,
false,
null);
channel.queueBind("queue.delay", "ex.delay", "key.delay");
Map<String, Object> messageHeaders = new HashMap<>();
// 消息延迟十秒消费
messageHeaders.put("x-delay", 10 * 1000);
final AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.contentEncoding("utf-8")
.deliveryMode(2) // 持久化的消息
.headers(messageHeaders)
.build();
channel.basicPublish("ex.delay",
"key.delay",
properties,
("O" + new Random().nextInt(Integer.MAX_VALUE)).getBytes("utf-8"));
System.out.println("消息发送在: " + new SimpleDateFormat("yyyy-MM-ss HH:mm:ss").format(new Date()));
channel.close();
connection.close();
}
}
Consumer.java
public class Consumer {
public static void main(String[] args) throws NoSuchAlgorithmException, KeyManagementException, URISyntaxException, IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@rabbitmq-host:5672/%2f");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare("queue.delay",
true,
false,
false,
null);
channel.basicConsume("queue.delay", (consumerTag, message) -> {
System.out.println("订单: " + new String(message.getBody(), "utf-8") + " 于" + new SimpleDateFormat("yyyy-MM-ss HH:mm:ss").format(new Date()) + "过期");
}, (consumerTag) -> {});
}
}
SpringBoot案例
配置类
@Configuration
@EnableRabbit
public class RabbitConfig {
@Bean
public Queue queue() {
return new Queue("queue.boot", false, false, false, null);
}
@Bean
public Exchange exchange() {
return new TopicExchange("ex.boot", false, false, null);
}
@Bean
public Binding binding() {
return new Binding("queue.boot",
Binding.DestinationType.QUEUE,
"ex.boot",
"key.boot",
null);
}
@Bean
public Queue delayQueue() {
return new Queue("queue.delay", true, false, false, null);
}
@Bean
public Exchange delayExchange() {
Map<String, Object> arguments = new HashMap<>();
// 使用x-delayed-type指定交换器的类型
arguments.put("x-delayed-type", ExchangeTypes.DIRECT);
// 使用x-delayed-message表示使用delayed exchange插件处理消息
return new CustomExchange("ex.delay", "x-delayed-message", true, false, arguments);
}
@Bean
public Binding delayBinding() {
return new Binding("queue.delay",
Binding.DestinationType.QUEUE,
"ex.delay",
"key.delay",
null);
}
@Bean
@Autowired
public RabbitAdmin rabbitAdmin(ConnectionFactory factory) {
return new RabbitAdmin(factory);
}
@Bean
@Autowired
public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {
return new RabbitTemplate(factory);
}
@Bean
@Autowired
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
}
消息生产
@RestController
public class MessageController {
@Autowired
private AmqpTemplate rabbitTemplate;
@RequestMapping("/rabbit/{message}")
public String send(@PathVariable String message) throws UnsupportedEncodingException {
final MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setContentEncoding("utf-8")
.setHeader("hello", "world")
.build();
final Message msg = MessageBuilder
.withBody(message.getBytes("utf-8"))
.andProperties(messageProperties)
.build();
rabbitTemplate.send("ex.boot", "key.boot", msg);
return "ok";
}
@RequestMapping("/rabbit-delay/{message}")
public String delaySend(@PathVariable String message) throws UnsupportedEncodingException {
final MessageProperties messageProperties = MessagePropertiesBuilder.newInstance()
// 设置消息的过期时间
.setHeader("x-delay", 10 * 1000)
.setContentEncoding("utf-8")
.build();
final Message msg = MessageBuilder
.withBody(message.getBytes("utf-8"))
.andProperties(messageProperties)
.build();
rabbitTemplate.send("ex.delay", "key.delay", msg);
System.out.println("消息发送在: " + new SimpleDateFormat("yyyy-MM-ss HH:mm:ss").format(new Date()));
return "ok";
}
}
消息消费
@Component
public class MyMessageListener {
// @RabbitListener(queues = "queue.boot")
// public void getMyMessage(@Payload String message, @Header(name = "hello") String value, Channel channel) {
// System.out.println(message);
// System.out.println("hello = " + value);
//
// // 确认消息
// channel.basicAck();
// // 拒收消息
// channel.basicReject();
// }
private Integer index = 0;
/**
* NONE模式,则只要收到消息后就立即确认(消息出列,标记已消费),有丢失数据的风险
* AUTO模式,看情况确认,如果此时消费者抛出异常则消息会返回到队列中
* MANUAL模式,需要显式的调用当前channel的basicAck方法
*
* @param channel
* @param message
*/
@RabbitListener(queues = "queue.boot")
public void getMyMessage(Message message, Channel channel) throws IOException {
String value = message.getMessageProperties().getHeader("hello");
System.out.println(message);
System.out.println("hello = " + value);
final long deliveryTag = message.getMessageProperties().getDeliveryTag();
if (index % 2 == 0) {
// 确认消息
// 手动ack,deliveryTag表示消息的唯一标志,multiple表示是否是批量确认
channel.basicAck(deliveryTag, false);
// 手动nack,告诉broker消费者处理失败,最后一个参数表示是否需要将消息重新入列
// channel.basicNack(deliveryTag, false, true);
} else {
// 拒收消息
// 手动拒绝消息。第二个参数表示是否重新入列
channel.basicReject(deliveryTag, false);
}
index++;
}
@RabbitListener(queues = "queue.delay")
public void getDelayMessage(Message message, Channel channel) throws IOException {
System.out.println("消息: " + new String(message.getBody(), "utf-8") + " 于" + new SimpleDateFormat("yyyy-MM-ss HH:mm:ss").format(new Date()) + "过期");
final long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false);
}
}
以上就是本文的全部内容。欢迎小伙伴们积极留言交流~~~
附件
链接:https://pan.rubinchu.com/share/1462960576488538112
提取码:rboe
文章评论