Kafka 在新的一批spark结构流上,每次都覆盖Hudi数据

dldeef67  于 2023-05-16  发布在  Apache
关注(0)|答案(2)|浏览(164)

我正在研究Spark结构流,其中作业消耗Kafka消息,每10秒进行聚合并将数据保存在Apache hudi表中。下面的代码工作正常,但它覆盖了每个批处理的结果apache hudi表数据。我还不知道为什么会这样?是Spark结构流还是胡迪行为?我使用的是MERGE_ON_READ,所以每次更新时表文件都不应该删除。但不知道为什么会这样?由于这个问题,我的另一个作业失败了,它读取了这个表。

spark.readStream
                .format('kafka')
                .option("kafka.bootstrap.servers",
                        "localhost:9092")
      ...
      ...                  
    df1 = df.groupby('a', 'b', 'c').agg(sum('d').alias('d'))
    df1.writeStream
              .format('org.apache.hudi')
              .option('hoodie.table.name', 'table1')
              .option("hoodie.datasource.write.table.type", "MERGE_ON_READ")
              .option('hoodie.datasource.write.keygenerator.class', 'org.apache.hudi.keygen.ComplexKeyGenerator')
              .option('hoodie.datasource.write.recordkey.field', "a,b,c")
              .option('hoodie.datasource.write.partitionpath.field', 'a')
              .option('hoodie.datasource.write.table.name', 'table1')
              .option('hoodie.datasource.write.operation', 'upsert')
              .option('hoodie.datasource.write.precombine.field', 'c')
              .outputMode('complete')
              .option('path', '/Users/lucy/hudi/table1')
              .option("checkpointLocation",
                      "/Users/lucy/checkpoint/table1")
              .trigger(processingTime="10 second")
              .start()
              .awaitTermination()
axr492tv

axr492tv1#

根据你的配置,这个问题的解释可能是你在每一个批读取相同的键(相同的a,b,c,不同的d值),并且在你有一个upsert操作的地方,hudi用新的值替换旧的值。尝试使用insert而不是upsert,或者根据你想做的来修改hudi键。

snvhrwxg

snvhrwxg2#

您应该将“outputMode('complete ')”更改为“.outputMode('append')”

相关问题