如何在dataflow/beam中将流数据与大型历史数据集相结合

wmtdaxz3  于 2021-06-26  发布在  Flink
关注(0)|答案(1)|浏览(400)

我正在调查通过googledataflow/apachebeam处理web用户会话的日志,需要将用户的日志与上个月的用户会话历史相结合。
我研究了以下方法:
使用30天固定窗口:最有可能的一个大窗口,以适应内存,我不需要更新用户的历史,只是参考它
使用cogroupbykey连接两个数据集,但这两个数据集必须具有相同的窗口大小(https://cloud.google.com/dataflow/model/group-by-key#join),这在我的情况下是不正确的(24小时对30天)
使用side输入检索给定会话的用户会话历史记录 elementprocessElement(ProcessContext processContext) 我的理解是 .withSideInputs(pCollectionView) 需要融入记忆。我知道我可以将单个用户的所有会话历史记录放入内存,但不能将所有会话历史记录都放入内存。
我的问题是,是否有一种方法可以从只与当前用户会话相关的侧输入加载/流式传输数据?
我设想了一个pardo函数,通过指定用户的id从侧面输入加载用户的历史会话,但是只有当前用户的历史会话可以放入内存;通过侧输入加载所有历史记录会话将太大。
一些伪代码来说明:

  1. public static class MetricFn extends DoFn<LogLine, String> {
  2. final PCollectionView<Map<String, Iterable<LogLine>>> pHistoryView;
  3. public MetricFn(PCollectionView<Map<String, Iterable<LogLine>>> historyView) {
  4. this.pHistoryView = historyView;
  5. }
  6. @Override
  7. public void processElement(ProcessContext processContext) throws Exception {
  8. Map<String, Iterable<LogLine>> historyLogData = processContext.sideInput(pHistoryView);
  9. final LogLine currentLogLine = processContext.element();
  10. final Iterable<LogLine> userHistory = historyLogData.get(currentLogLine.getUserId());
  11. final String outputMetric = calculateMetricWithUserHistory(currentLogLine, userHistory);
  12. processContext.output(outputMetric);
  13. }
  14. }
nszi6y05

nszi6y051#

目前还没有一种方法可以访问流中的每键输入,但它肯定会像您描述的那样有用,我们正在考虑实现它。
一种可能的解决方法是使用side输入来分发指向实际会话历史的指针。生成24小时会话历史记录的代码可以将它们上传到gcs/bigquery/etc,然后将位置作为侧输入发送到加入代码。

相关问题