从postgres表创建flink数据流

abithluo  于 2021-06-21  发布在  Flink
关注(0)|答案(0)|浏览(416)

我正在尝试处理大量的数据流(source=kinesis stream)并将其放入postgres db。在这样做的时候,我需要首先用postgresdb中已经存在的一些主数据加入传入的流。
我正在从传入的kinesis流创建一个键控流,并使用jdbc catalog使用flink table api创建第二个流。我的数据库接收器设置如下:

public class PosgresSink extends RichSinkFunction <Clazz> implements CheckpointedFunction, CheckpointListener { .. }

所以每次flinks创建一个检查点接收器时都会触发。
但是,当我与来自jdbc源代码的传入流进行连接时,我得到以下信息:

org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint triggering task Source TableSourceScan(table ...
    ...
   (1/4) of job 3961ccdf0d2a7da504f61e094a74fa5f is not in state RUNNING but FINISHED instead. Aborting checkpoint

每次检查点都会被中止,这会阻塞我的接收器。
我的jdbc源代码似乎很早就完成了,当flink尝试检查点时,它找不到任何正在运行的作业,并中止检查点。flink似乎有一个限制,即它只在所有操作符/任务仍在运行时设置检查点
https://issues.apache.org/jira/browse/flink-2491
我正在设置jdbc流,如下所示:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
 StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);

// Register a JDBC catalog 

static JdbcCatalog registerJdbcCatalog(StreamTableEnvironment bsTableEnv)   {
   String name = "<>";
   String defaultDatabase = "<>";
   String username = "<>";
   String password = "<>";
   String baseUrl = "jdbc:postgresql://localhost:5432/";

   JdbcCatalog jdbcCatalog = new JdbcCatalog(name, defaultDatabase, username, password, baseUrl);
   bsTableEnv.registerCatalog("catalogName", jdbcCatalog);
   bsTableEnv.useCatalog("catalogName");
   return jdbcCatalog;
 }

// get the table
Table table= bsTableEnv.from("table")

// create a data stream from table
DataStream<Table> myStream= bsTableEnv.toAppendStream(table, Table.class);

这是正确的理解吗?有办法吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题