apache-kafka 如果传入的消息只在值中引用了Kafka主题中的相关消息,如何更新这些消息?

sc4hvdpw  于 2022-11-01  发布在  Apache
关注(0)|答案(2)|浏览(114)

如果不同消息中的引用号(refNo)相同,我需要合并有效负载。我的限制是我只能使用KTable,如果键是偶数,我不需要合并有效负载。另外,传入消息的顺序不应该改变结果。
例如,如果我们有一个空主题,传入的消息为:

1: { key: "1", value: {refNo:1, payload:{data1}} }
2: { key: "2", value: {refNo:1, payload:{data2}} }
3: { key: "3", value: {refNo:2, payload:{data3}} } // this one should be not effected and left how it is

预期结果:

1: { key: "1", value: {refNo:1, payload:{data1, data2}} }
2: { key: "2", value: {refNo:1, payload:{data2}} }
3: { key: "3", value: {refNo:2, payload:{data3}} }

我能想到的唯一方法是使用两次.groupBy并再次加入到原始主题中。
1.首先将键更改为refNo,将键保存到值本身,然后在聚合期间加入有效负载。
1.其次.groupBy还原键到初始状态。
1.最后一步将所有内容都加入到原始主题中,因为我在分组时丢失了一条消息。
我很确定有一个更简单的方法来解决这个问题。什么是最优化和优雅的方法来解决这个问题?
编辑:其下游有输出题目,原文不编辑。

vjrehmav

vjrehmav1#

在KSQL中进行聚合可能正好可以实现这一点。
唯一的问题是你需要多大的窗口。你需要多久连接一次数据?
而且,感觉写回原来的主题就像一个大大的“不”。
1.您只是稍微更改了消息格式,其他下游使用者可能已经期望特定的消息格式。
1.写一个新的主题似乎是一个更好的途径,它还减少了额外的消费者需要消费的消息量。

gfttwv5a

gfttwv5a2#

目前我正在使用这个解决方案。它工作,但我不知道它将如何执行,或者它可以更优化,或者是否有更好的方法来解决我的问题。

KStream<String, Value> even = inputTopicStream.filter((key, value) -> value.isEven()));

inputTopicStream.toTable(Materialized.with(String.serdes, Value.serde))
   .groupBy(
      (key, value) -> KeyValue.pair(new Key(value.getRefNo(), addKeyToValue(key, value)),
      Grouped.with("aggregation-internal", String.serdes, Value.serde))
   .aggregate(
      Value::new,
      (key, value, agg) -> mergePayload(key, value, agg), // ensure that key is uneven after merge
      (key, value, agg) -> handleSplit(key, value, agg))
   .toStream()
      .selectKey((key, value) -> new Key(value.getKey())) // restore original key
      .merge(even) // need to merge even key stream, because they was lost during aggregation
      .to(OUTPUT_TOPIC, Produced.with(String.serde, Value.serde));

相关问题