我在pyspark中创建了一个databricks笔记本,并且尝试创建一个动态json(TSML)来处理analysis service中的多维数据集。
在代码的第一部分中,我定义了分区和事实表,然后独立于我放置的分区,它将动态创建一个json,并且创建得很好:第一次
json输出1:{"table": "Analysis Financial", "partition": "IT_2022"}, {"table": "Analysis Financial", "partition": "ES_2022"}, {"table": "Analysis Financial", "partition": "BE_2022"}, {"table": "Analysis Financial", "partition": "PT_2022"}, {"table": "Analysis Product", "partition": "IT"}, {"table": "Analysis Product", "partition": "ES"}, {"table": "Analysis Product", "partition": "PT"}, {"table": "Analysis Product", "partition": "BE"}
然后我想追加另一个包含维表的json,这部分是静态的,因为表不需要按分区动态处理:
而且,附加这两个我尝试:
指令集
但输出不正确:
{"MaxParallelism": 1, "Objects": [{"table": "Analysis Measures"}, {"table": "Bank"}, {"table": "Channel"}], "RetryCount": 2, "Type": "Full"}, {"partition": "IT_2022", "table": "Analysis Financial"}, {"partition": "ES_2022", "table": "Analysis Financial"}, {"partition": "BE_2022", "table": "Analysis Financial"}, {"partition": "PT_2022", "table": "Analysis Financial"}, {"partition": "IT", "table": "Analysis Product"}, {"partition": "ES", "table": "Analysis Product"}, {"partition": "PT", "table": "Analysis Product"}, {"partition": "BE", "table": "Analysis Product"}
在这种情况下,我希望得到的输出是:
{
"Type":"Full",
"MaxParallelism":1,
"RetryCount":2,
"Objects":[
{
"table":"Analysis Measures"
},
{
"table":"Bank"
},
{
"table":"Channel"
},
{
"table":"Analysis Financial",
"partition":"IT_2022"
},
{
"table":"Analysis Financial",
"partition":"ES_2022"
},
{
"table":"Analysis Financial",
"partition":"BE_2022"
},
{
"table":"Analysis Financial",
"partition":"PT_2022"
},
{
"table":"Analysis Product",
"partition":"IT"
},
{
"table":"Analysis Product",
"partition":"ES"
},
{
"table":"Analysis Product",
"partition":"PT"
},
{
"table":"Analysis Product",
"partition":"BE"
}
]
}
我正在使用的笔记本电脑:
# Databricks notebook source
from pyspark.sql.types import *
from pyspark.sql.functions import *
# from pyspark.sql.functions.sequence import *
from pyspark.sql.window import Window
from datetime import datetime, timedelta
from pathlib import Path
from functools import reduce
import traceback
import pyodbc
import uuid
import sys
spark.conf.set("spark.databricks.delta.targetFileSize", 33554432)
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
spark.conf.set("spark.databricks.io.cache.enabled", "true")
sqlContext.clearCache()
# COMMAND ----------
dbutils.widgets.text("cubeName", "Consumer", "cubeName")
dbutils.widgets.text("countryPartition", '["PT","IT","ES","BE"]', "countryPartition")
dbutils.widgets.text("yearPartition", '["2022"]', "yearPartition")
dbutils.widgets.text("partitionColumn", '["Analysis Financial|country_year","Analysis Product|country"]', "partitionColumn")
cubeName = dbutils.widgets.get('cubeName')
countryPartition = dbutils.widgets.get('countryPartition')
yearPartition = dbutils.widgets.get('yearPartition')
partitionColumn = dbutils.widgets.get('partitionColumn')
print(cubeName)
print(countryPartition)
print(yearPartition)
print(partitionColumn)
# COMMAND ----------
cp_df = spark.createDataFrame(data=[[i] for i in eval(dbutils.widgets.get('countryPartition'))],schema=['country'])
y_df = spark.createDataFrame(data=[[i] for i in eval(dbutils.widgets.get('yearPartition'))],schema=['year'])
# p_df = spark.createDataFrame(data=[[i] for i in eval(dbutils.widgets.get('partitionColumn'))],schema=['partition_col'])
p_df = spark.createDataFrame(data=[[i] for i in eval(dbutils.widgets.get('partitionColumn'))],schema=['partition_col'])
# applying cross join to get all combination results.
from pyspark.sql.functions import broadcast
# final_df= broadcast(broadcast(cp_df).crossJoin(y_df)).crossJoin(path_df)
final_df= broadcast(broadcast(cp_df).crossJoin(y_df)).crossJoin(p_df)
from pyspark.sql.functions import split
fdf = final_df.select('country','year',split(final_df['partition_col'],'[|]').getItem(0).alias('table'),split(final_df['partition_col'],'[|]').getItem(1).alias('partitionColumn')).distinct()
fdf = fdf.withColumn('partition', when(col('partitionColumn') == "country", lit(col('country'))).otherwise(concat(col('country'), lit('_'), col('year'))))
fdf = fdf.select('table', 'partition').distinct()
# # display(fdf)
output = [eval(i) for i in fdf.toJSON().collect()]
import json
print(json.dumps(output))
# COMMAND ----------
b = {
"Type": "Full",
"MaxParallelism": 1,
"RetryCount": 2,
"Objects": [
{
"table": "Analysis Measures"
},
{
"table": "Bank"
},
{
"table": "Channel"
}
]
}
# COMMAND ----------
json_content1 = b
json_content2 = output
json_list = []
json_list.append(json_content1)
json_list.append(json_content2)
df = spark.read.json(sc.parallelize(json_list))
# display(df)
df = [eval(i) for i in df.toJSON().collect()]
import json
dbutils.notebook.exit(json.dumps(df))
有人能帮我实现这一点吗?
谢谢你,谢谢你
1条答案
按热度按时间66bbxpm51#
请检查Append two JSON结构的代码,使用
['Objects'].extend
您将获得所需的输出。输出: