我在将pysparkDataframe导出到csv时遇到问题。也许我误解了spark的工作原理。
我尝试过将Dataframe导出到一个Dataframe,然后再导出到一个csv,但没有成功:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, HiveContext, SQLContext
from datetime import datetime
import time
import sys
import subprocess
start = time.time()
start_time = datetime.now()
myhost = sys.argv[1]
oracle_cnx = sys.argv[2]
thrift_host = "thrift://"+ myhost+":9083"
print thrift_host
# Initialise Hive metastore
SparkContext.setSystemProperty("hive.metastore.uris", thrift_host)
# Create Spark Session
spark = (SparkSession
.builder
.appName('Pyspark-read-and-write-from-hive')
.config("hive.metastore.uris", thrift_host)
.enableHiveSupport()
.getOrCreate())
hive_query = """
select cast(u.policy_key as int) , cast(u.endorsement_cde as int) from (
select policy_key, endorsement_cde
from v_od_genpolicyshadow_listall_perm
union
select policy_key, endorsement_cde
from v_od_genpolicy_listall_perm
) u
left join v_od_genpolicyshadow_listall_perm s
on s.policy_key = u.policy_key
and s.endorsement_cde = u.endorsement_cde
where s.policy_key is NULL
"""
print('Start the HIVE query')
df_hive = spark.sql(hive_query)
# print "Number of rows in HIVE final policies DF"
# print df_hive.count() #Produces an error
from pyspark.sql.functions import *
# print df_hive.printSchema()
# Start the oracle query, it's a python script that loads two columns
# in a pandas dataframe then saves the dataframe into a csv file.
# (The file is populated after the oracle_query.py script finishes)
subprocess.call(['python', 'oracle_query.py', oracle_cnx])
# Read the oracle csv file
df_oracle = spark\
.read\
.option("inferSchema", "true")\
.option("header", "true")\
.option('delimiter', '|')\
.csv("data/oracle_query.csv")
df_merge = df_hive.join(df_oracle, (df_hive.policy_key == df_oracle.policy_key_g) & (df_hive.endorsement_cde == df_oracle.endorsement_cde_g), how='left')
df_merge = df_merge.toPandas().collect()
df_final = df_merge.to_csv('data/test.csv', sep="|", encoding='utf-8')
我得到以下错误:
Start the HIVE query
Start the oracle query
Insert into csv
Conversion to Pandas dataframe
root
|-- policy_key: integer (nullable = true)
|-- endorsement_cde: integer (nullable = true)
|-- policy_key_g: integer (nullable = true)
|-- endorsement_cde_g: integer (nullable = true)
None
2019-01-17 17:37:55 WARN Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
Traceback (most recent call last):
File "main.py", line 77, in <module>
df_merge = df_merge.toPandas().collect()
File "/usr/local/lib/python2.7/site-packages/pyspark/sql/dataframe.py", line 1966, in toPandas
pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
File "/usr/local/lib/python2.7/site-packages/pyspark/sql/dataframe.py", line 466, in collect
port = self._jdf.collectToPython()
File "/usr/local/lib/python2.7/site-packages/py4j/java_gateway.py", line 1160, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/local/lib/python2.7/site-packages/pyspark/sql/utils.py", line 63, in deco
return f(*a,**kw)
File "/usr/local/lib/python2.7/site-packages/py4j/protocol.py", line 320, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o55.collectToPython.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
日志还很长,但我认为主要是这个
我也试过:
df_merge.coalesce(1).write.csv('data/test.csv', sep="|", header=True)
但是我没有创建一个文件,而是创建了一个名为 test.csv
,其中没有文件。
我基本上有两个Dataframe,我正在合并,并希望在一个csv文件的输出。我错过了什么?
暂无答案!
目前还没有任何答案,快来回答吧!