如何在flink中读取hadoop序列文件?我用下面的方法遇到了多个问题。
我有:
DataSource<String> source = env.readFile(new SequenceFileInputFormat(config), filePath);
和
public static class SequenceFileInputFormat extends FileInputFormat<String> {
...
@Override
public void setFilePath(String filePath) {
org.apache.hadoop.conf.Configuration config = HadoopUtils.getHadoopConfiguration(configuration);
logger.info("Initializing:"+filePath);
org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(filePath);
try {
reader = new SequenceFile.Reader(hadoopPath.getFileSystem(config), hadoopPath, config);
key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), config);
value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), config);
} catch (IOException e) {
logger.error("sequence file creation failed.", e);
}
}
}
问题之一:无法读取用户代码 Package 器:sequencefileinputformat。
1条答案
按热度按时间qxgroojn1#
一旦你得到一个
InputFormat
,您可以拨打ExecutionEnvironment.createInput(<input format>)
创建您的DataSource
.为了
SequenceFile
s、 数据的类型总是Tuple2<key, value>
,所以您必须使用map函数来转换为您试图读取的任何类型。我用这个代码来读
SequenceFile
包含层叠元组的。。。