我有一个数据流,看起来像这样:
impressionId | id | name | eventType | timestamp
我需要过滤(忽略)类型为“click”的事件,它没有匹配类型为“impression”的“impressionid”(因此基本上忽略没有impression的clicks事件),然后计算我总共有多少个impression,以及在特定时间窗口中我有多少次单击(对于id/name对)。
我就是这样找到解决方案的:
[...]
Table eventsTable = tEnv.fromDataStream(eventStreamWithTimeStamp, "impressionId, id, name, eventType, eventTime.rowtime");
tEnv.registerTable("Events", eventsTable);
Table clicksTable = eventsTable
.where("eventType = 'click'")
.window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow"))
.groupBy("impressionId, id, name, eventType, minuteWindow")
.select("impressionId as clickImpressionId, eventType as clickEventType, concat(concat(id,'_'), name) as concatClickId, id as clickId, name as clickName, minuteWindow.rowtime as clickMinute");
Table impressionsTable = eventsTable
.where("eventType = 'impression'")
.window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow"))
.groupBy("impressionId, id, name, eventType, minuteWindow")
.select("impressionId as impressionImpressionId, eventType as impressionEventType, concat(concat(id,'_'), name) as concatImpId, id as impId, name as impName, minuteWindow.rowtime as impMinute");
Table filteredClickCount = clicksTable
.join(impressionsTable, "clickImpressionId = impressionImpressionId && concatClickId = concatImpId && clickMinute = impMinute")
.window(Slide.over("24.hour").every("1.minute").on("clickMinute").as("minuteWindow"))
.groupBy("concatClickId, clickMinute")
.select("concatClickId, concatClickId.count as clickCount, clickMinute as eventTime");
DataStream<Test3> result = tEnv.toAppendStream(filteredClickCount, Test3.class);
result.print();
我要做的只是创建两个表,一个有点击,一个有印象,'内部'连接点击到印象和一个被连接意味着他们的点击有一个匹配的印象。
现在这不管用,我也不知道为什么!?
最后一个联接表生成的计数不正确。它在第一分钟起作用,但在那之后计数几乎减少了一倍。
然后,我尝试修改最后一个表,如下所示:
Table clickWithMatchingImpression2 = clicksTable
.join(impressionsTable, "clickImpressionId = impressionImpressionId && concatClickId = concatImpId && clickMinute = impMinute")
.groupBy("concatClickId, clickMinute")
.select("concatClickId, concatClickId.count as clickCount, clickMinute as eventTime");
DataStream<Tuple3<Boolean, Tuple3>> result2 = tEnv.toRetractStream(clickWithMatchingImpression2, Test3.class);
result2.print();
还有。。。。这管用!?但是我不知道为什么,也不知道如何处理这个数据流<tuple3<boolean,test3>>格式。。。当table没有Windows时,Flink拒绝使用toappendstream。我想要一个只有最后数字的简单结构。
1)我的方法正确吗?有没有更简单的方法过滤没有印象的点击?
2)为什么我的解决方案中的计数不正确?
1条答案
按热度按时间0s7z1bwu1#
我不完全确定我是否正确理解了您的用例,一个带有一些数据点的示例在这里肯定会有所帮助。
让我解释一下你的代码在做什么。首先,这两个表计算了过去24小时内有多少次点击/印象。对于输入
您将获得窗口(array,window\u start,window\u end,rowtime):
因此,当你在id和name上分组时,你会得到如下结果:
如果您在24小时窗口中再次分组,您将多次计算具有相同id的每个事件。
如果我正确地理解了您的用例,并且您正在查找一次单击前后1分钟内发生了多少次印象,那么间隔连接可能就是您要查找的。您可以通过以下查询实现您的案例:
至于为什么flink有时不能产生追加流,而只能产生收回流,请参见。非常简单地说,如果基于时间属性的操作不起作用,当结果为“有效”时,就没有单一的时间点。因此,它必须发出更改流,而不是一个附加值。元组中的第一个字段告诉您记录是插入(true)还是收回/删除(false)。