我正在使用kstreamsapi for java。我正在尝试将搜索查询与结果集上的结果单击连接起来。一个查询可以导致0到n次单击。单击和查询分别记录在一个主题中,可以通过请求id进行连接。当我连接它们时,连接的对首先在包含单击信息的部分中为空(很明显,因为点击需要时间)。我将这些记录输出到cassandra,以便在之后对它们进行聚合(我知道我不是个好孩子)。不管怎样,我不希望每次查询都有一个“空连接”,我只希望在超过joinwindow之后得到结果。那么,有没有一种方法可以在窗口结束之前抑制join的输出呢?
这是连接的(kotlin)代码:
// Consuming the query log topic with message key = request_id
val queryLogs = streamBuilder.stream("query_logs",
Consumed.with(stringSerdes, querySerdes))
// Consuming the click log topic with message key = request_id
val clickLogs = streamBuilder.stream("click_logs",
Consumed.with(stringSerdes, clickSerdes))
// Joining the click and the query log on request id to get the information which queries resulted in which clicks
val outerJoin = queryLogs.outerJoin(clickLogs, QueryClickJoiner(),
JoinWindows.of(Duration.ofMinutes(30)),
Joined.with(stringSerdes, clickSerdes, querySerdes))
outerJoin.to("joined_clicks", Produced.with(stringSerdes,queryClickSerdes!!))
结果“查询点击”将有一个查询日志字段和一个结果点击列表。我只希望在连接窗口的30分钟结束后得到结果,而不是每次单击与查询连接时的更新。
谨致问候
暂无答案!
目前还没有任何答案,快来回答吧!