如何将一个PandasDataframe保存到hdfs?

nwsw7zdq  于 2021-06-01  发布在  Hadoop
关注(0)|答案(4)|浏览(865)

我与Pandas和SparkDataframe工作。Dataframe总是非常大(>20gb),标准的spark函数对于这些大小是不够的。目前,我正在将PandasDataframe转换为sparkDataframe,如下所示:

dataframe = spark.createDataFrame(pandas_dataframe)

我这样做是因为spark将Dataframe写入hdfs非常简单:

dataframe.write.parquet(output_uri, mode="overwrite", compression="snappy")

但是对于大于2GB的Dataframe,转换失败了。如果我将sparkDataframe转换为Pandas,我可以使用pyarrow:

// temporary write spark dataframe to hdfs
dataframe.write.parquet(path, mode="overwrite", compression="snappy")

// open hdfs connection using pyarrow (pa)
hdfs = pa.hdfs.connect("default", 0)
// read parquet (pyarrow.parquet (pq))
parquet = pq.ParquetDataset(path_hdfs, filesystem=hdfs)
table = parquet.read(nthreads=4)
// transform table to pandas
pandas = table.to_pandas(nthreads=4)

// delete temp files
hdfs.delete(path, recursive=True)

这是一个从spark到pandas的快速对话,它也适用于大于2GB的Dataframe。我却找不到另一种方法。意思是有一个PandasDataframe,我转换为Spark的帮助下pyarrow。问题是我真的找不到如何将PandasDataframe写入hdfs。
我的Pandas版本:0.19.0

kuuvgm7e

kuuvgm7e1#

从https://issues.apache.org/jira/browse/spark-6235
支持并行化大于2gb的r data.frame
已解决。
从https://pandas.pydata.org/pandas-docs/stable/r_interface.html
将Dataframe转换为r对象
您可以将Dataframe转换为r data.frame
所以也许转换Pandas->r->Spark->hdfs?

mnemlml8

mnemlml82#

黑客可以从大Dataframe创建n个pandasDataframe(每个小于2GB)(水平分区),并创建n个不同的sparkDataframe,然后合并(合并)它们以创建最后一个Dataframe写入hdfs。我假设你的主机器是强大的,但你也有一个集群,你运行的Spark。

vatpfxk5

vatpfxk53#

另一种方法是将pandasDataframe转换为sparkDataframe(使用pyspark)并使用save命令将其保存到hdfs。例子

df = pd.read_csv("data/as/foo.csv")
    df[['Col1', 'Col2']] = df[['Col2', 'Col2']].astype(str)
    sc = SparkContext(conf=conf)
    sqlCtx = SQLContext(sc)
    sdf = sqlCtx.createDataFrame(df)

在这里 astype 更改列的类型 objectstring . 这将使您免于引发其他异常,因为spark无法找出异常类型 object . 但要确保这些列确实是string类型。
现在要在hdfs中保存df:

sdf.write.csv('mycsv.csv')
jpfvwuh4

jpfvwuh44#

意思是有一个PandasDataframe,我转换为Spark的帮助下pyarrow。 pyarrow.Table.fromPandas 您需要的功能是:

Table.from_pandas(type cls, df, bool timestamps_to_ms=False, Schema schema=None, bool preserve_index=True)

Convert pandas.DataFrame to an Arrow Table
import pyarrow as pa

pdf = ...  # type: pandas.core.frame.DataFrame
adf = pa.Table.from_pandas(pdf)  # type: pyarrow.lib.Table

结果可以直接写入parquet/hdfs,而无需通过spark传递数据:

import pyarrow.parquet as pq

fs  = pa.hdfs.connect()

with fs.open(path, "wb") as fw
    pq.write_table(adf, fw)

另请参见
@wesmckinneyanswer使用pyarrow从hdfs读取Parquet文件。
在中读写apache parquet格式 pyarrow 文档。
python中的本机hadoop文件系统(hdfs)连接
spark注解:
此外,由于spark 2.3(当前主控)箭头直接支持 createDataFrame (spark-20791-使用apachearrow从pandas.dataframe改进spark createdataframe)。它使用 SparkContext.defaultParallelism 计算块的数量,以便轻松控制单个批的大小。
最后 defaultParallelism 可用于控制使用标准 _convert_from_pandas ,有效地减小了切片的大小,使其更易于管理。
不幸的是,这些不太可能解决您当前的内存问题。两者都取决于 parallelize ,因此将所有数据存储在驱动程序节点的内存中。切换到箭头或调整配置只能加快进程或地址块大小限制。
在实践中,我不认为有任何理由在这里切换到Spark,只要你使用当地的Pandas DataFrame 作为输入。在这种情况下,最严重的瓶颈是驱动程序的网络i/o,分发数据无法解决这个问题。

相关问题