消息发送
数据生产流程解析
- Producer创建时,会创建一个Sender线程并设置为守护线程
- 生产消息时,内部其实是异步流程。生产的消息先经过拦截器->序列化器->分区器,然后将消息缓存在缓冲区(该缓冲区也是在Producer创建时创建)
- 批次发送的条件为:缓冲区数据大小达到
batch.size
或者linger.ms
达到上限,哪个先达到就算哪个 - 批次发送后,发往指定分区,然后落盘到Broker。如果生产者配置了
retrires
参数大于0并且失败原因允许重试,那么客户端内部会对该消息进行重试 - 落盘到Broker成功,返回生产元数据给生产者
- 元数据返回有两种方式:一种是通过阻塞直接返回,另一种是通过回调返回
必要参数配置
broker配置
参数配置详情如下:
属性 | 说明 | 重要性 |
bootstrap.servers | 生产者客户端与Broker集群建立初始连接需要的Broker地址列表,由该初始连接发现Kafka集群中其他的所有Broker。该地址列表不需要写全部的Kafka集群中Broker的地址,但也不要写一个,以防该节点宕机的时候不可用。形式为:host1:port1,host2:port2,… | high |
key.serializer | 实现了接口org.apache.kafka.common.serialization.Serializer 的key序列化类 | high |
value.serializer | 实现了接口org.apache.kafka.common.serialization.Serializer 的value序列化类 | high |
acks | 该选项控制着已发送消息的持久性 acks=0 :生产者不等待broker的任何消息确认。只要将消息放到了socket的缓冲区,就认为消息已发送。不能保证服务器是否收到该消息, retries设置也不起作用,因为客户端不关心消息是否发送失败。客户端收到的消息偏移量永远是-1 acks=1 :leader将记录写到它本地日志,就响应客户端确认消息,而不等待follower副本的确认。如果leader确认了消息就宕机,则可能会丢失消息,因为follower副本可能还没来得及同步该消息 acks=all :leader等待所有同步的副本确认该消息。保证了只要有一个同步副本存在,消息就不会丢失。这是最强的可用性保证。等价于 acks=-1 。默认值为1,字符串。可选值:[all, -1, 0, 1] | high |
compression.type | 生产者生成数据的压缩格式。默认是none (没有压缩)。允许的值:none ,gzip ,snappy 和lz4 。压缩是对整个消息批次来讲的。消息批的效率也影响压缩的比例。消息批越大,压缩效率越好。字符串类型的值。默认是none | high |
retries | 设置该属性为一个大于1的值,将在消息发送失败的时候重新发送消息。该重试与客户端收到异常重新发送并无二至。允许重试但是不设置max.in.flight.requests.per.connection 为1,存在消息乱序的可能,因为如果两个批次发送到同一个分区,第一个失败了重试,第二个成功了,则第一个消息批在第二个消息批后。int类型的值,默认:0,可选值:[0,…,2147483647] | high |
序列化器
由于Kafka中的数据都是字节数组,在将消息发送到Kafka之前需要先将数据序列化为字节数组。
序列化器的作用就是用于序列化要发送的消息的。
Kafka使用org.apache.kafka.common.serialization.Serializer
接口用于定义序列化器,将泛型指定类型的数据转换为字节数组。
package org.apache.kafka.common.serialization;
import java.io.Closeable;
import java.util.Map;
import org.apache.kafka.common.header.Headers;
public interface Serializer<T> extends Closeable {
default void configure(Map<String, ?> configs, boolean isKey) {
}
byte[] serialize(String var1, T var2);
default byte[] serialize(String topic, Headers headers, T data) {
return this.serialize(topic, data);
}
default void close() {
}
}
系统提供了该接口的子接口以及实现类:
org.apache.kafka.common.serialization.ByteArraySerializer
package org.apache.kafka.common.serialization;
public class ByteArraySerializer implements Serializer<byte[]> {
public ByteArraySerializer() {
}
public byte[] serialize(String topic, byte[] data) {
return data;
}
}
org.apache.kafka.common.serialization.ByteBufferSerializer
package org.apache.kafka.common.serialization;
import java.nio.ByteBuffer;
public class ByteBufferSerializer implements Serializer<ByteBuffer> {
public ByteBufferSerializer() {
}
public byte[] serialize(String topic, ByteBuffer data) {
if (data == null) {
return null;
} else {
data.rewind();
byte[] arr;
if (data.hasArray()) {
arr = data.array();
if (data.arrayOffset() == 0 && arr.length == data.remaining()) {
return arr;
}
}
arr = new byte[data.remaining()];
data.get(arr, 0, arr.length);
data.rewind();
return arr;
}
}
}
org.apache.kafka.common.serialization.BytesSerializer
package org.apache.kafka.common.serialization;
import org.apache.kafka.common.utils.Bytes;
public class BytesSerializer implements Serializer<Bytes> {
public BytesSerializer() {
}
public byte[] serialize(String topic, Bytes data) {
return data == null ? null : data.get();
}
}
org.apache.kafka.common.serialization.DoubleSerializer
package org.apache.kafka.common.serialization;
public class DoubleSerializer implements Serializer<Double> {
public DoubleSerializer() {
}
public byte[] serialize(String topic, Double data) {
if (data == null) {
return null;
} else {
long bits = Double.doubleToLongBits(data);
return new byte[]{(byte)((int)(bits >>> 56)), (byte)((int)(bits >>> 48)), (byte)((int)(bits >>> 40)), (byte)((int)(bits >>> 32)), (byte)((int)(bits >>> 24)), (byte)((int)(bits >>> 16)), (byte)((int)(bits >>> 8)), (byte)((int)bits)};
}
}
}
org.apache.kafka.common.serialization.FloatSerializer
package org.apache.kafka.common.serialization;
public class FloatSerializer implements Serializer<Float> {
public FloatSerializer() {
}
public byte[] serialize(String topic, Float data) {
if (data == null) {
return null;
} else {
long bits = (long)Float.floatToRawIntBits(data);
return new byte[]{(byte)((int)(bits >>> 24)), (byte)((int)(bits >>> 16)), (byte)((int)(bits >>> 8)), (byte)((int)bits)};
}
}
}
org.apache.kafka.common.serialization.IntegerSerializer
package org.apache.kafka.common.serialization;
public class IntegerSerializer implements Serializer<Integer> {
public IntegerSerializer() {
}
public byte[] serialize(String topic, Integer data) {
return data == null ? null : new byte[]{(byte)(data >>> 24), (byte)(data >>> 16), (byte)(data >>> 8), data.byteValue()};
}
}
org.apache.kafka.common.serialization.StringSerializer
package org.apache.kafka.common.serialization;
import java.io.UnsupportedEncodingException;
import java.util.Map;
import org.apache.kafka.common.errors.SerializationException;
public class StringSerializer implements Serializer<String> {
private String encoding = "UTF8";
public StringSerializer() {
}
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null) {
encodingValue = configs.get("serializer.encoding");
}
if (encodingValue instanceof String) {
this.encoding = (String)encodingValue;
}
}
public byte[] serialize(String topic, String data) {
try {
return data == null ? null : data.getBytes(this.encoding);
} catch (UnsupportedEncodingException var4) {
throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + this.encoding);
}
}
}
org.apache.kafka.common.serialization.LongSerializer
package org.apache.kafka.common.serialization;
public class LongSerializer implements Serializer<Long> {
public LongSerializer() {
}
public byte[] serialize(String topic, Long data) {
return data == null ? null : new byte[]{(byte)((int)(data >>> 56)), (byte)((int)(data >>> 48)), (byte)((int)(data >>> 40)), (byte)((int)(data >>> 32)), (byte)((int)(data >>> 24)), (byte)((int)(data >>> 16)), (byte)((int)(data >>> 8)), data.byteValue()};
}
}
org.apache.kafka.common.serialization.ShortSerializer
package org.apache.kafka.common.serialization;
public class ShortSerializer implements Serializer<Short> {
public ShortSerializer() {
}
public byte[] serialize(String topic, Short data) {
return data == null ? null : new byte[]{(byte)(data >>> 8), data.byteValue()};
}
}
自定义序列化器
数据的序列化一般生产中使用avro。
自定义序列化器需要实现org.apache.kafka.common.serialization.Serializer接口,并实现其中的serialize
方法。
案例:
实体类:
package com.rubin.kafka.customserializer;
import lombok.Data;
@Data
public class User {
private Integer userId;
private String username;
}
序列化类:
package com.rubin.kafka.customserializer;
import org.apache.commons.lang3.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import java.nio.ByteBuffer;
import java.util.Map;
public class UserSerializer implements Serializer<User> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// do nothing
// 用于接收对序列化器的配置参数,并对当前序列化器进行配置和初始化的
}
@Override
public byte[] serialize(String topic, User data) {
try {
if (data == null) {
return null;
} else {
final Integer userId = data.getUserId();
final String username = data.getUsername();
if (userId != null) {
if (username != null) {
final byte[] bytes = username.getBytes("UTF-8");
int length = bytes.length;
// 第一个4个字节用于存储userId的值
// 第二个4个字节用于存储username字节数组的长度int值
// 第三个长度,用于存放username序列化之后的字节数组
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);
// 设置userId
buffer.putInt(userId);
// 设置username字节数组长度
buffer.putInt(length);
// 设置username字节数组
buffer.put(bytes);
// 以字节数组形式返回user对象的值
return buffer.array();
}
}
}
} catch (Exception e) {
throw new SerializationException("数据序列化失败");
}
return null;
}
@Override
public void close() {
// do nothing
// 用于关闭资源等操作。需要幂等,即多次调用,效果是一样的。
}
}
生产者:
package com.rubin.kafka.customserializer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.HashMap;
import java.util.Map;
public class MyProducer {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-host:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 设置自定义的序列化器
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class);
KafkaProducer<String, User> producer = new KafkaProducer<String, User>(configs);
User user = new User();
// user.setUserId(1001);
// user.setUsername("张三");
// user.setUsername("李四");
// user.setUsername("王五");
user.setUserId(400);
user.setUsername("赵四");
ProducerRecord<String, User> record = new ProducerRecord<String, User>(
"tp_user_01", // topic
user.getUsername(), // key
user // value
);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.out.println("消息发送异常");
} else {
System.out.println("主题:" + metadata.topic() + "\t"
+ "分区:" + metadata.partition() + "\t"
+ "生产者偏移量:" + metadata.offset());
}
}
});
// 关闭生产者
producer.close();
}
}
分区器
默认(DefaultPartitioner
)分区计算:
- 如果record提供了分区号,则使用record提供的分区号
- 如果record没有提供分区号,则使用key的序列化后的值的hash值对分区数量取模
- 如果record没有提供分区号,也没有提供key,则使用轮询的方式分配分区号:会首先在可用的分区中分配分区号;如果没有可用的分区,则在该主题所有分区中分配分区号
如果要自定义分区器,则需要:
- 首先开发
Partitioner
接口的实现类 - 在
KafkaProducer
中进行设置:configs.put("partitioner.class", "xxx.xx.Xxx.class")
位于org.apache.kafka.clients.producer
中的分区器接口:
package org.apache.kafka.clients.producer;
import java.io.Closeable;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Configurable;
public interface Partitioner extends Configurable, Closeable {
/**
* 为指定的消息记录计算分区值
*
* @param topic 主题名称
* @param key 根据该key的值进行分区计算,如果没有则为null。
* @param keyBytes key的序列化字节数组,根据该数组进行分区计算。如果没有key,则为
null
* @param value 根据value值进行分区计算,如果没有,则为null
* @param valueBytes value的序列化字节数组,根据此值进行分区计算。如果没有,则为
null
* @param cluster 当前集群的元数据
*/
int partition(String var1, Object var2, byte[] var3, Object var4, byte[] var5, Cluster var6);
void close();
}
包org.apache.kafka.clients.producer.internals
中分区器的默认实现:
package org.apache.kafka.clients.producer.internals;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
/**
* 默认的分区策略:
*
* 如果在记录中指定了分区,则使用指定的分区
* 如果没有指定分区,但是有key的值,则使用key值的散列值计算分区
* 如果没有指定分区也没有key的值,则使用轮询的方式选择一个分区
*/
public class DefaultPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();
public DefaultPartitioner() {
}
public void configure(Map<String, ?> configs) {
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = this.nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return ((PartitionInfo)availablePartitions.get(part)).partition();
} else {
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
public void close() {
}
}
自定义分区器:
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* 自定义分区器
*/
public class MyPartitioner implements Partitioner {
/**
* 指定的消息记录计算分区值
*
* @param topic 主题名称
* @param key 根据该key的值进行分区计算,如果没有则为null
* @param keyBytes key的序列化字节数组,根据该数组进行分区计算。如果没有key,则为null
* @param value 根据value值进行分区计算,如果没有,则为null
* @param valueBytes value的序列化字节数组,根据此值进行分区计算。如果没有,则为null
* @param cluster 当前集群的元数据
* @return
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 此处可以计算分区的数字。
// 我们直接指定为0
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
创建生产者使用自定义分区器:
package com.rubin.kafka.custompartitioner;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.HashMap;
import java.util.Map;
public class MyProducer {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-host:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 指定自定义的分区器
configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);
// 此处不要设置partition的值
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
"tp_part_02",
"mykey",
"myvalue"
);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.out.println("消息发送失败");
} else {
System.out.println(metadata.topic());
System.out.println(metadata.partition());
System.out.println(metadata.offset());
}
}
});
// 关闭生产者
producer.close();
}
}
拦截器
Producer拦截器(interceptor)和Consumer端Interceptor是在Kafka 0.10版本被引入的,主要用于实现Client端的定制化控制逻辑。
对于Producer而言,Interceptor使得用户在消息发送前以及Producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,Producer允许用户指定多个Interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor
,其定义的方法包括:
onSend(ProducerRecord)
:该方法封装进KafkaProducer.send方法中,即运行在用户主线程中。Producer确保在消息被序列化以计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算onAcknowledgement(RecordMetadata, Exception)
:该方法会在消息被应答之前或消息发送失败时调用,并且通常都是在Producer回调逻辑触发之前。onAcknowledgement运行在Producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢Producer的消息发送效率close
:关闭Interceptor,主要用于执行一些资源清理工作
自定义拦截器:
- 实现
ProducerInterceptor
接口 - 在KafkaProducer的设置中设置自定义的拦截器
案例:
自定义拦截器:
package com.rubin.kafka.custominterceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class InterceptorOne implements ProducerInterceptor<Integer, String> {
private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorOne.class);
@Override
public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
System.out.println("拦截器1 -- go");
// 消息发送的时候,经过拦截器,调用该方法
// 要发送的消息内容
final String topic = record.topic();
final Integer partition = record.partition();
final Integer key = record.key();
final String value = record.value();
final Long timestamp = record.timestamp();
final Headers headers = record.headers();
// 拦截器拦下来之后根据原来消息创建的新的消息
// 此处对原消息没有做任何改动
ProducerRecord<Integer, String> newRecord = new ProducerRecord<Integer, String>(
topic,
partition,
timestamp,
key,
value,
headers
);
// 传递新的消息
return newRecord;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
System.out.println("拦截器1 -- back");
// 消息确认或异常的时候,调用该方法,该方法中不应实现较重的任务
// 会影响kafka生产者的性能。
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
final Object classContent = configs.get("classContent");
System.out.println(classContent);
}
}
package com.rubin.kafka.custominterceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import java.util.Map;
public class InterceptorTwo implements ProducerInterceptor<Integer, String> {
@Override
public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
System.out.println("拦截器2 -- go");
// 消息发送的时候,经过拦截器,调用该方法
// 要发送的消息内容
final String topic = record.topic();
final Integer partition = record.partition();
final Integer key = record.key();
final String value = record.value();
final Long timestamp = record.timestamp();
final Headers headers = record.headers();
// 拦截器拦下来之后根据原来消息创建的新的消息
// 此处对原消息没有做任何改动
ProducerRecord<Integer, String> newRecord = new ProducerRecord<Integer, String>(
topic,
partition,
timestamp,
key,
value,
headers
);
// 传递新的消息
return newRecord;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
System.out.println("拦截器2 -- back");
// 消息确认或异常的时候,调用该方法,该方法中不应实现较重的任务
// 会影响kafka生产者的性能。
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
final Object classContent = configs.get("classContent");
System.out.println(classContent);
}
}
package com.rubin.kafka.custominterceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import java.util.Map;
public class InterceptorThree implements ProducerInterceptor<Integer, String> {
@Override
public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
System.out.println("拦截器3 -- go");
// 消息发送的时候,经过拦截器,调用该方法
// 要发送的消息内容
final String topic = record.topic();
final Integer partition = record.partition();
final Integer key = record.key();
final String value = record.value();
final Long timestamp = record.timestamp();
final Headers headers = record.headers();
// 拦截器拦下来之后根据原来消息创建的新的消息
// 此处对原消息没有做任何改动
ProducerRecord<Integer, String> newRecord = new ProducerRecord<Integer, String>(
topic,
partition,
timestamp,
key,
value,
headers
);
// 传递新的消息
return newRecord;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
System.out.println("拦截器3 -- back");
// 消息确认或异常的时候,调用该方法,该方法中不应实现较重的任务
// 会影响kafka生产者的性能。
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
final Object classContent = configs.get("classContent");
System.out.println(classContent);
}
}
创建生产者使用自定义过滤器:
package com.rubin.kafka.custominterceptor;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.HashMap;
import java.util.Map;
public class MyProducer {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-host:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 保证等待确认的消息只有设置的这几个。如果设置为1,则只有一个请求在等待响应
// 此时可以保证发送消息即使在重试的情况下也是有序的。
configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
// configs.put("max.in.flight.requests.per.connection", 1);
// interceptor.classes
// 如果有多个拦截器,则设置为多个拦截器类的全限定类名,中间用逗号隔开
configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.rubin.kafka.custominterceptor.InterceptorOne," +
"com.rubin.kafka.custominterceptor.InterceptorTwo," +
"com.rubin.kafka.custominterceptor.InterceptorThree");
configs.put("classContent", "this is rubin's kafka demo");
KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);
ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
"tp_inter_01",
0,
1001,
"this is rubin's 1001 message"
);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println(metadata.offset());
}
}
});
// 关闭生产者
producer.close();
}
}
原理剖析
由上图可以看出:KafkaProducer有两个基本线程:
- 主线程:负责消息创建,拦截器,序列化器,分区器等操作,并将消息追加到消息收集器
RecoderAccumulator
中。消息收集器RecoderAccumulator
为每个分区都维护了一个Deque
类型的双端队列。ProducerBatch
可以理解为是ProducerRecord
的集合,批量发送有利于提升吞吐量,降低网络影响。由于生产者客户端使用java.io.ByteBuffer
在发送消息之前进行消息保存,并维护了一个BufferPool
实现ByteBuffer
的复用,该缓存池只针对特定大小(batch.size
指定)的ByteBuffer
进行管理,对于消息过大的缓存,不能做到重复利用。每次追加一条ProducerRecord
消息,会寻找/新建对应的双端队列,从其尾部获取一个ProducerBatch
,判断当前消息的大小是否可以写入该批次中。若可以写入则写入;若不可以写入,则新建一个ProducerBatch
,判断该消息大小是否超过客户端参数配置batch.size
的值,不超过,则以batch.size
建立新的ProducerBatch
,这样方便进行缓存重复利用;若超过,则以计算的消息大小建立对应的ProducerBatch
。缺点就是该内存不能被复用了 - Sender线程:该线程从消息收集器获取缓存的消息,将其处理为
<Node, List<ProducerBatch>>
的形式,Node
表示集群的Broker节点。进一步将<Node, List<ProducerBatch>>
转化为<Node, Request>
形式,此时才可以向服务端发送数据。在发送之前,Sender线程将消息以Map<NodeId, Deque<Request>>
的形式保存到InFlightRequests
中进行缓存,可以通过其获取leastLoadedNode
,即当前Node
中负载压力最小的一个,以实现消息的尽快发出
生产者参数配置补充
参数设置方式:
补充参数:
参数名称 | 描述 |
retry.backoff.ms | 在向一个指定的主题分区重发消息的时候,重试之间的等待时间。比如3次重试,每次重试之后等待该时间长度,再接着重试。在一些失败的场景,避免了密集循环的重新发送请求。long型值,默认100。可选值:[0,…] |
retries | retries重试次数 当消息发送出现错误的时候,系统会重发消息。 跟客户端收到错误时重发一样。 如果设置了重试,还想保证消息的有序性,需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1 否则在重试此失败消息的时候,其他的消息可能发送成功了 |
request.timeout.ms | 客户端等待请求响应的最大时长。如果服务端响应超时,则会重发请求,除非 达到重试次数。该设置应该比 replica.lag.time.max.ms (a broker configuration)要大,以免在服务器延迟时间内重发消息。int类型值,默认:30000,可选值:[0,…] |
interceptor.classes | 在生产者接收到该消息,向Kafka集群传输之前,由序列化器处理之前,可以 通过拦截器对消息进行处理。 要求拦截器类必须实现 org.apache.kafka.clients.producer.ProducerInterceptor 接口。默认没有拦截器。 Map configs中通过List集合配置多个拦截器类名 |
acks | 默认值:all。 acks=0: 生产者不等待broker对消息的确认,只要将消息放到缓冲区,就认为消息已经发送完成。该情形不能保证broker是否真的收到了消息,retries配置也不会生效。发送的消息的返回的消息偏移量永远是-1 acks=1: 表示消息只需要写到主分区即可,然后就响应客户端,而不等待副本分区的确认。在该情形下,如果主分区收到消息确认之后就宕机了,而副本分区还没来得及同步该消息,则该消息丢失 acks=all: 首领分区会等待所有的ISR副本分区确认记录 该处理保证了只要有一个ISR副本分区存活,消息就不会丢失 这是Kafka最强的可靠性保证,等效于 acks=-1 |
batch.size | 当多个消息发送到同一个分区的时候,生产者尝试将多个记录作为一个批来处 理。批处理提高了客户端和服务器的处理效率。该配置项以字节为单位控制默认批的大小 所有的批小于等于该值 发送给Broker的请求将包含多个批次,每个分区一个,并包含可发送的数据 如果该值设置的比较小,会限制吞吐量(设置为0会完全禁用批处理)。如果 设置的很大,又有一点浪费内存,因为Kafka会永远分配这么大的内存来参与 到消息的批整合中 |
client.id | 生产者发送请求的时候传递给broker的id字符串。用于在broker的请求日志中追踪什么应用发送了什么消息。一般该id是跟业务有关的字符串 |
compression.type | 生产者发送的所有数据的压缩方式。默认是none ,也就是不压缩。支持的值:none 、gzip 、snappy 和lz4 。压缩是对于整个批来讲的,所以批处理的效率也会影响到压缩的比例 |
send.buffer.bytes | TCP发送数据的时候使用的缓冲区(SO_SNDBUF )大小。如果设置为0,则使用操作系统默认的 |
buffer.memory | 生产者可以用来缓存等待发送到服务器的记录的总内存字节。如果记录的发送 速度超过了将记录发送到服务器的速度,则生产者将阻塞 max.block.ms 的时 间,此后它将引发异常。此设置应大致对应于生产者将使用的总内存,但并非 生产者使用的所有内存都用于缓冲。一些额外的内存将用于压缩(如果启用了 压缩)以及维护运行中的请求。long型数据。默认值:33554432,可选值: [0,…] |
connections.max.idle.ms | 当连接空闲时间达到这个值,就关闭连接。long型数据,默认:540000 |
linger.ms | 生产者在发送请求传输间隔会对需要发送的消息进行累积,然后作为一个批次 发送。一般情况是消息的发送的速度比消息累积的速度慢。有时客户端需要减 少请求的次数,即使是在发送负载不大的情况下。该配置设置了一个延迟,生 产者不会立即将消息发送到Broker,而是等待这么一段时间以累积消息,然 后将这段时间之内的消息作为一个批次发送。该设置是批处理的另一个上限: 一旦批消息达到了 batch.size 指定的值,消息批会立即发送,如果积累的消息字节数达不到 batch.size 的值,可以设置该毫秒值,等待这么长时间之后,也会发送消息批。该属性默认值是0(没有延迟)。如果设置 linger.ms=5 ,则在一个请求发送之前先等待5ms。long型值,默认:0,可 选值:[0,…] |
max.block.ms | 控制KafkaProducer.send() 和KafkaProducer.partitionsFor() 阻塞的时长。当缓存满了或元数据不可用的时候,这些方法阻塞。在用户提供的序列化器和分区器的阻塞时间不计入。long型值,默认:60000,可选值:[0,…] |
max.request.size | 单个请求的最大字节数。该设置会限制单个请求中消息批的消息个数,以免单 个请求发送太多的数据。服务器有自己的限制批大小的设置,与该配置可能不 一样。int类型值,默认1048576,可选值:[0,…] |
partitioner.class | 实现了接口org.apache.kafka.clients.producer.Partitioner 的分区器实现类。默认值为:org.apache.kafka.clients.producer.internals.DefaultPartitioner |
receive.buffer.bytes | TCP接收缓存(SO_RCVBUF ),如果设置为-1,则使用操作系统默认的值。int类型值,默认32768,可选值:[-1,…] |
security.protocol | 跟Broker通信的协议:PLAINTEXT , SSL , SASL_PLAINTEXT , SASL_SS 。String 类型值,默认:PLAINTEXT |
max.in.flight.requests.per.connection | 单个连接上未确认请求的最大数量。达到这个数量,客户端阻塞。如果该值大 于1,且存在失败的请求,在重试的时候消息顺序不能保证。 int类型值,默认5。可选值:[1,…] |
reconnect.backoff.max.ms | 对于每个连续的连接失败,每台主机的退避将成倍增加,直至达到此最大值。 在计算退避增量之后,添加20%的随机抖动以避免连接风暴。 long型值,默认1000,可选值:[0,…] |
reconnect.backoff.ms | 尝试重连指定主机的基础等待时间。避免了到该主机的密集重连。该退避时间 应用于该客户端到Broker的所有连接。 long型值,默认50。可选值:[0,…] |
以上就是本文的全部内容。欢迎各位小伙伴积极留言交流~~~
文章评论