我对spark/pyspark还很陌生,我正在尝试将一些pandas代码转换成pyspark。
简言之,问题是:如何在行方向转换sparkDataframe的一些数字列的同时保留行索引值。
我有一个dataframe,其中有几列作为索引,其余的都是数字数据,我需要对这些数据进行几次转换
i0 i1 c0 c1 c2
0 0 A 1.764052 -0.977278 0.144044
1 1 B 0.400157 0.950088 1.454274
2 2 C 0.978738 -0.151357 0.761038
3 3 D 2.240893 -0.103219 0.121675
4 4 E 1.867558 0.410599 0.443863
所以列i0和i1是索引,c0-c2是数据。
我想做的是对数值列(按行)应用一些转换,但保留索引信息。
在下面的例子中,我使用“行平均值减去”作为一个例子,我需要做的实际操作是多种多样的,需要任意函数。我知道你不需要用一个函数来减去sparkDataframe的平均值,我只是把它作为一个简化。
设置代码如下:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.master("local").getOrCreate()
def pd_norm(df):
return df.sub(df.mean(axis=1), axis=0)
def pd_someop(it):
for df in it:
yield pd_norm(df)
np.random.seed(0)
nrows = 5
ncols = 3
v = pd.DataFrame({'i0' : range(nrows), 'i1' : [chr(65 + i) for i in range(nrows)]})
v = v.join(pd.DataFrame({'c' + str(x) : np.random.normal(size=nrows) for x in range(ncols)}))
vdf = spark.createDataFrame(v)
我可以像这样跑
vdf.select([F.col(x) for x in vdf.columns[2:]]).mapInPandas(pd_someop, schema=vdf.schema[2:]).show()
它将应用转换,但是不能保证按顺序返回行,所以我不知道如何用它们的索引列值获得转换后的值。
我无法传递索引列,因为我不希望它们包含在转换计算中。它们可以是日期或字符串,而不是简单的整数索引/行号。
在Pandas中,我会做一些
v.iloc[:,:2].join(pd_norm(v.iloc[:,2:]))
这给了
i0 i1 c0 c1 c2
0 0 A 1.453780 -1.287551 -0.166229
1 1 B -0.534683 0.015249 0.519434
2 2 C 0.449265 -0.680830 0.231565
3 3 D 1.487777 -0.856335 -0.631441
4 4 E 0.960218 -0.496741 -0.463477
i、 我有原始索引的转换数值列。
我也有相当数量的列(10到1000),所以我显式硬编码列名的解决方案是不可行的。
我真的在寻找一种通用的使用模式,在这种模式中,我有几个列组成一个索引,几百个列需要通过一些任意的行函数进行转换。
我考虑过将列元数据添加到spark数据框中,以指示该列是否是索引,但此元数据不能用于pandas函数,因此我无法在那里过滤索引列。
我希望这是清楚的。就像我说的我是一个新的Spark,所以我不知道我是否只是错过了一些明显的东西。谢谢。
1条答案
按热度按时间wwtsj6pe1#
你可以改变主意
yield
分开如下,改变你的呼叫方式mapInPandas
: