关于micronaut/kafka流应用程序的示例,有几个细节我不了解。下面是文档中的示例类(原始链接:https://micronaut-projects.github.io/micronaut-kafka/latest/guide/#kafkastreams).
我的问题是:
为什么我们只返回源流?
如果我们有多个来源 KStream
对象,如要进行连接,是否还需要将其制作为bean?
我们是否也需要制作每个来源 KTable
一颗豆子?
如果我们找不到来源怎么办 KStream
或者 KTable
一颗豆子?我们目前至少有一个项目,这样做,但没有明显的问题。
import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder;
import io.micronaut.context.annotation.Factory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import javax.inject.Named;
import javax.inject.Singleton;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
@Factory
public class WordCountStream {
public static final String STREAM_WORD_COUNT = "word-count";
public static final String INPUT = "streams-plaintext-input";
public static final String OUTPUT = "streams-wordcount-output";
public static final String WORD_COUNT_STORE = "word-count-store";
@Singleton
@Named(STREAM_WORD_COUNT)
KStream<String, String> wordCountStream(ConfiguredStreamBuilder builder) {
// set default serdes
Properties props = builder.getConfiguration();
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KStream<String, String> source = builder
.stream(INPUT);
KTable<String, Long> groupedByWord = source
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word, Grouped.with(Serdes.String(), Serdes.String()))
//Store the result in a store for lookup later
.count(Materialized.as(WORD_COUNT_STORE));
groupedByWord
//convert to stream
.toStream()
//send to output using specific serdes
.to(OUTPUT, Produced.with(Serdes.String(), Serdes.Long()));
return source;
}
}
编辑:这是我们服务的一个版本,包含多个流,编辑以删除标识信息。
@Factory
public class TopologyCopy {
private static class DataOut {}
private static class DataInOne {}
private static class DataInTwo {}
private static class DataInThree {}
@Singleton
@Named("data")
KStream<Integer, DataOut> dataStream(ConfiguredStreamBuilder builder) {
Properties props = builder.getConfiguration();
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
KStream<Integer, DataInOne> dataOneStream = builder.stream("data-one",
Consumed.with(TextualIntSerde.INSTANCE, new JsonSerde<>(DataInOne.class)));
KStream<Integer, DataInTwo> dataTwoStream = builder.stream("data-two",
Consumed.with(TextualIntSerde.INSTANCE, new JsonSerde<>(DataInTwo.class)));
GlobalKTable<Integer, DataInThree> signalTable = builder.globalTable("data-three",
Consumed.with(TextualIntSerde.INSTANCE, new JsonSerde<>(DataInThree.class)),
Materialized.as("data-three-store"));
KTable<Integer, DataInTwo> dataTwoTable = dataTwoStream
.groupByKey()
.aggregate(() -> null, (key, device, storedDevice) -> device,
Materialized.with(TextualIntSerde.INSTANCE, new JsonSerde<>(DataInTwo.class)));
dataOneStream
.transformValues(() -> /* MAGIC */))
.join(dataTwoTable, (data1, data2) -> /* MAGIC */)
.selectKey((something, msg) -> /* MAGIC */)
.to("topic-out", Produced.with(Serdes.UUID(), new JsonSerde<>(OutMessage.class)));
return dataOneStream;
}
}
暂无答案!
目前还没有任何答案,快来回答吧!