如何从pyspark上的另一个Dataframe中完全删除/减去/删除一个Dataframe并导出到csv

ee7vknir  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(339)

我知道关于一个类似的主题有几个问题,我都复习了,都试过了。仍然出错/不工作。所以我贴出这个问题来寻求更多的解决方案。
我有两个数据集(所有数据都是我自己做的,不是真正的数据):

df 1:
ID    week_id  month_id  school cd class code
1     20200103  202001       A        103
1     20200110  202001       A        105
1     20200103  202001       B        202
2     20200103  202001       B        205
2     20200103  202001       C        202

df 2:
ID    week_id  month_id  school cd class code
1     20200103  202001       A        103
2     20200103  202001       C        202

The output I want is df1 - df2
ID    week_id  month_id  school cd class code
1     20200110  202001       A        105
1     20200103  202001       B        202
2     20200103  202001       B        205

I used a couple methods:
1. df1.substrac(df2) 
2. also use left anti with df1.ID = d2.ID and df1.school_cd != df2.school_cd
3. also use spark.sql with not in

可能是因为这两个数据都很大,在我做了减法之后,我需要每周做一次id计数,所以我习惯于分组和gag(count distinct),然后我需要将计数导出到csv文件中。这就是错误发生的部分。我对spark还不太了解,我在google上搜索了一下,试图增加内存和maxresultsize。但仍然会出错:
我还想知道是否有更快或更容易看到的计数,通常代码运行非常快,但当我想看到计数,并运行.show(),这将需要永远,并通常以作业失败和红色错误弹出结束。
这是一个错误的副本:我不知道这是否有帮助。
我仍然是新的Spark,任何建议将不胜感激!谢谢您!

Jan20200103_cnt.toPandas().to_csv("wk_cnt_20200103.csv")

Py4JJavaError: An error occurred while calling o2250.collectToPython.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(school_cd#5854, week_id#3610, 200)
+- *(21) HashAggregate(keys=[school_cd#5854, week_id#3610], functions=[partial_count(distinct id#3614L)], output=[school_cd#5854, week_id#3610, count#16086L])
   +- *(21) HashAggregate(keys=[school_cd#5854, week_id#3610, id#3614L], functions=[], output=[school_cd#5854, week_id#3610, id#3614L])
      +- *(21) HashAggregate(keys=[school_cd#5854, week_id#3610, id#3614L], functions=[], output=[school_cd#5854, week_id#3610, id#3614L])

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题