使用posexplode分解带有索引的嵌套json

dzhpxtsq  于 2021-05-27  发布在  Spark
关注(0)|答案(4)|浏览(847)

我使用下面的函数来分解一个深度嵌套的json(具有嵌套的结构和数组)。

  1. # Flatten nested df
  2. def flatten_df(nested_df):
  3. for col in nested_df.columns:
  4. array_cols = [c[0] for c in nested_df.dtypes if c[1][:5] == 'array']
  5. for col in array_cols:
  6. nested_df =nested_df.withColumn(col, F.explode_outer(nested_df[col]))
  7. nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']
  8. if len(nested_cols) == 0:
  9. return nested_df
  10. flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
  11. flat_df = nested_df.select(flat_cols +
  12. [F.col(nc+'.'+c).alias(nc+'_'+c)
  13. for nc in nested_cols
  14. for c in nested_df.select(nc+'.*').columns])
  15. return flatten_df(flat_df)

我成功地爆炸了。但是我还想在分解的Dataframe中添加元素的顺序或索引。所以在上面的代码中我替换 explode_outer 函数到 posexplode_outer . 但我得到下面的错误

  1. An error was encountered:
  2. 'The number of aliases supplied in the AS clause does not match the number of columns output by the UDTF expected 2 aliases'

我试着换衣服 nested_df.withColumnnested_df.select 但我没有成功。有谁能帮我分解嵌套的json,但同时保持数组元素在分解的dataframe中作为列的顺序。

uqjltbpv

uqjltbpv1#

以dataframe格式读取json数据并创建视图或表。在sparksql中,可以使用使用别名引用的number-of-laterviewexplode方法。如果json数据结构是struct类型,那么可以使用点来表示结构。级别1.2

nhaq1z21

nhaq1z212#

错误是因为posexplode\u outer返回两列pos和col,所以不能与column()一起使用。这可以在选择中使用,如下面的代码所示

  1. from pyspark.sql import functions as F
  2. from pyspark.sql.window import Window
  3. tst= sqlContext.createDataFrame([(1,7,80),(1,8,40),(1,5,100),(5,8,90),(7,6,50),(0,3,60)],schema=['col1','col2','col3'])
  4. tst_new = tst.withColumn("arr",F.array(tst.columns))
  5. expr = tst.columns
  6. expr.append(F.posexplode_outer('arr'))
  7. # %%
  8. tst_explode = tst_new.select(*expr)

结果:

  1. tst_explode.show()
  2. +----+----+----+---+---+
  3. |col1|col2|col3|pos|col|
  4. +----+----+----+---+---+
  5. | 1| 7| 80| 0| 1|
  6. | 1| 7| 80| 1| 7|
  7. | 1| 7| 80| 2| 80|
  8. | 1| 8| 40| 0| 1|
  9. | 1| 8| 40| 1| 8|
  10. | 1| 8| 40| 2| 40|
  11. | 1| 5| 100| 0| 1|
  12. | 1| 5| 100| 1| 5|
  13. | 1| 5| 100| 2|100|
  14. | 5| 8| 90| 0| 5|
  15. | 5| 8| 90| 1| 8|
  16. | 5| 8| 90| 2| 90|
  17. | 7| 6| 50| 0| 7|
  18. | 7| 6| 50| 1| 6|
  19. | 7| 6| 50| 2| 50|
  20. | 0| 3| 60| 0| 0|
  21. | 0| 3| 60| 1| 3|
  22. | 0| 3| 60| 2| 60|
  23. +----+----+----+---+---+

如果需要重命名列,可以使用.withcolumnrenamed()函数

  1. df_final=(tst_explode.withColumnRenamed('pos','position')).withColumnRenamed('col','column')
展开查看全部
wpcxdonn

wpcxdonn3#

替换 nested_df =nested_df.withColumn(col, F.explode_outer(nested_df[col]))nested_df = df.selectExpr("*", f"posexplode({col}) as (position,col)").drop(col) 您可能需要编写一些逻辑来将列名替换为original,但应该很简单

ddhy6vgd

ddhy6vgd4#

您可以尝试使用列表理解选择来分解现有代码中的arraytype列:

  1. for col in array_cols:
  2. nested_df = nested_df.select([ F.posexplode_outer(col).alias(col+'_pos', col) if c == col else c for c in nested_df.columns ])

例子:

  1. from pyspark.sql import functions as F
  2. df = spark.createDataFrame([(1,"n1", ["a", "b", "c"]),(2,"n2", ["foo", "bar"])],["id", "name", "vals"])
  3. # +---+----+----------+
  4. # | id|name| vals|
  5. # +---+----+----------+
  6. # | 1| n1| [a, b, c]|
  7. # | 2| n2|[foo, bar]|
  8. # +---+----+----------+
  9. col = "vals"
  10. df.select([F.posexplode_outer(col).alias(col+'_pos', col) if c == col else c for c in df.columns]).show()
  11. # +---+----+--------+----+
  12. # | id|name|vals_pos|vals|
  13. # +---+----+--------+----+
  14. # | 1| n1| 0| a|
  15. # | 1| n1| 1| b|
  16. # | 1| n1| 2| c|
  17. # | 2| n2| 0| foo|
  18. # | 2| n2| 1| bar|
  19. # +---+----+--------+----+
展开查看全部

相关问题