pyspark中的pickle错误

2skhul33  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(448)

我试图在pyspark中解析xml。我有一个目录,里面有许多小的xml文件,我想解析所有的xml,并将其放入hdfs中,因为我已经编写了下面的代码。
代码:

import xml.etree.ElementTree as ET
from subprocess import Popen, PIPE
import pickle
filenme = sc.wholeTextFiles("/user/root/CD")
dumpoff1 = Popen(["hadoop", "fs", "-put", "-", "/user/cloudera/Demo/Demo.txt"],stdin=PIPE)

def getname(filenm):
   return filenm[1]

def add_hk(filenm):
   source=[]
   global dumpoff1 
   doc = ET.fromstring(filenm)
   for elem1 in doc.findall('.//documentInfo/source'):
       source.append(elem1.text)
       print source[0]
       dumpoff1.stdin.write("%s\n" % source[0]) 

filenme.map(getname).foreach(add_hk)

但当我运行这个我得到下面的错误。
错误:
文件“/opt/cloudera/parcels/cdh-5.11.0-1.cdh5.11.0.p0.34/lib/spark/python/pyspark/cloudpickle.py”,第582行,在save\u file raise pickle.picklingerror(“cannot pickle files that not opened for reading”)pickle.picklingerror:cannot pickle files that not opened for reading
我试图写popen内添加香港然后我没有得到pickle错误,但demo.txt正在被覆盖,只有最新的文件值。请帮忙。

fiei3ece

fiei3ece1#

你应该加载你的 xml 使用spark sql的文件,然后将它们写入hdfs:
假设 /user/root/CD/ 是本地路径(否则请删除 file:// ):

df = spark.read.format('com.databricks.spark.xml').options(rowTag='page').load('file:///user/root/CD/*')

你可以把它写成 parquet :

df.write.parquet([HDFS path])

相关问题