从hdfs读取xml文件,用lxml.etree在pyspark中进行解析

vfwfrxfs  于 2021-05-31  发布在  Hadoop
关注(0)|答案(1)|浏览(452)

我已经使用lxml.etree在python中编写了一个解析器,现在我正尝试在hadoop集群上运行这个解析器。当我在本地运行该函数时,它会按预期工作,但是当我尝试将其应用于集群上的文件时,我收到以下错误(我正在pyspark shell python3中执行以下操作)

xml_pathname = "hdfs://file_path/date_directory/example_one.xml"
xml_tree = etree.parse(xml_pathname)

OSError: Error reading file '/file_path/date_directory/example_one.xml': failed to load external entity 
"/file_path/date_directory/example_one.xml"

我运行时可以看到文件 hdfs dfs -ls /file_path/date_directory/example_one.xml 在候机楼。
有两个方面我很感激你的帮助-
如何使用pyspark将xml文件从集群加载到lxml.etree.parse()方法中?
我如何才能最好地扩大规模,以有效地运行在Spark?我想使用我的python解析器解析集群上数百万个xml文件——下面的修改是否有效,或者是否有更好的方法来Parralize和大规模运行解析器?一般来说,我应该如何在spark配置中设置参数以获得最佳结果(大量执行者、多个驱动程序等)?


# Same as above but with wildcards to parse millions of XML files

xml_pathname = "hdfs://file_path/*/*.xml"
xml_tree = etree.parse(xml_pathname)

我已经为此工作了一段时间,非常感谢大家的帮助。谢谢你们

lvjbypge

lvjbypge1#

mapvalues()函数被证明是有用的。sark配置的xml解析器(例如pubmed解析器)也提供了有用的样板代码,例如:

path_rdd = sc.parallelize(path_sample, numSlices=10000) # use only example path
    parse_results_rdd = path_rdd.map(lambda x: Row(file_name=os.path.basename(x),**pp.parse_pubmed_xml(x)))
    pubmed_oa_df = parse_results_rdd.toDF()
    pubmed_oa_df_sel = pubmed_oa_df[['full_title', 'abstract', 'doi',
                                     'file_name', 'pmc', 'pmid',
                                     'publication_year', 'publisher_id',
                                     'journal', 'subjects']]
    pubmed_oa_df_sel.write.parquet(os.path.join(save_dir, 'pubmed_oa_%s.parquet' % date_update_str),
                                   mode='overwrite')

https://github.com/titipata/pubmed_parser/blob/master/scripts/pubmed_oa_spark.py
使用fs.globstatus可以检索一个子目录中的多个xml文件。

相关问题