我正在尝试找到一种方法来运行for loop,以便更好地优化我的case语句的脚本。
下面显示的脚本没有错误,但是我觉得这太冗长了,这可能会在下一次维护时造成混乱。
df = df.withColumn('Product', when(df.where('input_file_name LIKE "%CAD%"'), 'Cash and DUE').
when(df.where('input_file_name LIKE "%TP%"'), 'Trade Product').
when(df.where('input_file_name LIKE "%LNS%"'), 'Corp Loans').
when(df.where('input_file_name LIKE "%DBT%"'), 'Debt').
when(df.where('input_file_name LIKE "%CRD%"'), 'Retail Cards').
when(df.where('input_file_name LIKE "%MTG%"'), 'Mortage').
when(df.where('input_file_name LIKE "%OD%"'), 'Overdraft').
when(df.where('input_file_name LIKE "%PLN%"'), 'Retail Personal Loan').
when(df.where('input_file_name LIKE "%CLN%"'), 'CLN').
when(df.where('input_file_name LIKE "%CAT%"'), 'Custody and Trust').
when(df.where('input_file_name LIKE "%DEP%"'), 'Deposits').
when(df.where('input_file_name LIKE "%STZ%"'), 'Securitization').
when(df.where('input_file_name LIKE "%SECZ%"'), 'Security Securitization').
when(df.where('input_file_name LIKE "%SEC%"'), 'Securities').
when(df.where('input_file_name LIKE "%MTSZ%"'), 'Retail Mortage Securitization').
when(df.where('input_file_name LIKE "%PLSZ%"'), 'Retail Personal Loan Securitization').
when(df.where('input_file_name LIKE "%CCSZ%"'), 'Retail Cards Securitization').
when(df.where('input_file_name LIKE "%CMN%"'), 'Cash Management').
when(df.where('input_file_name LIKE "%OTC%"'), 'Over-the-counter').
when(df.where('input_file_name LIKE "%SFT%"'), 'Securities Financing Transactions').
when(df.where('input_file_name LIKE "%ETD%"'), 'Excahnge Traded Deriative').
when(df.where('input_file_name LIKE "%DEF%"'), 'Default Products').
when(df.where('input_file_name LIKE "%FFS%"'), 'Not Required').
when(df.where('input_file_name LIKE "%hdfs%"'), 'Not Required').
otherwise('feed_name'));
我想过运行一个循环,下面给出了一个例子(脚本不正确,仅用于演示目的)
product_code = ['%CAD%','%TP%','%LNS%','%DBT%','%CRD%','%MTG%','%OD%','%PLN%','%CLN%','%CAT%','%DEP%','%STZ%','%SECZ%','%SEC%','%MTSZ%','%PLSZ%','%CCSZ%','%CMN%','%OTC%','%SFT%','%ETD%','%DEF%','%FFS%','%hdfs%']
product_name = ['Cash and Due','Trade Product','Corp Loans','Debt','Retail Cards','Mortage','Overdraft','Retail Personal Loan','CLN','Custody and Trust','Deposits','Securitization','Securities Securitization','Securities','Retail Mortage Securitization','Retail Personal Loan Securitization','Retail Cards Securitization','Cash Management','Over-the-counter','Securities Finanacing Transactions','Exchange Traded Derivative','Default Products','Not Required','Not Required']
## Both product_code & product name have the same number of index
lastIndex = len(product_code)
for x in product_code:
# Logic i thought df.withColumn('Product', when(df.where('input_file_name LIKE "%'product_code[x]'%"'), product_name[x])
if(product_code[lastIndex]):
#otherwise('feed_name')
我需要一些建议,如果在Spark中为when(df.where()).otherwise
运行Case语句的循环是可能的,或者有其他方法或用例可以参考
更新
我已经实现了前面提到的方法,Query在条件集上返回正确,但我想知道为什么它不返回下面脚本中已知的Lit()的正确值,而是删除不满足条件的行
Sample DF:
product_code = ['%CMN%','%TP%','%LNS%']
product_name = ['Cash and Due','Trade Product']
feed_name = ['farid','arshad','jimmy']
df = spark.createDataFrame(
list(zip(inp_file,feed_name)),
['input_file_name','feed_name']
)
+---------------+---------+
|input_file_name|feed_name|
+---------------+---------+
|sdasdasdasd |bob |
|_CMN_BD |arshad |
|_CMN_BD_WS |jimmy |
+---------------+---------+
product_code = ['%CAD_%','%TP%','%LNS%','%DBT%','%CRD%','%MTG%','%_OD_%','%PLN%','%CLN%','%CAT%','%DEP%','%STZ%','%SECZ%','%SEC%','%MTSZ%','%PLSZ%','%CCSZ%','%CMN%','%OTC%','%SFT%','%ETD%','%DEF%','%FFS%','%hdfs%']
product_name = ['Cash and Due','Trade Product','Corp Loans','Debt','Retail Cards','Mortage','Overdraft','Retail Personal Loan','CLN','Custody and Trust','Deposits','Securitization','Securities Securitization','Securities','Retail Mortage Securitization','Retail Personal Loan Securitization','Retail Cards Securitization','Cash Management','Over-the-counter','Securities Finanacing Transactions','Exchange Traded Derivative','Default Products','Not Required','Not Required']
## -- Create spark dataframe and with list tuple
## -- Lit is used to add new column
product_ref_df = spark.createDataFrame(
list(zip(product_code, product_name)),
["product_code", "product_name"]
)
def tempDF(df,targetField,columnTitle,condition,targetResult,show=False):
product_ref_df = spark.createDataFrame(
list(zip(condition,targetResult)),
["condition", "target_result"]
)
df.join(broadcast(product_ref_df), expr(""+targetField+" like condition"))
.withColumn(columnTitle, coalesce(col("target_result"), lit("feed_name")))
.drop('condition','target_result')
.show()
return df
product_ref_df = tempDF(df,'input_file_name','Product',product_code,product_name)
当脚本被触发时,没有错误,返回结果如图所示。
+---------------+---------+------------+
|input_file_name|feed_name| Product|
+---------------+---------+------------+
| _CMN_BD| arshad|Cash and Due|
| _CMN_BD_WS| jimmy|Cash and Due|
+---------------+---------+------------+
因为我们没有删除任何行,所以结果不应该返回第一行吗,
+---------------+---------+------------+
|input_file_name|feed_name| Product|
+---------------+---------+------------+
| sdasdasdasd| bob|bob |
| _CMN_BD| jimmy|Cash and Due|
| _CMN_BD_WS| jimmy|Cash and Due|
+---------------+---------+------------+
+---------------+---------+------------+
2条答案
按热度按时间lhcgjxsq1#
您可以使用
coalesce
将所有when
语句合并为一个语句。coalesce
将选取第一个非空列,而when
仅在条件匹配时才会给出非空值;否则它会给出空值(没有otherwise
条件)。idv4meu82#
您可以从这些引用的产品名称创建一个新的DataFrame,并与原始的DF连接以获得产品名称:
或者使用
functools.reduce
链接case/when
条件,如下所示: