Rubin's Blog

  • 首页
  • 关于作者
  • 隐私政策
享受恬静与美好~~~
分享生活的点点滴滴~~~
  1. 首页
  2. Kafka
  3. 正文

Kafka高级特性之重试队列

2021年 12月 2日 1431点热度 0人点赞 0条评论

Kafka没有重试机制不支持消息重试,也没有死信队列,因此使用Kafka做消息队列时,需要自己实现消息重试的功能。

实现思路

创建新的kafka主题作为重试队列:

  1. 创建一个Topic作为重试Topic,用于接收等待重试的消息
  2. 普通Topic消费者设置待重试消息的下一个重试Topic
  3. 从重试Topic获取待重试消息储存到Redis的zset中,并以下一次消费时间排序
  4. 定时任务从Redis获取到达消费事件的消息,并把消息发送到对应的Topic
  5. 同一个消息重试次数过多则不再重试

代码实现

创建SpringBoot项目:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.rubin</groupId>
    <artifactId>mq-demo-spring-boot</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.73</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

配置文件:

spring.application.name=springboot-kafka
server.port=8080

# kafka的配置
spring.kafka.bootstrap-servers=kafka-host:9092

#producer配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 生产者每个批次最多放多少条记录
spring.kafka.producer.batch-size=16384
# 生产者一端总的可用发送缓冲区大小,此处设置为32MB
spring.kafka.producer.buffer-memory=33554432

#consumer配置
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id=springboot-consumer
# 如果在kafka中找不到当前消费者的偏移量,则直接将偏移量重置为最早的
spring.kafka.consumer.auto-offset-reset=earliest
# 消费者的偏移量是自动提交还是手动提交,此处自动提交偏移量
spring.kafka.consumer.enable-auto-commit=true
# 消费者偏移量自动提交的时间间隔
spring.kafka.consumer.auto-commit-interval=1000

# redis数据库编号
spring.redis.database=0
# redis主机地址
spring.redis.host=127.0.0.1
# redis端口
spring.redis.port=6379
# Redis服务器连接密码(默认为空)
spring.redis.password=
# 连接池最大连接数(使用负值表示没有限制)
spring.redis.jedis.pool.max-active=20
# 连接池最大阻塞等待时间(使用负值表示没有限制)
spring.redis.jedis.pool.max-wait=-1
# 连接池中的最大空闲连接
spring.redis.jedis.pool.max-idle=10
# 连接池中的最小空闲连接
spring.redis.jedis.pool.min-idle=0
# 连接超时时间(毫秒)
spring.redis.timeout=1000

# Kafka主题名称,业务主题
spring.kafka.topics.test=tp_demo_retry_01
# 重试队列,重试主题
spring.kafka.topics.retry=tp_demo_retry_02

配置Redis:

package com.rubin.kafka.springboot.retry.config;

import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;

@SpringBootConfiguration
public class RedisConfig {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        // 配置连接工厂
        template.setConnectionFactory(factory);
        return template;
    }

}

控制器:

package com.rubin.kafka.springboot.retry.controller;

import com.rubin.kafka.springboot.retry.service.KafkaService;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.ExecutionException;

@RestController
public class RetryController {

    @Autowired
    private KafkaService kafkaService;

    @Value("${spring.kafka.topics.test}")
    private String topic;

    @RequestMapping("/send/{message}")
    public String sendMessage(@PathVariable String message) throws ExecutionException, InterruptedException {

        ProducerRecord<String, String> record = new ProducerRecord<>(
                topic,
                message
        );

        // 向业务主题发送消息
        String result = kafkaService.sendMessage(record);

        return result;
    }

}

生产者业务处理:

package com.rubin.kafka.springboot.retry.service;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;

import java.util.concurrent.ExecutionException;

@Service
public class KafkaService {

    private Logger log = LoggerFactory.getLogger(KafkaService.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public String sendMessage(ProducerRecord<String, String> record) throws ExecutionException, InterruptedException {

        SendResult<String, String> result = this.kafkaTemplate.send(record).get();
        RecordMetadata metadata = result.getRecordMetadata();
        String returnResult = metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset();
        log.info("发送消息成功:" + returnResult);

        return returnResult;
    }

}

业务消费者:

package com.rubin.kafka.springboot.retry.listener;

import com.rubin.kafka.springboot.retry.service.RetryService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class ConsumerListener {

    private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class);

    @Autowired
    private RetryService kafkaRetryService;

    private static int index = 0;

    @KafkaListener(topics = "${spring.kafka.topics.test}", groupId = "${spring.kafka.consumer.group-id}")
    public void consume(ConsumerRecord<String, String> record) {
        try {
            // 业务处理
            log.info("消费的消息:" + record);
            index++;
            if (index % 2 == 0) {
                throw new Exception("该重发了");
            }
        } catch (Exception e) {
            log.error(e.getMessage());
            // 消息重试,实际上先将消息放到redis
            kafkaRetryService.consumerLater(record);
        }
    }

}

重试业务相关:

package com.rubin.kafka.springboot.retry.entity;

import lombok.Data;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

@Data
public class RetryRecord {

    public static final String KEY_RETRY_TIMES = "retryTimes";

    private String key;
    private String value;

    private Integer retryTimes;
    private String topic;
    private Long nextTime;

    public ProducerRecord parse() {
        Integer partition = null;
        Long timestamp = System.currentTimeMillis();
        List<Header> headers = new ArrayList<>();
        ByteBuffer retryTimesBuffer = ByteBuffer.allocate(4);
        retryTimesBuffer.putInt(retryTimes);
        retryTimesBuffer.flip();
        headers.add(new RecordHeader(RetryRecord.KEY_RETRY_TIMES, retryTimesBuffer.array()));

        ProducerRecord sendRecord = new ProducerRecord(
                topic, partition, timestamp, key, value, headers);
        return sendRecord;
    }

}
package com.rubin.kafka.springboot.retry.service;

import com.alibaba.fastjson.JSON;
import com.rubin.kafka.springboot.retry.entity.RetryRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import java.nio.ByteBuffer;
import java.util.Calendar;
import java.util.Date;

@Service
public class RetryService {
    private static final Logger log = LoggerFactory.getLogger(RetryService.class);

    /**
     * 消息消费失败后下一次消费的延迟时间(秒)
     * 第一次重试延迟10秒;第	二次延迟30秒,第三次延迟1分钟...
     */
    private static final int[] RETRY_INTERVAL_SECONDS = {10, 30, 1*60, 2*60, 5*60, 10*60, 30*60, 1*60*60, 2*60*60};

    /**
     * 重试topic
     */
    @Value("${spring.kafka.topics.retry}")
    private String retryTopic;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void consumerLater(ConsumerRecord<String, String> record){
        // 获取消息的已重试次数
        int retryTimes = getRetryTimes(record);
        Date nextConsumerTime = getNextConsumerTime(retryTimes);
        // 如果达到重试次数,则不再重试
        if(nextConsumerTime == null) {
            return;
        }

        // 组织消息
        RetryRecord retryRecord = new RetryRecord();
        retryRecord.setNextTime(nextConsumerTime.getTime());
        retryRecord.setTopic(record.topic());
        retryRecord.setRetryTimes(retryTimes);
        retryRecord.setKey(record.key());
        retryRecord.setValue(record.value());

        // 转换为字符串
        String value = JSON.toJSONString(retryRecord);
        // 发送到重试队列
        kafkaTemplate.send(retryTopic, null, value);
    }

    /**
     * 获取消息的已重试次数
     */
    private int getRetryTimes(ConsumerRecord record){
        int retryTimes = -1;
        for(Header header : record.headers()){
            if(RetryRecord.KEY_RETRY_TIMES.equals(header.key())){
                ByteBuffer buffer = ByteBuffer.wrap(header.value());
                retryTimes = buffer.getInt();
            }
        }
        retryTimes++;
        return retryTimes;
    }

    /**
     * 获取待重试消息的下一次消费时间
     */
    private Date getNextConsumerTime(int retryTimes){
        // 重试次数超过上限,不再重试
        if(RETRY_INTERVAL_SECONDS.length < retryTimes) {
            return null;
        }

        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.SECOND, RETRY_INTERVAL_SECONDS[retryTimes]);
        return calendar.getTime();
    }

}

重试队列消费者:

package com.rubin.kafka.springboot.retry.listener;

import com.alibaba.fastjson.JSON;
import com.rubin.kafka.springboot.retry.entity.RetryRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.Set;
import java.util.UUID;

@Component
@EnableScheduling
public class RetryListener {

    private Logger log = LoggerFactory.getLogger(RetryListener.class);

    private static final String RETRY_KEY_ZSET = "_retry_key";
    private static final String RETRY_VALUE_MAP = "_retry_value";
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${spring.kafka.topics.test}")
    private String bizTopic;

    @KafkaListener(topics = "${spring.kafka.topics.retry}")
//    public void consume(List<ConsumerRecord<String, String>> list) {
//        for(ConsumerRecord<String, String> record : list){
    public void consume(ConsumerRecord<String, String> record) {

        System.out.println("需要重试的消息:" + record);
        RetryRecord retryRecord = JSON.parseObject(record.value(), RetryRecord.class);

        /**
         * 防止待重试消息太多撑爆redis,可以将待重试消息按下一次重试时间分开存储放到不同介质
         * 例如下一次重试时间在半小时以后的消息储存到mysql,并定时从mysql读取即将重试的消息储储存到redis
         */

        // 通过redis的zset进行时间排序
        String key = UUID.randomUUID().toString();
        redisTemplate.opsForHash().put(RETRY_VALUE_MAP, key, record.value());
        redisTemplate.opsForZSet().add(RETRY_KEY_ZSET, key, retryRecord.getNextTime());
    }
//    }

    /**
     * 定时任务从redis读取到达重试时间的消息,发送到对应的topic
     */
//    @Scheduled(cron="2 * * * * *")
    @Scheduled(fixedDelay = 2000)
    public void retryFromRedis() {
        log.warn("retryFromRedis----begin");
        long currentTime = System.currentTimeMillis();
        // 根据时间倒序获取
        Set<ZSetOperations.TypedTuple<Object>> typedTuples =
                redisTemplate.opsForZSet().reverseRangeByScoreWithScores(RETRY_KEY_ZSET, 0, currentTime);
        // 移除取出的消息
        redisTemplate.opsForZSet().removeRangeByScore(RETRY_KEY_ZSET, 0, currentTime);
        for (ZSetOperations.TypedTuple<Object> tuple : typedTuples) {
            String key = tuple.getValue().toString();
            String value = redisTemplate.opsForHash().get(RETRY_VALUE_MAP, key).toString();
            redisTemplate.opsForHash().delete(RETRY_VALUE_MAP, key);
            RetryRecord retryRecord = JSON.parseObject(value, RetryRecord.class);
            ProducerRecord record = retryRecord.parse();

            ProducerRecord recordReal = new ProducerRecord(
                    bizTopic,
                    record.partition(),
                    record.timestamp(),
                    record.key(),
                    record.value(),
                    record.headers()
            );

            kafkaTemplate.send(recordReal);
        }
        // todo 发生异常将发送失败的消息重新发送到redis
    }
}

启动类:

package com.rubin.kafka.springboot;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaSpringBootApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaSpringBootApplication.class, args);
    }

}

本文的内容至此就结束了。欢迎小伙伴们积极留言交流~~~

本作品采用 知识共享署名 4.0 国际许可协议 进行许可
标签: Kafka
最后更新:2022年 6月 9日

RubinChu

一个快乐的小逗比~~~

打赏 点赞
< 上一篇
下一篇 >

文章评论

razz evil exclaim smile redface biggrin eek confused idea lol mad twisted rolleyes wink cool arrow neutral cry mrgreen drooling persevering
取消回复
文章目录
  • 实现思路
  • 代码实现
最新 热点 随机
最新 热点 随机
问题记录之Chrome设置屏蔽Https禁止调用Http行为 问题记录之Mac设置软链接 问题记录之JDK8连接MySQL数据库失败 面试系列之自我介绍 面试总结 算法思维
Kafka高级特性之稳定性 MySQL之Sharding-JDBC强制路由 面试总结 Docker之docker-compose Elasticsearch之深度应用及原理剖析 Elasticsearch之企业级高可用分布式集群

COPYRIGHT © 2021 rubinchu.com. ALL RIGHTS RESERVED.

Theme Kratos Made By Seaton Jiang

京ICP备19039146号-1