我正在研究Spark结构流,其中作业消耗Kafka消息,每10秒进行聚合并将数据保存在Apache hudi表中。下面的代码工作正常,但它覆盖了每个批处理的结果apache hudi表数据。我还不知道为什么会这样?是Spark结构流还是胡迪行为?我使用的是MERGE_ON_READ
,所以每次更新时表文件都不应该删除。但不知道为什么会这样?由于这个问题,我的另一个作业失败了,它读取了这个表。
spark.readStream
.format('kafka')
.option("kafka.bootstrap.servers",
"localhost:9092")
...
...
df1 = df.groupby('a', 'b', 'c').agg(sum('d').alias('d'))
df1.writeStream
.format('org.apache.hudi')
.option('hoodie.table.name', 'table1')
.option("hoodie.datasource.write.table.type", "MERGE_ON_READ")
.option('hoodie.datasource.write.keygenerator.class', 'org.apache.hudi.keygen.ComplexKeyGenerator')
.option('hoodie.datasource.write.recordkey.field', "a,b,c")
.option('hoodie.datasource.write.partitionpath.field', 'a')
.option('hoodie.datasource.write.table.name', 'table1')
.option('hoodie.datasource.write.operation', 'upsert')
.option('hoodie.datasource.write.precombine.field', 'c')
.outputMode('complete')
.option('path', '/Users/lucy/hudi/table1')
.option("checkpointLocation",
"/Users/lucy/checkpoint/table1")
.trigger(processingTime="10 second")
.start()
.awaitTermination()
2条答案
按热度按时间axr492tv1#
根据你的配置,这个问题的解释可能是你在每一个批读取相同的键(相同的a,b,c,不同的d值),并且在你有一个upsert操作的地方,hudi用新的值替换旧的值。尝试使用insert而不是upsert,或者根据你想做的来修改hudi键。
snvhrwxg2#
您应该将“outputMode('complete ')”更改为“.outputMode('append')”