使用传递的字符串创建一个KStream

rkkpypqq  于 2022-10-07  发布在  Apache
关注(0)|答案(1)|浏览(107)

我有一根绳子\

字符串输入=“{{”Header“:”K“},
{“Body”:“sghd”}}“\

我将此输入发送到Kafka主题,现在我想将其作为KStream<字符串,JsonNode>使用,而不是字符串?\

我试过了
StreamsBuilder Builder=new StreamsBuilder();
KStream<字符串,JsonNode>inputStream=Builder.stream(主题);\

但它不起作用。有人能帮帮我吗?

toiithl6

toiithl61#

您必须包含您正在使用的两个类的Serdes。存在用于字符串的Serdes,但需要为JsonNode创建自定义Serdes。在此之后,您可以使用此代码。

KStream<String, Record> inputStream = streamsBuilder
            .stream(topic, Consumed.with(Serdes.String(), CustomSerdes.JsonNode()));

可用于SERDES的代码为:

public class CustomSerdes {
  private CustomSerdes() {}

  public static Serde<JsonNode> JsonNode() {
    JsonSerializer<JsonNode> serializer = new JsonSerializer<>();
    JsonDeserializer<JsonNode> deserializer = new JsonDeserializer<>(JsonNode.class);
    return Serdes.serdeFrom(serializer, deserializer);
}

相关问题