我们使用flink对每个事件进行http调用,这需要存储在db中的特定数据。这些数据大约每周更新一次。此更新必须提交给操作员。当我们试图在我们的体系结构中保持较低的流的数量时,并且由于数据中的变化频繁,有没有任何方法可以在不使用广播流的情况下在运营商内部更新这些数据?
68de4m5k1#
可能的选项:a) 你只需要一个 ProcessFunction 带计时器,每x分钟拉一次。b) 如果您的状态很小,并且重新启动不是太关键:如果您不更新数据,服务器请求可能会失败(403?)。然后你就可以把数据加载进去 open 当你收到403并恢复时,操作失败。编辑:a)如何工作的一个例子。假设你有源(记录)->myasyncfunc(输出)->接收器(输出)我去添加另一个函数source(record)->conffetcher(tuple2(record,conf))->myasyncfunc(output)->sink(output)编辑2:正如您在评论中指出的,flink计时器绑定到键控状态。然而,对于这个用例,我们根本不需要使用任何flink计时器,只需要使用java计时器。
ProcessFunction
open
private static class PullConfig<T> extends RichMapFunction<T, Tuple2<T, Conf>> { private transient ScheduledExecutorService service = Executors.newScheduledThreadPool(1); private transient volatile Conf conf; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); service.scheduleWithFixedDelay(this::pullConfig, 0, 1, TimeUnit.HOURS); } void pullConfig() { conf = ... } @Override public Tuple2<T, Conf> map(T value) throws Exception { return new Tuple2(value, conf); } ... }
1条答案
按热度按时间68de4m5k1#
可能的选项:
a) 你只需要一个
ProcessFunction
带计时器,每x分钟拉一次。b) 如果您的状态很小,并且重新启动不是太关键:如果您不更新数据,服务器请求可能会失败(403?)。然后你就可以把数据加载进去
open
当你收到403并恢复时,操作失败。编辑:
a)如何工作的一个例子。假设你有
源(记录)->myasyncfunc(输出)->接收器(输出)
我去添加另一个函数
source(record)->conffetcher(tuple2(record,conf))->myasyncfunc(output)->sink(output)
编辑2:
正如您在评论中指出的,flink计时器绑定到键控状态。然而,对于这个用例,我们根本不需要使用任何flink计时器,只需要使用java计时器。