flink sink to parquet文件未将数据写入文件

72qzrwbm  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(615)

我试图写一个Parquet文件作为Flume使用avroparquetwriter。文件已创建,但长度为0(不写入数据)。我做错什么了吗?搞不清楚是什么问题

import io.eels.component.parquet.ParquetWriterConfig
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.{ParquetFileWriter, ParquetWriter}
import org.apache.parquet.hadoop.metadata.CompressionCodecName

import scala.io.Source
import org.apache.flink.streaming.api.scala._

object Tester extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
  def now = System.currentTimeMillis()
  val path = new Path(s"/tmp/test-$now.parquet")
  val schemaString = Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
  val schema: Schema = new Schema.Parser().parse(schemaString)
  val compressionCodecName = CompressionCodecName.SNAPPY
  val config = ParquetWriterConfig()
  val genericReocrd: GenericRecord = new GenericData.Record(schema)
  genericReocrd.put("name", "test_b")
  genericReocrd.put("code", "NoError")
  genericReocrd.put("ts", 100L)
  val stream = env.fromElements(genericReocrd)
  val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
    .withSchema(schema)
    .withCompressionCodec(compressionCodecName)
    .withPageSize(config.pageSize)
    .withRowGroupSize(config.blockSize)
    .withDictionaryEncoding(config.enableDictionary)
    .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
    .withValidation(config.validating)
    .build()

  writer.write(genericReocrd)
  stream.addSink{r =>
    writer.write(r)
  }
  env.execute()
dtcbnfnu

dtcbnfnu1#

问题是你没有关闭 ParquetWriter . 这是将挂起的元素刷新到磁盘所必需的。你可以通过定义自己的 RichSinkFunction 在那里你关了门 ParquetWriterclose 方法:

class ParquetWriterSink(val path: String, val schema: String, val compressionCodecName: CompressionCodecName, val config: ParquetWriterConfig) extends RichSinkFunction[GenericRecord] {
  var parquetWriter: ParquetWriter[GenericRecord] = null

  override def open(parameters: Configuration): Unit = {
    parquetWriter = AvroParquetWriter.builder[GenericRecord](new Path(path))
      .withSchema(new Schema.Parser().parse(schema))
      .withCompressionCodec(compressionCodecName)
      .withPageSize(config.pageSize)
      .withRowGroupSize(config.blockSize)
      .withDictionaryEncoding(config.enableDictionary)
      .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
      .withValidation(config.validating)
      .build()
  }

  override def close(): Unit = {
    parquetWriter.close()
  }

  override def invoke(value: GenericRecord, context: SinkFunction.Context[_]): Unit = {
    parquetWriter.write(value)
  }
}

相关问题