如何使用kafka流连接操作处理一对多关系

qnakjoqk  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(379)

你能帮助我如何实现这个使用Kafka流?
场景:将订单数据的所有发票分组。在实时流媒体中,可能会延迟接收发票。所以我们想等20分钟把所有的发票分组后再加入。
示例:订单“x”有3张发票,预计将在20分钟内收到。
预期输出:订单和3张发票应作为输出主题中的单个数据提供。
我们有下面的拓扑结构来实现这一点。
我们分别有订单流和发票流
我们正在根据订单密钥对发票进行分组。我们设置了20分钟的摇窗时间
将订单数据与生成的发票组关联
将输出写入新主题
问题:步骤3未等待步骤2完成。加入收到订单后立即执行的操作。所以我们没有得到预期的产出。
我们尝试使用joinwindows实现同样的功能。但是由于连接窗口是滑动窗口,所以我们在输出主题中得到了重复的数据。
在上面的例子中,如果我们使用join窗口而不是tumbling窗口,我们将得到3个输出数据,订单分别有1个发票、2个发票和3个发票。
请帮助我解决这个问题或建议任何替代方法
代码段:

KTable<Windowed<String>, List<InvoiceList>> invoiceList= invoiceStream
                .groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofSeconds(1200)))
                .aggregate(() -> new ArrayList<InvoiceList>(),
                        (key, newValue, agg) -> {
                            new KeyValue<>(key, agg.add(newValue));
                            return agg;
                        },
                        Materialized.as("invoice-list").with(Serdes.String(), new ArrayListSerde<InvoiceList>(AppSerdes.InvoiceList())))

KStream<String, Order> orderOutput=

                orderStream.join(invoiceList, Joiner);

        orderOutput.to(AppConfig.OutputTopic.OUTPUT_ORDER,Produced.with(Serdes.String(), AppSerdes.Order()));
uidvcgyl

uidvcgyl1#

这种连接力在我们的情况下起作用。因此,我们将其作为两个独立的流接收,并在使用者上添加自定义逻辑来处理我们的用例。
谢谢!

a2mppw5e

a2mppw5e2#

我想,订单先到,发票再到,而不是相反。如果我的假设是对的,那么你的逻辑就行不通了。因为当订单进入您的kstream时,可能没有发票,因此join不会获取任何发票。请记住,kstream ktable连接是非窗口连接,可以像查找ktable(changelog流)一样使用。

相关问题