我正在与flink合作一个实时项目,我需要通过计算交易的先前交易功能来丰富每张卡的状态,如下所示:
对于每一张卡,我都有一个统计过去24小时内交易次数的功能。另一方面,我有两个数据源:
首先是一个数据库表,其中存储了昨天结束前的卡片交易。
第二,今天的交易流。
因此,第一步是从数据库中获取每张卡片的昨天交易记录,并将其存储在卡片状态。然后,第二步是用今天的事务更新这个状态,这些事务是在流中出现的,并计算它们在过去24小时内的事务数。我试图以流的形式读取数据库数据,并将其连接到today事务。所以,为了达到上述目标,我使用了richflatmap函数。但是,由于数据库数据本身不是流,因此输出不正确。richflatmap函数如下所示:
transactionsHistory.connect(transactionsStream).flatMap(new
RichCoFlatMapFunction<History, Tuple2<String, Transaction>,
ExtractedFeatures>() {
private ValueState<History> history;
@Override
public void open(Configuration config) throws Exception {
this.history = getRuntimeContext().getState(new
ValueStateDescriptor<>("card history", History.class));
}
//historical data
@Override
public void flatMap1(History history,
Collector<ExtractedFeatures> collector) throws Exception {
this.history.update(history);
}
//new transactions from stream
@Override
public void flatMap2(Tuple2<String, Transaction>
transactionTuple, Collector<ExtractedFeatures> collector) throws
Exception {
History history = this.history.value();
Transaction transaction = transactionTuple.f1;
ArrayList<History> prevDayHistoryList =
history.prevDayTransactions;
// This function returns transactions which are in 24 hours
//window of the current transaction and their count.
Tuple2<ArrayList<History>, Integer> prevDayHistoryTuple =
findHistoricalDate(prevDayHistoryList,
transaction.transactionLocalDate);
prevDayHistoryList = prevDayHistoryTuple.f0;
history.prevDayTransactions = prevDayHistoryList;
this.history.update(history);
ExtractedFeatures ef = new ExtractedFeatures();
ef.updateFeatures(transaction, prevDayHistoryTuple.f1);
collector.collect(ef);
}
});
在flink流媒体程序中,实现上述丰富需求的正确设计模式是什么?我在stack overflow上发现了类似于我的问题的blow问题,但我无法解决我的问题,所以我决定寻求帮助:)
flink流媒体中使用静态数据集丰富数据流
任何帮助都将不胜感激。
1条答案
按热度按时间vlju58qv1#
但是,由于数据库数据本身不是流,因此输出不正确。
利用关系数据库中的信息丰富流数据无疑是可能的。然而,最棘手的是,如何确保在需要浓缩数据之前就将其摄取。通常,您可能需要缓冲要充实的流,直到充实数据被引导/摄取。例如,有时采取的一种方法是
在禁用要充实的流的情况下运行应用程序
一旦富集数据被完全摄取并以flink状态存储,就采取一个保存点
从保存点重新启动应用程序,并启用要充实的流
然而,在您描述的情况下,似乎一种更简单的方法会起作用。如果您只需要24小时的历史数据,那么为什么不忽略历史事务数据库呢?只需运行应用程序,直到它看到24小时的流数据,之后历史数据库就变得无关紧要了。
但是,如果您必须接收历史数据,并且不喜欢上面概述的基于保存点的方法,那么这里有几个其他的可能性:
在flink状态(例如liststate或mapstate)缓冲未富集的事件,直到历史流被摄取
编写一个自定义sourcefunction,在接收历史数据之前阻止主流
有关此主题的更深入的探讨,请参阅apache flink中的引导状态。
顺便说一句,计划在将来的版本中更好地支持这个用例。