Pypark在循环中重复

bmp9r5qi  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(429)

我在pyspark中有两个Dataframe,我检查dataframea中的数据,如果列为null,那么用dataframeb中的同一列替换null数据。
两个Dataframe都有唯一的id列,根据我加入的Dataframe和下面的代码工作正常。

updated_data = TABLE_BY_updated_date_unique.select('name_id_forwarded','name_id','name_id_org','first','last','passport','PHONE','EMAIL')
most_attributes_data = Most_attributes.select('name_id_forwarded','name_id','name_id_org','first','last','passport','PHONE','EMAIL')

final_df = updated_data.alias('a').join(most_attributes_data.alias('b'), on=['name_id_forwarded'], how='left')\
    .select(
        'a.name_id_forwarded','a.name_id','a.name_id_org',
        f.when(f.isnull(f.col('a.first')),f.col('b.first')).otherwise(f.col('a.first')).alias('first'),      
  f.when(f.isnull(f.col('a.last')),f.col('b.last')).otherwise(f.col('a.last')).alias('last'),
  f.when(f.isnull(f.col('a.passport')),f.col('b.passport')).otherwise(f.col('a.passport')).alias('passport'),
  f.when(f.isnull(f.col('a.PHONE')),f.col('b.PHONE')).otherwise(f.col('a.PHONE')).alias('PHONE'),
  f.when(f.isnull(f.col('a.EMAIL')),f.col('b.EMAIL')).otherwise(f.col('a.EMAIL')).alias('EMAIL')
  )

我有40多列,我不想为每一列重复下面的代码。f、 when(f.isnull(f.col('a.email')),f.col('b.email'))。否则(f.col('a.email'))。别名('email')
你能帮我循环一下这个语法吗?这样我就可以不重复地读所有的列了*

vktxenjb

vktxenjb1#

定义列表

'''

col_list_1 = ['a.name_id','a.SUM','a.full_name','a.updated']

col_list_2 = ['first_name', 'last_name', 'email', 'phone_number']

colExpr = col_list_1 + list(map(lambda x: "nvl(a.{},b.{}) as {}".format(x,x,x),col_list_2))

Unique_With_AllCols = TABLE_BY_updated_date_unique.alias('a').\
                   join(Most_attributes.alias('b'), on=['name_id_forwarded'], 
                   how='left').selectExpr(*colExpr)

'''

izj3ouym

izj3ouym2#

使用 coalesce 函数动态生成表达式,然后将其与 .select . Example: ```
from pyspark.sql.types import *
from pyspark.sql.functions import *

df=spark.createDataFrame([(1,'a'),(2,None),(3,10000)],['id','name','salary'])
df.show()

+---+----+------+

| id|name|salary|

+---+----+------+

| 1| a| 10|

| 2|null| 100|

| 3| b| 10000|

+---+----+------+

df1=spark.createDataFrame([(1,'a',20),(2,'b',None),(3,None,100)],['id','name','salary'])

df1.show()

+---+----+------+

| id|name|salary|

+---+----+------+

| 1| a| 20|

| 2| b| null|

| 3|null| 100|

+---+----+------+

df.alias("df").join(df1.alias("df1"),['id'],'left').select('id',*expr).show()

expr=[i for i in df.columns if i=='id'] + [coalesce(f'df1.{i}',f'df.{i}').alias(f'{i}') for i in df.columns if i !='id']

['id', Column<b'coalesce(df1.name, df.name) AS name'>, Column<b'coalesce(df1.salary, df.salary) AS salary'>]

df.alias("df").
join(df1.alias("df1"),['id'],'left').
select(*expr).
show()

+---+----+------+

| id|name|salary|

+---+----+------+

| 1| a| 20|

| 3| b| 100|

| 2| b| 100|

+---+----+------+

``` UPDATE: 我们使用coalesce函数来替换第一个非空值。
在这种情况下,你必须 b 如果值为null,则替换dataframe值,否则 a 如果不为null,则返回值。
在coalesce中我们需要提到coalesce(b.first,a.first)
如果b.first值为空,则将使用a.first值。
如果不是b,则使用第一个值。
使用列表理解 [coalesce(f'df1.{i}',f'df.{i}').alias(f'{i}') for i in df.columns if i !='id'] 动态创建具有df1(b)、df(a)Dataframe的合并表达式 id 当我们加入这个专栏的时候。
然后添加 id 列添加到列表中 [i for i in df.columns if i=='id'] 我们现在使用 .select 我们在join之后执行上面步骤中准备的表达式 .select(*expr) .

相关问题