如何读取压缩的spark事件日志?

rdlzhqv9  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(565)

当我尝试读取用lz4压缩的spark 2.4.4事件日志时,我得到一个空Dataframe:

cd /opt/spark-2.4.4-bin-hadoop2.7
bin/spark-shell --master=local --conf spark.eventLog.enabled=true --conf spark.eventLog.compress=true --conf spark.io.compression.codec=lz4 --driver-memory 4G --driver-library-path=/opt/hadoop-2.7.1/lib/native/

// Trying to read an event log from a previous session
spark.read.option("compression", "lz4").json(s"file:///tmp/spark-events/local-1589202668377.lz4")

// res0: org.apache.spark.sql.DataFrame = []

但是,当我读取未压缩的事件日志时,它可以正常工作:

bin/spark-shell --master=local --conf spark.eventLog.enabled=true --conf spark.eventLog.compress=false
spark.read.json(s"file:///tmp/spark-events/${sc.applicationId}.inprogress").printSchema

//root
// |-- App ID: string (nullable = true)
// |-- App Name: string (nullable = true)
// |-- Block Manager ID: struct (nullable = true)
// |    |-- Executor ID: string (nullable = true)

我还尝试读取一个用snappy压缩的事件日志,结果是一样的。

exdqitrt

exdqitrt1#

试着做

spark.read.json("dbfs:/tmp/compress/part-00000.lz4")
spark.conf.set("spark.io.compression.codec","org.apache.spark.io.LZ4CompressionCodec")

如果它不工作,有可能是一个很好的机会,您的lz4是不兼容的 org.apache.hadoop.io.compress.Lz4Codec 下面是关于操作系统和hadoop之间相同的lz4不兼容的公开问题链接

mu0hgdu0

mu0hgdu02#

实际上,我可以先使用spark编解码器解压文件,然后读取解压后的文件:

import java.io.{FileInputStream, FileOutputStream}
import org.apache.commons.io.IOUtils
import org.apache.spark.io.LZ4CompressionCodec

val inFile = "/tmp/spark-events/local-1589202668377.lz4"
val outFile = "/tmp/spark-events/local-1589202668377" 
val codec = new LZ4CompressionCodec(sc.getConf)
val is = codec.compressedInputStream(new FileInputStream(inFile))
val os = new FileOutputStream(outFile)
IOUtils.copyLarge(is, os)
os.close()
is.close()

spark.read.json(outFile).printSchema

如果我能直接读这本书就更好了 inFile 不使用临时存储,但至少我可以访问数据。

相关问题