我正在尝试处理大量的数据流(source=kinesis stream)并将其放入postgres db。在这样做的时候,我需要首先用postgresdb中已经存在的一些主数据加入传入的流。
我正在从传入的kinesis流创建一个键控流,并使用jdbc catalog使用flink table api创建第二个流。我的数据库接收器设置如下:
public class PosgresSink extends RichSinkFunction <Clazz> implements CheckpointedFunction, CheckpointListener { .. }
所以每次flinks创建一个检查点接收器时都会触发。
但是,当我与来自jdbc源代码的传入流进行连接时,我得到以下信息:
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Checkpoint triggering task Source TableSourceScan(table ...
...
(1/4) of job 3961ccdf0d2a7da504f61e094a74fa5f is not in state RUNNING but FINISHED instead. Aborting checkpoint
每次检查点都会被中止,这会阻塞我的接收器。
我的jdbc源代码似乎很早就完成了,当flink尝试检查点时,它找不到任何正在运行的作业,并中止检查点。flink似乎有一个限制,即它只在所有操作符/任务仍在运行时设置检查点
https://issues.apache.org/jira/browse/flink-2491
我正在设置jdbc流,如下所示:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);
// Register a JDBC catalog
static JdbcCatalog registerJdbcCatalog(StreamTableEnvironment bsTableEnv) {
String name = "<>";
String defaultDatabase = "<>";
String username = "<>";
String password = "<>";
String baseUrl = "jdbc:postgresql://localhost:5432/";
JdbcCatalog jdbcCatalog = new JdbcCatalog(name, defaultDatabase, username, password, baseUrl);
bsTableEnv.registerCatalog("catalogName", jdbcCatalog);
bsTableEnv.useCatalog("catalogName");
return jdbcCatalog;
}
// get the table
Table table= bsTableEnv.from("table")
// create a data stream from table
DataStream<Table> myStream= bsTableEnv.toAppendStream(table, Table.class);
这是正确的理解吗?有办法吗?
暂无答案!
目前还没有任何答案,快来回答吧!