我正在用python开发一个函数,然后将其注册为spark udf并将其应用于一个列。实际上,它所做的是从包含xml字符串的配置单元表中获取一列( xml_str
),将其解析为字典列表,然后我需要将这些字典作为附加列展开。
这里是我到目前为止所做的(假设函数已经定义为 parse_xml
):
xml_df = hive_context.table('db.table')
spark.udf.register("parse_xml_udf", parse_xml)
df = xml_df.withColumn('parsed_xml', parse_xml_udf(xml_df['xml_str']))
XML有许多不同的模型,我想为每种模型做的是:
new = data[data['alert_id'] == 'MODEL1']
s = new['parsed xml'].explode()
t = (new.join(pd.DataFrame(s.tolist(),index=s.index).groupby(level=0).agg(lambda x: x.dropna().tolist()), lsuffix = '_x', rsuffix = '_y')).applymap(lambda x: x[0] if (type(x)==list and len(x)==1) else x)
在pandas中,这将创建一个新的df,其中包含从xml解析的值的附加列。
我的主要问题是,我能否简单地创建一个新的spark函数来分解解析后的xml(比如 explode_parsed_xml
)使用
df2 = df.withColumn('explode_parsed_xml', explode_parsed_xml(df['parsed_xml']))
问题是我的explode函数将创建多个附加列,这些列将在字典中的键上重新命名。
暂无答案!
目前还没有任何答案,快来回答吧!