如何将pysparkDataframe导出到csv文件?

cxfofazt  于 2021-06-27  发布在  Hive
关注(0)|答案(0)|浏览(454)

我在将pysparkDataframe导出到csv时遇到问题。也许我误解了spark的工作原理。
我尝试过将Dataframe导出到一个Dataframe,然后再导出到一个csv,但没有成功:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, HiveContext, SQLContext
from datetime import datetime
import time
import sys
import subprocess

start = time.time()
start_time = datetime.now()

myhost = sys.argv[1]
oracle_cnx = sys.argv[2]
thrift_host = "thrift://"+ myhost+":9083"
print thrift_host

# Initialise Hive metastore

SparkContext.setSystemProperty("hive.metastore.uris", thrift_host)

# Create Spark Session

spark = (SparkSession
            .builder
            .appName('Pyspark-read-and-write-from-hive')
            .config("hive.metastore.uris", thrift_host)
            .enableHiveSupport()
            .getOrCreate())

hive_query = """
select  cast(u.policy_key as int) , cast(u.endorsement_cde as int) from (
    select policy_key, endorsement_cde
    from v_od_genpolicyshadow_listall_perm
union
    select policy_key, endorsement_cde
    from v_od_genpolicy_listall_perm
) u
left join v_od_genpolicyshadow_listall_perm s
on s.policy_key = u.policy_key
and s.endorsement_cde = u.endorsement_cde
where s.policy_key is NULL
"""

print('Start the HIVE query')
df_hive = spark.sql(hive_query)

# print "Number of rows in HIVE final policies DF"

# print df_hive.count()   #Produces an error

from pyspark.sql.functions import *

# print df_hive.printSchema()

# Start the oracle query, it's a python script that loads two columns

# in a pandas dataframe then saves the dataframe into a csv file.

# (The file is populated after the oracle_query.py script finishes)

subprocess.call(['python', 'oracle_query.py', oracle_cnx])

# Read the oracle csv file

df_oracle = spark\
.read\
.option("inferSchema", "true")\
.option("header", "true")\
.option('delimiter', '|')\
.csv("data/oracle_query.csv")

df_merge = df_hive.join(df_oracle, (df_hive.policy_key == df_oracle.policy_key_g) & (df_hive.endorsement_cde == df_oracle.endorsement_cde_g), how='left')

df_merge = df_merge.toPandas().collect()
df_final = df_merge.to_csv('data/test.csv', sep="|", encoding='utf-8')

我得到以下错误:

Start the HIVE query
Start the oracle query
Insert into csv
Conversion to Pandas dataframe                                                  
root
 |-- policy_key: integer (nullable = true)
 |-- endorsement_cde: integer (nullable = true)
 |-- policy_key_g: integer (nullable = true)
 |-- endorsement_cde_g: integer (nullable = true)

None
2019-01-17 17:37:55 WARN  Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.

Traceback (most recent call last):
  File "main.py", line 77, in <module>
    df_merge = df_merge.toPandas().collect()
  File "/usr/local/lib/python2.7/site-packages/pyspark/sql/dataframe.py", line 1966, in toPandas
    pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
  File "/usr/local/lib/python2.7/site-packages/pyspark/sql/dataframe.py", line 466, in collect
    port = self._jdf.collectToPython()
  File "/usr/local/lib/python2.7/site-packages/py4j/java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/lib/python2.7/site-packages/pyspark/sql/utils.py", line 63, in deco
    return f(*a,**kw)
  File "/usr/local/lib/python2.7/site-packages/py4j/protocol.py", line 320, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o55.collectToPython.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:

日志还很长,但我认为主要是这个
我也试过:

df_merge.coalesce(1).write.csv('data/test.csv', sep="|", header=True)

但是我没有创建一个文件,而是创建了一个名为 test.csv ,其中没有文件。
我基本上有两个Dataframe,我正在合并,并希望在一个csv文件的输出。我错过了什么?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题