这是输入Kafka主题,其中包含 ConnectionEvent
:
ConnectionEvent("John", "123", "CONNECTED") ConnectionEvent("John", "123", "DISCONNECTED")
ConnectionEvent("Anna", "222", "CONNECTED") ConnectionEvent("Rohan", "334", "CONNECTED")
ConnectionEvent("Anna", "199", "CONNECTED") ConnectionEvent("Anna", "255", "CONNECTED")
ConnectionEvent("Anna", "255", "DISCONNECTED") ConnectionEvent("Anna", "222", "DISCONNECTED")
流与还原逻辑
主题中的每个项目都使用消息键作为用户id发送。例如,“anna”。
必须按以下方式处理流:
john只有1个会话123连接和断开。所以他下线了
rohan只有一个没有断开的会话334。所以他是在线的
anna有3个会话(222199255),其中2个断开连接。所以她上网了
ktable必须具有以下数据: John Offline
Rohan Online Anna Online
我尝试的是:
KTable<String, String> connectedSessions = stream.groupBy((k,v) -> v.getSessionId()) //Group by user and then by sessionId
.reduce((agg, newVal) -> agg) //Take latest value ie, reduce pair of records for each session to 1
.filter(x -> x.getState == CONNECTED) //Filter only session records which has CONNECTED has last state
但是现在,如何将复合键(user,sessionid)解组为only user,然后根据最新状态为connected的sessionid的数量将user标记为online/offline?
1条答案
按热度按时间balp4ylt1#
如果用户在线,只要他连接的事件数大于断开的事件数。因此,您可以聚合流中的连接数并检查它是否为正。比如: