Kafka团队:定制jsonserge

xiozqbni  于 2021-07-14  发布在  Java
关注(0)|答案(0)|浏览(250)

我有一个代码,实现了kafkastream在springboot应用程序中的工作:

public Consumer<KStream<String, ContentModel>> stream() {
    return in -> in.groupByKey(Grouped.with(Serdes.String(), new JsonSerde<>(ContentModel.class)))      
}

private ContentModel mergeObjects(final ContentModel reducer, final ContentModel materialized) {
    return reducer;
}

问题是我的应用程序在其他情况下使用自己的objectmapper bean(例如,读取包含json内容的文件)。由于jsonserge还包含“内部”自己的objectmapper,所以2个objectmapper的示例会引起“混乱”。
我想办法把两个物体Map器放在两边。
为此,我尝试定制用于kafkastream处理的jsonserge:
在配置中:

@Bean
public JsonSerde<ContentModel> jsonSerde() {
    ObjectMapper kafkaObjectMapper = new ObjectMapper();
    return new JsonSerde<>(ContentModel.class, kafkaObjectMapper);
}

上课时间:

@Autowired
private JsonSerde<ContentModel> jsonSerde;

public Consumer<KStream<String, ContentModel>> stream() {
    return in -> in.groupByKey(Grouped.with(Serdes.String(), jsonSerde))
            .reduce(this::mergeObjects, Materialized.as(readMessagesStore.getStoreName()));
}

但是,我收到以下错误:

java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR.
at org.apache.kafka.streams.KafkaStreams.validateIsRunning(KafkaStreams.java:294) ~[kafka-streams-2.3.1.jar!/:?]
at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1066) ~[kafka-streams-2.3.1.jar!/:?]

我做错什么了?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题