因为flink支持hadoop文件系统抽象,并且有一个gcs连接器库在google云存储上实现它。如何使用此repo中的代码创建flink文件源?
xuo3flqw1#
为此,您需要:在flink群集上安装并配置gcs连接器。将hadoop和flink依赖项(包括hdfs连接器)添加到项目中:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hadoop-compatibility_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${hadoop.version}</version> <scope>provided</scope> </dependency>
使用它创建具有gcs路径的数据源:
import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.hadoopcompatibility.HadoopInputs; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.TextInputFormat; ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple2<LongWritable, Text>> input = env.createInput( HadoopInputs.readHadoopFile( new TextInputFormat(), LongWritable.class, Text.class, "gs://bucket/path/some*pattern/"));
1条答案
按热度按时间xuo3flqw1#
为此,您需要:
在flink群集上安装并配置gcs连接器。
将hadoop和flink依赖项(包括hdfs连接器)添加到项目中:
使用它创建具有gcs路径的数据源: