我们有一个用例,在这个用例中,我们必须将一些消息读入kstream,然后转换消息并有条件地将其生成到另一个主题。
在我们的用例中,对于对象的转换,我们进行一个下游api调用。如果api调用成功,则生成到newtopic1,否则生成到newtopic2。如何才能达到同样的效果??
到目前为止,我们正在使用streams api提供的to方法为新的kafka主题生成丰富的(即转换对象)样式。
KStream<String, Customer> transformedStream = sourceKafkaStream
.mapValues(cust -> {
try {
logger.info("Hitting to the downstream to fetch additional information, will take few seconds.");
Thread.sleep(7000);
return RecordEnrichmentAndTransformationBuilder.enrichTheCustomer(cust);
} catch (InterruptedException e) {
e.printStackTrace();
}
return cust;
});
.to('newTopic1', Produced.with(AppSerdes.String(), AppSerdes.serdeForEnrichedCustomer()));
感谢您对此的回复。
1条答案
按热度按时间6tqwzwtp1#
使用您使用的dsl api
KStream::filter
或者KStream:to(TopicNameExtractor<K, V> topicExtractor, Produced<K, V> produced)
.如果两种格式相同,示例代码如下所示: