如何使用历史数据集来丰富flink数据流

j2cgzkjk  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(853)

我正在与flink合作一个实时项目,我需要通过计算交易的先前交易功能来丰富每张卡的状态,如下所示:
对于每一张卡,我都有一个统计过去24小时内交易次数的功能。另一方面,我有两个数据源:
首先是一个数据库表,其中存储了昨天结束前的卡片交易。
第二,今天的交易流。
因此,第一步是从数据库中获取每张卡片的昨天交易记录,并将其存储在卡片状态。然后,第二步是用今天的事务更新这个状态,这些事务是在流中出现的,并计算它们在过去24小时内的事务数。我试图以流的形式读取数据库数据,并将其连接到today事务。所以,为了达到上述目标,我使用了richflatmap函数。但是,由于数据库数据本身不是流,因此输出不正确。richflatmap函数如下所示:

  1. transactionsHistory.connect(transactionsStream).flatMap(new
  2. RichCoFlatMapFunction<History, Tuple2<String, Transaction>,
  3. ExtractedFeatures>() {
  4. private ValueState<History> history;
  5. @Override
  6. public void open(Configuration config) throws Exception {
  7. this.history = getRuntimeContext().getState(new
  8. ValueStateDescriptor<>("card history", History.class));
  9. }
  10. //historical data
  11. @Override
  12. public void flatMap1(History history,
  13. Collector<ExtractedFeatures> collector) throws Exception {
  14. this.history.update(history);
  15. }
  16. //new transactions from stream
  17. @Override
  18. public void flatMap2(Tuple2<String, Transaction>
  19. transactionTuple, Collector<ExtractedFeatures> collector) throws
  20. Exception {
  21. History history = this.history.value();
  22. Transaction transaction = transactionTuple.f1;
  23. ArrayList<History> prevDayHistoryList =
  24. history.prevDayTransactions;
  25. // This function returns transactions which are in 24 hours
  26. //window of the current transaction and their count.
  27. Tuple2<ArrayList<History>, Integer> prevDayHistoryTuple =
  28. findHistoricalDate(prevDayHistoryList,
  29. transaction.transactionLocalDate);
  30. prevDayHistoryList = prevDayHistoryTuple.f0;
  31. history.prevDayTransactions = prevDayHistoryList;
  32. this.history.update(history);
  33. ExtractedFeatures ef = new ExtractedFeatures();
  34. ef.updateFeatures(transaction, prevDayHistoryTuple.f1);
  35. collector.collect(ef);
  36. }
  37. });

在flink流媒体程序中,实现上述丰富需求的正确设计模式是什么?我在stack overflow上发现了类似于我的问题的blow问题,但我无法解决我的问题,所以我决定寻求帮助:)
flink流媒体中使用静态数据集丰富数据流
任何帮助都将不胜感激。

vlju58qv

vlju58qv1#

但是,由于数据库数据本身不是流,因此输出不正确。
利用关系数据库中的信息丰富流数据无疑是可能的。然而,最棘手的是,如何确保在需要浓缩数据之前就将其摄取。通常,您可能需要缓冲要充实的流,直到充实数据被引导/摄取。例如,有时采取的一种方法是
在禁用要充实的流的情况下运行应用程序
一旦富集数据被完全摄取并以flink状态存储,就采取一个保存点
从保存点重新启动应用程序,并启用要充实的流
然而,在您描述的情况下,似乎一种更简单的方法会起作用。如果您只需要24小时的历史数据,那么为什么不忽略历史事务数据库呢?只需运行应用程序,直到它看到24小时的流数据,之后历史数据库就变得无关紧要了。
但是,如果您必须接收历史数据,并且不喜欢上面概述的基于保存点的方法,那么这里有几个其他的可能性:
在flink状态(例如liststate或mapstate)缓冲未富集的事件,直到历史流被摄取
编写一个自定义sourcefunction,在接收历史数据之前阻止主流
有关此主题的更深入的探讨,请参阅apache flink中的引导状态。
顺便说一句,计划在将来的版本中更好地支持这个用例。

相关问题