加入同一个流以过滤某些事件

agyaoht7  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(332)

我有一个数据流,看起来像这样:

  1. impressionId | id | name | eventType | timestamp

我需要过滤(忽略)类型为“click”的事件,它没有匹配类型为“impression”的“impressionid”(因此基本上忽略没有impression的clicks事件),然后计算我总共有多少个impression,以及在特定时间窗口中我有多少次单击(对于id/name对)。
我就是这样找到解决方案的:

  1. [...]
  2. Table eventsTable = tEnv.fromDataStream(eventStreamWithTimeStamp, "impressionId, id, name, eventType, eventTime.rowtime");
  3. tEnv.registerTable("Events", eventsTable);
  4. Table clicksTable = eventsTable
  5. .where("eventType = 'click'")
  6. .window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow"))
  7. .groupBy("impressionId, id, name, eventType, minuteWindow")
  8. .select("impressionId as clickImpressionId, eventType as clickEventType, concat(concat(id,'_'), name) as concatClickId, id as clickId, name as clickName, minuteWindow.rowtime as clickMinute");
  9. Table impressionsTable = eventsTable
  10. .where("eventType = 'impression'")
  11. .window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow"))
  12. .groupBy("impressionId, id, name, eventType, minuteWindow")
  13. .select("impressionId as impressionImpressionId, eventType as impressionEventType, concat(concat(id,'_'), name) as concatImpId, id as impId, name as impName, minuteWindow.rowtime as impMinute");
  14. Table filteredClickCount = clicksTable
  15. .join(impressionsTable, "clickImpressionId = impressionImpressionId && concatClickId = concatImpId && clickMinute = impMinute")
  16. .window(Slide.over("24.hour").every("1.minute").on("clickMinute").as("minuteWindow"))
  17. .groupBy("concatClickId, clickMinute")
  18. .select("concatClickId, concatClickId.count as clickCount, clickMinute as eventTime");
  19. DataStream<Test3> result = tEnv.toAppendStream(filteredClickCount, Test3.class);
  20. result.print();

我要做的只是创建两个表,一个有点击,一个有印象,'内部'连接点击到印象和一个被连接意味着他们的点击有一个匹配的印象。
现在这不管用,我也不知道为什么!?
最后一个联接表生成的计数不正确。它在第一分钟起作用,但在那之后计数几乎减少了一倍。
然后,我尝试修改最后一个表,如下所示:

  1. Table clickWithMatchingImpression2 = clicksTable
  2. .join(impressionsTable, "clickImpressionId = impressionImpressionId && concatClickId = concatImpId && clickMinute = impMinute")
  3. .groupBy("concatClickId, clickMinute")
  4. .select("concatClickId, concatClickId.count as clickCount, clickMinute as eventTime");
  5. DataStream<Tuple3<Boolean, Tuple3>> result2 = tEnv.toRetractStream(clickWithMatchingImpression2, Test3.class);
  6. result2.print();

还有。。。。这管用!?但是我不知道为什么,也不知道如何处理这个数据流<tuple3<boolean,test3>>格式。。。当table没有Windows时,Flink拒绝使用toappendstream。我想要一个只有最后数字的简单结构。
1)我的方法正确吗?有没有更简单的方法过滤没有印象的点击?
2)为什么我的解决方案中的计数不正确?

0s7z1bwu

0s7z1bwu1#

我不完全确定我是否正确理解了您的用例,一个带有一些数据点的示例在这里肯定会有所帮助。
让我解释一下你的代码在做什么。首先,这两个表计算了过去24小时内有多少次点击/印象。对于输入

  1. new Event("1", "1", "ABC", "...", 1),
  2. new Event("1", "2", "ABC", "...", 2),
  3. new Event("1", "3", "ABC", "...", 3),
  4. new Event("1", "4", "ABC", "...", 4)

您将获得窗口(array,window\u start,window\u end,rowtime):

  1. [1], 1969-12-31-01T00:01:00.000, 1970-01-01T00:01:00.000, 1970-01-01T00:00:59.999
  2. [1, 2], 1969-12-31-01T00:02:00.000, 1970-01-01T00:02:00.000, 1970-01-01T00:01:59.999
  3. [1, 2, 3], 1969-12-31-01T00:03:00.000, 1970-01-01T00:03:00.000, 1970-01-01T00:02:59.999
  4. ...

因此,当你在id和name上分组时,你会得到如下结果:

  1. 1, '...', '1_ABC', 1, 'ABC', 1970-01-01T00:00:59.999
  2. 1, '...', '1_ABC', 1, 'ABC', 1970-01-01T00:01:59.999
  3. 1, '...', '1_ABC', 1, 'ABC', 1970-01-01T00:02:59.999
  4. ...

如果您在24小时窗口中再次分组,您将多次计算具有相同id的每个事件。
如果我正确地理解了您的用例,并且您正在查找一次单击前后1分钟内发生了多少次印象,那么间隔连接可能就是您要查找的。您可以通过以下查询实现您的案例:

  1. Table clicks = eventsTable
  2. .where($("eventType").isEqual("click"))
  3. .select(
  4. $("impressionId").as("clickImpressionId"),
  5. concat($("id"), "_", $("name")).as("concatClickId"),
  6. $("id").as("clickId"),
  7. $("name").as("clickName"),
  8. $("eventTime").as("clickEventTime")
  9. );
  10. Table impressions = eventsTable
  11. .where($("eventType").isEqual("impression"))
  12. .select(
  13. $("impressionId").as("impressionImpressionId"),
  14. concat($("id"), "_", $("name")).as("concatImpressionId"),
  15. $("id").as("impressionId"),
  16. $("name").as("impressionName"),
  17. $("eventTime").as("impressionEventTime")
  18. );
  19. Table table = impressions.join(
  20. clicks,
  21. $("clickImpressionId").isEqual($("impressionImpressionId"))
  22. .and(
  23. $("clickEventTime").between(
  24. $("impressionEventTime").minus(lit(1).minutes()),
  25. $("impressionEventTime"))
  26. ))
  27. .select($("concatClickId"), $("impressionEventTime"));
  28. table
  29. .window(Slide.over("24.hour").every("1.minute").on("impressionEventTime").as("minuteWindow"))
  30. .groupBy($("concatClickId"), $("minuteWindow"))
  31. .select($("concatClickId"), $("concatClickId").count())
  32. .execute()
  33. .print();

至于为什么flink有时不能产生追加流,而只能产生收回流,请参见。非常简单地说,如果基于时间属性的操作不起作用,当结果为“有效”时,就没有单一的时间点。因此,它必须发出更改流,而不是一个附加值。元组中的第一个字段告诉您记录是插入(true)还是收回/删除(false)。

展开查看全部

相关问题