使用所需的键和值更新spark dataframe的map类型列

dpiehjr4  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(457)

我有一个下面的sparkDataframe,其中所有的列(除了主键列empïid)都由一个Map组成(其中键“from”和“to”可以有空值)。我想计算每一列的'from'和'to'(emp\u id除外),并向Map(名为'change')添加一个新键,该Map的值为a)'insert'if'from'value为null,to'不为null b)'delete'if'to'value为null,from'不为null b)'update'if'from'和'to'不为null&'from'value不同于'to'value
注意:具有空值的列将保持不变。
重要提示:这些列的类型不是map[string,string],而是map[string,any],这意味着该值可以是其他struct对象的值
我们如何在scala中实现这一点。

|emp_id|emp_city             |emp_name                    |emp_phone            |emp_sal                    |emp_site                          |

|1     |null                 |[from -> Will, to -> Watson]|null                 |[from -> 1000, to -> 8000]|[from ->, to -> Seattle]          |
|3     |null                 |[from -> Norman, to -> Nate]|null                 |[from -> 1000, to -> 8000]|[from -> CherryHill, to -> Newark]|
|4     |[from ->, to -> Iowa]|[from ->, to -> Ian]        |[from ->, to -> 1004]|[from ->, to -> 8000]     |[from ->, to -> Des Moines]       |

预期:

|emp_id|emp_city             |emp_name                    |emp_phone            |emp_sal                    |emp_site                          |

|1     |null                 |[from -> Will, to -> Watson, change -> update]|null                 |[from -> 1000, to -> 8000, change -> update]|[from ->, to -> Seattle, change -> insert]          |
|3     |null                 |[from -> Norman, to -> Nate, change -> update]|null                 |[from -> 1000, to -> 8000, change -> update]|[from -> CherryHill, to -> Newark, change -> update]|
|4     |[from ->, to -> Iowa, change -> insert]|[from ->, to -> Ian, change -> insert]        |[from ->, to -> 1004, change -> insert]|[from ->, to -> 8000, change -> insert]     |[from ->, to -> Des Moines, change -> insert]       |
dojqjjoe

dojqjjoe1#

您可以通过下面的行Map器函数来实现这一点,请在线查找代码解释

import org.apache.spark.sql.Row
object MapUpdater {

  def main(args: Array[String]): Unit = {

    val spark = Constant.getSparkSess

    import spark.implicits._

    //Load your data
    val df = List(
      (1,null,Map("from" ->"Will","to"-> "Watson"),null,Map("from" ->"1000","to"-> "8000"),Map("from" ->null,"to"-> "Seattle")),
      (2,null,Map("from" ->"Norman","to"-> "Nate"),null,Map("from" ->"1000","to"-> "8000"),Map("from" ->"CherryHill","to"-> "Newark")),
      (3,Map("from" ->null,"to"-> "Iowa"),Map("from" ->null,"to"-> "Ian"),Map("from" ->null,"to"-> "1004"),Map("from" ->"1000","to"-> "8000"),Map("from" ->null,"to"-> "Des Moines"))
    ).toDF("emp_id","emp_city","emp_name","emp_phone","emp_sal","emp_site")

    //Map each of your row
    df.map(row => {

      val new_emp_city = mapUpdater(row,1)
      val new_emp_name = mapUpdater(row,2)
      val new_emp_phone = mapUpdater(row,3)
      val new_emp_sal = mapUpdater(row,4)
      val new_emp_site = mapUpdater(row,5)

      (row.getInt(0),new_emp_city,new_emp_name,new_emp_phone,new_emp_sal,new_emp_site)

    }).toDF("emp_id","emp_city","emp_name","emp_phone","emp_sal","emp_site")
      .show(false)

  }

  //Row mapper function
  private def mapUpdater(row: Row,colId:Int): Map[String, String] = {
    val old_map = row.getAs[Map[String, String]](colId)

    val new_map: Map[String, String] = if (null != old_map) {
      if (null == old_map.getOrElse("from", null) && null != old_map.getOrElse("to", null)) {
        old_map + ("change" -> "Insert")
      } else if (null != old_map.getOrElse("from", null) && null == old_map.getOrElse("to", null)) {
        old_map + ("change" -> "Delete")
      } else old_map

    } else old_map
    (new_map)
  }
}

相关问题