apacheflink:如何使用sourcefunction以指定的间隔执行任务?

tmb3ates  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(811)

我需要我的flink作业以指定的时间间隔从数据库中提取记录,并在处理后将其存档。我已经实现了sourcefunction来从数据库中获取所需的记录,并添加了sourcefunction作为streamexecutionenvironment的源。如何指定streamexecutionenvironment需要每隔10分钟使用sourcefunction从数据库获取记录?
源函数:

public class MongoDBSourceFunction implements SourceFunction<List<Book>>{

    public void cancel() {
        // TODO Auto-generated method stub
    }

    public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
<List<Book>> context) throws Exception {

        List<Book> books = getBooks();

        context.collect(books);

    }

    public List<Book> getBooks() {
        List<Book> books = new ArrayList<Book>();

        //fetch all books from database     
        return books;
    }

}

处理器:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ArchiveJob {

    public static void main(String[] args) {

        final StreamExecutionEnvironment env = 
    StreamExecutionEnvironment.getExecutionEnvironment();

        env.addSource(new MongoDBSourceFunction()).print();
    }

}
mwecs4sa

mwecs4sa1#

您需要将此功能添加到 MongoDBSourceFunction 它自己。例如,可以示例化 ScheduledExecutorServiceopen 方法并使用此执行器安排读取任务。
请注意,在发送记录时保持检查点锁很重要。

相关问题