我是新的Kafka流,我想读一个主题,写在一个新的主题使用Kafka流api的一部分。我的键是string,值是avro有我可以使用的文档/示例吗?
编辑:
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, GenericRecord> inputStream = builder.stream("Test_CX_TEST_KAFKA_X");
final KStream<String, String> newStream = inputStream.mapValues(value -> value.get("ID").toString());
newStream.to("SUB_TOPIC",Produced.with(Serdes.String(),Serdes.String()));
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
在子主题中,我有:
key:{“id”:“145”}时间戳:2019年3月14日17:52:23.43偏移量:12分区:0
我的输入主题:
“id”是“145”,“时间戳”15525252538545,“周”是“145”的“星期”的“周”的“周”的“本”的“消息”的“id”是“145”的“145”,“时间戳”是“145”,“时间戳”1552525252525252538545,“时间戳”155252525252525255,“星期”周“周”这一“周”的“来源”是“源”的“字符串”字符串“tmp},”身体““,”身体:“{“字符串”字符串“{“字符串”字符串““{”操作操作操作操作操作的操作类型类型,操作操作操作类型的类型\“::“插入”是“插入”的“插入”,此外,“旧”和“旧的”和“时间”以及“旧的”以及“旧的”以及“时间”以及“旧的”以及“时间”的”以及“旧的”的”以及“此外,据据据据据据据据据据据据据据据据据据*\“,\“siren\u siret\”:null}}“}”,键入“action”:{“string”:“insert”}}
如何在新主题中添加body中的其他字段?例子:
{“id”:“145”,“timestamp”:1552585938545,“week”:“\u0000”,“source”:{“string”:“tmp”},“body”:{“string”:“{“operation\u type\”:\“insert\”,\“old\”:{“row\u id\”:null,\“last\u upd\”:null},\“new\”:{“row\u id\”:\“170309-********\”,\“last\u upd\”:\“2019-03-14t17:52:18\”},“type\u action”:{“string”:“insert”}
1条答案
按热度按时间lpwwtiir1#
您可以简单地将主题作为流使用,并使用.map()/.mapvalues()函数修改value/keyvalues。
示例:假设您想从avro记录中选取一列并发布到新的输出主题。
此外,您还可以查看github上的示例:
https://github.com/confluentinc/kafka-streams-examples/blob/5.1.2-post/src/main/java/io/confluent/examples/streams/wikipediafeedavroexample.java