将Pyspark Dataframe 中的字典拆分为单独的列

ao218c7q  于 2023-03-01  发布在  Spark
关注(0)|答案(3)|浏览(223)

我有一个 Dataframe (在Pyspark中),其中一个行值作为字典:

df.show()

它看起来像:

+----+---+-----------------------------+
|name|age|info                         |
+----+---+-----------------------------+
|rob |26 |{color: red, car: volkswagen}|
|evan|25 |{color: blue, car: mazda}    |
+----+---+-----------------------------+

根据评论给出更多:

df.printSchema()

类型为字符串

root
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- dict: string (nullable = true)

是否可以从字典中获取键(颜色和汽车),并将它们作为 Dataframe 中的列,然后将值作为这些列的行?
预期结果:

+----+---+-----------------------------+
|name|age|color |car                   |
+----+---+-----------------------------+
|rob |26 |red   |volkswagen            |
|evan|25 |blue  |mazda                 |
+----+---+-----------------------------+

我不知道我必须使用df. withColumn(),并以某种方式迭代字典来选择每一个,然后从中创建一列?到目前为止,我试图找到一些答案,但大多数人使用的是Pandas,而不是Spark,所以我不确定我是否可以应用同样的逻辑。

zz2j4svz

zz2j4svz1#

您的字符串:

"{color: red, car: volkswagen}"
"{color: blue, car: mazda}"

不是python友好的格式。它们不能用json.loads解析,也不能用ast.literal_eval计算。
但是,如果您事先知道密钥,并且可以假设字符串始终采用这种格式,则应该能够使用pyspark.sql.functions.regexp_extract
例如:

from pyspark.sql.functions import regexp_extract

df.withColumn("color", regexp_extract("info", "(?<=color: )\w+(?=(,|}))", 0))\
    .withColumn("car", regexp_extract("info", "(?<=car: )\w+(?=(,|}))", 0))\
    .show(truncate=False)
#+----+---+-----------------------------+-----+----------+
#|name|age|info                         |color|car       |
#+----+---+-----------------------------+-----+----------+
#|rob |26 |{color: red, car: volkswagen}|red  |volkswagen|
#|evan|25 |{color: blue, car: mazda}    |blue |mazda     |
#+----+---+-----------------------------+-----+----------+

模式为:

  • (?<=color: ):文字字符串"color: "的正向后查找
  • \w+:一个或多个单词字符
  • (?=(,|})):对原义逗号或右大括号的正向前查找。

下面介绍如何将此方法推广到两个以上的键,并处理字符串中不存在键的情况。

from pyspark.sql.functions import regexp_extract, when, col
from functools import reduce

keys = ["color", "car", "year"]
pat = "(?<=%s: )\w+(?=(,|}))"

df = reduce(
    lambda df, c: df.withColumn(
        c,
        when(
            col("info").rlike(pat%c),
            regexp_extract("info", pat%c, 0)
        )
    ),
    keys,
    df
)

df.drop("info").show(truncate=False)
#+----+---+-----+----------+----+
#|name|age|color|car       |year|
#+----+---+-----+----------+----+
#|rob |26 |red  |volkswagen|null|
#|evan|25 |blue |mazda     |null|
#+----+---+-----+----------+----+

在本例中,我们使用pyspark.sql.functions.whenpyspark.sql.Column.rlike来测试字符串是否包含模式,然后再尝试提取匹配项。
如果您事先不知道这些键,则必须编写自己的解析器或尝试修改上游数据。

f0brbegy

f0brbegy2#

从printSchema函数中可以看出,Spark将字典理解为字符串,分割字符串并创建新列的函数是split(),因此这个问题的简单解决方案是。

  • 创建能够执行以下操作的UDF:
  • 将字典字符串转换为逗号分隔的字符串(从字典中删除键,但保持值的顺序)
  • 应用拆分,并根据字典的新格式创建两个新列

代码:

@udf()
def transform_dict(dict_str):
    str_of_dict_values = dict_str.\
        replace("}", "").\
        replace("{", ""). \
        replace("color:", ""). \
        replace(" car: ", ""). \
        strip()
    # output example: 'red,volkswagen'
    return str_of_dict_values

# Create new column with our UDF with the dict values converted to str
df = df.withColumn('info_clean', clean("info"))
# Split these values and store in a tmp variable 
split_col = split(df['info_clean'], ',')

# Create new columns with the split values
df = df.withColumn('color', split_col.getItem(0))
df = df.withColumn('car', split_col.getItem(1))

只有当我们假设字典元素总是以相同的顺序出现,并且键是固定的,这个解决方案才是正确的。对于其他更复杂的情况,我们可以在UDF函数中创建一个字典,并通过显式调用每个字典键来形成值列表的字符串,这样我们就可以确保输出链中的顺序得到维护。

p8h8hvxi

p8h8hvxi3#

我觉得最具伸缩性的解决方案是下面这个,使用通过lambda函数传递的通用键:

from pyspark.sql.functions import explode,map_keys,col

keysDF = df.select(explode(map_keys(df.info))).distinct()
keysList = keysDF.rdd.map(lambda x:x[0]).collect()
keyCols = list(map(lambda x: col("info").getItem(x).alias(str(x)), keysList))
df.select(df.name, df.age, *keyCols).show()

相关问题