我正在执行一个简单的字数计算程序,其中我使用了一个Kafka主题(生产者)作为输入源,然后我应用pardo来计算字数。现在我需要帮助写的话,不同的主题的基础上,他们的频率。让我们说所有频率相同的单词都进入主题1,其余的进入主题2。有人能帮我举个例子吗?
zu0ti5jz1#
这可以使用kafka.io writerecord方法来完成,该方法接受producer<key,value>,然后使用new producer<>(“topic\u name”,“key”,“value”)-下面是代码-:
static class ExtractWordsFn extends DoFn<String, String> { private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines"); private final Distribution lineLenDist = Metrics.distribution(ExtractWordsFn.class, "lineLenDistro"); @ProcessElement public void processElement(@Element String element, OutputReceiver<String> receiver) { lineLenDist.update(element.length()); if (element.trim().isEmpty()) { emptyLines.inc(); } String[] words = element.split(ExampleUtils.TOKENIZER_PATTERN, -1); for (String word : words) { if (!word.isEmpty()) { receiver.output(word); } } } } public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, ProducerRecord<String,String>> { @Override public ProducerRecord<String, String> apply(KV<String, Long> input) { if(input.getValue()%2==0) return new ProducerRecord("test",input.getKey(),input.getKey()+" "+input.getValue().toString()); else return new ProducerRecord("copy",input.getKey(),input.getKey()+" "+input.getValue().toString()); } } public static class CountWords extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> { @Override public PCollection<KV<String, Long>> expand(PCollection<String> lines) { PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn())); PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement()); return wordCounts; } }
p.apply("ReadLines", KafkaIO.<Long, String>read() .withBootstrapServers("localhost:9092") .withTopic("copy")// use withTopics(List<String>) to read from multiple topics. .withKeyDeserializer(LongDeserializer.class) .withValueDeserializer(StringDeserializer.class) .updateConsumerProperties(ImmutableMap.of("group.id", "my_beam_app_1")) .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true")) .withLogAppendTime() .withReadCommitted() .commitOffsetsInFinalize() .withProcessingTime() .withoutMetadata() ) .apply(Values.create()) .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))) .apply(new CountWords()) .apply(MapElements.via(new FormatAsTextFn())) //PCollection<ProducerRecord<string,string>> .setCoder(ProducerRecordCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) .apply("WriteCounts", (KafkaIO.<String, String>writeRecords() .withBootstrapServers("localhost:9092") //.withTopic("test") .withKeySerializer(StringSerializer.class) .withValueSerializer(StringSerializer.class) ))
1条答案
按热度按时间zu0ti5jz1#
这可以使用kafka.io writerecord方法来完成,该方法接受producer<key,value>,然后使用new producer<>(“topic\u name”,“key”,“value”)-
下面是代码-: