使用检查点

gpnt7bae  于 2021-06-27  发布在  Hive
关注(0)|答案(1)|浏览(607)

我正在使用spark版本2.3,并尝试将spark中的配置单元表读取为:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
df = spark.table("emp.emptable")

在这里,我添加了一个新的列,其中当前日期来自系统,并添加到现有的Dataframe中

import pyspark.sql.functions as F
newdf = df.withColumn('LOAD_DATE', F.current_date())

现在面临一个问题,当我试图将这个Dataframe编写为配置单元表时

newdf.write.mode("overwrite").saveAsTable("emp.emptable")

pyspark.sql.utils.AnalysisException: u'Cannot overwrite table emp.emptable that is also being read from;'

因此,我检查Dataframe以打破谱系,因为我从同一个Dataframe读写

checkpointDir = "/hdfs location/temp/tables/"
spark.sparkContext.setCheckpointDir(checkpointDir)
df = spark.table("emp.emptable").coalesce(1).checkpoint()
newdf = df.withColumn('LOAD_DATE', F.current_date())
newdf.write.mode("overwrite").saveAsTable("emp.emptable")

这样工作正常,新列已添加到配置单元表中。但是每次创建检查点文件时我都要删除它。有没有最好的方法来中断沿袭,用更新的列详细信息编写相同的Dataframe,并将其保存到hdfs位置或作为配置单元表。
或者有没有办法为检查点目录指定一个临时位置,它将在spark会话完成后被删除。

ddhy6vgd

ddhy6vgd1#

正如我们在这篇文章中所讨论的,设置下面的属性是一个不错的选择。

spark.conf.set("spark.cleaner.referenceTracking.cleanCheckpoints", "true")

这个问题有不同的背景。我们想保留 checkpointed 所以数据集不关心添加清理解决方案。
设置上述属性有时是可行的(经过scala、java和python测试),但很难依赖它。官方文件说,通过设置这个属性 Controls whether to clean checkpoint files if the reference is out of scope. 我不知道它到底是什么意思,因为我的理解是,一旦spark会话/上下文停止,它应该清除它。如果有人能在上面遮光就太好了。
关于
有没有什么最好的办法来打破血统
看看这个问题,@bis找到了一些方法,用 createDataFrame(RDD, Schema) 方法。不过,我还没有亲自测试过。
仅供参考,我通常不依赖上述属性并删除 checkpointed 代码中的目录本身是安全的。
我们可以得到 checkpointed 目录如下:
斯卡拉:

//Set directory
scala> spark.sparkContext.setCheckpointDir("hdfs:///tmp/checkpoint/")

scala> spark.sparkContext.getCheckpointDir.get
res3: String = hdfs://<name-node:port>/tmp/checkpoint/625034b3-c6f1-4ab2-9524-e48dfde589c3

//It gives String so we can use org.apache.hadoop.fs to delete path

Pypark公司:

// Set directory
>>> spark.sparkContext.setCheckpointDir('hdfs:///tmp/checkpoint')
>>> t = sc._jsc.sc().getCheckpointDir().get()
>>> t 
u'hdfs://<name-node:port>/tmp/checkpoint/dc99b595-f8fa-4a08-a109-23643e2325ca'

# notice 'u' at the start which means It returns unicode object use str(t)

# Below are the steps to get hadoop file system object and delete

>>> fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
fs.exists(sc._jvm.org.apache.hadoop.fs.Path(str(t)))
True

>>> fs.delete(sc._jvm.org.apache.hadoop.fs.Path(str(t)))
True

>>> fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
fs.exists(sc._jvm.org.apache.hadoop.fs.Path(str(t)))
False

相关问题