hadoop—使用java和spark将本Map像中的序列文件写入hdfs

4zcjmb1e  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(351)

正如标题所说,这是我现在的目标。
我需要从一个目录加载一堆非文本文件
从中提取常用的文件信息(创建日期、作者、类型。。。那些人)
创建以下类型的序列文件
将新提取的信息放入.seq文件的键中
将它们全部存储在hdfs目录中。
我使用spark的原因是为了可伸缩性(需要处理数千个文件,并且我将有一个可用的worker集群),因为我正在考虑在image目录上实现sparkstreaming接收器,这样文件将被自动处理。这是我的初始代码:

  1. JavaPairRDD<String, String> imageRDD = jsc.wholeTextFiles("file:///home/cloudera/Pictures/");
  2. imageRDD.mapToPair(new PairFunction<Tuple2<String,String>, Text, Text>() {
  3. @Override
  4. public Tuple2<Text, Text> call(Tuple2<String, String> arg0)
  5. throws Exception {
  6. return new Tuple2<Text, Text>(new Text(arg0._1),new Text(arg0._2));
  7. }
  8. }).saveAsNewAPIHadoopFile("hdfs://localhost:8020/user/hdfs/sparkling/try.seq", Text.class, Text.class, SequenceFileOutputFormat.class);

在这里,我将图像作为文本文件加载,并从hadoop库中创建一个文本类型的元组。这是可行的,但是:
文件不是保存为单个文件,而是保存为包含分区的文件夹。
它不是字节数组,而是文件的文本表示形式。我们都知道从文本到图像(或其他任何形式)的转换是多么烦人
如果我像这样加载文件,会有办法提取所需的信息吗?
我试着把文件装成aa sparkContext.binaryFiles(<directory>) ,但我总是迷茫于如何提取信息以及如何保存它们。
我似乎在网上找不到答案:你们有人知道这件事吗?

iszxjhcz

iszxjhcz1#

我是这样做的:

  1. JavaPairRDD<String, PortableDataStream> imageByteRDD = jsc.binaryFiles(SOURCE_PATH);
  2. if(!imageByteRDD.isEmpty())
  3. imageByteRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String,PortableDataStream>>>() {
  4. @Override
  5. public void call(
  6. Iterator<Tuple2<String, PortableDataStream>> arg0)
  7. throws Exception {
  8. Configuration conf = new Configuration();
  9. conf.set("fs.defaultFS", HDFS_PATH);
  10. while(arg0.hasNext()){
  11. Tuple2<String,PortableDataStream>fileTuple = arg0.next();
  12. Text key = new Text(fileTuple._1());
  13. String fileName = key.toString().split(SEP_PATH)[key.toString().split(SEP_PATH).length-1].split(DOT_REGEX)[0];
  14. String fileExtension = fileName.split(DOT_REGEX)[fileName.split(DOT_REGEX).length-1];
  15. BytesWritable value = new BytesWritable( fileTuple._2().toArray());
  16. SequenceFile.Writer writer = SequenceFile.createWriter(
  17. conf,
  18. SequenceFile.Writer.file(new Path(DEST_PATH + fileName + SEP_KEY + getCurrentTimeStamp()+DOT+fileExtension)),
  19. SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD, new BZip2Codec()),
  20. SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(BytesWritable.class));
  21. key = new Text(key.toString().split(SEP_PATH)[key.toString().split(SEP_PATH).length-2] + SEP_KEY + fileName + SEP_KEY + fileExtension);
  22. writer.append(key, value);
  23. IOUtils.closeStream(writer);
  24. }
  25. }
  26. });
展开查看全部

相关问题