pyspark转换Dataframe列的子集,但保留索引

pw136qt2  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(333)

我对spark/pyspark还很陌生,我正在尝试将一些pandas代码转换成pyspark。
简言之,问题是:如何在行方向转换sparkDataframe的一些数字列的同时保留行索引值。
我有一个dataframe,其中有几列作为索引,其余的都是数字数据,我需要对这些数据进行几次转换

i0 i1        c0        c1        c2
0   0  A  1.764052 -0.977278  0.144044
1   1  B  0.400157  0.950088  1.454274
2   2  C  0.978738 -0.151357  0.761038
3   3  D  2.240893 -0.103219  0.121675
4   4  E  1.867558  0.410599  0.443863

所以列i0和i1是索引,c0-c2是数据。
我想做的是对数值列(按行)应用一些转换,但保留索引信息。
在下面的例子中,我使用“行平均值减去”作为一个例子,我需要做的实际操作是多种多样的,需要任意函数。我知道你不需要用一个函数来减去sparkDataframe的平均值,我只是把它作为一个简化。
设置代码如下:

import pandas as pd
import numpy as np

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.master("local").getOrCreate()

def pd_norm(df):
        return df.sub(df.mean(axis=1), axis=0)

def pd_someop(it):
        for df in it:
                yield pd_norm(df)

np.random.seed(0)
nrows = 5
ncols = 3
v = pd.DataFrame({'i0' : range(nrows), 'i1' : [chr(65 + i) for i in range(nrows)]})
v = v.join(pd.DataFrame({'c' + str(x) : np.random.normal(size=nrows) for x in range(ncols)}))
vdf = spark.createDataFrame(v)

我可以像这样跑

vdf.select([F.col(x) for x in vdf.columns[2:]]).mapInPandas(pd_someop, schema=vdf.schema[2:]).show()

它将应用转换,但是不能保证按顺序返回行,所以我不知道如何用它们的索引列值获得转换后的值。
我无法传递索引列,因为我不希望它们包含在转换计算中。它们可以是日期或字符串,而不是简单的整数索引/行号。
在Pandas中,我会做一些

v.iloc[:,:2].join(pd_norm(v.iloc[:,2:]))

这给了

i0 i1        c0        c1        c2
0   0  A  1.453780 -1.287551 -0.166229
1   1  B -0.534683  0.015249  0.519434
2   2  C  0.449265 -0.680830  0.231565
3   3  D  1.487777 -0.856335 -0.631441
4   4  E  0.960218 -0.496741 -0.463477

i、 我有原始索引的转换数值列。
我也有相当数量的列(10到1000),所以我显式硬编码列名的解决方案是不可行的。
我真的在寻找一种通用的使用模式,在这种模式中,我有几个列组成一个索引,几百个列需要通过一些任意的行函数进行转换。
我考虑过将列元数据添加到spark数据框中,以指示该列是否是索引,但此元数据不能用于pandas函数,因此我无法在那里过滤索引列。
我希望这是清楚的。就像我说的我是一个新的Spark,所以我不知道我是否只是错过了一些明显的东西。谢谢。

wwtsj6pe

wwtsj6pe1#

你可以改变主意 yield 分开如下,改变你的呼叫方式 mapInPandas :

def pd_norm(df):
        return df.sub(df.mean(axis=1), axis=0)

def pd_someop(it):
        for df in it:
                yield df.iloc[:,:2].join(pd_norm(df.iloc[:,2:]))

vdf.mapInPandas(pd_someop, schema=vdf.schema).show()
+---+---+------------------+--------------------+--------------------+
| i0| i1|                c0|                  c1|                  c2|
+---+---+------------------+--------------------+--------------------+
|  0|  A|1.4537796668836203| -1.2875505589604548|-0.16622910792316567|
|  1|  B|-0.534682502584706|0.015248706573660176|  0.5194337960110459|
|  2|  C| 0.449265150454061|  -0.680830041949376| 0.23156489149531523|
|  3|  D|1.4877767445678818|  -0.856335306427134| -0.6314414381407477|
|  4|  E|0.9602180818720457|-0.49674140633954944| -0.4634766755324961|
+---+---+------------------+--------------------+--------------------+

相关问题