您好,我为流式作业编写了代码,其中源和目标是postgresql数据库。我使用jdbcinputformat/jdbcinputformat读取和写入记录(参考示例)。代码:
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
JDBCInputFormatBuilder inputBuilder = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(JDBCConfig.DRIVER_CLASS)
.setDBUrl(JDBCConfig.DB_URL)
.setQuery(JDBCConfig.SELECT_FROM_SOURCE)
.setRowTypeInfo(JDBCConfig.ROW_TYPE_INFO);
SingleOutputStreamOperator<Row> source = environment.createInput(inputBuilder.finish())
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Row>() {
@Override
public long extractAscendingTimestamp(Row row) {
Date dt = (Date) row.getField(2);
return dt.getTime();
}
})
.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)))
.fold(null, new FoldFunction<Row, Row>(){
@Override
public Row fold(Row row1, Row row) throws Exception {
return row;
}
});
source.writeUsingOutputFormat(JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername(JDBCConfig.DRIVER_CLASS)
.setDBUrl(JDBCConfig.DB_URL)
.setQuery("insert into tablename(id, name) values (?,?)")
.setSqlTypes(new int[]{Types.BIGINT, Types.VARCHAR})
.finish());
此代码正在正确执行,但没有在flink服务器上连续运行(select查询只执行一次。)预期将在flink服务器上连续运行。
1条答案
按热度按时间pn9klfpd1#
可能,您必须定义自己的flink源或jdbcinputformat,因为您在这里使用的源将在从db获取所有结果时停止sourcetask。解决这个问题的一种方法是基于
JDBCInputFormat
,尝试在中从db读取最后一行时重新执行sql查询nextRecord
.