我遇到了一个关于PySpark DataFrame中包含一系列json字符串的行的难题。
这个问题围绕着每一行都可能包含不同的模式,所以当我想在PySpark中将所述行转换为可订阅的数据类型时,我需要有一个“统一的”模式。
例如,考虑以下 Dataframe
import pandas as pd
json_1 = '{"a": 10, "b": 100}'
json_2 = '{"a": 20, "c": 2000}'
json_3 = '{"c": 300, "b": "3000", "d": 100.0, "f": {"some_other": {"A": 10}, "maybe_this": 10}}'
df = spark.createDataFrame(pd.DataFrame({'A': [1, 2, 3], 'B': [json_1, json_2, json_3]}))
注意,每行包含不同版本的json字符串。
import json
import pyspark.sql.functions as fcn
from pyspark.sql import Row
from collections import OrderedDict
from pyspark.sql import DataFrame as SparkDataFrame
def convert_to_row(d: dict) -> Row:
"""Convert a dictionary to a SparkRow.
Parameters
----------
d : dict
Dictionary to convert.
Returns
-------
Row
"""
return Row(**OrderedDict(sorted(d.items())))
def get_schema_from_dictionary(the_dict: dict):
"""Create a schema from a dictionary.
Parameters
----------
the_dict : dict
Returns
-------
schema
Schema understood by PySpark.
"""
return spark.read.json(sc.parallelize([json.dumps(the_dict)])).schema
def get_universal_schema(df: SparkDataFrame, column: str):
"""Given a dataframe, retrieve the "global" schema for the column.
NOTE: It does this by merging across all the rows, so this will
take a long time for larger dataframes.
Parameters
----------
df : SparkDataFrame
Dataframe containing the column
column : str
Column to parse.
Returns
-------
schema
Schema understood by PySpark.
"""
col_values = [json.loads(getattr(item, column)) for item in df.select(column).collect()]
mega_dict = {}
for value in col_values:
mega_dict = {**mega_dict, **value}
return get_schema_from_dictionary(mega_dict)
def get_sample_schema(df, column):
"""Given a dataframe, sample a single value to convert.
NOTE: This assumes that the dataframe has the same schema
over all rows.
Parameters
----------
df : SparkDataFrame
Dataframe containing the column
column : str
Column to parse.
Returns
-------
schema
Schema understood by PySpark.
"""
return get_universal_schema(df.limit(1), column)
def from_json(df: SparkDataFrame, column: str, manual_schema=None, merge: bool = False) -> SparkDataFrame:
"""Convert json-string column to a subscriptable object.
Parameters
----------
df : SparkDataFrame
Dataframe containing the column
column : str
Column to parse.
manual_schema : PysparkSchema, optional
Schema understood by PySpark, by default None
merge : bool, optional
Parse the whole dataframe to extract a global schema, by default False
Returns
-------
SparkDataFrame
"""
if manual_schema is None or manual_schema == {}:
if merge:
schema = get_universal_schema(df, column)
else:
schema = get_sample_schema(df, column)
else:
schema = manual_schema
return df.withColumn(column, fcn.from_json(column, schema))
然后,我可以简单地执行以下操作,以获得具有统一模式的新 Dataframe
df = from_json(df, column='B', merge=True)
df.printSchema()
root
|-- A: long (nullable = true)
|-- B: struct (nullable = true)
| |-- a: long (nullable = true)
| |-- b: string (nullable = true)
| |-- c: long (nullable = true)
| |-- d: double (nullable = true)
| |-- f: struct (nullable = true)
| | |-- maybe_this: long (nullable = true)
| | |-- some_other: struct (nullable = true)
| | | |-- A: long (nullable = true)
现在我们来看看问题的症结所在,因为我在col_values = [json.loads(getattr(item, column)) for item in df.select(column).collect()]
中这样做,所以我受限于主节点上的内存量。
如何执行类似的过程,以便在收集到主节点之前,将更多的工作分配给每个工作者?
2条答案
按热度按时间wrrgggsh1#
如果我理解正确的话,由于我们可以使用RDD作为spark.read.json()方法的
path
参数,并且RDD是分布式的,可以减少在大型数据集上使用collect()
方法时可能出现的OOM问题,因此您可以尝试将函数get_universal_schema
调整为:并保持两个功能:
get_sample_schema()
和from_json()
按原样提供。hgb9j2n62#
Spark
DataFrame
旨在处理具有模式的数据。DataFrame
API公开了对具有已定义模式的数据有用的方法,如groupBy
acolumn,或对column进行操作的聚合函数等。考虑到问题中提出的需求,在我看来,输入数据中似乎没有固定的模式,您不会从
DataFrame
API中受益,事实上,它可能会添加更多的约束。我认为最好将此数据视为“无模式”并使用较低级别的API -
RDD
。RDD根据定义分布在整个集群中。因此,使用RDD API,您可以首先预处理数据(将其作为文本使用),然后将其转换为DataFrame
。