概念入门
消费者、消费组
消费者从订阅的主题消费消息,消费消息的偏移量保存在Kafka的名字是 __consumer_offsets 的主题中。
消费者还可以将自己的偏移量存储到ZooKeeper,需要设置offset.storage=zookeeper。
推荐使用Kafka存储消费者的偏移量。因为ZooKeeper不适合高并发。
多个从同一个主题消费的消费者可以加入到一个消费组中。消费组中的消费者共享group_id。
configs.put("group.id", "xxx");
group_id一般设置为应用的逻辑名称。比如多个订单处理程序组成一个消费组,可以设置group_id为"order_process"。
消费组均衡地给消费者分配分区,每个分区只由消费组中一个消费者消费。
一个拥有四个分区的主题,包含一个消费者的消费组。此时,消费组中的消费者消费主题中的所有分区。并且没有重复的可能。
如果在消费组中添加一个消费者2,则每个消费者分别从两个分区接收消息。
如果消费组有四个消费者,则每个消费者可以分配到一个分区。
如果向消费组中添加更多的消费者,超过主题分区数量,则有一部分消费者就会闲置,不会接收任何消息。
向消费组添加消费者是横向扩展消费能力的主要方式。
必要时,需要为主题创建大量分区,在负载增长时可以加入更多的消费者。但是不要让消费者的数量超过主题分区的数量。
除了通过增加消费者来横向扩展单个应用的消费能力之外,经常出现多个应用程序从同一个主题消费的情况。
此时,每个应用都可以获取到所有的消息。只要保证每个应用都有自己的消费组,就可以让它们获取到主题所有的消息。
横向扩展消费者和消费组不会对性能造成负面影响。
为每个需要获取一个或多个主题全部消息的应用创建一个消费组,然后向消费组添加消费者来横向扩展消费能力和应用的处理能力,则每个消费者只处理一部分消息。
心跳机制
消费者宕机,退出消费组,触发再平衡,重新给消费组中的消费者分配分区。
由于Broker宕机,主题X的分区3宕机,此时分区3没有Leader副本,触发再平衡,消费者4没有对应的主题分区,则消费者4闲置。
Kafka的心跳是Kafka Consumer和Broker之间的健康检查,只有当Broker Coordinator正常时,Consumer才会发送心跳。
Consumer和Rebalance相关的2个配置参数:
参数 | 字段 |
session.timeout.ms | MemberMetadata.sessionTimeoutMs |
max.poll.interval.ms | MemberMetadata.rebalanceTimeoutMs |
Broker端:sessionTimeoutMs
参数。Broker处理心跳的逻辑在GroupCoordinator
类中:如果心跳超期,Broker Coordinator会把消费者从group中移除,并触发rebalance。
Consumer端:sessionTimeoutMs
,rebalanceTimeoutMs
参数。如果客户端发现心跳超期,客户端会标记Coordinator为不可用,并阻塞心跳线程;如果超过了poll消息的间隔超过了rebalanceTimeoutMs
,则Consumer告知Broker主动离开消费组,也会触发rebalance。
消息接收
必要参数配置
参数 | 说明 |
bootstrap.servers | 向Kafka集群建立初始连接用到的host/port列表。客户端会使用这里列出的所有服务器进行集群其他服务器的发现,而不管是否指定了哪个服务器用作引导。这个列表仅影响用来发现集群所有服务器的初始主机。字符串形式:host1:port1,host2:port2,…由于这组服务器仅用于建立初始链接,然后发现集群中的所有服务器,因此没有必要将集群中的所有地址写在这里。一般最好两台,以防其中一台宕掉 |
key.deserializer | key的反序列化类,该类需要实现org.apache.kafka.common.serialization.Deserializer 接口 |
value.deserializer | 实现了org.apache.kafka.common.serialization.Deserializer 接口的反序列化器,用于对消息的value进行反序列化 |
client.id | 当从服务器消费消息的时候向服务器发送的id字符串。在ip/port基础上提供应用的逻辑名称,记录在服务端的请求日志中,用于追踪请求的源 |
group.id | 用于唯一标志当前消费者所属的消费组的字符串。如果消费者使用组管理功能如subscribe(topic) 或使用基于Kafka的偏移量管理策略,该项必须设置 |
auto.offset.reset | 当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理?earliest :自动重置偏移量到最早的偏移量latest :自动重置偏移量为最新的偏移量none :如果消费组原来的(previous)偏移量不存在,则向消费者抛异常anything :向消费者抛异常 |
enable.auto.commit | 如果设置为true,消费者会自动周期性地向服务器提交偏移量 |
订阅
主题和分区
- Topic:Kafka用于分类管理消息的逻辑单元,类似与MySQL的数据库
- Partition:是Kafka下数据存储的基本单元,这个是物理上的概念。同一个Topic的数据,会被分散的存储到多个Partition中,这些Partition可以在同一台机器上,也可以是在多台机器上。优势在于:有利于水平扩展,避免单台机器在磁盘空间和性能上的限制,同时可以通过复制来增加数据冗余性,提高容灾能力。为了做到均匀分布,通常Partition的数量通常是Broker Server数量的整数倍
- Consumer Group:同样是逻辑上的概念,是Kafka实现单播和广播两种消息模型的手段。保证一个消费组获取到特定主题的全部的消息。在消费组内部,若干个消费者消费主题分区的消息,消费组可以保证一个主题的每个分区只被消费组中的一个消费者消费
Consumer采用pull模式从Broker中读取数据。
采用pull模式,Consumer可自主控制消费消息的速率, 可以自己控制消费方式(批量消费/逐条消费),还可以选择不同的提交方式从而实现不同的传输语义。
consumer.subscribe("tp_demo_01,tp_demo_02")
反序列化
Kafka的Broker中所有的消息都是字节数组,消费者获取到消息之后,需要先对消息进行反序列化处理,然后才能交给用户程序消费处理。
Kafka默认提供了几个反序列化的实现:org.apache.kafka.common.serialization
包下包含了这几个实现:
自定义反序列化
自定义反序列化类,需要实现org.apache.kafka.common.serialization.Deserializer
接口。
自定义实体类:
package com.rubin.kafka.customserializer;
import lombok.Data;
@Data
public class User {
private Integer userId;
private String username;
}
自定义反序列化类:
package com.rubin.kafka.customserializer;
import lombok.SneakyThrows;
import org.apache.kafka.common.serialization.Deserializer;
import java.nio.ByteBuffer;
import java.util.Map;
public class UserDeserializer implements Deserializer<User> {
@Override
public void configure(Map<String, ?> map, boolean b) {
}
@SneakyThrows
@Override
public User deserialize(String topic, byte[] bytes) {
User user = new User();
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
Integer userId = byteBuffer.getInt();
Integer usernameLength = byteBuffer.getInt();
byte[] usernameByteAr = new byte[usernameLength];
byteBuffer.get(usernameByteAr);
String username = new String(usernameByteAr, 0, usernameLength, "UTF-8");
user.setUserId(userId);
user.setUsername(username);
return user;
}
@Override
public void close() {
}
}
消费者使用自定义反序列化器:
package com.rubin.kafka.customserializer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
public class MyConsumer {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
// kafka-host对应于服务器IP,windows的hosts文件中手动配置域名解析
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-host:9092");
// 使用常量代替手写的字符串,配置key的反序列化器
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 配置value的反序列化器
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class);
// 配置消费组ID
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_demo2");
// 如果找不到当前消费者的有效偏移量,则自动重置到最开始
// latest表示直接重置到消息偏移量的最后一个
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, User> consumer = new KafkaConsumer<String, User>(configs);
// 先订阅,再消费
consumer.subscribe(Arrays.asList("tp_user_01"));
// while (true) {
// final ConsumerRecords<Integer, String> consumerRecords = consumer.poll(3_000);
// }
// 如果主题中没有可以消费的消息,则该方法可以放到while循环中,每过3秒重新拉取一次
// 如果还没有拉取到,过3秒再次拉取,防止while循环太密集的poll调用。
// 批量从主题的分区拉取消息
final ConsumerRecords<String, User> consumerRecords = consumer.poll(3_000);
// 遍历本次从主题的分区拉取的批量消息
consumerRecords.forEach(new Consumer<ConsumerRecord<String, User>>() {
@Override
public void accept(ConsumerRecord<String, User> record) {
System.out.println(record.topic() + "\t"
+ record.partition() + "\t"
+ record.offset() + "\t"
+ record.key() + "\t"
+ record.value());
}
});
consumer.close();
}
}
位移提交
- Consumer需要向Kafka记录自己的位移数据,这个汇报过程称为
提交位移(Committing Offsets)
- Consumer需要为分配给它的每个分区提交各自的位移数据
- 位移提交的由Consumer端负责的,Kafka只负责保管。__consumer_offsets
- 位移提交分为自动提交和手动提交
- 位移提交分为同步提交和异步提交
自动提交
Kafka Consumer后台提交:
- 开启自动提交:
enable.auto.commit=true
- 配置自动提交间隔:Consumer端:
auto.commit.interval.ms
,默认 5s
示例
生产者:
package com.rubin.kafka.offsetcommit.producer;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.Random;
@Data
@Slf4j
public class KafkaProducer {
private static org.apache.kafka.clients.producer.KafkaProducer<String, String> kafkaProducer;
private Random random = new Random();
private String topic;
private int retry;
/**
* 静态内部类
*
* @author tanjie
*/
private static class LazyHandler {
private static final KafkaProducer instance = new KafkaProducer();
}
/**
* 单例模式,kafkaProducer是线程安全的,可以多线程共享一个实例
*
* @return
*/
public static final KafkaProducer getInstance() {
return LazyHandler.instance;
}
/**
* kafka生产者进行初始化
*
* @return KafkaProducer
*/
public void init(String topic, int retry) {
this.topic = topic;
this.retry = retry;
if (null == kafkaProducer) {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-host:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty(ProducerConfig.ACKS_CONFIG, "1");
kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(props);
}
}
/**
* 通过kafkaProducer发送消息
*
* @param message
*/
public void sendKafkaMessage(final String message) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
topic, random.nextInt(3), "", message);
kafkaProducer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata,
Exception exception) {
if (null != exception) {
log.error("kafka发送消息失败:" + exception.getMessage(), exception);
retryKakfaMessage(message);
}
}
});
}
/**
* 当kafka消息发送失败后,重试
*
* @param retryMessage
*/
private void retryKakfaMessage(final String retryMessage) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
topic, random.nextInt(3), "", retryMessage);
for (int i = 1; i <= retry; i++) {
try {
kafkaProducer.send(record);
return;
} catch (Exception e) {
log.error("kafka发送消息失败:" + e.getMessage(), e);
retryKakfaMessage(retryMessage);
}
}
}
/**
* kafka实例销毁
*/
public void close() {
if (null != kafkaProducer) {
kafkaProducer.close();
}
}
}
package com.rubin.kafka.offsetcommit.producer;
public class ProducerHandler implements Runnable {
private String message;
public ProducerHandler(String message) {
this.message = message;
}
@Override
public void run() {
KafkaProducer kafkaProducer = KafkaProducer.getInstance();
kafkaProducer.init("tp_demo_02", 3);
int i = 0;
while (true) {
try {
System.out.println("当前线程:" + Thread.currentThread().getName()
+ "\t获取的kafka实例:" + kafkaProducer);
kafkaProducer.sendKafkaMessage("发送消息: " + message + " " + (++i));
Thread.sleep(100);
} catch (Exception e) {
}
}
}
}
package com.rubin.kafka.offsetcommit.producer;
public class ProducerMain {
public static void main(String[] args){
Thread thread = new Thread(new ProducerHandler("hello rubin"));
thread.start();
}
}
消费者:
package com.rubin.kafka.offsetcommit.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class KafkaConsumerAuto {
/**
* kafka消费者不是线程安全的
*/
private final KafkaConsumer<String, String> consumer;
private ExecutorService executorService;
public KafkaConsumerAuto() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-host:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
// 打开自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put("auto.commit.interval.ms", "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<String, String>(props);
// 订阅主题
consumer.subscribe(Collections.singleton("tp_demo_02"));
}
public void execute() throws InterruptedException {
executorService = Executors.newFixedThreadPool(2);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(2_000);
if (null != records) {
executorService.submit(new ConsumerThreadAuto(records, consumer));
}
Thread.sleep(1000);
}
}
public void shutdown() {
try {
if (consumer != null) {
consumer.close();
}
if (executorService != null) {
executorService.shutdown();
}
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
System.out.println("关闭线程池超时。。。");
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
package com.rubin.kafka.offsetcommit.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class ConsumerThreadAuto implements Runnable {
private ConsumerRecords<String, String> records;
private KafkaConsumer<String, String> consumer;
public ConsumerThreadAuto(ConsumerRecords<String, String> records,
KafkaConsumer<String, String> consumer) {
this.records = records;
this.consumer = consumer;
}
@Override
public void run() {
for(ConsumerRecord<String,String> record : records){
System.out.println("当前线程:" + Thread.currentThread()
+ "\t主题:" + record.topic()
+ "\t偏移量:" + record.offset() + "\t分区:" + record.partition()
+ "\t获取的消息:" + record.value());
}
}
}
package com.rubin.kafka.offsetcommit.consumer;
public class ConsumerAutoMain {
public static void main(String[] args) {
KafkaConsumerAuto kafkaConsumerAuto = new KafkaConsumerAuto();
try {
kafkaConsumerAuto.execute();
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
kafkaConsumerAuto.shutdown();
}
}
}
手动提交
使用KafkaConsumer#commitSync()
会提交KafkaConsumer#poll()
返回的最新offset。该方法为同步操作,等待直到 offset 被成功提交才返回。
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
try {
consumer.commitSync();
} catch (CommitFailedException e) {
handle(e); // 处理提交失败异常
}
}
commitSync
要在处理完所有消息之后调用。手动同步提交可以控制offset提交的时机和频率,手动同步提交会Consumer处于阻塞状态,直到Broker返回结果。此种方式会影响TPS,如果选择拉长提交间隔,但是有以下问题:
- 会导致Consumer的提交频率下降
- Consumer重启后,会有更多的消息被消费
异步提交使用KafkaConsumer#commitAsync()
。如:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(3_000);
process(records); // 处理消息
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
handle(exception);
}
});
}
commitAsync
出现问题不会自动重试,处理方式如下:
try {
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
commitAysnc(); // 使用异步提交规避阻塞
}
} catch(Exception e) {
handle(e); // 处理异常
} finally {
try {
consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
} finally {
consumer.close();
}
}
消费者位移管理
Kafka中,消费者根据消息的位移顺序消费消息。
消费者的位移由消费者管理,可以存储于ZooKeeper中,也可以存储于Kafka主题__consumer_offsets中。
Kafka提供了消费者API,让消费者可以管理自己的位移。
API如下:KafkaConsumer
API | 说明 |
public void assign(Collection<TopicPartition> partitions) | 给当前消费者手动分配一系列主题分区 手动分配分区不支持增量分配,如果先前有分配分区,则该操作会覆盖之前的分配 如果给出的主题分区是空的,则等价于调用unsubscribe方法 手动分配主题分区的方法不使用消费组管理功能。当消费组成员变了,或者集群或主题的元数据改变了,不会触发分区分配的再平衡 手动分区分配assign(Collection)不能和自动分区分配subscribe(Collection, ConsumerRebalanceListener)一起使用 如果启用了自动提交偏移量,则在新的分区分配替换旧的分区分配之前,会对旧的分区分配中的消费偏移量进行异步提交 |
public Set<TopicPartition> assignment() | 获取给当前消费者分配的分区集合。如果订阅是通过调用assign方法直接分配主题分区,则返回相同的集合。如果使用了主题订阅,该方法返回当前分配给该消费者的主题分区集合。如果分区订阅还没开始进行分区分配,或者正在重新分配分区,则会返回none |
public Map<String, List<PartitionInfo>> listTopics() | 获取对用户授权的所有主题分区元数据。该方法会对服务器发起远程调用 |
public List<PartitionInfo> partitionsFor(String topic) | 获取指定主题的分区元数据。如果当前消费者没有关于该主题的元数据,就会对服务器发起远程调用 |
public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) | 对于给定的主题分区,列出它们第一个消息的偏移量 注意,如果指定的分区不存在,该方法可能会永远阻塞 该方法不改变分区的当前消费者偏移量 |
public void seekToEnd(Collection<TopicPartition> partitions) | 将偏移量移动到每个给定分区的最后一个 该方法延迟执行,只有当调用过 poll 方法或position 方法之后才可以使用如果没有指定分区,则将当前消费者分配的所有分区的消费者偏移量移动到最后 如果设置了隔离级别为: isolation.level=read_committed ,则会将分区的消费偏移量移动到最后一个稳定的偏移量,即下一个要消费的消息现在还是未提交状态的事务消息 |
public void seek(TopicPartition partition, long offset) | 将给定主题分区的消费偏移量移动到指定的偏移量,即当前消费者下一条要消费的消息偏移量 若该方法多次调用,则最后一次的覆盖前面的 如果在消费中间随意使用,可能会丢失数据 |
public long position(TopicPartition partition) | 检查指定主题分区的消费偏移量 |
public void seekToBeginning(Collection<TopicPartition> partitions) | 将给定每个分区的消费者偏移量移动到它们的起始偏移量。该方法懒执行,只有当调用过poll 方法或position 方法之后才会执行。如果没有提供分区,则将所有分配给当前消费者的分区消费偏移量移动到起始偏移量 |
准备数据
# 生成消息文件
[root@node1 ~]# for i in `seq 60`; do echo "hello rubin $i" >> nm.txt; done
# 创建主题,三个分区,每个分区一个副本
[root@node1 ~]# kafka-topics.sh --zookeeper 127.0.0.1:2181/myKafka --create --
topic tp_demo_01 --partitions 3 --replication-factor 1
# 将消息生产到主题中
[root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 --topic
tp_demo_01 < nm.txt
API实战
package com.rubin.kafka.offsetmanager;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
public class MyOffsetManager {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-host:9092");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// group.id很重要
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "mygrp1");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
// consumer.subscribe(Collections.singleton("tp_demo_01"));
// 如何手动给消费者分配分区?
// 1、需要知道有哪些主题可以访问,和消费
// 获取当前消费者可以访问和消费的主题以及它们的分区信息
// final Map<String, List<PartitionInfo>> stringListMap = consumer.listTopics();
//
// stringListMap.forEach(new BiConsumer<String, List<PartitionInfo>>() {
// @Override
// public void accept(String topicName, List<PartitionInfo> partitionInfos) {
// System.out.println("主题名称:" + topicName);
// for (PartitionInfo partitionInfo : partitionInfos) {
// System.out.println(partitionInfo);
// }
// }
// });
// final Set<TopicPartition> assignment1 = consumer.assignment();
//
// for (TopicPartition partition : assignment1) {
// System.out.println(partition);
// }
// System.out.println("----------------------------");
// 给当前消费者分配指定的主题分区
consumer.assign(Arrays.asList(
new TopicPartition("tp_demo_01", 0),
new TopicPartition("tp_demo_01", 1),
new TopicPartition("tp_demo_01", 2)
));
// 获取给当前消费者分配的主题分区信息
// final Set<TopicPartition> assignment = consumer.assignment();
//
// for (TopicPartition partition : assignment) {
// System.out.println(partition);
// }
// 查看当前消费者在指定主题的分区上的消费者偏移量
// final long offset0 = consumer.position(new TopicPartition("tp_demo_01", 0));
//
// System.out.println("当前主题在0号分区上的位移:" + offset0);
// consumer.seekToBeginning(Arrays.asList(
// new TopicPartition("tp_demo_01", 0),
// new TopicPartition("tp_demo_01", 2)
// ));
long offset0 = consumer.position(new TopicPartition("tp_demo_01", 0));
long offset1 = consumer.position(new TopicPartition("tp_demo_01", 1));
long offset2 = consumer.position(new TopicPartition("tp_demo_01", 2));
System.out.println(offset0);
System.out.println(offset1);
System.out.println(offset2);
// consumer.seekToEnd(Arrays.asList(new TopicPartition("tp_demo_01", 2)));
consumer.seek(new TopicPartition("tp_demo_01", 2), 14);
offset0 = consumer.position(new TopicPartition("tp_demo_01", 0));
offset1 = consumer.position(new TopicPartition("tp_demo_01", 1));
offset2 = consumer.position(new TopicPartition("tp_demo_01", 2));
System.out.println(offset0);
System.out.println(offset1);
System.out.println(offset2);
consumer.close();
}
}
再均衡
重平衡可以说是kafka为人诟病最多的一个点了。
重平衡其实就是一个协议,它规定了如何让消费者组下的所有消费者来分配Topic中的每一个分区。比如一个Topic有100个分区,一个消费者组内有20个消费者,在协调者的控制下让组内每一个消费者分配到5个分区,这个分配的过程就是重平衡。
重平衡的触发条件主要有三个:
- 消费者组内成员发生变更,这个变更包括了增加和减少消费者,比如消费者宕机退出消费组
- 主题的分区数发生变更,Kafka目前只支持增加分区,当增加的时候就会触发重平衡
- 订阅的主题发生变化,当消费者组使用正则表达式订阅主题,而恰好又新建了对应的主题,就会触发重平衡
消费者宕机,退出消费组,触发再平衡,重新给消费组中的消费者分配分区。
由于Broker宕机,主题X的分区3宕机,此时分区3没有Leader副本,触发再平衡,消费者4没有对应的主题分区,则消费者4闲置。
主题增加分区,需要主题分区和消费组进行再均衡。
由于使用正则表达式订阅主题,当增加的主题匹配正则表达式的时候,也要进行再均衡。
为什么说重平衡为人诟病呢?因为重平衡过程中,消费者无法从kafka消费消息,这对kafka的TPS影响极大,而如果kafka集内节点较多,比如数百个,那重平衡可能会耗时极多。数分钟到数小时都有可能,而这段时间kafka基本处于不可用状态。所以在实际环境中,应该尽量避免重平衡发生。
避免重平衡
要说完全避免重平衡,是不可能,因为你无法完全保证消费者不会故障。而消费者故障其实也是最常见的引发重平衡的地方,所以我们需要保证尽力避免消费者故障。
而其他几种触发重平衡的方式,增加分区,或是增加订阅的主题,抑或是增加消费者,更多的是主动控制。
如果消费者真正挂掉了,就没办法了,但实际中,会有一些情况,kafka错误地认为一个正常的消费者已经挂掉了,我们要的就是避免这样的情况出现。
首先要知道哪些情况会出现错误判断挂掉的情况。
在分布式系统中,通常是通过心跳来维持分布式系统的,Kafka也不例外。
在分布式系统中,由于网络问题你不清楚没接收到心跳,是因为对方真正挂了还是只是因为负载过重没来得及发生心跳或是网络堵塞。所以一般会约定一个时间,超时即判定对方挂了。而在Kafka消费者场景中,session.timout.ms
参数就是规定这个超时时间是多少。
还有一个参数,heartbeat.interval.ms
,这个参数控制发送心跳的频率,频率越高越不容易被误判,但也会消耗更多资源。
此外,还有最后一个参数,max.poll.interval.ms
,消费者poll数据后,需要一些处理,再进行拉取。如果两次拉取时间间隔超过这个参数设置的值,那么消费者就会被踢出消费者组。也就是说,拉取,然后处理,这个处理的时间不能超过max.poll.interval.ms
这个参数的值。这个参数的默认值是5分钟,而如果消费者接收到数据后会执行耗时的操作,则应该将其设置得大一些。
三个参数:
session.timout.ms
:控制心跳超时时间heartbeat.interval.ms
:控制心跳发送频率max.poll.interval.ms
:控制poll的间隔
这里给出一个相对较为合理的配置,如下:
session.timout.ms
:设置为6sheartbeat.interval.ms
:设置2smax.poll.interval.ms
:推荐为消费者处理消息最长耗时再加1分钟
消费者拦截器
消费者在拉取了分区消息之后,要首先经过反序列化器对key和value进行反序列化处理。
处理完之后,如果消费端设置了拦截器,则需要经过拦截器的处理之后,才能返回给消费者应用程序进行处理。
消费端定义消息拦截器,需要实现org.apache.kafka.clients.consumer.ConsumerInterceptor
接口。
- 一个可插拔接口,允许拦截甚至更改消费者接收到的消息。首要的用例在于将第三方组件引入消费者应用程序,用于定制的监控、日志处理等
- 该接口的实现类通过
configre
方法获取消费者配置的属性,如果消费者配置中没有指定clientID,还可以获取KafkaConsumer
生成的clientID。获取的这个配置是跟其他拦截器共享的,需要保证不会在各个拦截器之间产生冲突 ConsumerInterceptor
方法抛出的异常会被捕获、记录,但是不会向下传播。如果用户配置了错误的key或value类型参数,消费者不会抛出异常,而仅仅是记录下来ConsumerInterceptor
回调发生在org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)
方法同一个线程
该接口中有如下方法:
package org.apache.kafka.clients.consumer;
import java.util.Map;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.TopicPartition;
public interface ConsumerInterceptor<K, V> extends Configurable {
ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> var1);
void onCommit(Map<TopicPartition, OffsetAndMetadata> var1);
void close();
}
代码实现:
拦截器:
package com.rubin.kafka.custominterceptor.consumer;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;
public class OneInterceptor implements ConsumerInterceptor<String, String> {
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
// poll方法返回结果之前最后要调用的方法
System.out.println("One -- 开始");
// 消息不做处理,直接返回
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
// 消费者提交偏移量的时候,经过该方法
System.out.println("One -- 结束");
}
@Override
public void close() {
// 用于关闭该拦截器用到的资源,如打开的文件,连接的数据库等
}
@Override
public void configure(Map<String, ?> configs) {
// 用于获取消费者的设置参数
configs.forEach((k, v) -> {
System.out.println(k + "\t" + v);
});
}
}
package com.rubin.kafka.custominterceptor.consumer;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;
public class TwoInterceptor implements ConsumerInterceptor<String, String> {
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
// poll方法返回结果之前最后要调用的方法
System.out.println("Two -- 开始");
// 消息不做处理,直接返回
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
// 消费者提交偏移量的时候,经过该方法
System.out.println("Two -- 结束");
}
@Override
public void close() {
// 用于关闭该拦截器用到的资源,如打开的文件,连接的数据库等
}
@Override
public void configure(Map<String, ?> configs) {
// 用于获取消费者的设置参数
configs.forEach((k, v) -> {
System.out.println(k + "\t" + v);
});
}
}
package com.rubin.kafka.custominterceptor.consumer;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;
public class ThreeInterceptor implements ConsumerInterceptor<String, String> {
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
// poll方法返回结果之前最后要调用的方法
System.out.println("Three -- 开始");
// 消息不做处理,直接返回
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
// 消费者提交偏移量的时候,经过该方法
System.out.println("Three -- 结束");
}
@Override
public void close() {
// 用于关闭该拦截器用到的资源,如打开的文件,连接的数据库等
}
@Override
public void configure(Map<String, ?> configs) {
// 用于获取消费者的设置参数
configs.forEach((k, v) -> {
System.out.println(k + "\t" + v);
});
}
}
消费者:
package com.rubin.kafka.custominterceptor.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
public class MyConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-host:9092");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "mygrp");
// props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "myclient");
// 如果在kafka中找不到当前消费者的偏移量,则设置为最旧的
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 配置拦截器
// One -> Two -> Three,接收消息和发送偏移量确认都是这个顺序
props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
"com.rubin.kafka.custominterceptor.consumer.OneInterceptor" +
",com.rubin.kafka.custominterceptor.consumer.TwoInterceptor" +
",com.rubin.kafka.custominterceptor.consumer.ThreeInterceptor"
);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
// 订阅主题
consumer.subscribe(Collections.singleton("tp_inter_01"));
while (true) {
final ConsumerRecords<String, String> records = consumer.poll(3_000);
records.forEach(record -> {
System.out.println(record.topic()
+ "\t" + record.partition()
+ "\t" + record.offset()
+ "\t" + record.key()
+ "\t" + record.value());
});
// consumer.commitAsync();
// consumer.commitSync();
}
// consumer.close();
}
}
消费者参数配置补充
配置项 | 说明 |
bootstrap.servers | 建立到Kafka集群的初始连接用到的host/port列表 客户端会使用这里指定的所有的host/port来建立初始连接 这个配置仅会影响发现集群所有节点的初始连接 形式:host1:port1,host2:port2… 这个配置中不需要包含集群中所有的节点信息 最好不要配置一个,以免配置的这个节点宕机的时候连不上 |
group.id | 用于定义当前消费者所属的消费组的唯一字符串 如果使用了消费组的功能 subscribe(topic) 或使用了基于Kafka的偏移量管理机制,则应该配置group.id |
auto.commit.interval.ms | 如果设置了enable.auto.commit 的值为true ,则该值定义了消费者偏移量向Kafka提交的频率 |
auto.offset.reset | 如果Kafka中没有初始偏移量或当前偏移量在服务器中不存在比如数据被删掉了): earliest:自动重置偏移量到最早的偏移量 latest:自动重置偏移量到最后一个 none:如果没有找到该消费组以前的偏移量没有找到,就抛异常 其他值:向消费者抛异常 |
fetch.min.bytes | 服务器对每个拉取消息的请求返回的数据量最小值 如果数据量达不到这个值,请求等待,以让更多的数据累积,达到这个值之后响应请求 默认设置是1个字节,表示只要有一个字节的数据,就立即响应请求,或者在没有数据的时候请求超时 将该值设置为大一点儿的数字,会让服务器等待稍微长一点儿的时间以累积数据 如此则可以提高服务器的吞吐量,代价是额外的延迟时间 |
fetch.max.wait.ms | 如果服务器端的数据量达不到fetch.min.bytes 的话,服务器端不能立即响应请求该时间用于配置服务器端阻塞请求的最大时长 |
fetch.max.bytes | 服务器给单个拉取请求返回的最大数据量 消费者批量拉取消息,如果第一个非空消息批次的值比该值大,消息批也会返回,以让消费者可以接着进行 即该配置并不是绝对的最大值 Broker可以接收的消息批最大值通过 message.max.bytes (broker配置)或 max.message.bytes (主题配置)来指定需要注意的是,消费者一般会并发拉取请求 |
enable.auto.commit | 如果设置为true,则消费者的偏移量会周期性地在后台提交 |
connections.max.idle.ms | 在这个时间之后关闭空闲的连接 |
check.crcs | 自动计算被消费的消息的CRC32校验值 可以确保在传输过程中或磁盘存储过程中消息没有被破坏 它会增加额外的负载,在追求极致性能的场合禁用 |
exclude.internal.topics | 是否内部主题应该暴露给消费者。如果该条目设置为true,则只能先订阅再拉取 |
isolation.level | 控制如何读取事务消息 如果设置了 read_committed ,消费者的poll() 方法只会返回已经提交的事务消息如果设置了 read_uncommitted (默认值),消费者的poll 方法返回所有的消息,即使是已经取消的事务消息非事务消息以上两种情况都返回 消息总是以偏移量的顺序返回 read_committed 只能返回到达LSO的消息在LSO之后出现的消息只能等待相关的事务提交之后才能看到 结果, read_committed 模式,如果有未提交的事务,消费者不能读取到指到HW的消息read_committed 的seekToEnd 方法返回LSO |
heartbeat.interval.ms | 当使用消费组的时候,该条目指定消费者向消费者协调器发送心跳的时间间隔 心跳是为了确保消费者会话的活跃状态,同时在消费者加入或离开消费组的时候方便进行再平衡 该条目的值必须小于 session.timeout.ms ,也不应该高于session.timeout.ms 的1/3可以将其调整得更小,以控制正常重新平衡的预期时间 |
session.timeout.ms | 当使用Kafka的消费组的时候,消费者周期性地向Broker发送心跳数表明自己的存在 如果经过该超时时间还没有收到消费者的心跳,则Broker将消费者从消费组移除,并启动再平衡 该值必须在Broker配置 group.min.session.timeout.ms 和group.max.session.timeout.ms 之间 |
max.poll.records | 一次调用poll() 方法返回的记录最大数量 |
max.poll.interval.ms | 使用消费组的时候调用poll() 方法的时间间隔该条目指定了消费者调用 poll() 方法的最大时间间隔如果在此时间内消费者没有调用 poll() 方法,则Broker认为消费者失败,触发再平衡,将分区分配给消费组中其他消费者 |
max.partition.fetch.bytes | 对每个分区,服务器返回的最大数量。消费者按批次拉取数据 如果非空分区的第一个记录大于这个值,批处理依然可以返回,以保证消费者可以进行下去 Broker接收批的大小由 message.max.bytes (Broker参数)或max.message.bytes (主题参数)指定fetch.max.bytes 用于限制消费者单次请求的数据量 |
send.buffer.bytes | 用于TCP发送数据时使用的缓冲大小(SO_SNDBUF ),-1表示使用OS默认的缓冲区大小 |
retry.backoff.ms | 在发生失败的时候如果需要重试,则该配置表示客户端等待多长时间再发起重试 该时间的存在避免了密集循环 |
request.timeout.ms | 客户端等待服务端响应的最大时间。如果该时间超时,则客户端要么重新发起请求,要么如果重试耗尽,请求失败 |
reconnect.backoff.ms | 重新连接主机的等待时间。避免了重连的密集循环 该等待时间应用于该客户端到Broker的所有连接 |
reconnect.backoff.max.ms | 重新连接到反复连接失败的Broker时要等待的最长时间(以毫秒为单位) 如果提供此选项,则对于每个连续的连接失败,每台主机的退避将成倍增加,直至达到此最大值 在计算退避增量之后,添加20%的随机抖动以避免连接风暴 |
receive.buffer.bytes | TCP连接接收数据的缓存(SO_RCVBUF )-1表示使用操作系统的默认值 |
partition.assignment.strategy | 当使用消费组的时候,分区分配策略的类名 |
metrics.sample.window.ms | 计算指标样本的时间窗口 |
metrics.recording.level | 指标的最高记录级别 |
metrics.num.samples | 用于计算指标而维护的样本数量 |
interceptor.classes | 拦截器类的列表 默认没有拦截器 拦截器是消费者的拦截器,该拦截器需要实现 org.apache.kafka.clients.consumer.ConsumerInterceptor 接口拦截器可用于对消费者接收到的消息进行拦截处理 |
消费组管理
谁来执行再均衡和消费组管理
Kafka提供了一个角色:Group Coordinator来执行对于消费组的管理。
Group Coordinator——每个消费组分配一个消费组协调器用于组管理和位移管理。当消费组的第一个消费者启动的时候,它会去和Kafka Broker确定谁是它们组的组协调器。之后该消费组内所有消费者和该组协调器协调通信。
如何确定coordinator
两步:
- 确定消费组位移信息写入 __consumers_offsets的哪个分区。具体计算公式:
__consumers_offsets partition# =
Math.abs(groupId.hashCode() %
groupMetadataTopicPartitionCount
)(注意:groupMetadataTopicPartitionCount
由offsets.topic.num.partitions
指定,默认是50个分区) - 该分区Leader所在的Broker就是组协调器
Rebalance Generation
它表示Rebalance之后主题分区到消费组中消费者映射关系的一个版本,主要是用于保护消费组,隔离无效偏移量提交的。如上一个版本的消费者无法提交位移到新版本的消费组中,因为映射关系变了,你消费的或许已经不是原来的那个分区了。每次group进行Rebalance之后,Generation号都会加1,表示消费组和分区的映射关系到了一个新版本,如下图所示: Generation 1时group有3个成员,随后成员2退出组,消费组协调器触发Rebalance,消费组进入Generation 2,之后成员4加入,再次触发Rebalance,消费组进入Generation 3。
协议(protocol)
Kafka提供了5个协议来处理与消费组协调相关的问题:
- Heartbeat请求:Consumer需要定期给组协调器发送心跳来表明自己还活着
- LeaveGroup请求:主动告诉组协调器我要离开消费组
- SyncGroup请求:消费组Leader把分配方案告诉组内所有成员
- JoinGroup请求:成员请求加入组
- DescribeGroup请求:显示组的所有信息,包括成员信息,协议名称,分配方案,订阅信息等。通常该请求是给管理员使用
组协调器在再均衡的时候主要用到了前面4种请求。
再均衡过程
再均衡分为2步:Join和Sync
- Join:加入组。所有成员都向消费组协调器发送JoinGroup请求,请求加入消费组。一旦所有成员都发送了JoinGroup请求,协调i器从中选择一个消费者担任Leader的角色,并把组成员信息以及订阅信息发给Leader
- Sync:Leader开始分配消费方案,即哪个消费者负责消费哪些主题的哪些分区。一旦完成分配,Leader会将这个方案封装进SyncGroup请求中发给消费组协调器,非Leader也会发SyncGroup请求,只是内容为空。消费组协调器接收到分配方案之后会把方案塞进SyncGroup的response中发给各个消费者
注意:在协调器收集到所有成员请求前,它会把已收到请求放入一个叫purgatory(炼狱)的地方。然后是分发分配方案的过程,即SyncGroup请求。消费组的分区分配方案在客户端执行。Kafka交给客户端可以有更好的灵活性。Kafka默认提供三种分配策略:range
和round-robin
和sticky
。可以通过消费者的参数:partition.assignment.strategy
来实现自己分配策略。
消费组状态机
消费组组协调器根据状态机对消费组做不同的处理:
说明:
- Dead:组内已经没有任何成员的最终状态,组的元数据也已经被组协调器移除了。这种状态响应各种请求都是一个response: UNKNOWN_MEMBER_ID
- Empty:组内无成员,但是位移信息还没有过期。这种状态只能响应JoinGroup请求
- PreparingRebalance:组准备开启新的rebalance,等待成员加入
- AwaitingSync:正在等待leader consumer将分配方案传给各个成员
- Stable:再均衡完成,可以开始消费
以上就是本文的全部内容。欢迎小伙伴们积极留言交流~~~
文章评论