hudi在使用spark结构化流时在基本路径中创建小的0kb文件

0s7z1bwu  于 2021-05-29  发布在  Spark
关注(0)|答案(0)|浏览(291)
  1. val query = newDf.coalesce(1).writeStream.
  2. outputMode("append").format("hudi")
  3. .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL)
  4. .option("hoodie.datasource.write.recordkey.field", "value,score")
  5. .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,"region,year")
  6. .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "year")
  7. .option(HoodieWriteConfig.TABLE_NAME, tableName)
  8. .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
  9. .option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.ComplexKeyGenerator")
  10. .option("hoodie.index.type","GLOBAL_BLOOM")
  11. .option("hoodie.bloom.index.update.partition.path","true")
  12. .option("hoodie.cleaner.policy","KEEP_LATEST_FILE_VERSIONS")
  13. .option("hoodie.keep.max.commits","2")
  14. .option("hoodie.keep.min.commits","1")
  15. .option("hoodie.cleaner.commits.retained","0")
  16. .option("checkpointLocation","/demo/app2")
  17. .option("path","/tmp/kafka")
  18. .option("hoodie.upsert.shuffle.parallelism", "2")
  19. .option("hoodie.clean.automatic","true")
  20. .start("/tmp/kafka")
  21. query.awaitTermination()

虽然在kafka文件夹中创建的Parquet文件大小合适,但对于每个微批触发器,一个小的空0kb文件将写入tmp文件夹

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题