pypsark:如何有条件地将函数应用于spark dataframe列并填充空值

xmjla07d  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(498)

我有一个特别有一列的sparkDataframe location_string 我只想把它分解成3列,叫做 country , region ,和 city . 然后我想把这些和已经存在的结合起来 country , region , city 列以确保填充空值。或者换句话说,我想把我的函数应用到 city , region ,或 country 为null,尝试使用 location_string .
示例数据集:

+--------------------+-----------------+------+-------+
|     location_string|             city|region|country|
+--------------------+-----------------+------+-------+
|Jonesboro, AR, US...|             NULL|    AR|   NULL|
|Lake Village, AR,...|     Lake Village|    AR|    USA|
|Little Rock, AR, ...|      Little Rock|    AR|    USA|
|Little Rock, AR, ...|      Little Rock|    AR|    USA|
|Malvern, AR, US, ...|          Malvern|  NULL|    USA|
|Malvern, AR, US, ...|          Malvern|    AR|    USA|
|Morrilton, AR, US...|        Morrilton|    AR|    USA|
|Morrilton, AR, US...|        Morrilton|    AR|    USA|
|N. Little Rock, A...|North Little Rock|    AR|    USA|
|N. Little Rock, A...|North Little Rock|    AR|    USA|
|Ozark, AR, US, 72949|            Ozark|    AR|    USA|
|Ozark, AR, US, 72949|            Ozark|    AR|    USA|
|Palestine, AR, US...|             NULL|    AR|    USA|
|Pine Bluff, AR, U...|       Pine Bluff|    AR|   NULL|
|Pine Bluff, AR, U...|       Pine Bluff|    AR|    USA|
|Prescott, AR, US,...|         Prescott|    AR|    USA|
|Prescott, AR, US,...|         Prescott|    AR|    USA|
|Searcy, AR, US, 7...|           Searcy|    AR|    USA|
|Searcy, AR, US, 7...|           Searcy|    AR|    USA|
|West Memphis, AR,...|     West Memphis|  NULL|    USA|
+--------------------+-----------------+------+-------+

分解位置字符串的函数示例:

def geocoder_decompose_location(location_string):
    if not location_string:
        return {'country': None, 'state': None, 'city': None}
    GOOGLE_GEOCODE_API_KEY = "<API KEY HERE>"
    result = geocoder.google(location_string, key=GOOGLE_GEOCODE_API_KEY)
    return {'country': result.country, 'state': result.state, 'city': result.city}
vtwuwzda

vtwuwzda1#

scala伪码
首先,我们需要从df中删除所有重复项(这将减少对google服务的api调用)。

import spark.implicits._
    case class Data(location_string:String,city: String,region: String,country: String)
    val cleaner = ((location_string: String) => {
      try{
        GOOGLE_GEOCODE_API_KEY = "<API KEY HERE>"
        val result = geocoder.google(location_string, key=GOOGLE_GEOCODE_API_KEY)
        Some(result)
      } catch {
        case error: Exception => println(error); None;
      }
    })
    output.as[Data].dropDuplicates("location_string").map(x => {
       val toCheck = (x.city ==null || x.country == null || x.region == null) // can also add blank check with StringUtils.isBlank
       if(toCheck){
         val result = cleaner(x.location_string)
         val city = if(nullcheck on result.city value) result.city else x.city
         val country = if(nullcheck on result.country value) result.country else x.country
         val region = if(nullcheck on result.state value) result.state else x.region
         Data(x.location_string , city,country,region)
       }else x
    })

我们还可以在删除重复项之前执行orderby(desc(“city”)、desc(“country”)、desc(“state”)),以便在存在重复项时(将删除具有空值的项)。

相关问题