使用复制命令-pyspark将Spark Dataframe 复制到Postgres

gj3fmq9x  于 2022-11-16  发布在  Apache
关注(0)|答案(3)|浏览(319)

我需要将一个spark Dataframe 写入Postgres DB。

df.write
.option("numPartitions",partions)
.option("batchsize",batchsize)
.jdbc(url=url, table="table_name", mode=append, properties=properties)

这工作正常,但是,我想比较性能与'复制'命令
已尝试以下操作

output = io.StringIO() 

 csv_new.write
.format("csv")
.option("header", "true")
.save(path=output)

output.seek(0)
contents = output.getvalue()
cursor.copy_from(output, 'tb_pivot_table', null="") \\using psycopg2 
con_bb.commit()

这似乎不起作用,因为错误“type”对象不可迭代
与Pandas数据框架配合良好

output= io.StringIO()
df.to_csv(path_or_buf=output,sep='\t', header=False, index=False)
output.seek(0)
contents = output.getvalue()
cursor.copy_from(output, 'tb_ts_devicedatacollection_aggregate', null="")  
con_bb.commit()

关于如何在Pyspark中实现Pandas等价物的任何线索。它的性能关键,因此转换到SparkdfPandasdf是不是一个选项。任何帮助将不胜感激

pod7payv

pod7payv1#

目前对我来说非常有效的方法(100- 200 GB的csv文件,大约有1.000.000.000行)是将psycopg 2与多处理结合使用
可用内核:200
首先,我将spark Dataframe 导出到多个文件中,这些文件是可用内核的倍数

filepath="/base_path/psql_multiprocessing_data"

df.repartition(400) \
    .write \
    .mode("overwrite") \
    .format("csv") \ # even faster using binary format, but ok with csv
    .save(filepath,header='false')

然后,我通过并行迭代文件夹中的所有文件

import glob
import psycopg2   
from multiprocessing import Pool, cpu_count

file_path_list=sorted(glob.glob("/base_path/psql_multiprocessing_data/*.csv"))

def psql_copy_load(fileName):
    con = psycopg2.connect(database="my_db",user="my_user",password="my_password",host="my_host",port="my_port")
    cursor = con.cursor()
    with open(fileName, 'r') as f:
        # next(f)  # in case to skip the header row.
        cursor.copy_from(f, 'my_schema.my_table', sep=",")
    
    con.commit()
    con.close()
    return (fileName)
    

with Pool(cpu_count()) as p:
        p.map(psql_copy_load,file_path_list)

print("parallelism (cores): ",cpu_count())
print("files processed: ",len(file_path_list))

我没有进一步尝试将数据导出为二进制,因为它变得复杂与正确的标题和数据类型,我很高兴与运行时间约25-30分钟(6列)

0wi1tuuw

0wi1tuuw2#

据我所知,Spark没有提供在内部使用 copy 命令的方法。
如果你想从hdfs加载postgres,你可能会对Sqoop感兴趣。它允许导出存储在hdfs上的csv。而且,它能够产生多个 copy 语句。在我的实验中,添加4个Map器比只添加一个Map器加快了2倍。这应该比使用spark jdbc的方式快得多。
步骤如下:
1.文件夹名称

  1. sqoop导出--连接“jdbc:postgresql://postgres_host/postgres_db”--用户名--密码文件文件:///home/$USER/.密码--导出目录my_csv_table --表-m 4 --直接--以'\n'终止的行--以','终止的字段----架构

相关问题