主题1到主题2使用Kafka流

2hh7jdfx  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(398)

我是新的Kafka流,我想读一个主题,写在一个新的主题使用Kafka流api的一部分。我的键是string,值是avro有我可以使用的文档/示例吗?
编辑:

final StreamsBuilder builder = new StreamsBuilder();
    final KStream<String, GenericRecord> inputStream = builder.stream("Test_CX_TEST_KAFKA_X");
    final KStream<String, String> newStream = inputStream.mapValues(value -> value.get("ID").toString());
    newStream.to("SUB_TOPIC",Produced.with(Serdes.String(),Serdes.String()));
    final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
    streams.start();

在子主题中,我有:
key:{“id”:“145”}时间戳:2019年3月14日17:52:23.43偏移量:12分区:0
我的输入主题:
“id”是“145”,“时间戳”15525252538545,“周”是“145”的“星期”的“周”的“周”的“本”的“消息”的“id”是“145”的“145”,“时间戳”是“145”,“时间戳”1552525252525252538545,“时间戳”155252525252525255,“星期”周“周”这一“周”的“来源”是“源”的“字符串”字符串“tmp},”身体““,”身体:“{“字符串”字符串“{“字符串”字符串““{”操作操作操作操作操作的操作类型类型,操作操作操作类型的类型\“::“插入”是“插入”的“插入”,此外,“旧”和“旧的”和“时间”以及“旧的”以及“旧的”以及“时间”以及“旧的”以及“时间”的”以及“旧的”的”以及“此外,据据据据据据据据据据据据据据据据据据*\“,\“siren\u siret\”:null}}“}”,键入“action”:{“string”:“insert”}}
如何在新主题中添加body中的其他字段?例子:
{“id”:“145”,“timestamp”:1552585938545,“week”:“\u0000”,“source”:{“string”:“tmp”},“body”:{“string”:“{“operation\u type\”:\“insert\”,\“old\”:{“row\u id\”:null,\“last\u upd\”:null},\“new\”:{“row\u id\”:\“170309-********\”,\“last\u upd\”:\“2019-03-14t17:52:18\”},“type\u action”:{“string”:“insert”}

lpwwtiir

lpwwtiir1#

您可以简单地将主题作为流使用,并使用.map()/.mapvalues()函数修改value/keyvalues。
示例:假设您想从avro记录中选取一列并发布到新的输出主题。

// If you are using Schema registry, make sure to add the schema registry url 
// in streamConfiguration. Also specify the AvroSerde for VALUE_SERDE

final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, GenericRecord> inputStream = builder.stream("inputTopic");
final KStream<String, String> newStream = userProfiles.mapValues(value -> value.get("fieldName").toString());
subStream.to("outputTopic",Produced.with(Serdes.String(),Serdes.String());
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);

此外,您还可以查看github上的示例:
https://github.com/confluentinc/kafka-streams-examples/blob/5.1.2-post/src/main/java/io/confluent/examples/streams/wikipediafeedavroexample.java

相关问题