Kafka流在转换后有条件地产生消息

xlpyo6sf  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(393)

我们有一个用例,在这个用例中,我们必须将一些消息读入kstream,然后转换消息并有条件地将其生成到另一个主题。
在我们的用例中,对于对象的转换,我们进行一个下游api调用。如果api调用成功,则生成到newtopic1,否则生成到newtopic2。如何才能达到同样的效果??
到目前为止,我们正在使用streams api提供的to方法为新的kafka主题生成丰富的(即转换对象)样式。

KStream<String, Customer> transformedStream = sourceKafkaStream
                .mapValues(cust -> {
                    try {
                        logger.info("Hitting to the downstream to fetch additional information, will take few seconds.");
                        Thread.sleep(7000);
                        return RecordEnrichmentAndTransformationBuilder.enrichTheCustomer(cust);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return cust;
                });
                .to('newTopic1', Produced.with(AppSerdes.String(), AppSerdes.serdeForEnrichedCustomer()));

感谢您对此的回复。

6tqwzwtp

6tqwzwtp1#

使用您使用的dsl api KStream::filter 或者 KStream:to(TopicNameExtractor<K, V> topicExtractor, Produced<K, V> produced) .
如果两种格式相同,示例代码如下所示:

KStream<String, Customer> transformedStream = sourceKafkaStream
                .mapValues(cust -> {
                    try {
                        logger.info("Hitting to the downstream to fetch additional information, will take few seconds.");
                        Thread.sleep(7000);
                        return RecordEnrichmentAndTransformationBuilder.enrichTheCustomer(cust);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return cust;
                });
                .to((key, value, recordContext) -> topicNameCalculation(key, value), Produced.with(AppSerdes.String(), AppSerdes.serdeForEnrichedCustomer()));
``` `topicNameCalculation(...)` 会根据关键字和值选择合适的题目。
在Kafka流中进行外部调用通常不是一个好的方法。

相关问题