如何在批处理模式下执行join in flink作业

6ju8rftf  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(199)

我的用例是在Flink中连接流并将输出保存到S3。我正在使用Flink阅读Kafka数据。来自Kafka的消息包含两种类型的数据,因此我需要在S3中连接这两种类型。

Table tr = tableEnv.sqlQuery("select " +
            " a.x," +
            " a.y," +
            " b.z", +
            " b.z1" +
            " from " +
            " (select * from table1" +
            " where type='machine1') a" +
            " full outer join " +
            " (select * from table2" +
            " where type='machine12') b" +
            " on a.id=b.id ");

在flink中,输出是连续的,状态存储在内存中。我的目标是在批处理模式下运行程序,这意味着每分钟无论什么事件进入都将应用于连接并保存到S3,无论连接成功还是失败。做Spark流很容易,但我不知道如何用Flink做它?

ilmyapht

ilmyapht1#

如果你想让Flink Kafka SQL连接器以批处理模式运行,你需要指定Flink需要使用事件的偏移量。正如https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#connector-options中所述,你可以为scan.startup.specific-offsets参数提供正确的值。你唯一需要确定的是你从哪个偏移量开始使用事件,以及你最后读取的偏移量是多少。

相关问题