向pyspark.sql.functions.udf传递可变数量的参数

nsc4cvqm  于 2024-01-06  发布在  Spark
关注(0)|答案(1)|浏览(171)

我正在使用pyspark创建一个spark结构的流应用程序,并希望将每一行的数据输出为json包。我正在使用udf,如下所示。

  1. from pyspark.sql.functions import udf
  2. 1 def create_json_packet(when, ip, mac):
  3. 2 json_dict = {
  4. 3 'When': when.timestamp(),
  5. 4 'IP': ip,
  6. 5 'MAC': mac
  7. 6 }
  8. 7 return json.dumps(json_dict)
  9. 8 def construct_output_packet(data_frame)
  10. 9 json_udf = udf(create_json_packet, StringType())
  11. 10 out_df = data_frame.select(json_udf(data_frame.when, data_frame.ip, data_frame.mac)).alias("output_json"))
  12. 11 return out_df

字符串
这是工作正常,我得到了一个很好的格式化JSON的方式,我想如下所示

  1. +------------------------------------------------------------------------+
  2. |output_json |
  3. +------------------------------------------------------------------------+
  4. |{"When": 1704204003.0, "IP": "10.14.6.11", "MAC": "3C:A3:08:4D:91:71"} |
  5. |{"When": 1704204003.0, "IP": "10.18.11.98", "MAC": "02:3F:3B:94:8F:E0"} |
  6. |{"When": 1704204003.0, "IP": "10.13.21.51", "MAC": "F0:3C:07:95:34:C5"} |
  7. +------------------------------------------------------------------------+


现在的问题是data_frame在第10行有超过20列,我不想手动展开所有这些并更新create_json_packet,它输入了20个参数。有没有什么方法可以使用某种列表/compact for循环和**kwargs来实现这一点,data_frame中的列名变成了**kwargskeys,我可以用它来生成json对象。

xesrikrc

xesrikrc1#

您可以创建一个列数组,并将其传递给withColumn中的udf

  1. from pyspark.sql import SparkSession
  2. from pyspark.sql.functions import udf
  3. from pyspark.sql.types import StringType
  4. import json
  5. # Sample data
  6. data = [
  7. ("2022-01-01", "192.168.1.1", "00:1A:2B:3C:4D:5E"),
  8. ("2022-01-02", "192.168.1.2", "11:22:33:44:55:66"),
  9. # Add more rows with similar structure
  10. ]
  11. # Define the schema for the DataFrame
  12. columns = ["when", "ip", "mac"]
  13. # Create the DataFrame
  14. df = spark.createDataFrame(data, columns)
  15. # Define a UDF to create a dictionary from variable parameters
  16. @udf(StringType())
  17. def create_json_packet(*args):
  18. subset_dict = dict(zip(subset_columns, args))
  19. return json.dumps(subset_dict)
  20. # Specify the subset of columns you want to include in the dictionary
  21. subset_columns = ["when", "ip"]
  22. df \
  23. .withColumn("output_json", create_json_packet(*subset_columns)) \
  24. .show(10, truncate=False)
  25. Output:
  26. +----------+-----------+-----------------+-------------------------------------------+
  27. |when |ip |mac |output_json |
  28. +----------+-----------+-----------------+-------------------------------------------+
  29. |2022-01-01|192.168.1.1|00:1A:2B:3C:4D:5E|{"when": "2022-01-01", "ip": "192.168.1.1"}|
  30. |2022-01-02|192.168.1.2|11:22:33:44:55:66|{"when": "2022-01-02", "ip": "192.168.1.2"}|
  31. +----------+-----------+-----------------+-------------------------------------------+

字符串
或者,如果你愿意,你也可以把subset_columns数组作为参数传递给udf

  1. from pyspark.sql.functions import lit
  2. # Define a UDF to create a dictionary from variable parameters
  3. @udf(StringType())
  4. def create_json_packet(subset_columns, *args):
  5. subset_dict = dict(zip(subset_columns, args))
  6. return json.dumps(subset_dict)
  7. df \
  8. .withColumn("output_json", create_json_packet(lit(subset_columns), *subset_columns)) \
  9. .show(10, truncate=False)

展开查看全部

相关问题