我有一个csv,我导入到databricks使用spark.read。这个大文件包含每日级别的记录/事务。我将Dataframe缩减为5列,并保持500000行不变。我正在尝试构建这个源文件的摘要表,它在一个月级别(聚合)表示这些记录/事务。
脚本有一个filter/groupby/sum命令,它返回一行,将数据汇总为一个月的计数。查询返回的行如下所示:
+---------+---------+-------+-------------+
| Country|StockCode|YYYY-MM|sum(Quantity)|
+---------+---------+-------+-------------+
|Singapore| M| 2011-4| 10|
+---------+---------+-------+-------------+
脚本在源Dataframe上迭代并每次返回。我很难使用这个脚本的输出(显示或csv导出)。在Pypark和pandas我都有问题。我不知道如何堆叠查询的结果,它应该是什么形式?
pandas如果我在pandas中执行,脚本生成文件需要很长时间(我相信pandas+me的执行效率不高会导致持续时间延长)~2.5小时。display和write.csv命令的工作速度相当快,只需几秒钟即可完成。
pyspark如果我在pyspark中这样做,脚本大约需要10分钟才能完成,但是显示和导出会崩溃。笔记本要么返回超时错误,要么重新启动,要么抛出崩溃错误。
方法应该是动态地创建一个列表列表,当这个列表完全构建好后,将它转换成一个Dataframe来使用吗?我一直在尝试我遇到的所有方法,但我似乎没有取得任何进展。
下面是生成结果的代码
# officeSummaryDFBefore
column_names = "Country|StockCode|YYYY-MM|Quantity"
monthlyCountsBeforeImpactDate = spark.createDataFrame(
[
tuple('' for i in column_names.split("|"))
],
column_names.split("|")
).where("1=0")
monthlyCountsBeforeImpacteDateRow = spark.createDataFrame(
[
tuple('' for i in column_names.split("|"))
],
column_names.split("|")
).where("1=0")
try :
for country in country_lookup :
country = country[0]
print(country_count, " country(s) left")
country_count = country_count - 1
for stockCode in stockCode_lookup :
stockCode = stockCode[0]
monthlyCountsBeforeImpacteDateRow = dataBeforeImpactDate.filter((col("Country").rlike(country)) & (col("StockCode").rlike(stockCode))).groupby("Country", "StockCode", "YYYY-MM").sum()
monthlyCountsBeforeImpacteDateRow.show()
dfsCountsBefore = [monthlyCountsBeforeImpacteDateRow, monthlyCountsBeforeImpactDate]
monthlyCountsBeforeImpactDate = reduce(DataFrame.union, dfsCountsBefore)
except Exception as e:
print(e)
我在循环中声明dfscountsbeforeimpactdate,这似乎不正确,但当它在循环外时,返回为null。
1条答案
按热度按时间4nkexdtk1#
iiuc您正在查找country和stock以限制行,然后对它们进行分组以生成聚合。
为什么不过滤df呢
这将是更快的方式,因为你不循环周围的过滤器,也不需要工会。