我正在写一个应用程序,使用Kafka流。它读取主题a,进行一些转换,然后写入主题b。在转换过程中,值按键分组,因此输出键、值类型与输入值类型不同。kafka流使用特定类型的serdes(例如string serdes序列化和反序列化字符串)进行序列化和反序列化,因此在转换数据后它将不起作用。如何在streams api中定义不同的序列化程序和反序列化程序?
gab6jxml1#
当然可以当您创建流、调用groupby或将输出写入某个主题时,您可以提供 Serde 或者 Serialized . 例子:
Serde
Serialized
Serde<String> stringSerde = Serdes.String(); Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde); Produced<String, YourCustomItem> produced = Produced.with(stringSerde, new JsonSerde<>(YourCustomItem.class)); KStream<String, String> kStream = streamsBuilder.stream("sourceTopicName", consumed); KStream<String, YourCustomItem> transformedKStream = kStream.mapValues((key, value) -> new YourCustomItem()); transformedKStream.to("destinationTopicName", produced); transformedKStream.groupByKey(Serialized.with(Serdes.String(), new JsonSerde<>(YourCustomItem.class)));
哪里 JsonSerde 来自 spring-kafka 附属国。或者你可以用下面的 Serde :
JsonSerde
spring-kafka
Serializer<JsonNode> jsonSerializer = new JsonSerializer(); Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer(); Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
1条答案
按热度按时间gab6jxml1#
当然可以
当您创建流、调用groupby或将输出写入某个主题时,您可以提供
Serde
或者Serialized
. 例子:哪里
JsonSerde
来自spring-kafka
附属国。或者你可以用下面的Serde
: