hudi在使用spark结构化流时在基本路径中创建小的0kb文件

0s7z1bwu  于 2021-05-29  发布在  Spark
关注(0)|答案(0)|浏览(270)
val query = newDf.coalesce(1).writeStream.
      outputMode("append").format("hudi")
    .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL)
      .option("hoodie.datasource.write.recordkey.field", "value,score")
      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,"region,year")
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "year")
      .option(HoodieWriteConfig.TABLE_NAME, tableName)
      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
      .option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.ComplexKeyGenerator")
      .option("hoodie.index.type","GLOBAL_BLOOM")
      .option("hoodie.bloom.index.update.partition.path","true")
      .option("hoodie.cleaner.policy","KEEP_LATEST_FILE_VERSIONS")
      .option("hoodie.keep.max.commits","2")
      .option("hoodie.keep.min.commits","1")
      .option("hoodie.cleaner.commits.retained","0")
      .option("checkpointLocation","/demo/app2")
      .option("path","/tmp/kafka")
      .option("hoodie.upsert.shuffle.parallelism", "2")
      .option("hoodie.clean.automatic","true")
      .start("/tmp/kafka")

    query.awaitTermination()

虽然在kafka文件夹中创建的Parquet文件大小合适,但对于每个微批触发器,一个小的空0kb文件将写入tmp文件夹

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题