正在读取正在flink中追加的文件

inn6fuwd  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(296)

我们有一个遗留应用程序,它将结果作为记录写入一些本地文件。我们希望实时处理这些记录,因此我们计划使用flink作为引擎。我知道我可以用 StreamingExecutionEnvironment#readFile . 看来我们需要类似的东西 PROCESS_CONTINUOUSLY 但是这个标志会导致在每次更改时重新处理整个文件,这不是我们想要的。
当然,我可以编写自定义源代码来保存每个文件状态下的记录数。但我想这种检查点之类的方法可能会有一些问题——我的理由是,如果这很容易可靠地实现,那么它就已经在flink中实现了。
有什么建议吗?

k5ifujac

k5ifujac1#

使用自定义源代码可以很容易地做到这一点,只要您希望从单个文件(每个源代码示例)读取内容。您将需要使用操作符状态并实现检查点。状态处理和检查点如下所示:

public class CheckpointedFileSource implements SourceFunction<Event>, ListCheckpointed<Long> {
    private long eventCnt = 0;

    public void run(SourceContext<Event> sourceContext) throws Exception {
        final Object lock = sourceContext.getCheckpointLock();

        // skip over previously emitted events
        ...

        while (not cancelled) {
            read event from file;

            synchronized (lock) {
                eventCnt++;
                sourceContext.collectWithTimestamp(event, timestamp);
            }

        }
    }

    @Override
    public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
        return Collections.singletonList(eventCnt);
    }

    @Override
    public void restoreState(List<Long> state) throws Exception {
        for (Long s : state)
            this.eventCnt = s;
    }

}

有关完整的示例,请参阅flink培训练习中使用的检查点出租车乘坐数据源。您必须对它进行一些调整,因为它是为读取静态文件而设计的,而不是附加到其中的文件。

相关问题