我对Kafka还比较陌生,我想用一个唯一的密钥加入一堆流。我有两个不同的流,我从听两个不同的主题。我重新分区并通过相应的键将它们连接起来。然后,我听另一个主题c,并将之前连接的数据与新的streamc连接起来。总之
A-B=>AB
C->
AB-C>ABC
这工作,并加入数据完全如我所愿,我很高兴。出于某种原因,如果只有数据到达主题c,比如说,它将从状态存储中获取数据(我相信它会将以前的数据写入状态存储)。因此,它将完成与旧记录的连接。据我所知,kafka有自己的缓存机制和状态存储,所以它使它更有效,但在我的情况下,我的数据是相当动态和变化的,所以我不希望它使用旧记录加入。本质上,我想要的是,只有当所有这些主题都分别获得了一个新的有效负载,并且每个主题也完成了自己的连接时,才能连接这些数据。-在任何情况下都不使用任何以前的记录。
这是相关代码
protected Topology createStreamTopology(){
StreamsBuilder builder = new StreamsBuilder();
//Stream A
KStream<String, A> streamA = builder.stream(
aTopic,
Consumed.with(Serdes.String(), aSerde)
).selectKey((k, v) -> A.getOrderId(v));
//Stream B
//Make a table of the salesforce order line item events
KStream<String, B> streamB = builder.stream(
bTopic,
Consumed.with(Serdes.String(), bSerde)
).filter(
(k1, v1) -> v1.getPayload().getPlatformC() != null
).selectKey(
(k, v) -> v.getOrder()
);
//Join stream a and stream b
KStream<String, AB> abData = streamA.join(
streamB,
joinedAB::new,
JoinWindows.of(Duration.ofHours(2)),
Joined.with(Serdes.String(), aSerde, bSerde)
).selectKey(
(k, v) -> v.getPayload.getUniqueID());
// Stream c
KStream<String, C>stream streamC=builder.stream(cTopic, Consumed.with(Serdes.String(), serde))
.mapValues((k,v)->transformToC(v))
.filter((key, value) -> Objects.nonNull(value))
.selectKey((key, value) -> value.getID());
//join stream c and joined abData
KStream<String, abcData> abcData = streamC.join(
abData,
joinedABC::new,
JoinWindows.of(Duration.ofHours(2)),
Joined.with(Serdes.String(), cSerde, abSerde)
);
}
暂无答案!
目前还没有任何答案,快来回答吧!