使用多个kstream/ktable实现micronaut/kafka流的最佳实践?

hpcdzsge  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(283)

关于micronaut/kafka流应用程序的示例,有几个细节我不了解。下面是文档中的示例类(原始链接:https://micronaut-projects.github.io/micronaut-kafka/latest/guide/#kafkastreams).
我的问题是:
为什么我们只返回源流?
如果我们有多个来源 KStream 对象,如要进行连接,是否还需要将其制作为bean?
我们是否也需要制作每个来源 KTable 一颗豆子?
如果我们找不到来源怎么办 KStream 或者 KTable 一颗豆子?我们目前至少有一个项目,这样做,但没有明显的问题。

  1. import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder;
  2. import io.micronaut.context.annotation.Factory;
  3. import org.apache.kafka.clients.consumer.ConsumerConfig;
  4. import org.apache.kafka.common.serialization.Serdes;
  5. import org.apache.kafka.streams.StreamsConfig;
  6. import org.apache.kafka.streams.kstream.Grouped;
  7. import org.apache.kafka.streams.kstream.KStream;
  8. import org.apache.kafka.streams.kstream.KTable;
  9. import org.apache.kafka.streams.kstream.Materialized;
  10. import org.apache.kafka.streams.kstream.Produced;
  11. import javax.inject.Named;
  12. import javax.inject.Singleton;
  13. import java.util.Arrays;
  14. import java.util.Locale;
  15. import java.util.Properties;
  16. @Factory
  17. public class WordCountStream {
  18. public static final String STREAM_WORD_COUNT = "word-count";
  19. public static final String INPUT = "streams-plaintext-input";
  20. public static final String OUTPUT = "streams-wordcount-output";
  21. public static final String WORD_COUNT_STORE = "word-count-store";
  22. @Singleton
  23. @Named(STREAM_WORD_COUNT)
  24. KStream<String, String> wordCountStream(ConfiguredStreamBuilder builder) {
  25. // set default serdes
  26. Properties props = builder.getConfiguration();
  27. props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
  28. props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
  29. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  30. KStream<String, String> source = builder
  31. .stream(INPUT);
  32. KTable<String, Long> groupedByWord = source
  33. .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
  34. .groupBy((key, word) -> word, Grouped.with(Serdes.String(), Serdes.String()))
  35. //Store the result in a store for lookup later
  36. .count(Materialized.as(WORD_COUNT_STORE));
  37. groupedByWord
  38. //convert to stream
  39. .toStream()
  40. //send to output using specific serdes
  41. .to(OUTPUT, Produced.with(Serdes.String(), Serdes.Long()));
  42. return source;
  43. }
  44. }

编辑:这是我们服务的一个版本,包含多个流,编辑以删除标识信息。

  1. @Factory
  2. public class TopologyCopy {
  3. private static class DataOut {}
  4. private static class DataInOne {}
  5. private static class DataInTwo {}
  6. private static class DataInThree {}
  7. @Singleton
  8. @Named("data")
  9. KStream<Integer, DataOut> dataStream(ConfiguredStreamBuilder builder) {
  10. Properties props = builder.getConfiguration();
  11. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  12. props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
  13. KStream<Integer, DataInOne> dataOneStream = builder.stream("data-one",
  14. Consumed.with(TextualIntSerde.INSTANCE, new JsonSerde<>(DataInOne.class)));
  15. KStream<Integer, DataInTwo> dataTwoStream = builder.stream("data-two",
  16. Consumed.with(TextualIntSerde.INSTANCE, new JsonSerde<>(DataInTwo.class)));
  17. GlobalKTable<Integer, DataInThree> signalTable = builder.globalTable("data-three",
  18. Consumed.with(TextualIntSerde.INSTANCE, new JsonSerde<>(DataInThree.class)),
  19. Materialized.as("data-three-store"));
  20. KTable<Integer, DataInTwo> dataTwoTable = dataTwoStream
  21. .groupByKey()
  22. .aggregate(() -> null, (key, device, storedDevice) -> device,
  23. Materialized.with(TextualIntSerde.INSTANCE, new JsonSerde<>(DataInTwo.class)));
  24. dataOneStream
  25. .transformValues(() -> /* MAGIC */))
  26. .join(dataTwoTable, (data1, data2) -> /* MAGIC */)
  27. .selectKey((something, msg) -> /* MAGIC */)
  28. .to("topic-out", Produced.with(Serdes.UUID(), new JsonSerde<>(OutMessage.class)));
  29. return dataOneStream;
  30. }
  31. }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题