在apache beam中编写多个Kafka主题?

vdzxcuhz  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(408)

我正在执行一个简单的字数计算程序,其中我使用了一个Kafka主题(生产者)作为输入源,然后我应用pardo来计算字数。现在我需要帮助写的话,不同的主题的基础上,他们的频率。让我们说所有频率相同的单词都进入主题1,其余的进入主题2。
有人能帮我举个例子吗?

zu0ti5jz

zu0ti5jz1#

这可以使用kafka.io writerecord方法来完成,该方法接受producer<key,value>,然后使用new producer<>(“topic\u name”,“key”,“value”)-
下面是代码-:

static class ExtractWordsFn extends DoFn<String, String> {
        private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
        private final Distribution lineLenDist =
                Metrics.distribution(ExtractWordsFn.class, "lineLenDistro");

        @ProcessElement
        public void processElement(@Element String element, OutputReceiver<String> receiver) {
            lineLenDist.update(element.length());
            if (element.trim().isEmpty()) {
                emptyLines.inc();
            }
            String[] words = element.split(ExampleUtils.TOKENIZER_PATTERN, -1);
            for (String word : words) {
                if (!word.isEmpty()) {
                    receiver.output(word);
                }
            } 
        }
    }

    public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, ProducerRecord<String,String>> {
        @Override
        public ProducerRecord<String, String> apply(KV<String, Long> input) {
            if(input.getValue()%2==0)
             return new ProducerRecord("test",input.getKey(),input.getKey()+" "+input.getValue().toString());
            else
                return new ProducerRecord("copy",input.getKey(),input.getKey()+" "+input.getValue().toString());
        }
    }

    public static class CountWords
            extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
        @Override
        public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

            PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
            PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
            return wordCounts;
        }
    }
p.apply("ReadLines", KafkaIO.<Long, String>read()
                .withBootstrapServers("localhost:9092")
                .withTopic("copy")// use withTopics(List<String>) to read from multiple topics.
                .withKeyDeserializer(LongDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)
                .updateConsumerProperties(ImmutableMap.of("group.id", "my_beam_app_1"))
                .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
                .withLogAppendTime()
                .withReadCommitted()
                .commitOffsetsInFinalize()
                .withProcessingTime()
                .withoutMetadata()
        )
.apply(Values.create())
.apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(new CountWords())  
.apply(MapElements.via(new FormatAsTextFn())) //PCollection<ProducerRecord<string,string>>
.setCoder(ProducerRecordCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
.apply("WriteCounts", (KafkaIO.<String, String>writeRecords()
 .withBootstrapServers("localhost:9092")
 //.withTopic("test")
 .withKeySerializer(StringSerializer.class)
 .withValueSerializer(StringSerializer.class)
                ))

相关问题