我想构建一个应用程序,它对数据进行一些聚合,并使用计时器将聚合发送到一个异步步骤,该步骤将其转储到其他地方。在onTimer
函数中发送数据后,我清除状态。
比如说
@Override
public void onTimer(long timestamp, KeyedProcessFunction<KEY, IN, Aggregation>.OnTimerContext ctx, Collector<Aggregation> out) throws Exception {
out.collect(new Aggregation(ctx.getCurrentKey(), aggregation.get()));
aggregation.clear();
}
字符串
该流将被传递到AsyncDataStream
,如下所示:
SingleOutputStreamOperator<Aggregation> aggregations;
AsyncDataStream.unorderedWaitWithRetry(aggregations, new AsyncDatabaseRequest(), 10, TimeUnit.SECONDS, 1000, asyncRetryStrategy).addSink(new DiscardingSink<>());
型
在将聚合发送到目标步骤后清除状态是否安全?如果它无法将数据写入目标,会发生什么?
1条答案
按热度按时间3qpi33ja1#
如果工作流本身失败,那么Flink的exactly once模式(假设你已经正确配置了)将确保数据不会被丢弃。这可能意味着一条记录被多次写入外部服务,所以你必须处理这种情况。
因此,如果写入失败导致工作流失败,您应该没有问题。
如果您不希望写入失败导致工作流终止/重新启动,那么您可以自行决定如何不丢弃任何数据。例如,您仍然可以从BLOG函数生成结果,但带有错误信息,然后将BLOG函数的流拆分为OK & failure流,并对failure流进行特殊处理。