FOR LOOP WITH CASE语句

cbwuti44  于 2022-10-07  发布在  Spark
关注(0)|答案(2)|浏览(169)

我正在尝试找到一种方法来运行for loop,以便更好地优化我的case语句的脚本。

下面显示的脚本没有错误,但是我觉得这太冗长了,这可能会在下一次维护时造成混乱。

df = df.withColumn('Product', when(df.where('input_file_name LIKE "%CAD%"'), 'Cash and DUE').
                   when(df.where('input_file_name LIKE "%TP%"'), 'Trade Product').
                   when(df.where('input_file_name LIKE "%LNS%"'), 'Corp Loans').
                   when(df.where('input_file_name LIKE "%DBT%"'), 'Debt').
                   when(df.where('input_file_name LIKE "%CRD%"'), 'Retail Cards').
                   when(df.where('input_file_name LIKE "%MTG%"'), 'Mortage').
                   when(df.where('input_file_name LIKE "%OD%"'), 'Overdraft').
                   when(df.where('input_file_name LIKE "%PLN%"'), 'Retail Personal Loan').
                   when(df.where('input_file_name LIKE "%CLN%"'), 'CLN').
                   when(df.where('input_file_name LIKE "%CAT%"'), 'Custody and Trust').
                   when(df.where('input_file_name LIKE "%DEP%"'), 'Deposits').
                   when(df.where('input_file_name LIKE "%STZ%"'), 'Securitization').
                   when(df.where('input_file_name LIKE "%SECZ%"'), 'Security Securitization').
                   when(df.where('input_file_name LIKE "%SEC%"'), 'Securities').
                   when(df.where('input_file_name LIKE "%MTSZ%"'), 'Retail Mortage Securitization').
                   when(df.where('input_file_name LIKE "%PLSZ%"'), 'Retail Personal Loan Securitization').
                   when(df.where('input_file_name LIKE "%CCSZ%"'), 'Retail Cards Securitization').
                   when(df.where('input_file_name LIKE "%CMN%"'), 'Cash Management').
                   when(df.where('input_file_name LIKE "%OTC%"'), 'Over-the-counter').
                   when(df.where('input_file_name LIKE "%SFT%"'), 'Securities Financing Transactions').
                   when(df.where('input_file_name LIKE "%ETD%"'), 'Excahnge Traded Deriative').
                   when(df.where('input_file_name LIKE "%DEF%"'), 'Default Products').
                   when(df.where('input_file_name LIKE "%FFS%"'), 'Not Required').
                   when(df.where('input_file_name LIKE "%hdfs%"'), 'Not Required').
                   otherwise('feed_name'));

我想过运行一个循环,下面给出了一个例子(脚本不正确,仅用于演示目的)

product_code = ['%CAD%','%TP%','%LNS%','%DBT%','%CRD%','%MTG%','%OD%','%PLN%','%CLN%','%CAT%','%DEP%','%STZ%','%SECZ%','%SEC%','%MTSZ%','%PLSZ%','%CCSZ%','%CMN%','%OTC%','%SFT%','%ETD%','%DEF%','%FFS%','%hdfs%']
product_name = ['Cash and Due','Trade Product','Corp Loans','Debt','Retail Cards','Mortage','Overdraft','Retail Personal Loan','CLN','Custody and Trust','Deposits','Securitization','Securities Securitization','Securities','Retail Mortage Securitization','Retail Personal Loan Securitization','Retail Cards Securitization','Cash Management','Over-the-counter','Securities Finanacing Transactions','Exchange Traded Derivative','Default Products','Not Required','Not Required']

## Both product_code & product name have the same number of index

lastIndex = len(product_code)    
    for x in product_code:
       # Logic i thought df.withColumn('Product', when(df.where('input_file_name LIKE "%'product_code[x]'%"'), product_name[x])
       if(product_code[lastIndex]): 
      #otherwise('feed_name')

我需要一些建议,如果在Spark中为when(df.where()).otherwise运行Case语句的循环是可能的,或者有其他方法或用例可以参考

更新

我已经实现了前面提到的方法,Query在条件集上返回正确,但我想知道为什么它不返回下面脚本中已知的Lit()的正确值,而是删除不满足条件的行

Sample DF:
product_code = ['%CMN%','%TP%','%LNS%']
product_name = ['Cash and Due','Trade Product']
feed_name = ['farid','arshad','jimmy']   

df = spark.createDataFrame(
     list(zip(inp_file,feed_name)),
     ['input_file_name','feed_name']
)

+---------------+---------+
|input_file_name|feed_name|
+---------------+---------+
|sdasdasdasd    |bob      |
|_CMN_BD        |arshad   |
|_CMN_BD_WS     |jimmy    |
+---------------+---------+

product_code = ['%CAD_%','%TP%','%LNS%','%DBT%','%CRD%','%MTG%','%_OD_%','%PLN%','%CLN%','%CAT%','%DEP%','%STZ%','%SECZ%','%SEC%','%MTSZ%','%PLSZ%','%CCSZ%','%CMN%','%OTC%','%SFT%','%ETD%','%DEF%','%FFS%','%hdfs%']
product_name = ['Cash and Due','Trade Product','Corp Loans','Debt','Retail Cards','Mortage','Overdraft','Retail Personal Loan','CLN','Custody and Trust','Deposits','Securitization','Securities Securitization','Securities','Retail Mortage Securitization','Retail Personal Loan Securitization','Retail Cards Securitization','Cash Management','Over-the-counter','Securities Finanacing Transactions','Exchange Traded Derivative','Default Products','Not Required','Not Required']

## -- Create spark dataframe and with list tuple

## -- Lit is used to add new column

product_ref_df = spark.createDataFrame(
     list(zip(product_code, product_name)),
     ["product_code", "product_name"]
)

def tempDF(df,targetField,columnTitle,condition,targetResult,show=False):
    product_ref_df = spark.createDataFrame(
         list(zip(condition,targetResult)),
         ["condition", "target_result"]
    )

    df.join(broadcast(product_ref_df), expr(""+targetField+" like condition")) 
    .withColumn(columnTitle, coalesce(col("target_result"), lit("feed_name"))) 
    .drop('condition','target_result') 
    .show()

    return df

product_ref_df = tempDF(df,'input_file_name','Product',product_code,product_name)

当脚本被触发时,没有错误,返回结果如图所示。

+---------------+---------+------------+
|input_file_name|feed_name|     Product|
+---------------+---------+------------+
|        _CMN_BD|   arshad|Cash and Due|
|     _CMN_BD_WS|    jimmy|Cash and Due|
+---------------+---------+------------+

因为我们没有删除任何行,所以结果不应该返回第一行吗,

+---------------+---------+------------+
|input_file_name|feed_name|     Product|
+---------------+---------+------------+
|    sdasdasdasd|      bob|bob         |
|        _CMN_BD|    jimmy|Cash and Due|
|     _CMN_BD_WS|    jimmy|Cash and Due|
+---------------+---------+------------+
+---------------+---------+------------+
lhcgjxsq

lhcgjxsq1#

您可以使用coalesce将所有when语句合并为一个语句。coalesce将选取第一个非空列,而when仅在条件匹配时才会给出非空值;否则它会给出空值(没有otherwise条件)。

product_code = ['%CAD%','%TP%','%LNS%','%DBT%','%CRD%','%MTG%','%OD%','%PLN%','%CLN%','%CAT%','%DEP%','%STZ%','%SECZ%','%SEC%','%MTSZ%','%PLSZ%','%CCSZ%','%CMN%','%OTC%','%SFT%','%ETD%','%DEF%','%FFS%','%hdfs%']
product_name = ['Cash and Due','Trade Product','Corp Loans','Debt','Retail Cards','Mortage','Overdraft','Retail Personal Loan','CLN','Custody and Trust','Deposits','Securitization','Securities Securitization','Securities','Retail Mortage Securitization','Retail Personal Loan Securitization','Retail Cards Securitization','Cash Management','Over-the-counter','Securities Finanacing Transactions','Exchange Traded Derivative','Default Products','Not Required','Not Required']

import pyspark.sql.functions as F

df2 = df.withColumn(
    'Product',
    F.coalesce(
        *[F.when(F.col('input_file_name').like(code), F.lit(name))
        for (code, name) in zip(product_code, product_name)]
    )
)
idv4meu8

idv4meu82#

您可以从这些引用的产品名称创建一个新的DataFrame,并与原始的DF连接以获得产品名称:

from pyspark.sql.functions import expr, col, broadcast, coalesce

product_ref_df = spark.createDataFrame(
     list(zip(product_code, product_name)),
     ["product_code", "product_name"]
)

df.join(broadcast(product_ref_df), expr("input_file_name like product_code"), "left") 
  .withColumn("Product", coalesce(col("product_name"), col("Feed_name"))) 
  .drop("product_code", "product_name") 
  .show()

或者使用functools.reduce链接case/when条件,如下所示:

import functools

from pyspark.sql.functions import lit, col, when

case_conditions = list(zip(product_code, product_name))

product_col = functools.reduce(
    lambda acc, x: acc.when(col(f"input_file_name").like(x[1]), lit(x[1])),
    case_conditions[1:],
    when(col("input_file_name").like(case_conditions[0][0]), lit(case_conditions[0][1]))
).otherwise(col("Feed_name"))

df.withColumn("Product", product_col).show()

相关问题