我需要我的flink作业以指定的时间间隔从数据库中提取记录,并在处理后将其存档。我已经实现了sourcefunction来从数据库中获取所需的记录,并添加了sourcefunction作为streamexecutionenvironment的源。如何指定streamexecutionenvironment需要每隔10分钟使用sourcefunction从数据库获取记录?
源函数:
public class MongoDBSourceFunction implements SourceFunction<List<Book>>{
public void cancel() {
// TODO Auto-generated method stub
}
public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
<List<Book>> context) throws Exception {
List<Book> books = getBooks();
context.collect(books);
}
public List<Book> getBooks() {
List<Book> books = new ArrayList<Book>();
//fetch all books from database
return books;
}
}
处理器:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class ArchiveJob {
public static void main(String[] args) {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new MongoDBSourceFunction()).print();
}
}
1条答案
按热度按时间mwecs4sa1#
您需要将此功能添加到
MongoDBSourceFunction
它自己。例如,可以示例化ScheduledExecutorService
在open
方法并使用此执行器安排读取任务。请注意,在发送记录时保持检查点锁很重要。