pyspark 使用pandas_API和toPandas将Spark转换为Pandas

0sgqnhkj  于 2023-10-15  发布在  Spark
关注(0)|答案(1)|浏览(170)
df = spark.table("data").limit(100)
df = df.toPandas()

这种使用.toPandas的转换工作得很好,因为df.limit只有几行。如果我去掉limit并对整个df执行toPandas,我会得到一个错误 “Job aborted due to stage failure”
我一直在使用.pandas_api(),它一直工作得很好,但我不能在sklearn函数上使用它。我试着将一个列传入fit_transform,得到错误:* “方法pd.Series.__iter__()未实现。"*
如果我限制数据集,使用toPandas,那么fit_transform就可以正常工作。
我该怎么做?
我试过了,它很有效:

df = spark.table("data").limit(100)
df = df.toPandas()
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
encoder = LabelEncoder()
df["p"] = encoder.fit_transform(df["p"])

去掉限制,我就不能转换成Pandas了相反,我尝试了API:

df = df.pandas_api()

它转换了,但是我不能把列传递给fit_transform

z2acfund

z2acfund1#

我会尽力回答你的问题,但我不确定是否有解决方案-原因如下:
您尝试使用toPandas()调用所做的是将spark Dataframe转换为pandas DataFrame。此操作实际上需要将所有数据存储在pandas DataFrame中,这可能会使您的执行耗尽内存-因此需要Job aborted due to stage failure。虽然错误不是明确的,但它通常是一个迹象,表明内存方面出现了问题。还有其他原因,但这似乎是最有可能的,因为当您调用.limit函数时,您的执行工作正常(在您的情况下,它将您的嵌套框限制为100行)。
好了,那么.toPandas()调用应该返回一个pandas DataFrame.pandas_api()是什么意思好吧,在scikit-learn的上下文中,并不是你想要的。Here's.pandas_api()函数文档,但请注意,它返回一个PandasOnSparkDataFrame对象,而scikit-learn可能需要一个pandas Dataframe或一些numpy array
话虽如此,一种选择是使用limit调用将大型spark Dataframe拆分为多个pandas Dataframes-即尝试获取一半的数据,看看是否失败如果是这样,尝试获得25%,等等。您可以保存生成的pandas DataFrames作为CSV文件,在for循环中将每个生成的CSV文件阅读到pandas DataFrames中,并顺序应用相同的Label Encoder。或者你可以尝试将所有部分pandas DataFrames保存为numpy数组pkl对象,然后连接所有矩阵来重构数据。
不是完美的解决方案,但它们应该让你找到一个适合你的解决方案。

相关问题