实习的这两个多月基于Kafka开发系统,主要学习参考了Kafka权威指南这本书籍。正好也是第一次看原版的技术书籍,针对最重要的几个章节记录一下笔记。
Chapter 3 Kafka Producers
Overview
通过创建一个ProducerRecord向Kafka发布消息,必须包括topic和value,可选指明key和/或者partition。
- 发布者会先序列化key和value对象成
ByteArrays以便在网络上传递。 - 数据被传递给partitioner。如果指定了partition,partitioner不会有任何操作并返回指定的partition。如果没有指定,partition会基于key选择一个partition。一旦选择了partition后,producer将会知道record发往的topic和partition,然后将在一批发往同一topic和partition的records中加入record。一个独立的线程负责把这些成批的records发往合适的Kafka集群。
- 集群收到消息,将会返回一个响应。如果消息成功写入Kafka,它会返回一个包含topic,partition和offset的
RecordMetadata对象。如果消息写入失败,它会返回一个error。当producer收到一个error,它可能会在放弃返回error前重试几次发送消息。
Constructing a Kafka Producer
- bootstrap.servers
- key.serializer
- value.serializer
|
|
创建了producer后,开始发送消息。主要有三种发送消息的方法
Fire-and-forget
发送消息到服务器并不在意是否成功到达,一些消息可能丢失
Synchronous send
send()方法返回一个Future对象,使用get()方法等待future查看send是否成功Asynchronous send
与
send()一起使用callback函数,当收到Kafka集群响应后被出发
Sending a Message to Kafka
|
|
Sending a Message Synchronously
|
|
Sending a Message Asynchronously
同步发送消息可能导致延迟,同时我们需要知道是否成功
|
|
Configuring Producers
acks
- ack=0:producer不会等待broker的响应,高吞吐率但不能得知消息是否丢失
- ack=1:producer在leader replica收到消息时得到来自broker的成功响应。如果消息不能被写入leader,producer会收到error响应并重试发送消息避免消息丢失。
- ack=all:producer在所有in-sync replica收到消息时得到来自broker的成功响应,延迟高
buffer.memory
设置producer用于缓存等待发往broker消息的缓存大小
compression.type
默认消息未被压缩发送
retries
放弃并通知问题前重试发送消息的次数
batch.size
当多个records被发往同一partition,producer将会分批发送。控制用于每一批消息内存大小的bytes数量。批次满时,批次内消息会被发送,但这未必意味着producer会等待批次满再发送。
linger.ms
发送当前批次消息前等待额外消息的时间。
KafkaProducer在当前批次消息满或者到达linger.ms限制时发送一批消息。默认在一有可用发送消息线程时就发送消息。max.request.size
producer发送请求的大小——最大可发送消息的大小和一次请求中可以发送消息的数量
Partitions
只使用topic和value创建ProducerRecord时,key被默认设置未null。Key有两个作用:与消息一起存储的额外信息并用于决定信息被写往的topic partition。所有相同key的消息将到相同的partition中,意味着如果一个进程只读取topic partitions的一个子集时,所有单个key的记录将被同个进程读取。
|
|
当key为null,将会使用默认的partitioner,记录会被随机发送可用的partition之一。轮询算法被用于partitions之间消息的平衡。当key存在且默认的partitioner被使用时,Kafka使用自己的hash算法对key进行hash,使用结果映射消息到特定的partition。因为一个key总是映射到同一partition,使用topic中所有partition计算映射,而不是可用的partitions。只有当topic的partition数量不变时,keys到partitions的映射是不变的。当topic中加入新的partition,新的记录会被写往不同的partition中。
Chapter 4 Kafka Consumers
Kafka Consumer Concepts
Consumers and Consumer Groups
Kafka consumers是consumer group的一部分。当同一consumer group中的多个consumers订阅了一个topic时,组内的每一个consumer将会收到topic不同partition子集的消息。
Partition Num > Consumer Num (1)
consumer会收到所有partition的消息
Partition Num > Consumer Num (> 1)
每一个consumer收到不同的partition子集的消息
Partition Num = Consumer Num
每一个consumer收到一个parition的消息
Partition Num < Consumer Num
部分consume收不到消息
consumer group num > 1
每一个组会单独收到partition所有的消息,组内负载均衡
Consumer Groups and Partition Rebalance
组内添加一个新的consumer或者一个consumer关闭或者崩溃时,会开始消费其他consumer之前消费的partition。从一个consumer移动partition的归属到另一个consumer叫做再平衡。再平衡时整个consumer group会暂时不可用。Consumer通过发送心跳到Kafka称为group coordinator的broker保持在一个consumer group的成员归属以及partition的归属。只要consumer以规律的时间间隔发送心跳,它就被认为是存活的并从partition处理消息。Consumer poll时发送消息,已经消费了就commit record。一旦consumer足够长时间内停止发送心跳,session就会超时,group coordinator就会认为它死亡并出发再平衡。新版本中有独立的心跳线程再poll之间发送心跳,允许分离心跳频率和poll的频率。
Creating a Kafka Consumer
- bootstrap.servers
- key.deserializer
- value.deserializer
- group.id 非必需
|
|
Subscribing to Topics
|
|
The Poll Loop
|
|
第一次使用新的consumer调用poll()时,负责找到GroupCoordinator,加入consumer group,收到partition的分配。
Thread safe
One consumer per thread is the rule.
Configuring Consumers
fetch.min.bytes
获取records时希望从broker获取的数据的最小量。如果新的records比min.fetch.bytes的byte数少时,broker会再发送records返回给consumer前等待更多可用的信息
fetch.max.wait.ms
默认Kafka等待500ms。
max.partition.fetch.bytes
服务器每个partition返回的最大bytes数量
session.timeout.ms
auto.offset.reset
enable.auto.commit
partition.assignment.strategy 随机/轮询
max.poll.records
单次调用poll
()返回的最大records数量receive.buffer.bytes and send.buffer.bytes
Commits and Offsets
我们称一个partition中更新当前位置的动作为commit。消费者生产一条带着每个partition的committed offset消息至Kafka__consumer_offsets的topic。如果一个消费者崩溃或者新的消费者加入了消费者组,将会触发rebalance。rebalance后,每个消费者可能被分配到新的一组partition,为了知道从哪里开始工作,消费者会从那里读取最新的每个parition的committed offset。
如果committed offset比起消费者处理的最后一条消息offset小,committed offset和最后处理的offset间的消息会被处理两次;
如果committed offset比起消费者实际处理的最后一条消息offset大,committed offset和最后处理的offset间的消息会被消费者组丢失。
Automatic Commit
如果配置了enable.auto.commit=true,每5秒钟消费者将会commit从poll()中收到的最大的offset。5秒钟的间隔是默认值并由auto.commit.interval.ms设置。自动commit由poll循环驱动。无论何时poll时,消费者检查是否是时间commit,如果是则commit上一次poll时返回的offset。
可以通过配置commit间隔使得更加频繁地commit并减少records重复的窗口,但不可能消除。通过使能自动commit,一次poll的调用总是返回上一次poll的最后一个offset,所以再次调用poll之前处理poll返回的所有event十分重要。
Commit Current Offset
设置auto.commit.offset=false,offsets只有在应用显式commit时才会被commit。最简单可靠的API是commitSync(),这个API会commit由poll()返回的最新的offset,一旦offset被commit就返回,如果commit由于某些原因失败了就抛出异常。
|
|
一旦处理完当前批次的records,在poll更多消息前调用commitSync()commit批次中最后一个offset。
Asynchronous Commit
|
|
commitAsync()将会重试commit直到要么成功要么遇到不可重试的失败。原因是commitAsync()收到来自服务器的响应时,可能有更晚一些的commit已经成功。
|
|
保证异步重试commit顺序正确的简单策略时使用递增序列数字。每次commit时增加序列号。当准备发送重试消息时,检查commit序列号是否等于实例变量。如果相等,没有更新的commit,重试是安全的。如果实例序列号更大则不要重试,因为更新的commit已经被发送。
Combining Synchronous and Asynchronous Commits
|
|
Commit Specified Offset
消费者API允许通过传递partition和希望commit的offset的映射调用commitSync()和commitAsync()。
|
|
读取每个record后,通过我们希望处理的下一个offset更新offset的映射,这是下次开始时开始读取的地方。
Consuming Records with Specific Offsets
如果想从partition的开始开始读取所有消息或者跳过所有消息到partition的尾开始只消费新的消息,使用seekToBeginning(TopicPartition tp)和seekToEnd(TopicPartition tp)。
或者使用seek()指定哪里开始读取。