stream静态连接:如何定期刷新(取消持久化/持久化)静态Dataframe

vd2z7a6w  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(505)

我正在构建一个spark结构化流应用程序,在这里我正在进行批处理流连接。批处理数据的源会定期更新。
因此,我计划定期对批处理数据进行持久化/非持久化。
下面是我用来持久化和取消持久化批处理数据的示例代码。
流量:
读取批次数据
保留批处理数据
每隔一小时,取消数据持久化,读取批处理数据并再次持久化。
但是,我没有看到每小时都刷新批处理数据。
代码:

  1. var batchDF = handler.readBatchDF(sparkSession)
  2. batchDF.persist(StorageLevel.MEMORY_AND_DISK)
  3. var refreshedTime: Instant = Instant.now()
  4. if (Duration.between(refreshedTime, Instant.now()).getSeconds > refreshTime) {
  5. refreshedTime = Instant.now()
  6. batchDF.unpersist(false)
  7. batchDF = handler.readBatchDF(sparkSession)
  8. .persist(StorageLevel.MEMORY_AND_DISK)
  9. }

在spark结构化流媒体作业中,有没有更好的方法来实现这个场景?

e0bqpujr

e0bqpujr1#

您可以通过使用结构化流提供的流调度功能来实现这一点。
通过创建一个人工的“速率”流定期刷新静态Dataframe,可以触发静态Dataframe的刷新(unpersist->load->persist)。其目的是:
初始加载静态Dataframe并保持 var 定义刷新静态Dataframe的方法
使用在所需间隔(例如1小时)触发的“速率”流
读取实际流数据并使用静态Dataframe执行连接操作
在这个速率流中有一个 foreachBatch 调用刷新方法的接收器
以下代码在spark 3.0.1、scala 2.12.10和delta 0.7.0中运行良好。

  1. // 1. Load the staticDataframe initially and keep as `var`
  2. var staticDf = spark.read.format("delta").load(deltaPath)
  3. staticDf.persist()
  4. // 2. Define a method that refreshes the static Dataframe
  5. def foreachBatchMethod[T](batchDf: Dataset[T], batchId: Long) = {
  6. staticDf.unpersist()
  7. staticDf = spark.read.format("delta").load(deltaPath)
  8. staticDf.persist()
  9. println(s"${Calendar.getInstance().getTime}: Refreshing static Dataframe from DeltaLake")
  10. }
  11. // 3. Use a "Rate" Stream that gets triggered at the required interval (e.g. 1 hour)
  12. val staticRefreshStream = spark.readStream
  13. .format("rate")
  14. .option("rowsPerSecond", 1)
  15. .option("numPartitions", 1)
  16. .load()
  17. .selectExpr("CAST(value as LONG) as trigger")
  18. .as[Long]
  19. // 4. Read actual streaming data and perform join operation with static Dataframe
  20. // As an example I used Kafka as a streaming source
  21. val streamingDf = spark.readStream
  22. .format("kafka")
  23. .option("kafka.bootstrap.servers", "localhost:9092")
  24. .option("subscribe", "test")
  25. .option("startingOffsets", "earliest")
  26. .option("failOnDataLoss", "false")
  27. .load()
  28. .selectExpr("CAST(value AS STRING) as id", "offset as streamingField")
  29. val joinDf = streamingDf.join(staticDf, "id")
  30. val query = joinDf.writeStream
  31. .format("console")
  32. .option("truncate", false)
  33. .option("checkpointLocation", "/path/to/sparkCheckpoint")
  34. .start()
  35. // 5. Within that Rate Stream have a `foreachBatch` sink that calls refresher method
  36. staticRefreshStream.writeStream
  37. .outputMode("append")
  38. .foreachBatch(foreachBatchMethod[Long] _)
  39. .queryName("RefreshStream")
  40. .trigger(Trigger.ProcessingTime("5 seconds"))
  41. .start()

为了获得完整的示例,创建了delta表,并使用新值进行了更新,如下所示:

  1. val deltaPath = "file:///tmp/delta/table"
  2. import spark.implicits._
  3. val df = Seq(
  4. (1L, "static1"),
  5. (2L, "static2")
  6. ).toDF("id", "deltaField")
  7. df.write
  8. .mode(SaveMode.Overwrite)
  9. .format("delta")
  10. .save(deltaPath)
展开查看全部

相关问题