使用基于flink模式的gcs文件

hof1towb  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(438)

因为flink支持hadoop文件系统抽象,并且有一个gcs连接器库在google云存储上实现它。
如何使用此repo中的代码创建flink文件源?

xuo3flqw

xuo3flqw1#

为此,您需要:
在flink群集上安装并配置gcs连接器。
将hadoop和flink依赖项(包括hdfs连接器)添加到项目中:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala_2.11</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-filesystem_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-hadoop-compatibility_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>${hadoop.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-core</artifactId>
    <version>${hadoop.version}</version>
    <scope>provided</scope>
</dependency>

使用它创建具有gcs路径的数据源:

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.TextInputFormat;

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Tuple2<LongWritable, Text>> input =
    env.createInput(
        HadoopInputs.readHadoopFile(
            new TextInputFormat(), LongWritable.class, Text.class, "gs://bucket/path/some*pattern/"));

相关问题