我的用例是在Flink中连接流并将输出保存到S3。我正在使用Flink阅读Kafka数据。来自Kafka的消息包含两种类型的数据,因此我需要在S3中连接这两种类型。
Table tr = tableEnv.sqlQuery("select " +
" a.x," +
" a.y," +
" b.z", +
" b.z1" +
" from " +
" (select * from table1" +
" where type='machine1') a" +
" full outer join " +
" (select * from table2" +
" where type='machine12') b" +
" on a.id=b.id ");
在flink中,输出是连续的,状态存储在内存中。我的目标是在批处理模式下运行程序,这意味着每分钟无论什么事件进入都将应用于连接并保存到S3,无论连接成功还是失败。做Spark流很容易,但我不知道如何用Flink做它?
1条答案
按热度按时间ilmyapht1#
如果你想让Flink Kafka SQL连接器以批处理模式运行,你需要指定Flink需要使用事件的偏移量。正如https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#connector-options中所述,你可以为
scan.startup.specific-offsets
参数提供正确的值。你唯一需要确定的是你从哪个偏移量开始使用事件,以及你最后读取的偏移量是多少。