对于下面的代码,stream1和stream2单独运行良好,我可以看到输出,但是连接的流根本不记录任何内容。我觉得这和join窗口有关,但是来自两个流的数据几乎是同时进入的。
val stream = builder.stream(stringSerde, byteArraySerde, "topic")
val stream1 = stream
.filter((key, value) => somefilter(key, value))
.through(stringSerde, byteArraySerde, "topic1")
val stream2 = stream
.filter((key, value) => someotherfilter(key, value))
.through(stringSerde, byteArraySerde, "topic2")
val joinedStream = stream1
.join(stream2, (value1: Array[Byte], value2: Array[Byte]) => {
println("wont print anything")
return somerandomdata
},
JoinWindows.of("othertopic").within(10000L),
stringSerde, byteArraySerde, byteArraySerde)
1条答案
按热度按时间wb1gzix01#
两个主题的键不应该相同吗?
我认为javadoc解释了这一点:https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/joinwindows.html
这可能也是一个有趣的读物:https://cwiki.apache.org/confluence/display/kafka/kafka+streams+join+semantics