Apache·Flink:具有大型状态的管道的最佳做法

pinkon5k  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(107)

背景
我想把一个在DB上运行的任务移到Flink上,使系统工作得更“真实的”。这个任务每10秒会重新计算所有账户的“状态”,如果这个值满足某些条件,我们会通知用户。账户的“状态”是根据他们拥有的股票数量和这些股票的当前价格计算的。

我的解决方案

我的想法是用这些输入创建一个管道:

  • 开始数据流:日开始数据(初始数据):最后一天的股票价格,每个帐户的股票数量。数据可以从DB表中读取或从Kafka主题中加载。
  • 价格流:一个Kafka主题的每一条信息都包含这样一只股票的价格:{【品名】:【存货名称】,【价格】:当前价格}
  • 库存流:一个Kafka主题的每一条信息都包含该账户买入或卖出的股票数量(正数或负数),如下所示:{“账号”:帐号,“股票”:“存货名称”,“数量”:更改号码}

我的解决方案:将BeginStream与StockStream连接,然后再与PriceStream连接(全部都是keyBy StockName),为了对BeginStream的每只股票进行计算,我将创建一个名为stockStates的ListState,它包含以下信息:股票数量和当前价格。
对于来自StockStream和PriceStream的每个事件,我将更新stockStates,然后计算“state”,如果该值满足某些条件,我们将向其他Kafka主题发送消息,并将该帐户从该ListState中删除

BeginStream
  .keyby(StockName)
  .connect(
      StockStream.
        .keyby(StockName))
  .flatMap(new EnrichmentFucntion())
...
  .connect(
      PriceStream.
        .keyby(StockName))
  .flatMap(new EnrichmentFucntion())

系统包含约500.000个账户,1.000只股票,每个账户持有10-20只股票,PriceStream和StockStream的吞吐量约为1.000条消息/秒。

问题

我是Flink的新手,因此我不确定我的解决方案是否是一个好的方法?有没有类似问题的设计模式?对于大约1000个ListState(每个列表包含大约500.000*10/1000=5.000个帐户的状态),我应该使用RocksDB来存储状态吗?
如有任何建议,我们将不胜感激。

ewm0tg9j

ewm0tg9j1#

是,您的用例可以在flink中实现。RocksDB作为状态后端是一个不错的选择。

相关问题