如何从flink runner上的google数据流(apache beam)向kafka发送消息

w8f9ii69  于 2021-06-25  发布在  Flink
关注(0)|答案(2)|浏览(573)

我试着写一个概念证明,从Kafka那里获取信息,用光束在Flink上变换,然后把结果推到另一个Kafka主题上。
我使用了kafkawindowedwordcount示例作为起点,这是我想做的第一部分,但是它输出到文本文件,而不是kafka。flinkkafkaproducer08看起来很有希望,但我不知道如何将它插入管道。我在想,它会被一个无限链接槽,或一些这样的 Package ,但这似乎并不存在。
对我要做的事有什么建议或想法吗?
我正在运行最新的孵化器beam(截至昨晚从github),集群模式下的flink1.0.0和kafka 0.9.0.1,所有这些都在google计算引擎上(debian jessie)。

vfwfrxfs

vfwfrxfs1#

beam中当前没有无界Sink类。大多数无边界接收器都是使用 ParDo .
您可能希望查看Kafka约连接器。这是一个kafka读取器,在所有的梁运行程序中工作,并实现并行读取、检查点和其他功能 UnboundedSource API。这个pull请求还包括tophashtags示例管道中的一个原始接收器,它通过在 ParDo :

  1. class KafkaWriter extends DoFn<String, Void> {
  2. private final String topic;
  3. private final Map<String, Object> config;
  4. private transient KafkaProducer<String, String> producer = null;
  5. public KafkaWriter(Options options) {
  6. this.topic = options.getOutputTopic();
  7. this.config = ImmutableMap.<String, Object>of(
  8. "bootstrap.servers", options.getBootstrapServers(),
  9. "key.serializer", StringSerializer.class.getName(),
  10. "value.serializer", StringSerializer.class.getName());
  11. }
  12. @Override
  13. public void startBundle(Context c) throws Exception {
  14. if (producer == null) { // in Beam, startBundle might be called multiple times.
  15. producer = new KafkaProducer<String, String>(config);
  16. }
  17. }
  18. @Override
  19. public void finishBundle(Context c) throws Exception {
  20. producer.close();
  21. }
  22. @Override
  23. public void processElement(ProcessContext ctx) throws Exception {
  24. producer.send(new ProducerRecord<String, String>(topic, ctx.element()));
  25. }
  26. }

当然,我们希望在 KafkaIO 也。它实际上与 KafkaWriter 但使用起来要简单得多。

展开查看全部
llew8vvj

llew8vvj2#

2016年,apache beam/dataflow中添加了用于写入kafka的sink转换。有关详细信息,请参阅javadoc KafkaIO 以apachebeam为例。

相关问题