我正在使用pyspark创建一个spark结构的流应用程序,并希望将每一行的数据输出为json包。我正在使用udf,如下所示。
from pyspark.sql.functions import udf
1 def create_json_packet(when, ip, mac):
2 json_dict = {
3 'When': when.timestamp(),
4 'IP': ip,
5 'MAC': mac
6 }
7 return json.dumps(json_dict)
8 def construct_output_packet(data_frame)
9 json_udf = udf(create_json_packet, StringType())
10 out_df = data_frame.select(json_udf(data_frame.when, data_frame.ip, data_frame.mac)).alias("output_json"))
11 return out_df
字符串
这是工作正常,我得到了一个很好的格式化JSON的方式,我想如下所示
+------------------------------------------------------------------------+
|output_json |
+------------------------------------------------------------------------+
|{"When": 1704204003.0, "IP": "10.14.6.11", "MAC": "3C:A3:08:4D:91:71"} |
|{"When": 1704204003.0, "IP": "10.18.11.98", "MAC": "02:3F:3B:94:8F:E0"} |
|{"When": 1704204003.0, "IP": "10.13.21.51", "MAC": "F0:3C:07:95:34:C5"} |
+------------------------------------------------------------------------+
型
现在的问题是data_frame
在第10行有超过20列,我不想手动展开所有这些并更新create_json_packet
,它输入了20个参数。有没有什么方法可以使用某种列表/compact for循环和**kwargs
来实现这一点,data_frame
中的列名变成了**kwargs
的keys
,我可以用它来生成json对象。
1条答案
按热度按时间xesrikrc1#
您可以创建一个列数组,并将其传递给
withColumn
中的udf
。字符串
或者,如果你愿意,你也可以把
subset_columns
数组作为参数传递给udf
:型