spark结构化流式处理批数据刷新问题(partition by子句)

nom7f22z  于 2021-07-09  发布在  Spark
关注(0)|答案(0)|浏览(169)

我在连接spark结构化流式Dataframe和批处理Dataframe时遇到了一个问题,在我的场景中,我有一个s3流,它需要对历史数据执行左反连接,返回历史中不存在的记录(找出新记录),然后我将这些记录作为新的附加(按列分区磁盘数据分区而不是内存)写入历史。
刷新已分区的历史Dataframe时,不会更新历史Dataframe。
下面是两段代码,一段有效,另一段无效。
工作代码和非工作代码之间的唯一区别是partition\ U by子句。
工作code:- (刷新历史记录)

import spark.implicits._

    val inputSchema = StructType(
      Array(
        StructField("spark_id", StringType),
        StructField("account_id", StringType),
        StructField("run_dt", StringType),
        StructField("trxn_ref_id", StringType),
        StructField("trxn_dt", StringType),
        StructField("trxn_amt", StringType)
      )
    )
    val historySchema = StructType(
      Array(
        StructField("spark_id", StringType),
        StructField("account_id", StringType),
        StructField("run_dt", StringType),
        StructField("trxn_ref_id", StringType),
        StructField("trxn_dt", StringType),
        StructField("trxn_amt", StringType)
      )
    )
    val source = spark.readStream
      .schema(inputSchema)
      .option("header", "false")
      .csv("src/main/resources/Input/")

    val history = spark.read
      .schema(inputSchema)
      .option("header", "true")
      .csv("src/main/resources/history/")
      .withColumnRenamed("spark_id", "spark_id_2")
      .withColumnRenamed("account_id", "account_id_2")
      .withColumnRenamed("run_dt", "run_dt_2")
      .withColumnRenamed("trxn_ref_id", "trxn_ref_id_2")
      .withColumnRenamed("trxn_dt", "trxn_dt_2")
      .withColumnRenamed("trxn_amt", "trxn_amt_2")

    val readFilePersisted = history.persist()
    readFilePersisted.createOrReplaceTempView("hist")

    val recordsNotPresentInHist = source
      .join(
        history,
        source.col("account_id") === history.col("account_id_2") &&
          source.col("run_dt") === history.col("run_dt_2") &&
          source.col("trxn_ref_id") === history.col("trxn_ref_id_2") &&
          source.col("trxn_dt") === history.col("trxn_dt_2") &&
          source.col("trxn_amt") === history.col("trxn_amt_2"),
        "leftanti"
      )

    recordsNotPresentInHist.writeStream
      .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
        batchDF.write
          .mode(SaveMode.Append)
          //.partitionBy("spark_id", "account_id", "run_dt")
          .csv("src/main/resources/history/")

        val lkpChacheFileDf1 = spark.read
          .schema(inputSchema)
          .parquet("src/main/resources/history")

        val lkpChacheFileDf = lkpChacheFileDf1
        lkpChacheFileDf.unpersist(true)
        val histLkpPersist = lkpChacheFileDf.persist()
        histLkpPersist.createOrReplaceTempView("hist")

      }
      .start()

    println("This is the kafka dataset:")
    source
      .withColumn("Input", lit("Input-source"))
      .writeStream
      .format("console")
      .outputMode("append")
      .start()

    recordsNotPresentInHist
      .withColumn("reject", lit("recordsNotPresentInHist"))
      .writeStream
      .format("console")
      .outputMode("append")
      .start()

    spark.streams.awaitAnyTermination()

不work:- (历史记录没有刷新)

import spark.implicits._

    val inputSchema = StructType(
      Array(
        StructField("spark_id", StringType),
        StructField("account_id", StringType),
        StructField("run_dt", StringType),
        StructField("trxn_ref_id", StringType),
        StructField("trxn_dt", StringType),
        StructField("trxn_amt", StringType)
      )
    )
    val historySchema = StructType(
      Array(
        StructField("spark_id", StringType),
        StructField("account_id", StringType),
        StructField("run_dt", StringType),
        StructField("trxn_ref_id", StringType),
        StructField("trxn_dt", StringType),
        StructField("trxn_amt", StringType)
      )
    )
    val source = spark.readStream
      .schema(inputSchema)
      .option("header", "false")
      .csv("src/main/resources/Input/")

    val history = spark.read
      .schema(inputSchema)
      .option("header", "true")
      .csv("src/main/resources/history/")
      .withColumnRenamed("spark_id", "spark_id_2")
      .withColumnRenamed("account_id", "account_id_2")
      .withColumnRenamed("run_dt", "run_dt_2")
      .withColumnRenamed("trxn_ref_id", "trxn_ref_id_2")
      .withColumnRenamed("trxn_dt", "trxn_dt_2")
      .withColumnRenamed("trxn_amt", "trxn_amt_2")

    val readFilePersisted = history.persist()
    readFilePersisted.createOrReplaceTempView("hist")

    val recordsNotPresentInHist = source
      .join(
        history,
        source.col("account_id") === history.col("account_id_2") &&
          source.col("run_dt") === history.col("run_dt_2") &&
          source.col("trxn_ref_id") === history.col("trxn_ref_id_2") &&
          source.col("trxn_dt") === history.col("trxn_dt_2") &&
          source.col("trxn_amt") === history.col("trxn_amt_2"),
        "leftanti"
      )

    recordsNotPresentInHist.writeStream
      .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
        batchDF.write
          .mode(SaveMode.Append)
          .partitionBy("spark_id", "account_id","run_dt")
          .csv("src/main/resources/history/")

        val lkpChacheFileDf1 = spark.read
          .schema(inputSchema)
          .parquet("src/main/resources/history")

        val lkpChacheFileDf = lkpChacheFileDf1
        lkpChacheFileDf.unpersist(true)
        val histLkpPersist = lkpChacheFileDf.persist()
        histLkpPersist.createOrReplaceTempView("hist")

      }
      .start()

    println("This is the kafka dataset:")
    source
      .withColumn("Input", lit("Input-source"))
      .writeStream
      .format("console")
      .outputMode("append")
      .start()

    recordsNotPresentInHist
      .withColumn("reject", lit("recordsNotPresentInHist"))
      .writeStream
      .format("console")
      .outputMode("append")
      .start()

    spark.streams.awaitAnyTermination()

谢谢,斯里

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题