如何将spark流检查点位置存储到s3中?

wqsoz72f  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(459)

我对spark流应用程序(spark v2.3.2)感兴趣,它提供s3Parquet数据并将Parquet数据写入s3。应用程序的Dataframe流利用 groupByKey() 以及 flatMapGroupsWithState() 利用 GroupState .
是否可以将其配置为使用s3检查点位置?例如:

val stream = myDataset.writeStream
    .format("parquet")
    .option("path", s3DataDestination)
    .option("checkpointLocation", s3CheckpointPath)
    .option("truncate", false)
    .option(Trigger.Once)
    .outputMode(OutputMode.Append)
stream.start().awaitTermination()

我确认以上是能够成功写入数据的 s3DataDestination .
但是,在写入s3检查点位置时会引发异常:

java.lang.IllegalStateException: Error committing version 1 into HDFSStateStore[id=(op=0, part=9), dir=s3://<my_s3_location>
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(...)
...
Caused by: java.io.IOException: Failed to rename s3://.../checkpoint/state/0/9/temp... to s3://.../checkpoint/state/0/9/1.delta

这需要s3的定制实现吗 StateStoreProvider ? 或者,检查点位置是否需要写入hdfs?

klr1opcd

klr1opcd1#

问题是写入和读取的并发频率太高。awss3不提供这种特性。
解决方案:
我们必须切换到本地安装的持久磁盘来进行Spark检查
s3guard:这将使s3的读写更加一致(注意:这是实验性的,我个人从未见过它的实际应用)
使用hdfs

相关问题