Pyspark -将平面结构转换为结构类型

fafcakar  于 2023-11-16  发布在  Spark
关注(0)|答案(2)|浏览(140)

我需要在pyspark中将平面结构转换为嵌套结构类型输入看起来像-

root
 |-- id: struct (nullable = true)
 |-- name: string (nullable = true)
 |-- created_at: timestamp (nullable = true)

字符串
我需要动态(在现实中有20多列)转换成下面的结构,也改变了datashees-

root
 |-- payload: struct (nullable = true)
 |    |-- after: struct (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- created_at: long (nullable = true)


请注意-时间戳数据类型需要转换为长。
如何在pyspark中动态地做到这一点?

oknwwptz

oknwwptz1#

使用struct函数嵌套列。

from pyspark.sql.functions import col, unix_timestamp, struct

>>> df.select(struct(struct(col("id"), col("name"), unix_timestamp(col("created_at"))).alias("after")).alias("payload")).printSchema()
root
 |-- payload: struct (nullable = false)
 |    |-- after: struct (nullable = false)
 |    |    |-- id: integer (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- col3: long (nullable = true)

字符串

fzsnzjdm

fzsnzjdm2#

基于@Srinivas的想法(使用struct创建嵌套结构),但动态地进行:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col,struct,lit
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, LongType, StringType

# Initialize Spark Session
spark = SparkSession.builder.getOrCreate()

def nest_dataframe(df: DataFrame) -> DataFrame:
    # Step 1: Identify column names and data types
    schema = df.schema
    col_names = [f.name for f in schema.fields]
    col_types = {f.name: f.dataType for f in schema.fields}

    # Step 2: Convert to desired data types
    for col_name in col_names:
        if col_types[col_name].typeName() == "timestamp":
            df = df.withColumn(col_name, col(col_name).cast("long"))

    # Step 3: Create the nested structure using struct
    nested_cols = [col(name) for name in col_names]
    after_struct = struct(nested_cols).alias("after")
    payload_struct = struct(after_struct).alias("payload")

    # Step 4: Apply nested structure
    new_df = df.select(payload_struct)
    
    return new_df

字符串
我们可以用下面的代码来测试这个函数:

# Sample DataFrame
data = [("1", "Alice", "2021-09-30 12:34:56"), ("2", "Bob", "2021-10-01 01:45:30")]
columns = ["id", "name", "created_at"]

df = spark.createDataFrame(data, columns)
df = df.withColumn("created_at", col("created_at").cast("timestamp"))
df = df.withColumn("yet_another_column", lit(3))

# Use the function to get the nested DataFrame
nested_df = nest_dataframe(df)
nested_df.printSchema()


输出应

root
 |-- payload: struct (nullable = false)
 |    |-- after: struct (nullable = false)
 |    |    |-- id: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- created_at: long (nullable = true)
 |    |    |-- another_column: integer (nullable = false)


尝试它与您的实际框架,并让我们知道,如果它的工作预期。

相关问题