如何在flink服务器上连续运行apache flink流作业

kmbjn2e3  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(640)

您好,我为流式作业编写了代码,其中源和目标是postgresql数据库。我使用jdbcinputformat/jdbcinputformat读取和写入记录(参考示例)。代码:

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    JDBCInputFormatBuilder inputBuilder = JDBCInputFormat.buildJDBCInputFormat()
            .setDrivername(JDBCConfig.DRIVER_CLASS)
            .setDBUrl(JDBCConfig.DB_URL)
            .setQuery(JDBCConfig.SELECT_FROM_SOURCE)
            .setRowTypeInfo(JDBCConfig.ROW_TYPE_INFO);

    SingleOutputStreamOperator<Row> source = environment.createInput(inputBuilder.finish())
            .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Row>() {
                @Override
                public long extractAscendingTimestamp(Row row) {
                    Date dt = (Date) row.getField(2);
                    return dt.getTime();
                }
            })
            .keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)))
            .fold(null, new FoldFunction<Row, Row>(){
                @Override
                public Row fold(Row row1, Row row) throws Exception {
                    return row;
                }
            });

    source.writeUsingOutputFormat(JDBCOutputFormat.buildJDBCOutputFormat()
            .setDrivername(JDBCConfig.DRIVER_CLASS)
            .setDBUrl(JDBCConfig.DB_URL)
            .setQuery("insert into tablename(id, name) values (?,?)")
            .setSqlTypes(new int[]{Types.BIGINT, Types.VARCHAR})
            .finish());

此代码正在正确执行,但没有在flink服务器上连续运行(select查询只执行一次。)预期将在flink服务器上连续运行。

pn9klfpd

pn9klfpd1#

可能,您必须定义自己的flink源或jdbcinputformat,因为您在这里使用的源将在从db获取所有结果时停止sourcetask。解决这个问题的一种方法是基于 JDBCInputFormat ,尝试在中从db读取最后一行时重新执行sql查询 nextRecord .

相关问题