我有一个spark结构的流媒体工作,我把parquet转换成avro,把sink转换成kafka,但在我的sink过程中,我的记录没有按正确的顺序下沉。在我的整个过程中,我认为唯一可以改变顺序的地方是当我使用withcolumn转换tumbstone事件时。
那么,withcolumn.的顺序是否有其他保证?
我使用withcolumn.otherwise()的代码的一部分
dataframe.withColumn("key", to_confluent_avro(col("key"), getSchemaRegistryConfigKey(table)))
.withColumn("value", when(col("Op") === "D", lit(null))
.otherwise(to_confluent_avro(col("value"),
valueJsonAvroSchema,
getSchemaRegistryConfigValue(table))))
.drop("Op")
我的Flume:
DataFrame.writeStream
.format("kafka")
.queryName(s"${table.schema}/${table.name}")
.option("kafka.bootstrap.servers", "MyKafka")
.option("topic", table.topic)
.trigger(Trigger.Once())
.option("checkpointLocation", s"checkpoints/${table.schema}/${table.name}")
.start()
.awaitTermination()
暂无答案!
目前还没有任何答案,快来回答吧!