我知道kafka streams允许根据指定的 predicate 将数据分发到多个主题,kafka streams绑定器使用 @StreamListener
以及 functional binding
接近。
...
// return type KStream<?, WordCount>[]
Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");
return input.
...
branch(isEnglish, isFrench, isSpanish);
我想知道如何在返回数据之前转换其中一个分支的键或值。假设我希望其中一个分支具有与其他分支不同的密钥类型。
Predicate<Key, Value> isOne = (k, v) -> v.important.equals("1");
Predicate<Key, Value> isTwo = (k, v) -> v.important.equals("2");
KStream<Key, Value>[] branches = input.branch(isOne, isTwo);
KStream<String, Value> one = branches[0].selectKey((k, v) -> v.importantValue);
我想创建一个新的 KStream<?, Value>[]
具有两个流的数组,但由于泛型数组创建错误而无法创建。
我知道这是可能的,从下面的文档摘录中可以看出,可以为每个分支的 producer
.
spring.cloud.stream.kafka.streams.bindings.output1.producer.valueSerde=IntegerSerde
spring.cloud.stream.kafka.streams.bindings.output2.producer.valueSerde=StringSerde
spring.cloud.stream.kafka.streams.bindings.output3.producer.valueSerde=JsonSerde
谢谢你的帮助。
1条答案
按热度按时间tpgth1q71#
一种选择是创建一个副标题。然后,每一个不同于
WordCount
你把主题发到那边去。仍然是WordCount
你继续讨论分支的主题。我使用kstream创建了这个基于spring云流示例的代码示例。它不起作用,因为这个想法是有效的。我有一个相似的例子,它消耗了不同的能量
Order
对象和发送错误Orders
错误主题。当您使用带有错误或空值的数据时,此用例是一种常见的方法,您希望将这些数据发送到一个辅助主题,即:一个错误主题。然后您仍然可以保存这些事件以在将来分析它们。