pyspark 如何从SQL DB上的连接查询创建多个JSON文件

vngu2lb8  于 2024-01-06  发布在  Spark
关注(0)|答案(1)|浏览(174)

我们有3个相关的表:PublicationObject、Source、SourceObjectDetails(在CSV中维护);我们执行连接并获得结果集:Join
x1c 0d1x的数据
我们希望为每个表创建JSON文件(并将这些数据保持为非规范化形式),如下所示:



我们有500多个出版物对象,每个PO平均有近25个栏目。
所以我们需要为每个PublicationObject创建一个JSON
我们需要使用python或pyspark(在Azure Databricks上)
请分享技巧和解决方案,如何执行此操作

nue99wik

nue99wik1#

您可以通过对列进行分组并将其收集为对象列表来使用以下转换。首先,您需要读取CSV文件并执行连接操作。我们将从生成的连接数据开始。

  1. from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, TimestampType
  2. schema = StructType([
  3. StructField("TableName", StringType(), True),
  4. StructField("DC", StringType(), True),
  5. StructField("Owner", StringType(), True),
  6. StructField("LT", StringType(), True),
  7. StructField("Sour", StringType(), True),
  8. StructField("SourceType", StringType(), True),
  9. StructField("SourceConn", StringType(), True),
  10. StructField("ColumnName", StringType(), True),
  11. StructField("ColPos", IntegerType(), True),
  12. StructField("DataType", StringType(), True),
  13. StructField("IsPrimary", IntegerType(), True),
  14. ])
  15. data = [
  16. ("A", "Confidential", "HR", "Full", "S1", "SQL DB", "<XYZ>", "A1", 1, "string",1),
  17. ("A", "Confidential", "HR", "Full", "S1", "SQL DB", "<XYZ>", "A2", 2, "int",0),
  18. ("A", "Confidential", "HR", "Full", "S1", "SQL DB", "<XYZ>", "A3", 3, "date",1),
  19. ("A", "Confidential", "HR", "Full", "S1", "SQL DB", "<XYZ>", "A4", 4, "timestamp",0),
  20. ]
  21. df = spark.createDataFrame(data, schema=schema)
  22. df.display()

字符串
要获得预期的结果,请使用以下转换查询。

  1. from pyspark.sql.functions import collect_list, struct, col
  2. result_df = (
  3. df.groupBy("TableName", "LT", "DC", "Owner", "Sour", "SourceType", "SourceConn")
  4. .agg(
  5. collect_list(
  6. struct("ColumnName", "ColPos", "DataType", "IsPrimary")
  7. ).alias("SourceObjectDetails")
  8. )
  9. .select(
  10. "TableName",
  11. "LT",
  12. "DC",
  13. "Owner",
  14. struct(
  15. col("Sour").alias("Name"),
  16. col("SourceType").alias("Type"),
  17. col("SourceConn").alias("ConnectDetails"),
  18. "SourceObjectDetails"
  19. ).alias("Source")
  20. )
  21. )
  22. result_df.display()


要为列表中的每个PublicationObject获取JSON对象,请使用以下代码。

  1. import json
  2. jsons = [json.loads(i) for i in result_df.toJSON().collect()]


输出量:

  1. [{'TableName': 'A',
  2. 'LT': 'Full',
  3. 'DC': 'Confidential',
  4. 'Owner': 'HR',
  5. 'Source': {'Name': 'S1',
  6. 'Type': 'SQL DB',
  7. 'ConnectDetails': '<XYZ>',
  8. 'SourceObjectDetails': [{'ColumnName': 'A1',
  9. 'ColPos': 1,
  10. 'DataType': 'string',
  11. 'IsPrimary': 1},
  12. {'ColumnName': 'A2', 'ColPos': 2, 'DataType': 'int', 'IsPrimary': 0},
  13. {'ColumnName': 'A3', 'ColPos': 3, 'DataType': 'date', 'IsPrimary': 1},
  14. {'ColumnName': 'A4',
  15. 'ColPos': 4,
  16. 'DataType': 'timestamp',
  17. 'IsPrimary': 0}]}}]

展开查看全部

相关问题