Pyspark Dataframe使用country-converter将国家名称转换为ISO代码

sxpgvts3  于 2023-03-30  发布在  Apache
关注(0)|答案(3)|浏览(162)

我有一个spark数据框,位置列包含国家名称。我需要将这些名称转换为ISO 3代码。我知道有一个python库的国家转换器,但我不知道如何应用它,所以它只转换我的数据框中一列的值。
Dataframe 示例:
| 用户名|地点|
| --------------|--------------|
| 亚当|美国|
| 阿纳斯塔西亚|联合 Realm |
我可以将国家名称从列转换为代码,方法是将它们转换为RDD,然后再次转换为DF:

import country_converter as coco

out_format = "ISO3"

countries = df.select("Location").rdd.collect()
countries = coco.convert(names=countries, to=out_format, not_found=None)
countriesDF = spark.createDataFrame(countries, StringType())

输出:
| 价值|
| --------------|
| 美国|
| GBR|
但是,这段代码有两个问题:
1.因此,我创建了完全不同的dataframe,并丢失了有关UserName的信息。我需要输出如下所示:
预期产出:
| 用户名|地点|
| --------------|--------------|
| 亚当|美国|
| 阿纳斯塔西亚|GBR|
1.一些结果是这样的:Row(Countries ='London,UK'),我如何才能摆脱这个?我使用下面的代码,但我想知道是否有更快的方法比手动为每一行:

countriesDF.replace({"Row(Countries='London, UK')" : "GBR"})
6rqinv9w

6rqinv9w1#

更新:
如果数据量很大,可以使用字典来Map所有的值。

import country_converter as coco
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, lit, collect_set, create_map
from itertools import chain

data = [['United States'],['United Kingdom'],['Not a country']]*200000
df = spark.createDataFrame(data,['countries'])

#Create a {country:contry formatted} dictionary by only using unique values.
unique_countries = df.select("countries").distinct().rdd.flatMap(lambda x: x).collect() 
unique_countries_formatted = coco.convert(unique_countries,to='ISO3', not_found=None)
uc_dict = dict(zip(unique_countries,unique_countries_formatted))

#create a map to apply on df
mapping_expr = create_map([lit(x) for x in chain(*uc_dict.items())])

#apply the df
df = df.withColumn('countries_formatted', mapping_expr[df.countries])

df.show()

命令耗时8.85秒。20万条记录在8.5秒内完成
您应该使用UDF来执行此操作。

import country_converter as coco
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, lit

df = spark.createDataFrame([['United States'],
                            ['United Kingdom'],
                            ['Not a country'],
                            [None]],['countries'])

def country_converter(country): # define function here
  if country:
    return coco.convert(country, to='ISO3', not_found=None)
  return None

cc_udf = udf(country_converter, StringType()) #register udf
df = df.withColumn("countries_formatted",cc_udf(df.countries))
df.show()

输出:

+--------------+-------------------+
|     countries|countries_formatted|
+--------------+-------------------+
| United States|                USA|
|United Kingdom|                GBR|
| Not a country|      Not a country|
|          null|               null|
+--------------+-------------------+
rjjhvcjd

rjjhvcjd2#

对于大数据,我发现它似乎有效:

def get_UNregion( iso2):
    global cc_all
    if 'cc_all' not in globals():
        cc_all = coco.CountryConverter(include_obsolete=True)
    return cc_all.convert(names=iso2, to='UNregion')
oxiaedzo

oxiaedzo3#

请注意,country_converter仅支持英语国家名称。如果您需要支持多种语言的解决方案,请考虑探索countrywrangler,它提供34种语言的支持,并为提高速度效率而开发。
下面是一个简单的例子:

import countrywrangler as cw

alpha2 = cw.Normalize.name_to_alpha2("Germany")
print(alpha2)

>>> DE

CountryWrangler包括一个模糊搜索,能够检测几乎所有国家,无论格式风格或拼写错误的变化。它比正常功能慢100倍,但与其他库相比仍然很快。

import countrywrangler as cw

alpha2 = cw.Normalize.name_to_alpha2("Germany Federal Republic of", use_fuzzy=True)
print(alpha2)

>>> DE

完整的文档可以在这里找到:https://countrywrangler.readthedocs.io/en/latest/normalize/country_name/

披露:作为CountryWrangler的作者,我想澄清一下,这个答案并不是为了阻止使用country_converter,而是为了为某些用例提供一种替代解决方案。

相关问题