展平模式

xdnvmnnf  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(185)

如何在spark中展平某些特定列(而不是所有列)的模式。

def flattenDataframe(df: DataFrame): DataFrame = {

        val fields = df.schema.fields
        val fieldNames = fields.map(x => x.name)
        val length = fields.length

        for(i <- 0 to fields.length-1){
          val field = fields(i)
          val fieldtype = field.dataType
          val fieldName = field.name
          fieldtype match {
            case arrayType: ArrayType =>
              val fieldNamesExcludingArray = fieldNames.filter(_!=fieldName)
              val fieldNamesAndExplode = fieldNamesExcludingArray ++ Array(s"explode_outer($fieldName) as $fieldName")
             // val fieldNamesToSelect = (fieldNamesExcludingArray ++ Array(s"$fieldName.*"))
              val explodedDf = df.selectExpr(fieldNamesAndExplode:_*)
              return flattenDataframe(explodedDf)
            case structType: StructType =>
              val childFieldnames = structType.fieldNames.map(childname => fieldName +"."+childname)
              val newfieldNames = fieldNames.filter(_!= fieldName) ++ childFieldnames
              val renamedcols = newfieldNames.map(x => (col(x.toString()).as(x.toString().replace(".", "_").replace("$","").replace(" ","_").replace("-", "_"))))
             val explodedf = df.select(renamedcols:_*)
              return flattenDataframe(explodedf)
            case _ =>
          }
        }
        df
      }

我使用上述代码,但它是平坦的所有列。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题