Kafka权威指南 笔记

实习的这两个多月基于Kafka开发系统,主要学习参考了Kafka权威指南这本书籍。正好也是第一次看原版的技术书籍,针对最重要的几个章节记录一下笔记。

Chapter 3 Kafka Producers

Overview

通过创建一个ProducerRecord向Kafka发布消息,必须包括topic和value,可选指明key和/或者partition。

  1. 发布者会先序列化key和value对象成ByteArrays以便在网络上传递。
  2. 数据被传递给partitioner。如果指定了partition,partitioner不会有任何操作并返回指定的partition。如果没有指定,partition会基于key选择一个partition。一旦选择了partition后,producer将会知道record发往的topic和partition,然后将在一批发往同一topic和partition的records中加入record。一个独立的线程负责把这些成批的records发往合适的Kafka集群。
  3. 集群收到消息,将会返回一个响应。如果消息成功写入Kafka,它会返回一个包含topic,partition和offset的RecordMetadata对象。如果消息写入失败,它会返回一个error。当producer收到一个error,它可能会在放弃返回error前重试几次发送消息。

Constructing a Kafka Producer

  • bootstrap.servers
  • key.serializer
  • value.serializer
1
2
3
4
5
private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(kafkaProps);

创建了producer后,开始发送消息。主要有三种发送消息的方法

  • Fire-and-forget

    发送消息到服务器并不在意是否成功到达,一些消息可能丢失

  • Synchronous send

    send()方法返回一个Future对象,使用get()方法等待future查看send是否成功

  • Asynchronous send

    send()一起使用callback函数,当收到Kafka集群响应后被出发

Sending a Message to Kafka

1
2
3
4
5
6
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}

Sending a Message Synchronously

1
2
3
4
5
6
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}

Sending a Message Asynchronously

同步发送消息可能导致延迟,同时我们需要知道是否成功

1
2
3
4
5
6
7
8
9
10
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
}
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");
producer.send(record, new DemoProducerCallback());

Configuring Producers

  • acks

    • ack=0:producer不会等待broker的响应,高吞吐率但不能得知消息是否丢失
    • ack=1:producer在leader replica收到消息时得到来自broker的成功响应。如果消息不能被写入leader,producer会收到error响应并重试发送消息避免消息丢失。
    • ack=all:producer在所有in-sync replica收到消息时得到来自broker的成功响应,延迟高
  • buffer.memory

    设置producer用于缓存等待发往broker消息的缓存大小

  • compression.type

    默认消息未被压缩发送

  • retries

    放弃并通知问题前重试发送消息的次数

  • batch.size

    当多个records被发往同一partition,producer将会分批发送。控制用于每一批消息内存大小的bytes数量。批次满时,批次内消息会被发送,但这未必意味着producer会等待批次满再发送。

  • linger.ms

    发送当前批次消息前等待额外消息的时间。KafkaProducer在当前批次消息满或者到达linger.ms限制时发送一批消息。默认在一有可用发送消息线程时就发送消息。

  • max.request.size

    producer发送请求的大小——最大可发送消息的大小和一次请求中可以发送消息的数量

Partitions

只使用topic和value创建ProducerRecord时,key被默认设置未null。Key有两个作用:与消息一起存储的额外信息并用于决定信息被写往的topic partition。所有相同key的消息将到相同的partition中,意味着如果一个进程只读取topic partitions的一个子集时,所有单个key的记录将被同个进程读取。

1
ProducerRecord<Integer, String> record = new ProducerRecord<>("CustomerCountry", "Laboratory Equipment", "USA");

当key为null,将会使用默认的partitioner,记录会被随机发送可用的partition之一。轮询算法被用于partitions之间消息的平衡。当key存在且默认的partitioner被使用时,Kafka使用自己的hash算法对key进行hash,使用结果映射消息到特定的partition。因为一个key总是映射到同一partition,使用topic中所有partition计算映射,而不是可用的partitions。只有当topic的partition数量不变时,keys到partitions的映射是不变的。当topic中加入新的partition,新的记录会被写往不同的partition中。

Chapter 4 Kafka Consumers

Kafka Consumer Concepts

Consumers and Consumer Groups

Kafka consumers是consumer group的一部分。当同一consumer group中的多个consumers订阅了一个topic时,组内的每一个consumer将会收到topic不同partition子集的消息。

  • Partition Num > Consumer Num (1)

    consumer会收到所有partition的消息

  • Partition Num > Consumer Num (> 1)

    每一个consumer收到不同的partition子集的消息

  • Partition Num = Consumer Num

    每一个consumer收到一个parition的消息

  • Partition Num < Consumer Num

    部分consume收不到消息

  • consumer group num > 1

    每一个组会单独收到partition所有的消息,组内负载均衡

Consumer Groups and Partition Rebalance

组内添加一个新的consumer或者一个consumer关闭或者崩溃时,会开始消费其他consumer之前消费的partition。从一个consumer移动partition的归属到另一个consumer叫做再平衡。再平衡时整个consumer group会暂时不可用。Consumer通过发送心跳到Kafka称为group coordinator的broker保持在一个consumer group的成员归属以及partition的归属。只要consumer以规律的时间间隔发送心跳,它就被认为是存活的并从partition处理消息。Consumer poll时发送消息,已经消费了就commit record。一旦consumer足够长时间内停止发送心跳,session就会超时,group coordinator就会认为它死亡并出发再平衡。新版本中有独立的心跳线程再poll之间发送心跳,允许分离心跳频率和poll的频率。

Creating a Kafka Consumer

  • bootstrap.servers
  • key.deserializer
  • value.deserializer
  • group.id 非必需
1
2
3
4
5
6
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

Subscribing to Topics

1
2
consumer.subscribe(Collections.singletonList("customerCountries"));
consumer.subscribe("test.*");

The Poll Loop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
log.debug("topic = %s, partition = %s, offset = %d,customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
int updatedCount = 1;
if (custCountryMap.countainsValue(record.value())) {
updatedCount = custCountryMap.get(record.value()) + 1;
}
custCountryMap.put(record.value(), updatedCount)
JSONObject json = new JSONObject(custCountryMap);
System.out.println(json.toString(4))
}
}
} finally {
consumer.close();
}

第一次使用新的consumer调用poll()时,负责找到GroupCoordinator,加入consumer group,收到partition的分配。

Thread safe

One consumer per thread is the rule.

Configuring Consumers

  • fetch.min.bytes

    获取records时希望从broker获取的数据的最小量。如果新的records比min.fetch.bytes的byte数少时,broker会再发送records返回给consumer前等待更多可用的信息

  • fetch.max.wait.ms

    默认Kafka等待500ms。

  • max.partition.fetch.bytes

    服务器每个partition返回的最大bytes数量

  • session.timeout.ms

  • auto.offset.reset

  • enable.auto.commit

  • partition.assignment.strategy 随机/轮询

  • max.poll.records

    单次调用poll()返回的最大records数量

  • receive.buffer.bytes and send.buffer.bytes

Commits and Offsets

我们称一个partition中更新当前位置的动作为commit。消费者生产一条带着每个partition的committed offset消息至Kafka__consumer_offsets的topic。如果一个消费者崩溃或者新的消费者加入了消费者组,将会触发rebalance。rebalance后,每个消费者可能被分配到新的一组partition,为了知道从哪里开始工作,消费者会从那里读取最新的每个parition的committed offset。

如果committed offset比起消费者处理的最后一条消息offset小,committed offset和最后处理的offset间的消息会被处理两次;

如果committed offset比起消费者实际处理的最后一条消息offset大,committed offset和最后处理的offset间的消息会被消费者组丢失。

Automatic Commit

如果配置了enable.auto.commit=true,每5秒钟消费者将会commit从poll()中收到的最大的offset。5秒钟的间隔是默认值并由auto.commit.interval.ms设置。自动commit由poll循环驱动。无论何时poll时,消费者检查是否是时间commit,如果是则commit上一次poll时返回的offset。

可以通过配置commit间隔使得更加频繁地commit并减少records重复的窗口,但不可能消除。通过使能自动commit,一次poll的调用总是返回上一次poll的最后一个offset,所以再次调用poll之前处理poll返回的所有event十分重要。

Commit Current Offset

设置auto.commit.offset=false,offsets只有在应用显式commit时才会被commit。最简单可靠的API是commitSync(),这个API会commit由poll()返回的最新的offset,一旦offset被commit就返回,如果commit由于某些原因失败了就抛出异常。

1
2
3
4
5
6
7
8
9
10
11
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
try {
consumer.commitSync();
} catch (CommitFailedException e) {
log.error("commit failed", e)
}
}

一旦处理完当前批次的records,在poll更多消息前调用commitSync()commit批次中最后一个offset。

Asynchronous Commit

1
2
3
4
5
6
7
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitAsync();
}

commitAsync()将会重试commit直到要么成功要么遇到不可重试的失败。原因是commitAsync()收到来自服务器的响应时,可能有更晚一些的commit已经成功。

1
2
3
4
5
6
7
8
9
10
11
12
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition,OffsetAndMetadata> offsets, Exception exception) {
if (e != null)
log.error("Commit failed for offsets {}", offsets, e);
}
});
}

保证异步重试commit顺序正确的简单策略时使用递增序列数字。每次commit时增加序列号。当准备发送重试消息时,检查commit序列号是否等于实例变量。如果相等,没有更新的commit,重试是安全的。如果实例序列号更大则不要重试,因为更新的commit已经被发送。

Combining Synchronous and Asynchronous Commits

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitAsync();
}
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
consumer.commitSync();
} finally {
consumer.close();
}
}

Commit Specified Offset

消费者API允许通过传递partition和希望commit的offset的映射调用commitSync()commitAsync()

1
2
3
4
5
6
7
8
9
10
11
12
13
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
int count = 0;
....
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new
OffsetAndMetadata(record.offset()+1, "no metadata"));
if (count % 1000 == 0) consumer.commitAsync(currentOffsets, null);
count++;
}
}

读取每个record后,通过我们希望处理的下一个offset更新offset的映射,这是下次开始时开始读取的地方。

Consuming Records with Specific Offsets

如果想从partition的开始开始读取所有消息或者跳过所有消息到partition的尾开始只消费新的消息,使用seekToBeginning(TopicPartition tp)seekToEnd(TopicPartition tp)

或者使用seek()指定哪里开始读取。