如何使用历史数据集来丰富flink数据流

j2cgzkjk  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(778)

我正在与flink合作一个实时项目,我需要通过计算交易的先前交易功能来丰富每张卡的状态,如下所示:
对于每一张卡,我都有一个统计过去24小时内交易次数的功能。另一方面,我有两个数据源:
首先是一个数据库表,其中存储了昨天结束前的卡片交易。
第二,今天的交易流。
因此,第一步是从数据库中获取每张卡片的昨天交易记录,并将其存储在卡片状态。然后,第二步是用今天的事务更新这个状态,这些事务是在流中出现的,并计算它们在过去24小时内的事务数。我试图以流的形式读取数据库数据,并将其连接到today事务。所以,为了达到上述目标,我使用了richflatmap函数。但是,由于数据库数据本身不是流,因此输出不正确。richflatmap函数如下所示:

transactionsHistory.connect(transactionsStream).flatMap(new         
RichCoFlatMapFunction<History, Tuple2<String, Transaction>,         
ExtractedFeatures>() {
    private ValueState<History> history;
    @Override
    public void open(Configuration config) throws Exception {
        this.history = getRuntimeContext().getState(new 
    ValueStateDescriptor<>("card history", History.class));
    }
    //historical data 
    @Override
    public void flatMap1(History history, 
    Collector<ExtractedFeatures> collector) throws Exception {
        this.history.update(history);
    }
    //new transactions from stream 
    @Override
    public void flatMap2(Tuple2<String, Transaction> 
    transactionTuple, Collector<ExtractedFeatures> collector) throws 
    Exception {
        History history = this.history.value();
        Transaction transaction = transactionTuple.f1;
        ArrayList<History> prevDayHistoryList = 
        history.prevDayTransactions;

        // This function returns transactions which are in 24 hours 
        //window of the current transaction and their count.
        Tuple2<ArrayList<History>, Integer> prevDayHistoryTuple = 
        findHistoricalDate(prevDayHistoryList,
                transaction.transactionLocalDate);
        prevDayHistoryList = prevDayHistoryTuple.f0;
        history.prevDayTransactions = prevDayHistoryList;
        this.history.update(history);
        ExtractedFeatures ef = new ExtractedFeatures();
        ef.updateFeatures(transaction, prevDayHistoryTuple.f1);
        collector.collect(ef);
    }
});

在flink流媒体程序中,实现上述丰富需求的正确设计模式是什么?我在stack overflow上发现了类似于我的问题的blow问题,但我无法解决我的问题,所以我决定寻求帮助:)
flink流媒体中使用静态数据集丰富数据流
任何帮助都将不胜感激。

vlju58qv

vlju58qv1#

但是,由于数据库数据本身不是流,因此输出不正确。
利用关系数据库中的信息丰富流数据无疑是可能的。然而,最棘手的是,如何确保在需要浓缩数据之前就将其摄取。通常,您可能需要缓冲要充实的流,直到充实数据被引导/摄取。例如,有时采取的一种方法是
在禁用要充实的流的情况下运行应用程序
一旦富集数据被完全摄取并以flink状态存储,就采取一个保存点
从保存点重新启动应用程序,并启用要充实的流
然而,在您描述的情况下,似乎一种更简单的方法会起作用。如果您只需要24小时的历史数据,那么为什么不忽略历史事务数据库呢?只需运行应用程序,直到它看到24小时的流数据,之后历史数据库就变得无关紧要了。
但是,如果您必须接收历史数据,并且不喜欢上面概述的基于保存点的方法,那么这里有几个其他的可能性:
在flink状态(例如liststate或mapstate)缓冲未富集的事件,直到历史流被摄取
编写一个自定义sourcefunction,在接收历史数据之前阻止主流
有关此主题的更深入的探讨,请参阅apache flink中的引导状态。
顺便说一句,计划在将来的版本中更好地支持这个用例。

相关问题