stream静态连接:如何定期刷新(取消持久化/持久化)静态Dataframe

vd2z7a6w  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(411)

我正在构建一个spark结构化流应用程序,在这里我正在进行批处理流连接。批处理数据的源会定期更新。
因此,我计划定期对批处理数据进行持久化/非持久化。
下面是我用来持久化和取消持久化批处理数据的示例代码。
流量:
读取批次数据
保留批处理数据
每隔一小时,取消数据持久化,读取批处理数据并再次持久化。
但是,我没有看到每小时都刷新批处理数据。
代码:

var batchDF = handler.readBatchDF(sparkSession)
batchDF.persist(StorageLevel.MEMORY_AND_DISK)
var refreshedTime: Instant = Instant.now()

if (Duration.between(refreshedTime, Instant.now()).getSeconds > refreshTime) {
  refreshedTime = Instant.now()
  batchDF.unpersist(false)
  batchDF =  handler.readBatchDF(sparkSession)
    .persist(StorageLevel.MEMORY_AND_DISK)
}

在spark结构化流媒体作业中,有没有更好的方法来实现这个场景?

e0bqpujr

e0bqpujr1#

您可以通过使用结构化流提供的流调度功能来实现这一点。
通过创建一个人工的“速率”流定期刷新静态Dataframe,可以触发静态Dataframe的刷新(unpersist->load->persist)。其目的是:
初始加载静态Dataframe并保持 var 定义刷新静态Dataframe的方法
使用在所需间隔(例如1小时)触发的“速率”流
读取实际流数据并使用静态Dataframe执行连接操作
在这个速率流中有一个 foreachBatch 调用刷新方法的接收器
以下代码在spark 3.0.1、scala 2.12.10和delta 0.7.0中运行良好。

// 1. Load the staticDataframe initially and keep as `var`
  var staticDf = spark.read.format("delta").load(deltaPath)
  staticDf.persist()

  //  2. Define a method that refreshes the static Dataframe
  def foreachBatchMethod[T](batchDf: Dataset[T], batchId: Long) = {
    staticDf.unpersist()
    staticDf = spark.read.format("delta").load(deltaPath)
    staticDf.persist()
    println(s"${Calendar.getInstance().getTime}: Refreshing static Dataframe from DeltaLake")
  }

  // 3. Use a "Rate" Stream that gets triggered at the required interval (e.g. 1 hour)
  val staticRefreshStream = spark.readStream
    .format("rate")
    .option("rowsPerSecond", 1)
    .option("numPartitions", 1)
    .load()
    .selectExpr("CAST(value as LONG) as trigger")
    .as[Long]

  // 4. Read actual streaming data and perform join operation with static Dataframe
  // As an example I used Kafka as a streaming source
  val streamingDf = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "test")
    .option("startingOffsets", "earliest")
    .option("failOnDataLoss", "false")
    .load()
    .selectExpr("CAST(value AS STRING) as id", "offset as streamingField")

  val joinDf = streamingDf.join(staticDf, "id")

  val query = joinDf.writeStream
    .format("console")
    .option("truncate", false)
    .option("checkpointLocation", "/path/to/sparkCheckpoint")
    .start()

  // 5. Within that Rate Stream have a `foreachBatch` sink that calls refresher method
  staticRefreshStream.writeStream
    .outputMode("append")
    .foreachBatch(foreachBatchMethod[Long] _)
    .queryName("RefreshStream")
    .trigger(Trigger.ProcessingTime("5 seconds"))
    .start()

为了获得完整的示例,创建了delta表,并使用新值进行了更新,如下所示:

val deltaPath = "file:///tmp/delta/table"

  import spark.implicits._
  val df = Seq(
    (1L, "static1"),
    (2L, "static2")
  ).toDF("id", "deltaField")

  df.write
    .mode(SaveMode.Overwrite)
    .format("delta")
    .save(deltaPath)

相关问题