结构化流kafka先写最新文件,即使禁用了latestfirst

zbq4xfa0  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(349)

我有一份工作,在那里我阅读了一些来自s3/alluxio的parquets,转换成avro,从schema registry捕获schema,并将其放入kafka。为此,在我的writer中,我使用trigger.once()。
但由于某种原因,当我检查我的Kafka主题时,第一次写的记录来自最后一次谈判。
即使禁用最新的第一个选项,也会发生这种情况。
我怎样才能解决这个问题?在窗口点菜?
求求你,救命!

  1. val dataFrame = spark.readStream
  2. .option("mergeSchema", "true")
  3. .option("maxFilesPerTrigger", 5)
  4. .schema(dfSchema)
  5. // .parquet(s"alluxio://tenants/table/*")
  6. .parquet(s"s3a://tenants/table/*")
  7. dataFrame.withColumn("cdcTime", to_timestamp(col("cdcTime")))
  8. val avroDF = dataFrame.
  9. .withColumn("key", to_confluent_avro(col("key"), getSchemaRegistryConfigKey(table)))
  10. .withColumn("value", when(col("Op") === "D", lit(null))
  11. .otherwise(to_confluent_avro(col("value"),
  12. valueJsonAvroSchema,
  13. getSchemaRegistryConfigValue(table))))
  14. .drop("Op")
  15. avroDF.writeStream
  16. .format("kafka")
  17. .queryName(s"${table.schema}/${table.name}")
  18. .option("kafka.bootstrap.servers", "mybrokerUrl")
  19. .option("topic", table.topic)
  20. .trigger(Trigger.Once())
  21. .option("checkpointLocation", s"checkpoints/${table.schema}/${table.name}")
  22. .start()
  23. .awaitTermination()

我用abris把我的parquert转换成avro

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题