我正在使用spark版本2.3,并尝试将spark中的配置单元表读取为:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
df = spark.table("emp.emptable")
在这里,我添加了一个新的列,其中当前日期来自系统,并添加到现有的Dataframe中
import pyspark.sql.functions as F
newdf = df.withColumn('LOAD_DATE', F.current_date())
现在面临一个问题,当我试图将这个Dataframe编写为配置单元表时
newdf.write.mode("overwrite").saveAsTable("emp.emptable")
pyspark.sql.utils.AnalysisException: u'Cannot overwrite table emp.emptable that is also being read from;'
因此,我检查Dataframe以打破谱系,因为我从同一个Dataframe读写
checkpointDir = "/hdfs location/temp/tables/"
spark.sparkContext.setCheckpointDir(checkpointDir)
df = spark.table("emp.emptable").coalesce(1).checkpoint()
newdf = df.withColumn('LOAD_DATE', F.current_date())
newdf.write.mode("overwrite").saveAsTable("emp.emptable")
这样工作正常,新列已添加到配置单元表中。但是每次创建检查点文件时我都要删除它。有没有最好的方法来中断沿袭,用更新的列详细信息编写相同的Dataframe,并将其保存到hdfs位置或作为配置单元表。
或者有没有办法为检查点目录指定一个临时位置,它将在spark会话完成后被删除。
1条答案
按热度按时间ddhy6vgd1#
正如我们在这篇文章中所讨论的,设置下面的属性是一个不错的选择。
这个问题有不同的背景。我们想保留
checkpointed
所以数据集不关心添加清理解决方案。设置上述属性有时是可行的(经过scala、java和python测试),但很难依赖它。官方文件说,通过设置这个属性
Controls whether to clean checkpoint files if the reference is out of scope.
我不知道它到底是什么意思,因为我的理解是,一旦spark会话/上下文停止,它应该清除它。如果有人能在上面遮光就太好了。关于
有没有什么最好的办法来打破血统
看看这个问题,@bis找到了一些方法,用
createDataFrame(RDD, Schema)
方法。不过,我还没有亲自测试过。仅供参考,我通常不依赖上述属性并删除
checkpointed
代码中的目录本身是安全的。我们可以得到
checkpointed
目录如下:斯卡拉:
Pypark公司: