Kafka制作人apache beam

vdzxcuhz  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(442)

我如何获得在Apache光束Kafka约收到确认的记录?
基本上,我希望所有没有得到任何确认的记录都转到bigquery表中,以便稍后重试。我使用了以下文档中的代码片段

.apply(KafkaIO.<Long, String>read()
       .withBootstrapServers("broker_1:9092,broker_2:9092")
       .withTopic("my_topic")  // use withTopics(List<String>) to read from multiple topics.
       .withKeyDeserializer(LongDeserializer.class)
       .withValueDeserializer(StringDeserializer.class)

       // Above four are required configuration. returns PCollection<KafkaRecord<Long, String>>

       // Rest of the settings are optional :

       // you can further customize KafkaConsumer used to read the records by adding more
       // settings for ConsumerConfig. e.g :
       .updateConsumerProperties(ImmutableMap.of("group.id", "my_beam_app_1"))

       // set event times and watermark based on LogAppendTime. To provide a custom
       // policy see withTimestampPolicyFactory(). withProcessingTime() is the default.
       .withLogAppendTime()

       // restrict reader to committed messages on Kafka (see method documentation).
       .withReadCommitted()

       // offset consumed by the pipeline can be committed back.
       .commitOffsetsInFinalize()

       // finally, if you don't need Kafka metadata, you can drop it.g
       .withoutMetadata() // PCollection<KV<Long, String>>
    )
    .apply(Values.<String>create()) // PCollection<String>
bvjveswy

bvjveswy1#

默认情况下,beam ios被设计为一直尝试写入/读取/处理元素,直到(重复错误后,批处理管道将失败)
您所指的通常称为死信队列,用于获取失败的记录并将其添加到pcollection、pubsub主题、队列服务等。这通常是可行的,因为当遇到写入某些记录的错误时,它允许流式管道进行处理(而不是阻塞),但是允许成功的故事被写下来。
不幸的是,除非我弄错了,否则kafkaio中没有实现死信队列。也许可以修改Kafka约来支持这一点。对beam邮件列表进行了一些讨论,并提出了一些实现这一点的想法,这些想法可能有一些想法。
我怀疑有可能将此添加到kafkawriter,捕获失败的记录并将其输出到另一个pcollection。如果您选择实现此功能,还请联系beam社区邮件列表,如果您希望帮助将其合并到master中,他们将能够帮助确保更改涵盖必要的要求,以便可以将其合并并作为一个整体对beam有意义。
然后,您的管道可以在其他地方(即不同的源)写入这些内容。当然,如果该辅助源同时出现中断/问题,则需要另一个dlq。

相关问题