ApacheKafka—SpringCloud流中具有不同键/值的多个输出绑定(又称分支)

7lrncoxx  于 2021-07-13  发布在  Java
关注(0)|答案(1)|浏览(349)

我知道kafka streams允许根据指定的 predicate 将数据分发到多个主题,kafka streams绑定器使用 @StreamListener 以及 functional binding 接近。

...
// return type KStream<?, WordCount>[]

Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench =  (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");

return input.
    ... 
    branch(isEnglish, isFrench, isSpanish);

我想知道如何在返回数据之前转换其中一个分支的键或值。假设我希望其中一个分支具有与其他分支不同的密钥类型。

Predicate<Key, Value> isOne = (k, v) -> v.important.equals("1");
Predicate<Key, Value> isTwo =  (k, v) -> v.important.equals("2");

KStream<Key, Value>[] branches = input.branch(isOne, isTwo);

KStream<String, Value> one = branches[0].selectKey((k, v) -> v.importantValue);

我想创建一个新的 KStream<?, Value>[] 具有两个流的数组,但由于泛型数组创建错误而无法创建。
我知道这是可能的,从下面的文档摘录中可以看出,可以为每个分支的 producer .

spring.cloud.stream.kafka.streams.bindings.output1.producer.valueSerde=IntegerSerde
spring.cloud.stream.kafka.streams.bindings.output2.producer.valueSerde=StringSerde
spring.cloud.stream.kafka.streams.bindings.output3.producer.valueSerde=JsonSerde

谢谢你的帮助。

tpgth1q7

tpgth1q71#

一种选择是创建一个副标题。然后,每一个不同于 WordCount 你把主题发到那边去。仍然是 WordCount 你继续讨论分支的主题。
我使用kstream创建了这个基于spring云流示例的代码示例。它不起作用,因为这个想法是有效的。我有一个相似的例子,它消耗了不同的能量 Order 对象和发送错误 Orders 错误主题。

return input -> {
            KStream<?, WordCount> intermediateStream = input
                    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                    .groupBy((key, value) -> value)
                    .windowedBy(TimeWindows.of(Duration.ofSeconds(6)))
                    .count(Materialized.as("WordCounts-1"))
                    .toStream()
                    .map((key, value) -> new KeyValue<>(null,
                            new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));

            // here we return to the SIDE_TOPIC records with JsonSerde
            intermediateStream
                    .filter((k, v) -> `create another filter`)
                    .map((k, v) -> `transform only this stream`)
                    .to(SIDE_TOPIC, Produced.with(CustomSerdes.String(), new JsonSerde(....)));

            // here we keep using the branch serializer.
            intermediateStream.branch(isEnglish, isFrench, isSpanish);
        }

当您使用带有错误或空值的数据时,此用例是一种常见的方法,您希望将这些数据发送到一个辅助主题,即:一个错误主题。然后您仍然可以保存这些事件以在将来分析它们。

相关问题