pyspark 在Spark Dataframe中跨多行json字符串统一模式

ifsvaxew  于 2023-01-12  发布在  Spark
关注(0)|答案(2)|浏览(152)

我遇到了一个关于PySpark DataFrame中包含一系列json字符串的行的难题。
这个问题围绕着每一行都可能包含不同的模式,所以当我想在PySpark中将所述行转换为可订阅的数据类型时,我需要有一个“统一的”模式。
例如,考虑以下 Dataframe

import pandas as pd
json_1 = '{"a": 10, "b": 100}'
json_2 = '{"a": 20, "c": 2000}'
json_3 = '{"c": 300, "b": "3000", "d": 100.0, "f": {"some_other": {"A": 10}, "maybe_this": 10}}'
df = spark.createDataFrame(pd.DataFrame({'A': [1, 2, 3], 'B': [json_1, json_2, json_3]}))

注意,每行包含不同版本的json字符串。

import json
import pyspark.sql.functions as fcn
from pyspark.sql import Row
from collections import OrderedDict
from pyspark.sql import DataFrame as SparkDataFrame

def convert_to_row(d: dict) -> Row:
    """Convert a dictionary to a SparkRow.

    Parameters
    ----------
    d : dict
        Dictionary to convert.

    Returns
    -------
    Row

    """
    return Row(**OrderedDict(sorted(d.items())))

def get_schema_from_dictionary(the_dict: dict):
    """Create a schema from a dictionary.

    Parameters
    ----------
    the_dict : dict

    Returns
    -------
    schema
        Schema understood by PySpark.

    """
    return spark.read.json(sc.parallelize([json.dumps(the_dict)])).schema

def get_universal_schema(df: SparkDataFrame, column: str):
    """Given a dataframe, retrieve the "global" schema for the column.

    NOTE: It does this by merging across all the rows, so this will
          take a long time for larger dataframes.

    Parameters
    ----------
    df : SparkDataFrame
        Dataframe containing the column
    column : str
        Column to parse.

    Returns
    -------
    schema
        Schema understood by PySpark.

    """
    col_values = [json.loads(getattr(item, column)) for item in df.select(column).collect()]
    mega_dict = {}
    for value in col_values:
        mega_dict = {**mega_dict, **value}

    return get_schema_from_dictionary(mega_dict)

def get_sample_schema(df, column):
    """Given a dataframe, sample a single value to convert.

    NOTE: This assumes that the dataframe has the same schema
          over all rows.

    Parameters
    ----------
    df : SparkDataFrame
        Dataframe containing the column
    column : str
        Column to parse.

    Returns
    -------
    schema
        Schema understood by PySpark.

    """
    return get_universal_schema(df.limit(1), column)

def from_json(df: SparkDataFrame, column: str, manual_schema=None, merge: bool = False) -> SparkDataFrame:
    """Convert json-string column to a subscriptable object.

    Parameters
    ----------
    df : SparkDataFrame
        Dataframe containing the column
    column : str
        Column to parse.
    manual_schema : PysparkSchema, optional
        Schema understood by PySpark, by default None
    merge : bool, optional
        Parse the whole dataframe to extract a global schema, by default False

    Returns
    -------
    SparkDataFrame

    """
    if manual_schema is None or manual_schema == {}:
        if merge:
            schema = get_universal_schema(df, column)
        else:
            schema = get_sample_schema(df, column)
    else:
        schema = manual_schema

    return df.withColumn(column, fcn.from_json(column, schema))

然后,我可以简单地执行以下操作,以获得具有统一模式的新 Dataframe

df = from_json(df, column='B', merge=True)
df.printSchema()
root
 |-- A: long (nullable = true)
 |-- B: struct (nullable = true)
 |    |-- a: long (nullable = true)
 |    |-- b: string (nullable = true)
 |    |-- c: long (nullable = true)
 |    |-- d: double (nullable = true)
 |    |-- f: struct (nullable = true)
 |    |    |-- maybe_this: long (nullable = true)
 |    |    |-- some_other: struct (nullable = true)
 |    |    |    |-- A: long (nullable = true)

现在我们来看看问题的症结所在,因为我在col_values = [json.loads(getattr(item, column)) for item in df.select(column).collect()]中这样做,所以我受限于主节点上的内存量。
如何执行类似的过程,以便在收集到主节点之前,将更多的工作分配给每个工作者?

wrrgggsh

wrrgggsh1#

如果我理解正确的话,由于我们可以使用RDD作为spark.read.json()方法的path参数,并且RDD是分布式的,可以减少在大型数据集上使用collect()方法时可能出现的OOM问题,因此您可以尝试将函数get_universal_schema调整为:

def get_universal_schema(df: SparkDataFrame, column: str):
    return spark.read.json(df.select(column).rdd.map(lambda x: x[0])).schema

并保持两个功能:get_sample_schema()from_json()按原样提供。

hgb9j2n6

hgb9j2n62#

Spark DataFrame旨在处理具有模式的数据。DataFrame API公开了对具有已定义模式的数据有用的方法,如groupBy acolumn,或对column进行操作的聚合函数等。
考虑到问题中提出的需求,在我看来,输入数据中似乎没有固定的模式,您不会从DataFrame API中受益,事实上,它可能会添加更多的约束。
我认为最好将此数据视为“无模式”并使用较低级别的API -RDD。RDD根据定义分布在整个集群中。因此,使用RDD API,您可以首先预处理数据(将其作为文本使用),然后将其转换为DataFrame

相关问题