在flink中读取hadoop序列文件

vsmadaxz  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(451)

如何在flink中读取hadoop序列文件?我用下面的方法遇到了多个问题。
我有:

DataSource<String> source = env.readFile(new SequenceFileInputFormat(config), filePath);

public static class SequenceFileInputFormat extends FileInputFormat<String> {
    ...
    @Override
    public void setFilePath(String filePath) {
        org.apache.hadoop.conf.Configuration config = HadoopUtils.getHadoopConfiguration(configuration);
        logger.info("Initializing:"+filePath);
        org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(filePath);

        try {
            reader = new SequenceFile.Reader(hadoopPath.getFileSystem(config), hadoopPath, config);
            key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), config);
            value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), config);
        } catch (IOException e) {
            logger.error("sequence file creation failed.", e);
        }
    }

}
问题之一:无法读取用户代码 Package 器:sequencefileinputformat。

qxgroojn

qxgroojn1#

一旦你得到一个 InputFormat ,您可以拨打 ExecutionEnvironment.createInput(<input format>) 创建您的 DataSource .
为了 SequenceFile s、 数据的类型总是 Tuple2<key, value> ,所以您必须使用map函数来转换为您试图读取的任何类型。
我用这个代码来读 SequenceFile 包含层叠元组的。。。

Job job = Job.getInstance();
FileInputFormat.addInputPath(job, new Path(directory));
env.createInput(HadoopInputs.createHadoopInput(new SequenceFileInputFormat<Tuple, Tuple>(), Tuple.class, Tuple.class, job);

相关问题