使用pyspark从json df数组中删除选择性的json

vawmfj5a  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(566)

我想从json数组中删除多个json,我有一个如下所示的json源代码格式。我有一个列表,其中包含需要保存在json数组中的设备id列表,其余的需要删除。例如,如我的源代码json所示,我有3个 dev_id 100010100 , 200020200 and 300030300 .
我有Python名单 device_id_list=[200020200,300030300] ,我的最终json数组应该只包含2个json,dev_id=100010100的json将被删除,如输出json所示。
我尝试了一个可能不是最佳的选择,我的方法是将json作为字符串而不是json读取,如下所示。

  1. df = spark.read.text("path\\iot-sensor.json")
  2. df:pyspark.sql.dataframe.DataFrame
  3. value:string

我已经编写了一个udf来删除json文件中不存在的json文件 device_id_list . 它正在删除 dev_id 不存在,并以字符串形式返回json。
我要这根绳子。 dataframe df2 要转换为json,使用相同的源json模式 (df2:pyspark.sql.dataframe.DataFrame = [iot_station: array] (Sorce Schema) ) 因为源和输出json的模式应该是相同的,如果有更好的解决方案,请与我们分享。
自定义项:

  1. def drop_dev_id(jsonResponse,dict_keys):
  2. try:
  3. data = json.loads(jsonResponse)
  4. i = 0
  5. n = len(data['iot_station'])
  6. while (i < n):
  7. if data['iot_station'][i]["dev_id"] not in dict_keys:
  8. data['iot_station'].pop(i)
  9. n -= 1
  10. else:
  11. i += 1
  12. return data
  13. except Exception as e:
  14. print('Exception --> ' + str(e))
  15. def drop_dev_id_udf(dict_keys):
  16. return udf(lambda row: drop_dev_id(row,dict_keys), StringType())
  17. df2 = df.select('value',drop_dev_id_udf(dict_keys)('value')).select('<lambda>(value)')
  18. df2:pyspark.sql.dataframe.DataFrame
  19. <lambda>(value):string

源json

  1. {
  2. "iot_station": [
  3. {
  4. "dev_id": 100010100,
  5. "device1": dev_val1,
  6. "device2": "dev_val2",
  7. "device3": dev_val3,
  8. "device4": "dev_val4",
  9. "stationid": [
  10. {
  11. "id": id_val,
  12. "idrs": idrs_val,
  13. "idrq": "idrq_val",
  14. "idrx": "idrx_val"
  15. }
  16. ],
  17. "geospat": {
  18. "id": id_val,
  19. "idrs": idrs_val,
  20. "idrq": "idrq_val",
  21. "idrx": "idrx_val"
  22. }
  23. },
  24. {
  25. "dev_id": 200020200,
  26. "device1": dev_val1,
  27. "device2": "dev_val2",
  28. "device3": dev_val3,
  29. "device4": "dev_val4",
  30. "stationid": [
  31. {
  32. "id": id_val,
  33. "idrs": idrs_val,
  34. "idrq": "idrq_val",
  35. "idrx": "idrx_val"
  36. }
  37. ],
  38. "geospat": {
  39. "id": id_val,
  40. "idrs": idrs_val,
  41. "idrq": "idrq_val",
  42. "idrx": "idrx_val"
  43. }
  44. },
  45. {
  46. "dev_id": 300030300,
  47. "device1": dev_val1,
  48. "device2": "dev_val2",
  49. "device3": dev_val3,
  50. "device4": "dev_val4",
  51. "stationid": [
  52. {
  53. "id": id_val,
  54. "idrs": idrs_val,
  55. "idrq": "idrq_val",
  56. "idrx": "idrx_val"
  57. }
  58. ],
  59. "geospat": {
  60. "id": id_val,
  61. "idrs": idrs_val,
  62. "idrq": "idrq_val",
  63. "idrx": "idrx_val"
  64. }
  65. }
  66. ]
  67. }

输出json:

  1. {
  2. "iot_station": [
  3. {
  4. "dev_id": 200020200,
  5. "device1": dev_val1,
  6. "device2": "dev_val2",
  7. "device3": dev_val3,
  8. "device4": "dev_val4",
  9. "stationid": [
  10. {
  11. "id": id_val,
  12. "idrs": idrs_val,
  13. "idrq": "idrq_val",
  14. "idrx": "idrx_val"
  15. }
  16. ],
  17. "geospat": {
  18. "id": id_val,
  19. "idrs": idrs_val,
  20. "idrq": "idrq_val",
  21. "idrx": "idrx_val"
  22. }
  23. },
  24. {
  25. "dev_id": 300030300,
  26. "device1": dev_val1,
  27. "device2": "dev_val2",
  28. "device3": dev_val3,
  29. "device4": "dev_val4",
  30. "stationid": [
  31. {
  32. "id": id_val,
  33. "idrs": idrs_val,
  34. "idrq": "idrq_val",
  35. "idrx": "idrx_val"
  36. }
  37. ],
  38. "geospat": {
  39. "id": id_val,
  40. "idrs": idrs_val,
  41. "idrq": "idrq_val",
  42. "idrx": "idrx_val"
  43. }
  44. }
  45. ]
  46. }
swvgeqrz

swvgeqrz1#

你不需要自定义项来实现你的目标。只需将其作为普通的json而不是文本加载并使用 filter 函数来筛选数组列 iot_station :

  1. from pyspark.sql import functions as F
  2. df = spark.read.json("path/iot-sensor.json", multiLine=True)
  3. device_id_list = [str(i) for i in [200020200, 300030300]]
  4. df1 = df.withColumn(
  5. "iot_station",
  6. F.expr(f"""
  7. filter(
  8. iot_station,
  9. x -> x.dev_id in ({','.join(device_id_list)})
  10. )
  11. """)
  12. )
  13. # check filtered json
  14. df1.select(F.col("iot_station").getItem("dev_id").alias("dev_id")).show(truncate=False)
  15. # +----------------------+
  16. # |dev_id |
  17. # +----------------------+
  18. # |[200020200, 300030300]|
  19. # +----------------------+
展开查看全部

相关问题