我有两个数据源-一个S3存储桶和一个postgres数据库表。两个数据源都有格式相同的记录,这些记录都有一个uuid类型的唯一标识符。S3存储桶中的一些记录不是postgres表的一部分,我们的目的是找到那些丢失的记录。数据是有界的,因为它是按s3存储桶中的每一天分区的。
阅读s3-source(我认为此操作是以批处理模式读取数据,因为我没有提供monitorContinuously()参数)-
final FileSource<GenericRecord> source = FileSource.forRecordStreamFormat(
AvroParquetReaders.forGenericRecord(schema), path).build();
final DataStream<GenericRecord> avroStream = env.fromSource(
source, WatermarkStrategy.noWatermarks(), "s3-source");
DataStream<Row> s3Stream = avroStream.map(x -> Row.of(x.get("uuid").toString()))
.returns(Types.ROW_NAMED(new String[] {"uuid"}, Types.STRING));
Table s3table = tableEnv.fromDataStream(s3Stream);
tableEnv.createTemporaryView("s3table", s3table);
为了阅读Postgres,我创建了一个目录-
PostgresCatalog postgresCatalog = (PostgresCatalog) JdbcCatalogUtils.createCatalog(
catalogName,
defaultDatabase,
username,
pwd,
baseUrl);
tableEnv.registerCatalog(postgresCatalog.getName(), postgresCatalog);
tableEnv.useCatalog(postgresCatalog.getName());
Table dbtable = tableEnv.sqlQuery("select cast(uuid as varchar) from `localschema.table`");
tableEnv.createTemporaryView("dbtable", dbtable);
我的意图是简单地执行左连接并从dbtable中找到丢失的记录。
Table resultTable = tableEnv.sqlQuery("SELECT * FROM s3table LEFT JOIN dbtable ON s3table.uuid = dbtable.uuid where dbtable.uuid is null");
DataStream<Row> resultStream = tableEnv.toDataStream(resultTable);
resultStream.print();
但是,UUID列类型似乎还不受支持,因为我得到了下面的异常。
Caused by: java.lang.UnsupportedOperationException: Doesn't support Postgres type 'uuid' yet
at org.apache.flink.connector.jdbc.dialect.psql.PostgresTypeMapper.mapping(PostgresTypeMapper.java:171)
作为一种替代方法,我尝试按如下方式读取数据库表-
TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] {
BasicTypeInfo.of(String.class)
};
RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
JdbcInputFormat jdbcInputFormat = JdbcInputFormat.buildJdbcInputFormat()
.setDrivername("org.postgresql.Driver")
.setDBUrl("jdbc:postgresql://127.0.0.1:5432/localdatabase")
.setQuery("select cast(uuid as varchar) from localschema.table")
.setUsername("postgres")
.setPassword("postgres")
.setRowTypeInfo(rowTypeInfo)
.finish();
DataStream<Row> dbStream = env.createInput(jdbcInputFormat);
Table dbtable = tableEnv.fromDataStream(dbStream).as("uuid");
tableEnv.createTemporaryView("dbtable", dbtable);
只是这一次,我在执行左连接时遇到了以下异常(如上所述)-
Exception in thread "main" org.apache.flink.table.api.TableException: Table sink '*anonymous_datastream_sink$3*' doesn't support consuming update and delete changes which is produced by node Join(joinType=[LeftOuterJoin]
如果我调整resultStream来发布changeLogStream,它就可以工作-
Table resultTable = tableEnv.sqlQuery("SELECT * FROM s3table LEFT JOIN dbtable ON s3table.sync_id = dbtable.sync_id where dbtable.sync_id is null");
DataStream<Row> resultStream = tableEnv.toChangelogStream(resultTable);
resultStream.print();
Sample O/P
+I[9cc38226-bcce-47ce-befc-3576195a0933, null]
+I[a24bf933-1bb7-425f-b1a7-588fb175fa11, null]
+I[da6f57c8-3ad1-4df5-9636-c6b36df2695f, null]
+I[2f3845c1-6444-44b6-b1e8-c694eee63403, null]
-D[9cc38226-bcce-47ce-befc-3576195a0933, null]
-D[a24bf933-1bb7-425f-b1a7-588fb175fa11, null]
然而,我不希望接收器将插入和删除分开。我只希望得到丢失uuid的最终列表。我猜这是因为我用DataStream<Row> dbStream = env.createInput(jdbcInputFormat);
创建的Postgres源是一个流源。如果我试图以批处理模式执行整个应用程序,我会得到以下异常-
org.apache.flink.table.api.ValidationException: Querying an unbounded table '*anonymous_datastream_source$2*' in batch mode is not allowed. The table source is unbounded.
有可能拥有一个绑定的JDBC源吗?如果没有,我如何使用流API来实现这一点?(使用Flink version -1. 15. 2)
我相信这种情况将是一个常见的用例,可以实现与Flink,但显然我错过了一些东西。任何线索将不胜感激。
1条答案
按热度按时间zbdgwd5y1#
目前常用的方法是将resultStream发送到一个表中,这样你就可以调度一个任务来截断这个表,然后执行Apache Flink任务,然后从这个表中读取结果。
我还注意到Apache Flink Table Store 0.3.0刚刚发布。他们在0.4.0的路线图上有物化视图。这可能也是一个解决方案。非常令人兴奋。