我如何获得在Apache光束Kafka约收到确认的记录?
基本上,我希望所有没有得到任何确认的记录都转到bigquery表中,以便稍后重试。我使用了以下文档中的代码片段
.apply(KafkaIO.<Long, String>read()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("my_topic") // use withTopics(List<String>) to read from multiple topics.
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
// Above four are required configuration. returns PCollection<KafkaRecord<Long, String>>
// Rest of the settings are optional :
// you can further customize KafkaConsumer used to read the records by adding more
// settings for ConsumerConfig. e.g :
.updateConsumerProperties(ImmutableMap.of("group.id", "my_beam_app_1"))
// set event times and watermark based on LogAppendTime. To provide a custom
// policy see withTimestampPolicyFactory(). withProcessingTime() is the default.
.withLogAppendTime()
// restrict reader to committed messages on Kafka (see method documentation).
.withReadCommitted()
// offset consumed by the pipeline can be committed back.
.commitOffsetsInFinalize()
// finally, if you don't need Kafka metadata, you can drop it.g
.withoutMetadata() // PCollection<KV<Long, String>>
)
.apply(Values.<String>create()) // PCollection<String>
1条答案
按热度按时间bvjveswy1#
默认情况下,beam ios被设计为一直尝试写入/读取/处理元素,直到(重复错误后,批处理管道将失败)
您所指的通常称为死信队列,用于获取失败的记录并将其添加到pcollection、pubsub主题、队列服务等。这通常是可行的,因为当遇到写入某些记录的错误时,它允许流式管道进行处理(而不是阻塞),但是允许成功的故事被写下来。
不幸的是,除非我弄错了,否则kafkaio中没有实现死信队列。也许可以修改Kafka约来支持这一点。对beam邮件列表进行了一些讨论,并提出了一些实现这一点的想法,这些想法可能有一些想法。
我怀疑有可能将此添加到kafkawriter,捕获失败的记录并将其输出到另一个pcollection。如果您选择实现此功能,还请联系beam社区邮件列表,如果您希望帮助将其合并到master中,他们将能够帮助确保更改涵盖必要的要求,以便可以将其合并并作为一个整体对beam有意义。
然后,您的管道可以在其他地方(即不同的源)写入这些内容。当然,如果该辅助源同时出现中断/问题,则需要另一个dlq。