我有一份工作,在那里我阅读了一些来自s3/alluxio的parquets,转换成avro,从schema registry捕获schema,并将其放入kafka。为此,在我的writer中,我使用trigger.once()。
但由于某种原因,当我检查我的Kafka主题时,第一次写的记录来自最后一次谈判。
即使禁用最新的第一个选项,也会发生这种情况。
我怎样才能解决这个问题?在窗口点菜?
求求你,救命!
val dataFrame = spark.readStream
.option("mergeSchema", "true")
.option("maxFilesPerTrigger", 5)
.schema(dfSchema)
// .parquet(s"alluxio://tenants/table/*")
.parquet(s"s3a://tenants/table/*")
dataFrame.withColumn("cdcTime", to_timestamp(col("cdcTime")))
val avroDF = dataFrame.
.withColumn("key", to_confluent_avro(col("key"), getSchemaRegistryConfigKey(table)))
.withColumn("value", when(col("Op") === "D", lit(null))
.otherwise(to_confluent_avro(col("value"),
valueJsonAvroSchema,
getSchemaRegistryConfigValue(table))))
.drop("Op")
avroDF.writeStream
.format("kafka")
.queryName(s"${table.schema}/${table.name}")
.option("kafka.bootstrap.servers", "mybrokerUrl")
.option("topic", table.topic)
.trigger(Trigger.Once())
.option("checkpointLocation", s"checkpoints/${table.schema}/${table.name}")
.start()
.awaitTermination()
我用abris把我的parquert转换成avro
暂无答案!
目前还没有任何答案,快来回答吧!