我正在使用kafka connect从kafka代理(v0.10.2)获取消息,然后将其同步到下游服务。
目前,我有密码 SinkTask#put
这将处理 SinkRecord
&然后将其持久化到下游服务。
几个关键要求,
我们需要确保消息至少被持久化到下游服务一次。
如果下游服务抛出错误或说它没有处理消息,那么我们需要确保消息被重新读取。
所以我们认为我们可以依靠 SinkTask#flush
通过抛出异常或告诉connect不要提交偏移量,而是在下一次轮询中重试的内容,有效地退出提交已接收消息的特定轮询/周期的偏移量。
但我们发现 flush
实际上是基于时间的,或多或少独立于轮询,当达到某个时间阈值时,它将提交偏移量。
在0.10.2中 SinkTask#preCommit
是引进的,所以我们认为我们可以用它来达到我们的目的。但文件中没有提到 SinkTask#put
& SinkTask#preCommit
.
既然我们想 commit offsets
只要一个人 put succeeds
. 类似地,不提交偏移量,如果 put
失败。
如果不是通过 SinkTask#preCommit
?
1条答案
按热度按时间yeotifhr1#
正确进出Kafka的数据可能是一项挑战,而Kafka连接使这更容易,因为它使用了最佳实践并隐藏了许多复杂性。对于接收器连接器,kafka connect从主题读取消息,将它们发送到连接器,然后定期提交已读取和处理的各个主题分区的最大偏移量。
请注意,“将它们发送到连接器”对应于
put(Collection<SinkRecord>)
方法,并且在kafka connect提交偏移量之前可能会多次调用该方法。您可以控制kafka connect提交偏移量的频率,但kafka connect确保它仅在连接器成功处理消息时提交该消息的偏移量。当连接器名义上运行时,一切都很好,连接器只会看到每条消息一次,即使偏移量是周期性提交的。但是,如果连接器发生故障,则当它重新启动时,连接器将在最后提交的偏移量处启动。这可能意味着您的连接器会看到一些与崩溃前处理的消息相同的消息。如果您仔细编写连接器,使其至少具有一次语义,这通常不是问题。
为什么Kafka要周期性地连接提交偏移量而不是每个记录?因为它节省了大量的工作,而且当事情名义上进展的时候并不重要。只有当事情出错时,补偿滞后才是重要的。即使这样,如果你有Kafka连接句柄偏移,你的连接器需要准备好处理消息至少一次。可以只使用一次,但您的连接器需要做更多的工作(见下文)。
书写记录
编写连接器有很大的灵活性,这很好,因为这很大程度上取决于它所编写的外部系统的能力。让我们看看不同的实现方法
put
以及flush
.如果系统支持事务或可以处理一批更新,则连接器的
put(Collection<SinkRecord>)
可以使用单个事务/批处理写入该集合中的所有记录,并根据需要重试多次,直到事务/批处理完成或最终引发错误。在这种情况下,put
做所有的工作,要么成功,要么失败。如果成功,那么kafka connect知道所有记录都得到了正确的处理,因此可以(在某个时刻)提交偏移量。如果你的put
调用失败,则kafka connect假定不知道是否处理了任何记录,因此它不会更新其偏移量,并停止连接器。你的连接器坏了flush(...)
不需要做任何事情,因为Kafka连接正在处理所有的偏移。如果系统不支持事务,而您一次只能提交一个项目,那么您可能有连接器的
put(Collection<SinkRecord>)
尝试分别写出每条记录,阻塞直到成功,并在抛出错误之前根据需要重试每条记录。再一次,put
做所有的工作flush
方法可能不需要做任何事情。到目前为止,我的例子做了所有的工作
put
. 你总是可以选择put
只需缓冲这些记录,就可以在中完成所有向外部服务写入的工作flush
或者preCommit
. 你这样做的一个原因是你的写作是基于时间的,就像flush
以及preCommit
. 如果你不希望你的写作是基于时间的,你可能不想写的时间flush
或者preCommit
.是否记录偏移
如上所述,默认情况下,kafka connect会定期记录偏移量,以便重新启动时,连接器可以从上次关闭的位置开始。
然而,有时连接器需要记录外部系统中的偏移量,特别是当可以原子化地完成时。当这样一个连接器启动时,它可以在外部系统中查找上一次写入的偏移量,然后告诉kafka connect它想从哪里开始读取。通过这种方法,连接器可以对消息进行一次处理。
当sink连接器执行此操作时,它们实际上根本不需要kafka connect来提交任何偏移。这个
flush
方法只是一个让连接器知道kafka connect正在为您提交哪些偏移的机会,由于它不返回任何内容,因此它无法修改这些偏移或告诉kafka connect连接器正在处理哪些偏移。这就是
preCommit
方法进来了。它真的是一个替代品flush
(它实际上采用与flush
),但它应该返回kafka connect应该提交的偏移量。默认情况下,preCommit
只是打电话而已flush
然后返回传递给的相同偏移量preCommit
,这意味着kafka connect应该提交它通过preCommit
. 但是如果你的preCommit
返回一个空的偏移集,然后kafka connect将不记录任何偏移。因此,如果连接器要处理外部系统中的所有偏移,并且不需要kafka connect来记录任何内容,那么应该重写
preCommit
方法而不是flush
,并返回一个空的偏移集。