spark:基于模式填充空值的干净方法

qlfbtfca  于 2021-05-18  发布在  Spark
关注(0)|答案(2)|浏览(523)

我的avro文件格式如下:

|Some col|Some other col|          body         |
|--------|--------------|-----------------------|
|some val|   some val   |   some json string    |
|  ...   |     ...      |         ...           |

我用英语读它们 spark.read.format("avro").load(file_path) 我想从这个json字符串中选择所有的值,但是这个json被修剪了,所以只给出值不是none的列。
我有一个模式,它包含这些json文件中所有可能的值(都可以为null)。
问:有没有一种干净的方法来选择json字符串中的所有列+模式中不在json字符串中的所有列(插入值为none)?

bzzcjhmw

bzzcjhmw1#

In [1]: from pyspark.sql.types import StructField, StructType, StringType
   ...: from pyspark.sql.functions import col, from_json

In [2]: schema = StructType([
   ...:     StructField("a", StringType()),
   ...:     StructField("b", StringType()),
   ...:     StructField("c", StringType()),
   ...:     StructField("d", StringType()),
   ...: ])

In [3]: df = spark.createDataFrame([("1", '{"a": 1, "b": 2}'),
   ...:                             ("2", '{"a": 3, "c": 4}')],
   ...:                            schema=["Some col", "body"])

In [4]: df.show()
+--------+----------------+
|Some col|            body|
+--------+----------------+
|       1|{"a": 1, "b": 2}|
|       2|{"a": 3, "c": 4}|
+--------+----------------+

In [5]: df.select(from_json(col("body"), schema).alias("data")).select("data.*").show()
+---+----+----+----+
|  a|   b|   c|   d|
+---+----+----+----+
|  1|   2|null|null|
|  3|null|   4|null|
+---+----+----+----+
5lhxktic

5lhxktic2#

不一定能完全理解这个问题,但这似乎是一个问题 df 已经接通了 spark.read.format("avro").load(file_path) . 其中一列 dfbody 那一栏是一系列词典。一种可能的解决方案是:

import pandas as pd
import json

entry_1 = {'a': 1, 'b': 1}
entry_2 = {'b': 2, 'c': 2}
entry_3 = {'c': 3, 'd': 3}

df = pd.DataFrame({'body': [entry_1, entry_2, entry_3]})  # 1
series_of_dict_to_df = pd.DataFrame(df.body.to_dict()).T  # 2
df = pd.concat([df, series_of_dict_to_df], axis=1) 

# string version

entry_1 = "{\"a\": 1, \"b\": 1}"
entry_2 = "{\"b\": 2, \"c\": 2}"
entry_3 = "{\"c\": 3, \"d\": 3}"

df_with_strings = pd.DataFrame({'body': [entry_1, entry_2, entry_3]})  
df_with_strings.body = df_with_strings.body.map(json.loads)  # 1
series_of_dict_to_df = pd.DataFrame(df_with_strings.body.to_dict()).T  #2
df_with_strings = pd.concat([df_with_strings, series_of_dict_to_df], axis=1)

两个Dataframe版本的输出为:

body    a    b    c    d
0  {'a': 1, 'b': 1}  1.0  1.0  NaN  NaN
1  {'b': 2, 'c': 2}  NaN  2.0  2.0  NaN
2  {'c': 3, 'd': 3}  NaN  NaN  3.0  3.0

缺少的列数据将自动用nan填充(注意标有 #1 以及 #2 在代码中是那些可能与解决问题相关的代码。剩下的代码用于演示。

相关问题