如何在pysparkDataframe中对单个列执行整形操作?

nfzehxib  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(343)

我有一个很长的pysparkDataframe,如下所示:

+------+
|number|
+------+
|12.4  |
|13.4  |
|42.3  |
|33.4  |
|42.3  |
|32.4  |
|44.2  |
|12.3  |
|45.4  |
+------+

理想情况下,我希望这是重塑成一个 nxn 矩阵在哪里 nsqrt(length of pyspark dataframe) .
但有一个解决方案是将其转换为numpy数组,然后将其重塑为 nxn 但我想在Pypark完成。因为我的数据是超长的(大约1亿行)。
因此,我要寻找的预期产出大致如下:

+------+------+------+
|12.4  | 13.4 | 42.3 |
|33.4  | 42.3 | 32.4 |
|44.2  | 12.3 | 45.4 |
+------+------+------+

虽然我能够正确地做它转换成Pandas,然后到numpy,然后做整形手术。但我想在pyspark中进行这种转换。因为下面的代码只适用于几千行。

covarianceMatrix_pd = covarianceMatrix_df.toPandas()
nrows = np.sqrt(len(covarianceMatrix_pd))
covarianceMatrix_pd = covarianceMatrix_pd.to_numpy().reshape((int(nrows),int(nrows)))
covarianceMatrix_pd
p4tfgftt

p4tfgftt1#

一种方法是使用 row_number 在Dataframe计数之后使用pivot:

from pyspark.sql import functions as F, Window
from math import sqrt
c = int(sqrt(df.count())) #this gives 3
rnum = F.row_number().over(Window.orderBy(F.lit(1)))

out = (df.withColumn("Rnum",((rnum-1)/c).cast("Integer"))
 .withColumn("idx",F.row_number().over(Window.partitionBy("Rnum").orderBy("Rnum")))
.groupby("Rnum").pivot("idx").agg(F.first("number")))
out.show()

+----+----+----+----+
|Rnum|   1|   2|   3|
+----+----+----+----+
|   0|12.4|13.4|42.3|
|   1|33.4|42.3|32.4|
|   2|44.2|12.3|45.4|
+----+----+----+----+

相关问题