我在连接spark结构化流式Dataframe和批处理Dataframe时遇到了一个问题,在我的场景中,我有一个s3流,它需要对历史数据执行左反连接,返回历史中不存在的记录(找出新记录),然后我将这些记录作为新的附加(按列分区磁盘数据分区而不是内存)写入历史。
刷新已分区的历史Dataframe时,不会更新历史Dataframe。
下面是两段代码,一段有效,另一段无效。
工作代码和非工作代码之间的唯一区别是partition\ U by子句。
工作code:- (刷新历史记录)
import spark.implicits._
val inputSchema = StructType(
Array(
StructField("spark_id", StringType),
StructField("account_id", StringType),
StructField("run_dt", StringType),
StructField("trxn_ref_id", StringType),
StructField("trxn_dt", StringType),
StructField("trxn_amt", StringType)
)
)
val historySchema = StructType(
Array(
StructField("spark_id", StringType),
StructField("account_id", StringType),
StructField("run_dt", StringType),
StructField("trxn_ref_id", StringType),
StructField("trxn_dt", StringType),
StructField("trxn_amt", StringType)
)
)
val source = spark.readStream
.schema(inputSchema)
.option("header", "false")
.csv("src/main/resources/Input/")
val history = spark.read
.schema(inputSchema)
.option("header", "true")
.csv("src/main/resources/history/")
.withColumnRenamed("spark_id", "spark_id_2")
.withColumnRenamed("account_id", "account_id_2")
.withColumnRenamed("run_dt", "run_dt_2")
.withColumnRenamed("trxn_ref_id", "trxn_ref_id_2")
.withColumnRenamed("trxn_dt", "trxn_dt_2")
.withColumnRenamed("trxn_amt", "trxn_amt_2")
val readFilePersisted = history.persist()
readFilePersisted.createOrReplaceTempView("hist")
val recordsNotPresentInHist = source
.join(
history,
source.col("account_id") === history.col("account_id_2") &&
source.col("run_dt") === history.col("run_dt_2") &&
source.col("trxn_ref_id") === history.col("trxn_ref_id_2") &&
source.col("trxn_dt") === history.col("trxn_dt_2") &&
source.col("trxn_amt") === history.col("trxn_amt_2"),
"leftanti"
)
recordsNotPresentInHist.writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.write
.mode(SaveMode.Append)
//.partitionBy("spark_id", "account_id", "run_dt")
.csv("src/main/resources/history/")
val lkpChacheFileDf1 = spark.read
.schema(inputSchema)
.parquet("src/main/resources/history")
val lkpChacheFileDf = lkpChacheFileDf1
lkpChacheFileDf.unpersist(true)
val histLkpPersist = lkpChacheFileDf.persist()
histLkpPersist.createOrReplaceTempView("hist")
}
.start()
println("This is the kafka dataset:")
source
.withColumn("Input", lit("Input-source"))
.writeStream
.format("console")
.outputMode("append")
.start()
recordsNotPresentInHist
.withColumn("reject", lit("recordsNotPresentInHist"))
.writeStream
.format("console")
.outputMode("append")
.start()
spark.streams.awaitAnyTermination()
不work:- (历史记录没有刷新)
import spark.implicits._
val inputSchema = StructType(
Array(
StructField("spark_id", StringType),
StructField("account_id", StringType),
StructField("run_dt", StringType),
StructField("trxn_ref_id", StringType),
StructField("trxn_dt", StringType),
StructField("trxn_amt", StringType)
)
)
val historySchema = StructType(
Array(
StructField("spark_id", StringType),
StructField("account_id", StringType),
StructField("run_dt", StringType),
StructField("trxn_ref_id", StringType),
StructField("trxn_dt", StringType),
StructField("trxn_amt", StringType)
)
)
val source = spark.readStream
.schema(inputSchema)
.option("header", "false")
.csv("src/main/resources/Input/")
val history = spark.read
.schema(inputSchema)
.option("header", "true")
.csv("src/main/resources/history/")
.withColumnRenamed("spark_id", "spark_id_2")
.withColumnRenamed("account_id", "account_id_2")
.withColumnRenamed("run_dt", "run_dt_2")
.withColumnRenamed("trxn_ref_id", "trxn_ref_id_2")
.withColumnRenamed("trxn_dt", "trxn_dt_2")
.withColumnRenamed("trxn_amt", "trxn_amt_2")
val readFilePersisted = history.persist()
readFilePersisted.createOrReplaceTempView("hist")
val recordsNotPresentInHist = source
.join(
history,
source.col("account_id") === history.col("account_id_2") &&
source.col("run_dt") === history.col("run_dt_2") &&
source.col("trxn_ref_id") === history.col("trxn_ref_id_2") &&
source.col("trxn_dt") === history.col("trxn_dt_2") &&
source.col("trxn_amt") === history.col("trxn_amt_2"),
"leftanti"
)
recordsNotPresentInHist.writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.write
.mode(SaveMode.Append)
.partitionBy("spark_id", "account_id","run_dt")
.csv("src/main/resources/history/")
val lkpChacheFileDf1 = spark.read
.schema(inputSchema)
.parquet("src/main/resources/history")
val lkpChacheFileDf = lkpChacheFileDf1
lkpChacheFileDf.unpersist(true)
val histLkpPersist = lkpChacheFileDf.persist()
histLkpPersist.createOrReplaceTempView("hist")
}
.start()
println("This is the kafka dataset:")
source
.withColumn("Input", lit("Input-source"))
.writeStream
.format("console")
.outputMode("append")
.start()
recordsNotPresentInHist
.withColumn("reject", lit("recordsNotPresentInHist"))
.writeStream
.format("console")
.outputMode("append")
.start()
spark.streams.awaitAnyTermination()
谢谢,斯里
暂无答案!
目前还没有任何答案,快来回答吧!