背景
我想把一个在DB上运行的任务移到Flink上,使系统工作得更“真实的”。这个任务每10秒会重新计算所有账户的“状态”,如果这个值满足某些条件,我们会通知用户。账户的“状态”是根据他们拥有的股票数量和这些股票的当前价格计算的。
我的解决方案
我的想法是用这些输入创建一个管道:
- 开始数据流:日开始数据(初始数据):最后一天的股票价格,每个帐户的股票数量。数据可以从DB表中读取或从Kafka主题中加载。
- 价格流:一个Kafka主题的每一条信息都包含这样一只股票的价格:{【品名】:【存货名称】,【价格】:当前价格}
- 库存流:一个Kafka主题的每一条信息都包含该账户买入或卖出的股票数量(正数或负数),如下所示:{“账号”:帐号,“股票”:“存货名称”,“数量”:更改号码}
我的解决方案:将BeginStream与StockStream连接,然后再与PriceStream连接(全部都是keyBy StockName),为了对BeginStream的每只股票进行计算,我将创建一个名为stockStates的ListState,它包含以下信息:股票数量和当前价格。
对于来自StockStream和PriceStream的每个事件,我将更新stockStates,然后计算“state”,如果该值满足某些条件,我们将向其他Kafka主题发送消息,并将该帐户从该ListState中删除
BeginStream
.keyby(StockName)
.connect(
StockStream.
.keyby(StockName))
.flatMap(new EnrichmentFucntion())
...
.connect(
PriceStream.
.keyby(StockName))
.flatMap(new EnrichmentFucntion())
系统包含约500.000个账户,1.000只股票,每个账户持有10-20只股票,PriceStream和StockStream的吞吐量约为1.000条消息/秒。
问题
我是Flink的新手,因此我不确定我的解决方案是否是一个好的方法?有没有类似问题的设计模式?对于大约1000个ListState(每个列表包含大约500.000*10/1000=5.000个帐户的状态),我应该使用RocksDB来存储状态吗?
如有任何建议,我们将不胜感激。
1条答案
按热度按时间ewm0tg9j1#
是,您的用例可以在flink中实现。RocksDB作为状态后端是一个不错的选择。