我有一个Beam管道来消费流事件,它有多个阶段(PTransforms)来处理它们。
pipeline.apply("Read Data from Stream", StreamReader.read())
.apply("Decode event and extract relevant fields", ParDo.of(new DecodeExtractFields()))
.apply("Deduplicate process", ParDo.of(new Deduplication()))
.apply("Conversion, Mapping and Persisting", ParDo.of(new DataTransformer()))
.apply("Build Kafka Message", ParDo.of(new PrepareMessage()))
.apply("Publish", ParDo.of(new PublishMessage()))
.apply("Commit offset", ParDo.of(new CommitOffset()));
使用KafkaIO和StreamReader.read()
方法实现读取的流事件是这样的:
public static KafkaIO.Read<String, String> read() {
return KafkaIO.<String, String>read()
.withBootstrapServers(Constants.BOOTSTRAP_SERVER)
.withTopics(Constants.KAFKA_TOPICS)
.withConsumerConfigUpdates(Constants.CONSUMER_PROPERTIES)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class);
}
在我们通过KafkaIO读取流事件/消息后,我们可以提交偏移量。我需要做的是手动提交偏移量,在执行所有先前PTransform时的最后一个Commit offset
PTransform内。
原因是,我正在管道中间执行一些转换、Map和持久化,当所有事情都顺利完成时,我需要提交偏移量。通过这样做,如果处理在中间失败,我可以再次使用相同的记录/事件并进行处理。
我的问题是,我如何手动提交偏移?如果可以共享资源/示例代码,请表示感谢。
1条答案
按热度按时间rm5edbpk1#
当然,有
Read.commitOffsetsInFinalize()
方法,它应该在完成检查点时提交偏移量,还有AUTO_COMMIT
消费者配置选项,它用于自动提交Kafka消费者的读取记录。但是,在您的情况下,这将不起作用,您需要手动地将同一个主题/分区a/窗口的偏移量分组,并在
CommitOffset
DoFn中创建一个新的Kafka客户端示例,该示例将提交这些偏移量。您需要按分区对偏移量进行分组,否则在不同的工作线程上提交同一个分区的偏移量可能会出现争用情况。