val query = newDf.coalesce(1).writeStream.
outputMode("append").format("hudi")
.option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL)
.option("hoodie.datasource.write.recordkey.field", "value,score")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,"region,year")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "year")
.option(HoodieWriteConfig.TABLE_NAME, tableName)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.ComplexKeyGenerator")
.option("hoodie.index.type","GLOBAL_BLOOM")
.option("hoodie.bloom.index.update.partition.path","true")
.option("hoodie.cleaner.policy","KEEP_LATEST_FILE_VERSIONS")
.option("hoodie.keep.max.commits","2")
.option("hoodie.keep.min.commits","1")
.option("hoodie.cleaner.commits.retained","0")
.option("checkpointLocation","/demo/app2")
.option("path","/tmp/kafka")
.option("hoodie.upsert.shuffle.parallelism", "2")
.option("hoodie.clean.automatic","true")
.start("/tmp/kafka")
query.awaitTermination()
虽然在kafka文件夹中创建的Parquet文件大小合适,但对于每个微批触发器,一个小的空0kb文件将写入tmp文件夹
暂无答案!
目前还没有任何答案,快来回答吧!