df = spark.table("data").limit(100)
df = df.toPandas()
这种使用.toPandas
的转换工作得很好,因为df.limit
只有几行。如果我去掉limit并对整个df执行toPandas
,我会得到一个错误 “Job aborted due to stage failure”
我一直在使用.pandas_api()
,它一直工作得很好,但我不能在sklearn函数上使用它。我试着将一个列传入fit_transform
,得到错误:* “方法pd.Series.__iter__()
未实现。"*
如果我限制数据集,使用toPandas
,那么fit_transform
就可以正常工作。
我该怎么做?
我试过了,它很有效:
df = spark.table("data").limit(100)
df = df.toPandas()
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
encoder = LabelEncoder()
df["p"] = encoder.fit_transform(df["p"])
去掉限制,我就不能转换成Pandas了相反,我尝试了API:
df = df.pandas_api()
它转换了,但是我不能把列传递给fit_transform
。
1条答案
按热度按时间z2acfund1#
我会尽力回答你的问题,但我不确定是否有解决方案-原因如下:
您尝试使用
toPandas()
调用所做的是将spark Dataframe
转换为pandas DataFrame
。此操作实际上需要将所有数据存储在pandas DataFrame
中,这可能会使您的执行耗尽内存-因此需要Job aborted due to stage failure
。虽然错误不是明确的,但它通常是一个迹象,表明内存方面出现了问题。还有其他原因,但这似乎是最有可能的,因为当您调用.limit
函数时,您的执行工作正常(在您的情况下,它将您的嵌套框限制为100行)。好了,那么
.toPandas()
调用应该返回一个pandas DataFrame
。.pandas_api()
是什么意思好吧,在scikit-learn
的上下文中,并不是你想要的。Here's.pandas_api()
函数文档,但请注意,它返回一个PandasOnSparkDataFrame
对象,而scikit-learn
可能需要一个pandas Dataframe
或一些numpy array
。话虽如此,一种选择是使用
limit
调用将大型spark Dataframe
拆分为多个pandas Dataframes
-即尝试获取一半的数据,看看是否失败如果是这样,尝试获得25%,等等。您可以保存生成的pandas DataFrames
作为CSV文件,在for循环中将每个生成的CSV文件阅读到pandas DataFrames
中,并顺序应用相同的Label Encoder。或者你可以尝试将所有部分pandas DataFrames
保存为numpy数组pkl
对象,然后连接所有矩阵来重构数据。不是完美的解决方案,但它们应该让你找到一个适合你的解决方案。