Pyspark -将平面结构转换为结构类型

fafcakar  于 2023-11-16  发布在  Spark
关注(0)|答案(2)|浏览(175)

我需要在pyspark中将平面结构转换为嵌套结构类型输入看起来像-

  1. root
  2. |-- id: struct (nullable = true)
  3. |-- name: string (nullable = true)
  4. |-- created_at: timestamp (nullable = true)

字符串
我需要动态(在现实中有20多列)转换成下面的结构,也改变了datashees-

  1. root
  2. |-- payload: struct (nullable = true)
  3. | |-- after: struct (nullable = true)
  4. | | |-- id: long (nullable = true)
  5. | | |-- name: string (nullable = true)
  6. | | |-- created_at: long (nullable = true)


请注意-时间戳数据类型需要转换为长。
如何在pyspark中动态地做到这一点?

oknwwptz

oknwwptz1#

使用struct函数嵌套列。

  1. from pyspark.sql.functions import col, unix_timestamp, struct
  2. >>> df.select(struct(struct(col("id"), col("name"), unix_timestamp(col("created_at"))).alias("after")).alias("payload")).printSchema()
  3. root
  4. |-- payload: struct (nullable = false)
  5. | |-- after: struct (nullable = false)
  6. | | |-- id: integer (nullable = true)
  7. | | |-- name: string (nullable = true)
  8. | | |-- col3: long (nullable = true)

字符串

fzsnzjdm

fzsnzjdm2#

基于@Srinivas的想法(使用struct创建嵌套结构),但动态地进行:

  1. from pyspark.sql import SparkSession
  2. from pyspark.sql.functions import col,struct,lit
  3. from pyspark.sql import DataFrame
  4. from pyspark.sql.types import StructType, StructField, LongType, StringType
  5. # Initialize Spark Session
  6. spark = SparkSession.builder.getOrCreate()
  7. def nest_dataframe(df: DataFrame) -> DataFrame:
  8. # Step 1: Identify column names and data types
  9. schema = df.schema
  10. col_names = [f.name for f in schema.fields]
  11. col_types = {f.name: f.dataType for f in schema.fields}
  12. # Step 2: Convert to desired data types
  13. for col_name in col_names:
  14. if col_types[col_name].typeName() == "timestamp":
  15. df = df.withColumn(col_name, col(col_name).cast("long"))
  16. # Step 3: Create the nested structure using struct
  17. nested_cols = [col(name) for name in col_names]
  18. after_struct = struct(nested_cols).alias("after")
  19. payload_struct = struct(after_struct).alias("payload")
  20. # Step 4: Apply nested structure
  21. new_df = df.select(payload_struct)
  22. return new_df

字符串
我们可以用下面的代码来测试这个函数:

  1. # Sample DataFrame
  2. data = [("1", "Alice", "2021-09-30 12:34:56"), ("2", "Bob", "2021-10-01 01:45:30")]
  3. columns = ["id", "name", "created_at"]
  4. df = spark.createDataFrame(data, columns)
  5. df = df.withColumn("created_at", col("created_at").cast("timestamp"))
  6. df = df.withColumn("yet_another_column", lit(3))
  7. # Use the function to get the nested DataFrame
  8. nested_df = nest_dataframe(df)
  9. nested_df.printSchema()


输出应

  1. root
  2. |-- payload: struct (nullable = false)
  3. | |-- after: struct (nullable = false)
  4. | | |-- id: string (nullable = true)
  5. | | |-- name: string (nullable = true)
  6. | | |-- created_at: long (nullable = true)
  7. | | |-- another_column: integer (nullable = false)


尝试它与您的实际框架,并让我们知道,如果它的工作预期。

展开查看全部

相关问题