我有一个s3存储桶,里面装满了没有文件扩展名的gz文件。例如
s3://mybucket/1234502827-34231 sc.textFile
使用该文件扩展名选择解码器。我发现了许多关于处理自定义文件扩展名的博客文章,但没有关于丢失文件扩展名的内容。
我认为解决办法可能是 sc.binaryFiles
手动解压缩文件。
另一种可能性是找出sc.textfile如何找到文件格式。我不清楚这些是什么 classOf[]
这叫工作。
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
4条答案
按热度按时间weylhg0b1#
我找到了几个几乎符合我需要的例子。下面是我用来解析用gz压缩的文件的最后一段代码。
aurhwmvo2#
您能尝试将下面的zip文件解决方案与gzipfileinputformat库结合起来吗?
这里-如何通过spark打开/stream.zip文件?您可以看到如何使用zip:
gzip文件输入格式:
https://github.com/bsankaran/internet_routing/blob/master/hadoop-tr/src/main/java/edu/usc/csci551/tools/gzipfileinputformat.java
有关newapihadoopfile()的详细信息,请参见:http://spark.apache.org/docs/latest/api/python/pyspark.html
fykwrbwg3#
您可以创建自己的自定义编解码器来解码文件。您可以从扩展gzip代码开始,并重写getdefaultextension方法,在该方法中,您将空字符串作为扩展名返回。
编辑:由于compressioncodecfactory的实现方式,该解决方案不会在所有情况下都起作用。例如:默认情况下,加载.lz4的编解码器。这意味着,如果要加载的文件名以4结尾,则会选择该编解码器而不是自定义(不带扩展名)。由于该编解码器不匹配的扩展,它将得到后来抛弃,没有编解码器将被使用。
java 语:
在spark应用程序中,您只需注册您的编解码器:
wf82jlnq4#
您可以读取二进制文件并使用map函数进行解压缩。
对于json内容,您可以使用