深入理解kafka学习笔记

kafka起初由LinkedIn公司采用Scala语言开发的一个多分区、多副本基于zookeeper协调的分布式消息系统,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛应用。
本文通过生产者、消费者、主题分区、日志存储等几个方面最kafka的基本用法和原理进行阐述,旨在以后方面回顾。文中大部分内容提取自《深入理解Kakfa 核心设计与实践原理》。

1. 初识kafka

1.1 基本概念

kakfa体系结构

  • 生产者
  • 消费者
  • Broker,可以简单看做是一个独立的kafka服务节点或者服务实例。

  • 主题,kafka中的消息以主题为单位进行归类,生产者负责将消息发送到特定的主题,消费者订阅主题并进行消费

  • 分区,一个分区只属于单个主题,每个主题可以包含多个分区,分区可以分布在不同的broker上,同一主题下不同分区包含的消息是不同的

kakfa消息追加写入

分区在存储层面可以看做是一个可追加的日志本间。每条消息被发送到broker之前,会根据分区规则选择存储到哪个具体的分区。分区规则如果设置的合理,那么消息会被均匀的分配到不同的分区中。

kafka为分区引入了多副本(Replica)机制,通过增加副本的数量可以提升容灾能力。副本之间是一主多从的关系,leader副本负责处理读写请求,follower副本负责和leader副本消息同步。副本处于不同的broker中,
当leader副本出现故障时,从follower副本中重新选举新的leader副本对外提供服务。kafka通过多副本机制实现了故障的自动转移,当kafka集群中某个broker失效时仍能保证服务可用。

kakfa多副本架构

分区中所有的副本统称AR(Assigned Replicas), 所有与leader副本保持一定程度同步的副本(包括leader副本在内)组成ISR(In-Sync Replicas)。消息会先发送到leader副本,然后follower副本从leader副本中拉取消息进行同步。

kakfa HW LEO

  • LEO: 高水位,副本最大位移
  • HW: 低水位,ISR列表中最小的LEO,是消费者可消费的最大位移

2. 生产者

最基本的生产者代码

1
2
3
4
5
6
7
8
9
10
11
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

producer.close();
  • bootstrap.servers: 设置集群broker地址清单,具体格式:host1:port1, host2:port2 可以设置一个或者多个,不需要设置所有的broker地址,生产者可以从给定的broker里面查到其他的broker信息,不过建议最少设置两个以上,其中一个宕机时,生产者还可以连到集群上
  • key.serializer、value.serializer用来序列化ProducerRecord
  • asks: 这个参数用来指定分区中必须要有多少副本收到这条消息,之后生产者才会认为这条消息是成功写入的, asks的设置会影响吞吐量
    • asks == 1. 即只要leader副本成功写入消息,那么生产端就会收到服务端的成功响应
    • asks == 0. 生产者不需要等待任何服务服务端的响应
    • asks == -1 or asks == all. 需要等待ISR列表中的所有副本都成功写入,服务器端才能响应成功

发送数据三种模式,发后即忘,同步和异步

同步:

1
2
3
4
5
6
try{
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get()
} catch(...) {
...
}

异步

1
2
3
4
5
6
producer.send(record, new Callback(){
@Override
public void onCompletion(RecordMetadata metadata, Exception exception){
...
}
});

kakfa 生产端的整体架构

消息缓存到batch,分批次发送,减少网络传输的资源消耗
send时,生产者并不会立即发送记录到broker,会先对记录做缓存。从图中可以看出,send会先发消息放到一个batch中,当一个batch满了之后,再申请一个新的batch填充,
kafka消费端维护了一个batch池,以便batch可以循环利用。此外消费者客户端会有一个send线程,实时去队列中获取已经满了的batch,发送到broker

3. 消费者

kafka中的消费是基于拉取的,消费者会重复的调用poll方法拉取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
  • group.id: 消费者隶属的消费组名称, 消费组可以包含多个消费者,每个分片只能被一个消费组中的一个消费者消费

kakfa consumer group

  • enable.auto.commit: 消费者消费完数据,需要告诉服务端消费的offset,以便下次poll数据时,服务端知道从哪里开始下发数据, 这个字段告诉消费者自动提交offset,
    消费者后台线程会定期自动提交offset,周期可以通过auto.commit.interval.ms设置,默认5s

kakfa consumer offset

但是自动提交会产生漏数据或者重复消费的问题。加入消费者一次poll到10条数据,在消费到第五条的时候,后台线程提交了offset,提交后消费者线程挂掉,
此时服务端会认为消费者已经将这10条数据消费,消费者重启,再poll数据时,会直接拉取10条后的数据。
不过因为kafka数据存储实在磁盘上的,会周期性的持久化,如果知道漏数据的offset,可以通过offset重新拉取数据
如果消费者在消费完数据后,提交位移前挂掉,会发生重复消费的问题。如果消费者业务是幂等的,重复消费不会产生太多问题,否则不太乐观
kafka同时提供了手动提交位移的api

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//一次poll消费完统一提交位移
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
insertIntoDb(records);
consumer.commitSync();
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//每消费完一个partition记录就提交
try {
while(running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}

4. 主题分区

5. 日志存储

kafka以log日志的形式将数据存储到磁盘上,每个partition可以看做是一个整体的log文件,但为了不使单个log文件太大,kafka引入了日志分段(LogSegment),将log日志切分成多个LogSegment,
相当于被平分为多个较小的文件

kakfa log partition logsegment

kakfa 日志结构

index 中存储了索引以及物理偏移量。 log 存储了消息的内容。索引文件的元数据执行对应数据文件中message 的物理偏移地址。举个简单的案例来说,以
[4053,80899]为例,在 log 文件中,对应的是第 4053 条记录,物理偏移量(position)为 80899. position 是ByteBuffer 的指针位置

kakfa 日志结构

查找算法

  • 根据offset的值,查找segment段中的index 索引文件。由于索引文件命名是以上一个文件的最后一个offset 进行命名的,所以,使用二分查找算法能够根据offset 快速定位到指定的索引文件。
  • 找到索引文件后,根据 offset 进行定位,找到索引文件中的符合范围的索引。(跳表)
  • 得到 position 以后,再到对应的 log 文件中,从 position出开始查找 offset 对应的消息,将每条消息的 offset 与目标 offset 进行比较,直到找到消息

顺序写,页缓存和零拷贝

  • 顺序写: 磁盘的顺序写速度要远大于随机写,kafka在设计时采用了文件追加的方式写入消息,即只能在日志文件的尾部追加新的消息并且也不允许修改已经写入的消息,这种方式属于典型的顺序写盘的操作
  • 页缓存:
    • kafka采用mmap直接操作页缓存pagecache的方式,最终数据的读写基本都是操作内存,脏页数据由操作系统统一刷到磁盘,
    • kafka提供了参数可以强制刷盘。页缓存存在的风险是,如果机器断电,页缓存中脏页数据没有刷到磁盘,会导致丢数据。但这个问题仅在机器断电时发生,多副本机制可以保障数据的可靠性。
    • Linux会使用磁盘的一部分作为swap分区,这样可以进行进程的调度,把当前非活跃的进程调入swap分区,以此把内存空出来让给活跃的进程。但是对kafka来说,如果大量的页缓存被置换到swap区,会极大的降低kafka的性能,但是如果不设置swap分区,当内存不够用时,会导致OOM的发生。建议swap设置的小一些。
    • 使用页缓存使同时可以避免在JVM内部缓存数据,降低内存消耗
  • 零拷贝: 在从磁盘读数据发送给消费者时,传统的做法被将数据从磁盘拷贝的内核空间,再由内核拷贝到用户空间,然后由用户控件拷贝到内核态的socket buffer中,最后内核态的buffer中拷贝到网卡,这中间需要四次复制过程,零拷贝技术将拷贝次数降低到两次,
    • 将文件拷贝到kernel buffer中;
    • 向socket buffer中追加当前要发生的数据在kernel buffer中的位置和偏移量;
    • 根据socket buffer中的位置和偏移量直接将kernel buffer的数据copy到网卡设备(protocol engine)中;

kakfa 传统拷贝
kakfa 零拷贝

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×