pyspark对象框架添加一个列,如果它不存在

tct7dpnv  于 2023-10-15  发布在  Spark
关注(0)|答案(4)|浏览(118)

我在不同的json文件中有json数据,并且键可以在行中不同,例如

{"a":1 , "b":"abc", "c":"abc2", "d":"abc3"}
{"a":1 , "b":"abc2", "d":"abc"}
{"a":1 ,"b":"abc", "c":"abc2", "d":"abc3"}

我想聚合列'b','c','d '和'f'上的数据,这些数据不存在于给定的json文件中,但可能存在于其他文件中。因此,由于列'f'不存在,我们可以为该列取空字符串。
我正在阅读输入文件,并像下面这样聚合数据

import pyspark.sql.functions as f
df =  spark.read.json(inputfile)
df2 =df.groupby("b","c","d","f").agg(f.sum(df["a"]))

这是我想要的最终输出

{"a":2 , "b":"abc", "c":"abc2", "d":"abc3","f":"" }
{"a":1 , "b":"abc2", "c":"" ,"d":"abc","f":""}

谁能帮帮忙?提前感谢!

syqv5f0l

syqv5f0l1#

您可以检查列是否在Xtrame中可用,并仅在必要时修改df

if 'f' not in df.columns:
   df = df.withColumn('f', f.lit(''))

对于嵌套模式,您可能需要像下面这样使用df.schema

>>> df.printSchema()
root
 |-- a: struct (nullable = true)
 |    |-- b: long (nullable = true)

>>> 'b' in df.schema['a'].dataType.names
True
>>> 'x' in df.schema['a'].dataType.names
False
daolsyd0

daolsyd02#

如果有人在Scala中需要这个:

if (!df.columns.contains("f")) {
  val newDf = df.withColumn("f", lit(""))
}
r8uurelv

r8uurelv3#

这个功能对我来说。

def detect_data(column, df, data_type):
          if not column in df.columns:
            ret = lit(None).cast(data_type)
          else:
            ret = col(column).cast(data_type)
            
          return ret

    df = df.withColumn('f', detect_data('f', df, StringType()))
5lhxktic

5lhxktic4#

下面是一个可以在df.transform(f)中使用的spark函数:

def addMissingColumn(
      colName: String,
      defaultColumn: Column = lit(null).cast(StringType)
    ): DataFrame => DataFrame = { df =>
      val noInfoPresent = !df.columns.toSeq.contains(colName)
      val dfUpdated = if (noInfoPresent) {
        df.withColumn(colName, defaultColumn)
      } else { df }

      dfUpdated

    }

相关问题