日志存储概述
Kafka 消息是以主题为单位进行归类,各个主题之间是彼此独立的,互不影响。
每个主题又可以分为一个或多个分区。
每个分区各自存在一个记录消息数据的日志文件。
图中,创建了一个tp_demo_01
主题,其存在3个Parition
,对应的每个Parition
下存在一个[Topic-Parition]
命名的消息日志文件。在理想情况下,数据流量分摊到各个Parition
中,实现了负载均衡的效果。在分区日志文件中,你会发现很多类型的文件,比如:.index
、.timestamp
、.log
、.snapshot
等。
其中,文件名一致的文件集合就称为LogSement
。
LogSegment
- 分区日志文件中包含很多的
LogSegment
- Kafka 日志追加是顺序写入的
LogSegment
可以减小日志文件的大小- 进行日志删除的时候和数据查找的时候可以快速定位
ActiveLogSegment
是活跃的日志分段,拥有文件拥有写入权限,其余的LogSegment
只有只读的权限
日志文件存在多种后缀文件,重点需要关注.index
、.timestamp
、.log
三种类型。
类别作用
后缀名 | 说明 |
.index | 偏移量索引文件 |
.timestamp | 时间戳索引文件 |
.log | 日志文件 |
.snapshot | 快照文件 |
.deleted | |
.cleaned | 日志清理时临时文件 |
.swap | 日志压缩之后的临时文件 |
leader-epoch-checkpoint |
每个LogSegment
都有一个基准偏移量,表示当前LogSegment
中第一条消息的offset
。
偏移量是一个 64 位的长整形数,固定是20位数字,长度未达到,用 0 进行填补,索引文件和日志文件都由该作为文件名命名规则(00000000000000000000.index、00000000000000000000.timestamp、00000000000000000000.log)。
如果日志文件名为 00000000000000000121.log ,则当前日志文件的一条数据偏移量就是121(偏移量从0开始)。
日志与索引文件
配置条目 | 默认值 | 说明 |
log.index.interval.bytes | 4096(4K) | 增加索引项字节间隔密度,会影响索引文件中的区间密度和查询效率 |
log.segment.bytes | 1073741824(1G) | 日志文件最大值 |
log.roll.ms | 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值允许的最大范围,单位毫秒 | |
log.roll.hours | 168(7天) | 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值允许的最大范围,单位小时 |
log.index.size.max.bytes | 10485760(10MB) | 触发偏移量索引文件或时间戳索引文件分段字节限额 |
配置项默认值说明
偏移量索引文件用于记录消息偏移量与物理地址之间的映射关系。
时间戳索引文件则根据时间戳查找对应的偏移量。
Kafka中的索引文件是以稀疏索引的方式构造消息的索引,并不保证每一个消息在索引文件中都有对应的索引项。
每当写入一定量的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项。
通过修改log.index.interval.bytes
的值,改变索引项的密度。
切分文件
当满足如下几个条件中的其中之一,就会触发文件的切分:
- 当前日志分段文件的大小超过了Broker端参数
log.segment.bytes
配置的值。log.segment.bytes
参数的默认值为 1073741824,即1GB - 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值大于
log.roll.ms
或log.roll.hours
参数配置的值。如果同时配置了log.roll.ms
和log.roll.hours
参数,那么log.roll.ms
的优先级高。默认情况下,只配置了log.roll.hours
参数,其值为168,即7天 - 偏移量索引文件或时间戳索引文件的大小达到Broker端参数
log.index.size.max.bytes
配置的值。log.index.size.max.bytes
的默认值为10485760,即10MB - 追加的消息的偏移量与当前日志分段的偏移量之间的差值大于
Integer.MAX_VALUE
,即要追加的消息的偏移量不能转变为相对偏移量
为什么是Integer.MAX_VALUE
1024 * 1024 * 1024=1073741824
在偏移量索引文件中,每个索引项共占用8个字节,并分为两部分。
相对偏移量和物理地址:
- 相对偏移量:表示消息相对与基准偏移量的偏移量,占 4 个字节
- 物理地址:消息在日志分段文件中对应的物理位置,也占 4 个字节
4个字节刚好对应Integer.MAX_VALUE
,如果大于Integer.MAX_VALUE
,则不能用4个字节进行表示了。
索引文件切分过程
索引文件会根据log.index.size.max.bytes
值进行预先分配空间,即文件创建的时候就是最大值。
当真正的进行索引文件切分的时候,才会将其裁剪到实际数据大小的文件。
这一点是跟日志文件有所区别的地方。其意义降低了代码逻辑的复杂性。
日志存储
索引
偏移量索引文件用于记录消息偏移量与物理地址之间的映射关系。时间戳索引文件则根据时间戳查找对应的偏移量。
查看一个Topic分区目录下的内容,发现有log、index和timeindex三个文件:
- log文件名是以文件中第一条message的offset来命名的,实际offset长度是64位,但是这里只使用了20位,应付生产是足够的
- 一组index+log+timeindex文件的名字是一样的,并且log文件默认写满1G后,会进行log rolling形成一个新的组合来记录消息,这个是通过Broker端
log.segment.bytes =1073741824
指定的 - index和timeindex在刚使用时会分配10M的大小,当进行log rolling后,它会修剪为实际的大小
创建主题:
[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --create --
topic tp_demo_05 --partitions 1 --replication-factor 1 --config
segment.bytes=104857600
创建消息文件:
[root@node1 ~]# for i in `seq 10000000`; do echo "hello rubin $i" >> nmm.txt;
done
将文本消息生产到主题中:
[root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 --topic
tp_demo_05 < nmm.txt
如果想查看这些文件,可以使用kafka提供的shell来完成,几个关键信息如下:
- offset:逐渐增加的整数,每个offset对应一个消息的偏移量
- position:消息批字节数,用于计算物理地址
- CreateTime:时间戳
- magic:2代表这个消息类型是V2,如果是0则代表是V0类型,1代表V1类型
- compresscodec:None说明没有指定压缩类型,kafka目前提供了4种可选择,0-None、1-GZIP、2-snappy、3-lz4
- crc:对所有字段进行校验后的crc值
[root@node1 tp_demo_05-0]# kafka-run-class.sh kafka.tools.DumpLogSegments --
files 00000000000000000000.log --print-data-log | head
Dumping 00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 716 baseSequence: -1 lastSequence: -1 producerId:
-1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false
position: 0 CreateTime: 1596513421661 isvalid: true size: 16380 magic: 2
compresscodec: NONE crc: 2973274901
baseOffset: 717 lastOffset: 1410 baseSequence: -1 lastSequence: -1
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
false position: 16380 CreateTime: 1596513421715 isvalid: true size: 16371
magic: 2 compresscodec: NONE crc: 1439993110
baseOffset: 1411 lastOffset: 2092 baseSequence: -1 lastSequence: -1
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
false position: 32751 CreateTime: 1596513421747 isvalid: true size: 16365
magic: 2 compresscodec: NONE crc: 3528903590
baseOffset: 2093 lastOffset: 2774 baseSequence: -1 lastSequence: -1
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
false position: 49116 CreateTime: 1596513421791 isvalid: true size: 16365
magic: 2 compresscodec: NONE crc: 763876977
baseOffset: 2775 lastOffset: 3456 baseSequence: -1 lastSequence: -1
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
false position: 65481 CreateTime: 1596513421795 isvalid: true size: 16365
magic: 2 compresscodec: NONE crc: 2218198476
baseOffset: 3457 lastOffset: 4138 baseSequence: -1 lastSequence: -1
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
false position: 81846 CreateTime: 1596513421798 isvalid: true size: 16365
magic: 2 compresscodec: NONE crc: 4018065070
baseOffset: 4139 lastOffset: 4820 baseSequence: -1 lastSequence: -1
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
false position: 98211 CreateTime: 1596513421802 isvalid: true size: 16365
magic: 2 compresscodec: NONE crc: 3073882858
baseOffset: 4821 lastOffset: 5502 baseSequence: -1 lastSequence: -1
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
false position: 114576 CreateTime: 1596513421819 isvalid: true size: 16365
magic: 2 compresscodec: NONE crc: 207330377
时间戳索引文件,它的作用是可以让用户查询某个时间段内的消息,它一条数据的结构是时间戳(8byte)+相对offset(4byte),如果要使用这个索引文件,首先需要通过时间范围,找到对应的相对offset,然后再去对应的index文件找到position信息,然后才能遍历log文件,它也是需要使用上面说的index文件的。
但是由于producer生产消息可以指定消息的时间戳,这可能将导致消息的时间戳不一定有先后顺序,因此尽量不要生产消息时指定时间戳。
偏移量
- 位置索引保存在index文件中
- log日志默认每写入4K(
log.index.interval.bytes
设定的),会写入一条索引信息到index文件中,因此索引文件是稀疏索引,它不会为每条日志都建立索引信息 - log文件中的日志,是顺序写入的,由message+实际offset+position组成
- 索引文件的数据结构则是由相对offset(4byte)+position(4byte)组成,由于保存的是相对第一个消息的相对offset,只需要4byte就可以了,可以节省空间,在实际查找后还需要计算回实际的offset,这对用户是透明的
稀疏索引,索引密度不高,但是offset有序,二分查找的时间复杂度为O(logN),如果从头遍历时间复杂度是O(N)。
示意图如下:
偏移量索引由相对偏移量和物理地址组成。
可以通过如下命令解析.index文件:
kafka-run-class.sh kafka.tools.DumpLogSegments --files
00000000000000000000.index --print-data-log | head
注意:offset与position没有直接关系,因为会删除数据和清理日志。
[root@node1 tp_demo_05-0]# kafka-run-class.sh kafka.tools.DumpLogSegments --
files 00000000000003925423.log --print-data-log | head
Dumping 00000000000003925423.log
Starting offset: 3925423
baseOffset: 3925423 lastOffset: 3926028 baseSequence: -1 lastSequence: -1
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
false position: 0 CreateTime: 1596513434779 isvalid: true size: 16359 magic: 2 compresscodec: NONE crc: 4049330741
baseOffset: 3926029 lastOffset: 3926634 baseSequence: -1 lastSequence: -1
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
false position: 16359 CreateTime: 1596513434786 isvalid: true size: 16359
magic: 2 compresscodec: NONE crc: 2290699169
baseOffset: 3926635 lastOffset: 3927240 baseSequence: -1 lastSequence: -1
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
false position: 32718 CreateTime: 1596513434787 isvalid: true size: 16359
magic: 2 compresscodec: NONE crc: 368995405
baseOffset: 3927241 lastOffset: 3927846 baseSequence: -1 lastSequence: -1
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
false position: 49077 CreateTime: 1596513434788 isvalid: true size: 16359
magic: 2 compresscodec: NONE crc: 143415655
baseOffset: 3927847 lastOffset: 3928452 baseSequence: -1 lastSequence: -1
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
false position: 65436 CreateTime: 1596513434789 isvalid: true size: 16359
magic: 2 compresscodec: NONE crc: 572340120
baseOffset: 3928453 lastOffset: 3929058 baseSequence: -1 lastSequence: -1
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
false position: 81795 CreateTime: 1596513434790 isvalid: true size: 16359
magic: 2 compresscodec: NONE crc: 1029643347
baseOffset: 3929059 lastOffset: 3929664 baseSequence: -1 lastSequence: -1
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
false position: 98154 CreateTime: 1596513434791 isvalid: true size: 16359
magic: 2 compresscodec: NONE crc: 2163818250
baseOffset: 3929665 lastOffset: 3930270 baseSequence: -1 lastSequence: -1
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
false position: 114513 CreateTime: 1596513434792 isvalid: true size: 16359
magic: 2 compresscodec: NONE crc: 3747213735
注意:timestamp文件中的offset与index文件中的relativeOffset不是一一对应的。因为数据的写入是各自追加。
思考:如何查看偏移量为23的消息?
Kafka 中存在一个ConcurrentSkipListMap
来保存在每个日志分段,通过跳跃表方式,定位到在00000000000000000000.index,通过二分法在偏移量索引文件中找到不大于23的最大索引项,即offset 20那栏,然后从日志分段文件中的物理位置为320开始顺序查找偏移量为23的消息。
时间戳
在偏移量索引文件中,索引数据都是顺序记录offset,但时间戳索引文件中每个追加的索引时间戳必须大于之前追加的索引项,否则不予追加。在Kafka 0.11.0.0以后,消息信息中存在若干的时间戳信息。如果Broker端参数 log.message.timestamp.type
设置为LogAppendTime
,那么时间戳必定能保持单调增长。反之如果是CreateTime
则无法保证顺序。
通过时间戳方式进行查找消息,需要通过查找时间戳索引和偏移量索引两个文件。
时间戳索引索引格式:前八个字节表示时间戳,后四个字节表示偏移量。
思考:如何查找时间戳为1557554753430开始的消息?
- 查找该时间戳应该在哪个日志分段中。将1557554753430和每个日志分段中最大时间戳
largestTimeStamp
逐一对比,直到找到不小于1557554753430所对应的日志分段。日志分段中的largestTimeStamp
的计算是:先查询该日志分段所对应时间戳索引文件,找到最后一条索引项,若最后一条索引项的时间戳字段值大于0,则取该值,否则取该日志分段的最近修改时间 - 查找该日志分段的偏移量索引文件,查找该偏移量对应的物理地址
- 日志文件中从320的物理位置开始查找不小于1557554753430数据
清理
Kafka提供两种日志清理策略:
- 日志删除:按照一定的删除策略,将不满足条件的数据进行数据删除
- 日志压缩:针对每个消息的Key进行整合,对于有相同Key的不同Value值,只保留最后一个版本
Kafka提供log.cleanup.policy
参数进行相应配置,默认值:delete
,还可以选择compact
。
主题级别的配置项是cleanup.policy
。
日志删除
基于时间
日志删除任务会根据log.retention.hours/log.retention.minutes/log.retention.ms
设定日志保留的时间节点。如果超过该设定值,就需要进行删除。默认是7天,log.retention.ms
优先级最高。
Kafka依据日志分段中最大的时间戳进行定位。
首先要查询该日志分段所对应的时间戳索引文件,查找时间戳索引文件中最后一条索引项,若最后一条索引项的时间戳字段值大于0,则取该值,否则取最近修改时间。为什么不直接选最近修改时间呢?因为日志文件可以有意无意的被修改,并不能真实的反应日志分段的最大时间信息。
删除过程如下:
- 从日志对象中所维护日志分段的跳跃表中移除待删除的日志分段,保证没有线程对这些日志分段进行读取操作
- 这些日志分段所有文件添加上
.delete
后缀 - 交由一个以"delete-file"命名的延迟任务来删除这些
.delete
为后缀的文件。延迟执行时间可以通过file.delete.delay.ms
进行设置
如果活跃的日志分段中也存在需要删除的数据时,怎么处理呢?
Kafka会先切分出一个新的日志分段作为活跃日志分段,该日志分段不删除,删除原来的日志分段。也就是说先腾出地方,再删除。
基于日志大小
日志删除任务会检查当前日志的大小是否超过设定值。设定项为log.retention.bytes
,单个日志分段的大小由 log.segment.bytes
进行设定。
删除过程如下:
- 计算需要被删除的日志总大小 (当前日志文件大小(所有分段)减去retention值)
- 从日志文件第一个
LogSegment
开始查找可删除的日志分段的文件集合 - 执行删除
基于偏移量
根据日志分段的下一个日志分段的起始偏移量是否大于等于日志文件的起始偏移量,若是,则可以删除此日志分段。
注意:日志文件的起始偏移量并不一定等于第一个日志分段的基准偏移量,存在数据删除,可能与之相等的那条数据已经被删除了。
删除过程如下:
- 从头开始遍历每个日志分段,日志分段1的下一个日志分段的起始偏移量为21,小于
logStartOffset
,将日志分段1加入到删除队列中 - 日志分段2的下一个日志分段的起始偏移量为35,小于
logStartOffset
,将 日志分段2加入到删除队列中 - 日志分段3的下一个日志分段的起始偏移量为57,小于
logStartOffset
,将日志分段3加入删除集合中 - 日志分段4的下一个日志分段的其实偏移量为71,大于logStartOffset,则不进行删除
日志压缩策略
概念
日志压缩是Kafka的一种机制,可以提供较为细粒度的记录保留,而不是基于粗粒度的基于时间的保留。
对于具有相同的Key,而数据不同,只保留最后一条数据,前面的数据在合适的情况下删除。
应用场景
日志压缩特性,就实时计算来说,可以在异常容灾方面有很好的应用途径。比如,我们在Spark、Flink中做实时计算时,需要长期在内存里面维护一些数据,这些数据可能是通过聚合了一天或者一周的日志得到的,这些数据一旦由于异常因素(内存、网络、磁盘等)崩溃了,从头开始计算需要很长的时间。一个比较有效可行的方式就是定时将内存里的数据备份到外部存储介质中,当崩溃出现时,再从外部存储介质中恢复并继续计算。
使用日志压缩来替代这些外部存储有哪些优势及好处呢?这里为大家列举并总结了几点:
- Kafka即是数据源又是存储工具,可以简化技术栈,降低维护成本
- 使用外部存储介质的话,需要将存储的Key记录下来,恢复的时候再使用这些Key将数据取回,实现起来有一定的工程难度和复杂度。使用Kafka的日志压缩特性,只需要把数据写进Kafka,等异常出现恢复任务时再读回到内存就可以了
- Kafka对于磁盘的读写做了大量的优化工作,比如磁盘顺序读写。相对于外部存储介质没有索引查询等工作量的负担,可以实现高性能。同时,Kafka的日志压缩机制可以充分利用廉价的磁盘,不用依赖昂贵的内存来处理,在性能相似的情况下,实现非常高的性价比(这个观点仅仅针对于异常处理和容灾的场景来说)
日志压缩方式的实现细节
主题的cleanup.policy
需要设置为compact
。
Kafka的后台线程会定时将Topic遍历两次:
- 记录每个key的hash值最后一次出现的偏移量
- 第二次检查每个offset对应的Key是否在后面的日志中出现过,如果出现了就删除对应的日志
日志压缩允许删除,除最后一个key之外,删除先前出现的所有该key对应的记录。在一段时间后从日志中清理,以释放空间。
注意:日志压缩与key有关,确保每个消息的key不为null。
压缩是在Kafka后台通过定时重新打开Segment来完成的,Segment的压缩细节如下图所示:
日志压缩可以确保:
- 任何保持在日志头部以内的使用者都将看到所写的每条消息,这些消息将具有顺序偏移量。可以使用Topic的
min.compaction.lag.ms
属性来保证消息在被压缩之前必须经过的最短时间。也就是说,它为每个消息在(未压缩)头部停留的时间提供了一个下限。可以使用Topic的max.compaction.lag.ms
属性来保证从收到消息到消息符合压缩条件之间的最大延时 - 消息始终保持顺序,压缩永远不会重新排序消息,只是删除一些而已
- 消息的偏移量永远不会改变,它是日志中位置的永久标识符
- 从日志开始的任何使用者将至少看到所有记录的最终状态,按记录的顺序写入。另外,如果使用者在比Topic的
log.cleaner.delete.retention.ms
短的时间内到达日志的头部,则会看到已删除记录的所有delete标记。保留时间默认是24小时
默认情况下,启动日志清理器,若需要启动特定Topic的日志清理,请添加特定的属性。配置日志清理器,这里为大家总结了以下几点:
log.cleanup.policy
设置为compact
,Broker的配置,影响集群中所有的Topiclog.cleaner.min.compaction.lag.ms
,用于防止对更新超过最小消息进行压缩,如果没有设置,除最后一个Segment之外,所有Segment都有资格进行压缩log.cleaner.max.compaction.lag.ms
,用于防止低生产速率的日志在无限制的时间内不压缩
Kafka的日志压缩原理并不复杂,就是定时把所有的日志读取两遍,写一遍,而CPU的速度超过磁盘完全不是问题,只要日志的量对应的读取两遍和写入一遍的时间在可接受的范围内,那么它的性能就是可以接受的。
磁盘存储
零拷贝
Kafka高性能,是多方面协同的结果,包括宏观架构、分布式partition存储、ISR数据同步、以及“无所不用其极”的高效利用磁盘/操作系统特性。
零拷贝并不是不需要拷贝,而是减少不必要的拷贝次数。通常是说在IO读写过程中。
nginx的高性能也有零拷贝的身影。
传统IO比如:读取文件,socket发送.
传统方式实现:先读取、再发送,实际经过1~4四次copy。
buffer = File.read
Socket.send(buffer)
- 第一次:将磁盘文件,读取到操作系统内核缓冲区
- 第二次:将内核缓冲区的数据,copy到application应用程序的buffer
- 第三步:将application应用程序buffer中的数据,copy到socket网络发送缓冲区(属于操作系统内核的缓冲区)
- 第四次:将socket buffer的数据,copy到网络协议栈,由网卡进行网络传输
实际IO读写,需要进行IO中断,需要CPU响应中断(内核态到用户态转换),尽管引入DMA(Direct Memory Access,直接存储器访问)来接管CPU的中断请求,但四次copy是存在“不必要的拷贝”的。
实际上并不需要第二个和第三个数据副本。数据可以直接从读缓冲区传输到套接字缓冲区。
Kafka的两个过程:
- 网络数据持久化到磁盘(Producer到Broker)
- 磁盘文件通过网络发送(Broker到Consumer)
数据落盘通常都是非实时的,Kafka的数据并不是实时的写入硬盘,它充分利用了现代操作系统分页存储来利用内存提高I/O效率。
磁盘文件通过网络发送(Broker 到 Consumer)。磁盘数据通过DMA(Direct Memory Access,直接存储器访问)拷贝到内核态Buffer。直接通过DMA拷贝到NIC Buffer(socket buffer),无需CPU拷贝。
除了减少数据拷贝外,整个读文件 ==> 网络发送由一个sendfile
调用完成,整个过程只有两次上下文切换,因此大大提高了性能。
Java NIO对sendfile
的支持就是FileChannel.transferTo()/transferFrom()
。
fileChannel.transferTo(position, count, socketChannel);
把磁盘文件读取OS内核缓冲区后的fileChannel
,直接转给socketChannel
发送,底层就是sendfile
。消费者从Broker读取数据,就是由此实现。
具体来看,Kafka的数据传输通过TransportLayer
来完成,其子类PlaintextTransportLayer
通过Java NIO的FileChannel
的transferTo
和transferFrom
方法实现零拷贝。
注:transferTo
和transferFrom
并不保证一定能使用零拷贝,需要操作系统支持。Linux 2.4+ 内核通过sendfile
系统调用,提供了零拷贝。
页缓存
页缓存是操作系统实现的一种主要的磁盘缓存,以此用来减少对磁盘 I/O 的操作。具体来说,就是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。
Kafka接收来自socket buffer的网络数据,应用进程不需要中间处理、直接进行持久化时。可以使用mmap
内存文件映射。
Memory Mapped Files简称mmap
,简单描述其作用就是:将磁盘文件映射到内存, 用户通过修改内存就能修改磁盘文件。
它的工作原理是直接利用操作系统的Page来实现磁盘文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上(操作系统在适当的时候)。
通过mmap
,进程像读写硬盘一样读写内存(当然是虚拟机内存)。使用这种方式可以获取很大的I/O提升,省去了用户空间到内核空间复制的开销。
mmap
也有一个很明显的缺陷:不可靠,写到mmap
中的数据并没有被真正的写到硬盘,操作系统会在程序主动调用flush
的时候才把数据真正的写到硬盘。
Kafka提供了一个参数producer.type
来控制是不是主动flush
。如果Kafka写入到mmap
之后就立即flush
然后再返回Producer叫同步(sync),写入mmap
之后立即返回Producer不调用flush
叫异步(async)。
Java NIO,提供了一个MappedByteBuffer
类可以用来实现内存映射。
MappedByteBuffer
只能通过调用FileChannel
的map()
取得,再没有其他方式。
FileChannel.map()
是抽象方法,具体实现是在FileChannelImpl.map()
可自行查看JDK源码,其map0()
方法就是调用了Linux内核的mmap的API。
使用MappedByteBuffer
类要注意的是:mmap
的文件映射,在FULL GC时才会进行释放。当close时,需要手动清除内存映射文件,可以反射调用sun.misc.Cleaner
方法。
当一个进程准备读取磁盘上的文件内容时:
- 操作系统会先查看待读取的数据所在的页 (page)是否在页缓存(pagecache)中,如果存在(命中)则直接返回数据,从而避免了对物理磁盘的 I/O 操作
- 如果没有命中,则操作系统会向磁盘发起读取请求并将读取的数据页存入页缓存,之后再将数据返回给进程
如果一个进程需要将数据写入磁盘:
- 操作系统也会检测数据对应的页是否在页缓存中,如果不存在,则会先在页缓存中添加相应的页,最后将数据写入对应的页
- 被修改过后的页也就变成了脏页,操作系统会在合适的时间把脏页中的数据写入磁盘,以保持数据的一致性
对一个进程而言,它会在进程内部缓存处理所需的数据,然而这些数据有可能还缓存在操作系统的页缓存中,因此同一份数据有可能被缓存了两次。并且,除非使用Direct I/O的方式, 否则页缓存很难被禁止。
当使用页缓存的时候,即使Kafka服务重启, 页缓存还是会保持有效,然而进程内的缓存却需要重建。这样也极大地简化了代码逻辑,因为维护页缓存和文件之间的一致性交由操作系统来负责,这样会比进程内维护更加安全有效。
Kafka中大量使用了页缓存,这是Kafka实现高吞吐的重要因素之一。
消息先被写入页缓存,由操作系统负责刷盘任务。
顺序写入
操作系统可以针对线性读写做深层次的优化,比如预读(read-ahead,提前将一个比较大的磁盘块读入内存) 和后写(write-behind,将很多小的逻辑写操作合并起来组成一个大的物理写操作)技术。
Kafka在设计时采用了文件追加的方式来写入消息,即只能在日志文件的尾部追加新的消息,并且也不允许修改已写入的消息,这种方式属于典型的顺序写盘的操作,所以就算Kafka使用磁盘作为存储介质,也能承载非常大的吞吐量。
mmap和sendfile:
- Linux内核提供、实现零拷贝的API
sendfile
是将读到内核空间的数据,转到socket buffer,进行网络发送mmap
将磁盘文件映射到内存,支持读和写,对内存的操作会反映在磁盘文件上RocketMQ
在消费消息时,使用了mmap
。kafka使用了sendFile
Kafka速度快是因为:
- partition顺序读写,充分利用磁盘特性,这是基础
- Producer生产的数据持久化到Broker,采用
mmap
文件映射,实现顺序的快速写入 - Customer从Broker读取数据,采用
sendfile
,将磁盘文件读到OS内核缓冲区后,直接转到socket buffer进行网络发送
以上就是本文的全部内容。欢迎小伙伴们积极留言交流~~~
文章评论