我正在遍历3个大文件并执行一系列统计计算。
我每个执行器有55gb的可用内存,8v内核,除了1个内核和1个主节点之外,还有多达10个任务节点可用。
以下是我实际代码的伪代码:
# Load MyConfigMeta file- this is a small file and will be a couple of times in the code
MyConfigMeta=spark.read.parquet("s3://path/MyConfigMeta.parquet")
MyConfigMeta=MyConfigMeta.persist(StorageLevel.MEMORY_AND_DISK)
# Very Large timeseries files
modules=["s3://path/file1.parquet",
"s3://path/file2.parquet",
"s3://path/file3.parquet"]
for file in modules:
out_filename=1
df1=spark.read.parquet(file)
df1=df1.join(MyConfigMeta, on=["key"], how="inner")
#Find out latest column values based on Timestamp
lim_max=df1.groupBy('key')\
.agg(f.max('TIME_STAMP').alias('TIME_STAMP'))
temp=df1.select('TIME_STAMP','key',''UL','LL')
lim_max=lim_max.join(temp, on=['TIME_STAMP','key'], how="left")\
.drop('TIME_STAMP')\
.distinct()
lim_max=lim_max.persist(StorageLevel.MEMORY_AND_DISK)
df1=df1.drop('UL,'LL')\
.join(lim_max, on=['key'], how="left")\
withColumn('out_clip', when(col('RESULT').between(col('LL'),col('UL')), 0).otherwise(1))\
df1=df1.persist(StorageLevel.MEMORY_AND_DISK) # This is a very large dataframe and will later be used for simulation
df2=df1.filter(col('out_clip')==0)\
.groupBy('key')\
.agg(f.round(expr('percentile(RESULT, 0.9999)'),4).alias('UPPER_PERCENTILE'),
f.round(expr('percentile(RESULT, 0.0001)'),4).alias('LOWER_PERCENTILE'))\
.withColumn('pcnt_clip', when(col('RESULT').between(col('LOWER_PERCENTILE'),col('UPPER_PERCENTILE')), 0).otherwise(1))\
.filter(col('pcnt_clip')==0)
stats=df2.groupBy('key')\
.agg(#Perform a bunch of statistical calculations (mean, avg, kurtosis, skew))
stats=stats.join(lim_max, on=['key'], how="left") #get back the columns from lim_max
lim_max=lim_max.unpersist()
stats=stats.withColumn('New_UL', #formula to calculate new limits)\
.withColumn('New_LL', #formula to calculate new limits)\
.join(MyConfigMeta, on=['key'], how="left")
#Simulate data
df_sim=df1.join(stats, on=['key'], how="inner")\
.withColumn('newOOC', when ((col('RESULT')<col('New_LL')) | (col('RESULT')>col('New_UL')), 1).otherwise(0))
df3=df_sim.groupBy('key')\
.agg(f.sum('newOOC').alias('simulated result'))
#Join back with stats to get statistcal data, context data along with simulated data
df4=df3.join(stats, on=['key'], how="inner")
#Write output file
df4.write.mode('overwrite').parquet("s3://path/sim_" +out_filename+ ".parquet")
df1=df1.unpersist()
spark.catalog.clearCache()
我的配置是6 executor-cores
以及 driver-cores
,41 GB executor-memory
,41 GB driver-memory
,14 GB spark.executor.memoryOverhead
以及 9
num executors。 当我查看ganglia中的内存图表时,我注意到第一个文件完成得很好,但是后续文件的计算失败了,因为它不断遇到丢失节点的问题 executorlostfailure(executor 5退出,与正在运行的任务无关)原因:容器标记为失败。诊断:丢失节点上释放的容器。 ![](https://i.stack.imgur.com/l1p37.png) 自从我取消持久化后,我本以为高速缓存会明显清除
df1Dataframe和使用
spark.catalog.clearCache()` . 但记忆似乎在不断增加而没有被清除。但是,如果我运行单独的文件,它似乎工作良好。
在这里,很大一部分内存被清除,只是因为10名执行者死亡并被列入黑名单。
有没有办法让记忆在Spark中冲走?还是我不断丢失节点的另一个原因?
1条答案
按热度按时间fbcarpbf1#
可以使用以下函数刷新sparkcontext中的所有持久化数据集。它列出rdd并调用unpersist方法。在函数内部创建df时,它特别有用。
为了监视持久化的Dataframe,请检查sparkui中的storage选项卡。不要担心ganglia stats中的空闲内存,实际上这可能是您的资源没有得到充分利用的迹象。spark明智地管理内存。
关于丢失的节点,如果您使用的是像databricks这样的托管服务,它将在集群的事件日志中显示终止节点的原因。