Pyspark -追加两个json结构

yvgpqqbh  于 2022-11-01  发布在  Spark
关注(0)|答案(1)|浏览(230)

我在pyspark中创建了一个databricks笔记本,并且尝试创建一个动态json(TSML)来处理analysis service中的多维数据集。
在代码的第一部分中,我定义了分区和事实表,然后独立于我放置的分区,它将动态创建一个json,并且创建得很好:第一次
json输出1:{"table": "Analysis Financial", "partition": "IT_2022"}, {"table": "Analysis Financial", "partition": "ES_2022"}, {"table": "Analysis Financial", "partition": "BE_2022"}, {"table": "Analysis Financial", "partition": "PT_2022"}, {"table": "Analysis Product", "partition": "IT"}, {"table": "Analysis Product", "partition": "ES"}, {"table": "Analysis Product", "partition": "PT"}, {"table": "Analysis Product", "partition": "BE"}
然后我想追加另一个包含维表的json,这部分是静态的,因为表不需要按分区动态处理:

而且,附加这两个我尝试:

指令集
但输出不正确:

  1. {"MaxParallelism": 1, "Objects": [{"table": "Analysis Measures"}, {"table": "Bank"}, {"table": "Channel"}], "RetryCount": 2, "Type": "Full"}, {"partition": "IT_2022", "table": "Analysis Financial"}, {"partition": "ES_2022", "table": "Analysis Financial"}, {"partition": "BE_2022", "table": "Analysis Financial"}, {"partition": "PT_2022", "table": "Analysis Financial"}, {"partition": "IT", "table": "Analysis Product"}, {"partition": "ES", "table": "Analysis Product"}, {"partition": "PT", "table": "Analysis Product"}, {"partition": "BE", "table": "Analysis Product"}

在这种情况下,我希望得到的输出是:

  1. {
  2. "Type":"Full",
  3. "MaxParallelism":1,
  4. "RetryCount":2,
  5. "Objects":[
  6. {
  7. "table":"Analysis Measures"
  8. },
  9. {
  10. "table":"Bank"
  11. },
  12. {
  13. "table":"Channel"
  14. },
  15. {
  16. "table":"Analysis Financial",
  17. "partition":"IT_2022"
  18. },
  19. {
  20. "table":"Analysis Financial",
  21. "partition":"ES_2022"
  22. },
  23. {
  24. "table":"Analysis Financial",
  25. "partition":"BE_2022"
  26. },
  27. {
  28. "table":"Analysis Financial",
  29. "partition":"PT_2022"
  30. },
  31. {
  32. "table":"Analysis Product",
  33. "partition":"IT"
  34. },
  35. {
  36. "table":"Analysis Product",
  37. "partition":"ES"
  38. },
  39. {
  40. "table":"Analysis Product",
  41. "partition":"PT"
  42. },
  43. {
  44. "table":"Analysis Product",
  45. "partition":"BE"
  46. }
  47. ]
  48. }

我正在使用的笔记本电脑:

  1. # Databricks notebook source
  2. from pyspark.sql.types import *
  3. from pyspark.sql.functions import *
  4. # from pyspark.sql.functions.sequence import *
  5. from pyspark.sql.window import Window
  6. from datetime import datetime, timedelta
  7. from pathlib import Path
  8. from functools import reduce
  9. import traceback
  10. import pyodbc
  11. import uuid
  12. import sys
  13. spark.conf.set("spark.databricks.delta.targetFileSize", 33554432)
  14. spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
  15. spark.conf.set("spark.databricks.io.cache.enabled", "true")
  16. sqlContext.clearCache()
  17. # COMMAND ----------
  18. dbutils.widgets.text("cubeName", "Consumer", "cubeName")
  19. dbutils.widgets.text("countryPartition", '["PT","IT","ES","BE"]', "countryPartition")
  20. dbutils.widgets.text("yearPartition", '["2022"]', "yearPartition")
  21. dbutils.widgets.text("partitionColumn", '["Analysis Financial|country_year","Analysis Product|country"]', "partitionColumn")
  22. cubeName = dbutils.widgets.get('cubeName')
  23. countryPartition = dbutils.widgets.get('countryPartition')
  24. yearPartition = dbutils.widgets.get('yearPartition')
  25. partitionColumn = dbutils.widgets.get('partitionColumn')
  26. print(cubeName)
  27. print(countryPartition)
  28. print(yearPartition)
  29. print(partitionColumn)
  30. # COMMAND ----------
  31. cp_df = spark.createDataFrame(data=[[i] for i in eval(dbutils.widgets.get('countryPartition'))],schema=['country'])
  32. y_df = spark.createDataFrame(data=[[i] for i in eval(dbutils.widgets.get('yearPartition'))],schema=['year'])
  33. # p_df = spark.createDataFrame(data=[[i] for i in eval(dbutils.widgets.get('partitionColumn'))],schema=['partition_col'])
  34. p_df = spark.createDataFrame(data=[[i] for i in eval(dbutils.widgets.get('partitionColumn'))],schema=['partition_col'])
  35. # applying cross join to get all combination results.
  36. from pyspark.sql.functions import broadcast
  37. # final_df= broadcast(broadcast(cp_df).crossJoin(y_df)).crossJoin(path_df)
  38. final_df= broadcast(broadcast(cp_df).crossJoin(y_df)).crossJoin(p_df)
  39. from pyspark.sql.functions import split
  40. fdf = final_df.select('country','year',split(final_df['partition_col'],'[|]').getItem(0).alias('table'),split(final_df['partition_col'],'[|]').getItem(1).alias('partitionColumn')).distinct()
  41. fdf = fdf.withColumn('partition', when(col('partitionColumn') == "country", lit(col('country'))).otherwise(concat(col('country'), lit('_'), col('year'))))
  42. fdf = fdf.select('table', 'partition').distinct()
  43. # # display(fdf)
  44. output = [eval(i) for i in fdf.toJSON().collect()]
  45. import json
  46. print(json.dumps(output))
  47. # COMMAND ----------
  48. b = {
  49. "Type": "Full",
  50. "MaxParallelism": 1,
  51. "RetryCount": 2,
  52. "Objects": [
  53. {
  54. "table": "Analysis Measures"
  55. },
  56. {
  57. "table": "Bank"
  58. },
  59. {
  60. "table": "Channel"
  61. }
  62. ]
  63. }
  64. # COMMAND ----------
  65. json_content1 = b
  66. json_content2 = output
  67. json_list = []
  68. json_list.append(json_content1)
  69. json_list.append(json_content2)
  70. df = spark.read.json(sc.parallelize(json_list))
  71. # display(df)
  72. df = [eval(i) for i in df.toJSON().collect()]
  73. import json
  74. dbutils.notebook.exit(json.dumps(df))

有人能帮我实现这一点吗?
谢谢你,谢谢你

66bbxpm5

66bbxpm51#

请检查Append two JSON结构的代码,使用['Objects'].extend您将获得所需的输出。

  1. json_content1 = b
  2. json_content2 = output
  3. json_content1['Objects'].extend(json_content2)
  4. dbutils.notebook.exit(json.dumps(json_content1))

输出:

相关问题