apache flink的bzip2压缩输入

vmpqdwk3  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(434)

我有一个用bzip2压缩的wikipedia转储文件(下载自http://dumps.wikimedia.org/enwiki/),但我不想将其解压:我想在动态解压缩时对其进行处理。
我知道可以用普通的java实现(比如java-read bz2文件和解压/解析),但是我想知道如何在ApacheFlink中实现它?我可能需要的是https://github.com/whym/wikihadoop 但对于flink,不是hadoop。

xiozqbni

xiozqbni1#

在apache flink中可以读取以下格式的压缩文件:

org.apache.hadoop.io.compress.BZip2Codec
org.apache.hadoop.io.compress.DefaultCodec
org.apache.hadoop.io.compress.DeflateCodec
org.apache.hadoop.io.compress.GzipCodec
org.apache.hadoop.io.compress.Lz4Codec
org.apache.hadoop.io.compress.SnappyCodec

从包名可以看出,flink使用hadoop的inputformats来实现这一点。这是一个使用flink的scalaapi读取gz文件的示例:(至少需要flink0.8.1)

def main(args: Array[String]) {

  val env = ExecutionEnvironment.getExecutionEnvironment
  val job = new JobConf()
  val hadoopInput = new TextInputFormat()
  FileInputFormat.addInputPath(job, new Path("/home/robert/Downloads/cawiki-20140407-all-titles.gz"))
  val lines = env.createHadoopInput(hadoopInput, classOf[LongWritable], classOf[Text], job)

  lines.print

  env.execute("Read gz files")
}

ApacheFlink只内置了对.deflate文件的支持。添加对更多压缩编解码器的支持很容易,但还没有完成。
与flink一起使用hadoopinputformats不会导致任何性能损失。flink内置了对hadoop的序列化支持 Writable 类型。

相关问题