使用多个kstream/ktable实现micronaut/kafka流的最佳实践?

hpcdzsge  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(260)

关于micronaut/kafka流应用程序的示例,有几个细节我不了解。下面是文档中的示例类(原始链接:https://micronaut-projects.github.io/micronaut-kafka/latest/guide/#kafkastreams).
我的问题是:
为什么我们只返回源流?
如果我们有多个来源 KStream 对象,如要进行连接,是否还需要将其制作为bean?
我们是否也需要制作每个来源 KTable 一颗豆子?
如果我们找不到来源怎么办 KStream 或者 KTable 一颗豆子?我们目前至少有一个项目,这样做,但没有明显的问题。

import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder;
import io.micronaut.context.annotation.Factory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;

import javax.inject.Named;
import javax.inject.Singleton;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;

@Factory
public class WordCountStream {

    public static final String STREAM_WORD_COUNT = "word-count";
    public static final String INPUT = "streams-plaintext-input"; 
    public static final String OUTPUT = "streams-wordcount-output"; 
    public static final String WORD_COUNT_STORE = "word-count-store";

    @Singleton
    @Named(STREAM_WORD_COUNT)
    KStream<String, String> wordCountStream(ConfiguredStreamBuilder builder) { 
        // set default serdes
        Properties props = builder.getConfiguration();
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KStream<String, String> source = builder
                .stream(INPUT);

        KTable<String, Long> groupedByWord = source
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .groupBy((key, word) -> word, Grouped.with(Serdes.String(), Serdes.String()))
                //Store the result in a store for lookup later
                .count(Materialized.as(WORD_COUNT_STORE)); 

        groupedByWord
                //convert to stream
                .toStream()
                //send to output using specific serdes
                .to(OUTPUT, Produced.with(Serdes.String(), Serdes.Long()));

        return source;
    }

}

编辑:这是我们服务的一个版本,包含多个流,编辑以删除标识信息。

@Factory
public class TopologyCopy {
    private static class DataOut {}
    private static class DataInOne {}
    private static class DataInTwo {}
    private static class DataInThree {}

    @Singleton
    @Named("data")
    KStream<Integer, DataOut> dataStream(ConfiguredStreamBuilder builder) {
        Properties props = builder.getConfiguration();
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);

        KStream<Integer, DataInOne> dataOneStream = builder.stream("data-one",
                Consumed.with(TextualIntSerde.INSTANCE, new JsonSerde<>(DataInOne.class)));
        KStream<Integer, DataInTwo> dataTwoStream = builder.stream("data-two",
                Consumed.with(TextualIntSerde.INSTANCE, new JsonSerde<>(DataInTwo.class)));
        GlobalKTable<Integer, DataInThree> signalTable = builder.globalTable("data-three",
                Consumed.with(TextualIntSerde.INSTANCE, new JsonSerde<>(DataInThree.class)),
                Materialized.as("data-three-store"));
        KTable<Integer, DataInTwo> dataTwoTable = dataTwoStream
                .groupByKey()
                .aggregate(() -> null, (key, device, storedDevice) -> device,
                        Materialized.with(TextualIntSerde.INSTANCE, new JsonSerde<>(DataInTwo.class)));

        dataOneStream
                .transformValues(() -> /* MAGIC */))
                .join(dataTwoTable, (data1, data2) -> /* MAGIC */)
                .selectKey((something, msg) ->  /* MAGIC */)
                .to("topic-out", Produced.with(Serdes.UUID(), new JsonSerde<>(OutMessage.class)));

        return dataOneStream;
    }

}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题