我对spark流应用程序(spark v2.3.2)感兴趣,它提供s3Parquet数据并将Parquet数据写入s3。应用程序的Dataframe流利用 groupByKey()
以及 flatMapGroupsWithState()
利用 GroupState
.
是否可以将其配置为使用s3检查点位置?例如:
val stream = myDataset.writeStream
.format("parquet")
.option("path", s3DataDestination)
.option("checkpointLocation", s3CheckpointPath)
.option("truncate", false)
.option(Trigger.Once)
.outputMode(OutputMode.Append)
stream.start().awaitTermination()
我确认以上是能够成功写入数据的 s3DataDestination
.
但是,在写入s3检查点位置时会引发异常:
java.lang.IllegalStateException: Error committing version 1 into HDFSStateStore[id=(op=0, part=9), dir=s3://<my_s3_location>
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(...)
...
Caused by: java.io.IOException: Failed to rename s3://.../checkpoint/state/0/9/temp... to s3://.../checkpoint/state/0/9/1.delta
这需要s3的定制实现吗 StateStoreProvider
? 或者,检查点位置是否需要写入hdfs?
1条答案
按热度按时间klr1opcd1#
问题是写入和读取的并发频率太高。awss3不提供这种特性。
解决方案:
我们必须切换到本地安装的持久磁盘来进行Spark检查
s3guard:这将使s3的读写更加一致(注意:这是实验性的,我个人从未见过它的实际应用)
使用hdfs