读取没有文件扩展名的压缩文件

svujldwt  于 2021-05-27  发布在  Spark
关注(0)|答案(4)|浏览(542)

我有一个s3存储桶,里面装满了没有文件扩展名的gz文件。例如
s3://mybucket/1234502827-34231 sc.textFile 使用该文件扩展名选择解码器。我发现了许多关于处理自定义文件扩展名的博客文章,但没有关于丢失文件扩展名的内容。
我认为解决办法可能是 sc.binaryFiles 手动解压缩文件。
另一种可能性是找出sc.textfile如何找到文件格式。我不清楚这些是什么 classOf[] 这叫工作。

def textFile(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
  }
weylhg0b

weylhg0b1#

我找到了几个几乎符合我需要的例子。下面是我用来解析用gz压缩的文件的最后一段代码。

import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream
import org.apache.spark.input.PortableDataStream
import scala.util.Try
import java.nio.charset._

def extractBSM(ps: PortableDataStream, n: Int = 1024) = Try {
  val gz = new GzipCompressorInputStream(ps.open)
  Stream.continually {
    // Read n bytes
    val buffer = Array.fill[Byte](n)(-1)
    val i = gz.read(buffer, 0, n)
    (i, buffer.take(i))
  }
  // Take as long as we've read something
  .takeWhile(_._1 > 0)
  .map(_._2)
  .flatten
  .toArray
}
def decode(charset: Charset = StandardCharsets.UTF_8)(bytes: Array[Byte]) = new String(bytes, StandardCharsets.UTF_8)
val inputFile = "s3://my-bucket/157c96bd-fb21-4cc7-b340-0bd4b8e2b614"
val rdd = sc.binaryFiles(inputFile).flatMapValues(x => extractBSM(x).toOption).map( x => decode()(x._2) )
val rdd2 = rdd.flatMap { x => x.split("\n") }
rdd2.take(10).foreach(println)
aurhwmvo

aurhwmvo2#

您能尝试将下面的zip文件解决方案与gzipfileinputformat库结合起来吗?
这里-如何通过spark打开/stream.zip文件?您可以看到如何使用zip:

rdd1  = sc.newAPIHadoopFile("/Users/myname/data/compressed/target_file.ZIP", ZipFileInputFormat.class, Text.class, Text.class, new Job().getConfiguration());

gzip文件输入格式:
https://github.com/bsankaran/internet_routing/blob/master/hadoop-tr/src/main/java/edu/usc/csci551/tools/gzipfileinputformat.java
有关newapihadoopfile()的详细信息,请参见:http://spark.apache.org/docs/latest/api/python/pyspark.html

fykwrbwg

fykwrbwg3#

您可以创建自己的自定义编解码器来解码文件。您可以从扩展gzip代码开始,并重写getdefaultextension方法,在该方法中,您将空字符串作为扩展名返回。
编辑:由于compressioncodecfactory的实现方式,该解决方案不会在所有情况下都起作用。例如:默认情况下,加载.lz4的编解码器。这意味着,如果要加载的文件名以4结尾,则会选择该编解码器而不是自定义(不带扩展名)。由于该编解码器不匹配的扩展,它将得到后来抛弃,没有编解码器将被使用。
java 语:

package com.customcodec;

import org.apache.hadoop.io.compress.GzipCodec;

public class GzipCodecNoExtension extends GzipCodec {

    @Override
    public String getDefaultExtension() {
        return "";
    }
}

在spark应用程序中,您只需注册您的编解码器:

SparkConf conf = new SparkConf()
            .set("spark.hadoop.io.compression.codecs", "com.customcodec.GzipCodecNoExtension");
wf82jlnq

wf82jlnq4#

您可以读取二进制文件并使用map函数进行解压缩。

JavaRDD<Tuple2<String, PortableDataStream>> rawData = spark.sparkContext().binaryFiles(readLocation, 1).toJavaRDD();

JavaRDD<String> decompressedData = rawData.map((Function<Tuple2<String, PortableDataStream>, String>) stringPortableDataStreamTuple2 -> {
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    GZIPInputStream s = new GZIPInputStream(new ByteArrayInputStream(stringPortableDataStreamTuple2._2.toArray()));
    IOUtils.copy(s, out);

    return new String(out.toByteArray());
});

对于json内容,您可以使用

Dataset co = spark.read().json(decompressedData);

相关问题