这是我的代码,用于过滤掉spark框架中的所有行,其中的列很少遇到值,这段代码正在工作,行数确实减少了(我需要它来做onehotencoder),但在那之后,框架本身变得有点奇怪,通常的df。Show()在2-3分钟内执行,我没有等待模型训练本身,为什么?问题开始后,这段代码,在此之前,所有的补丁都执行得很快
columns_to_check = ['is_zero', 'enc_loans', 'enc_limit', 'ratio_overdue']
for column in columns_to_check:
count_column = df.groupBy(column).agg(count("*").alias("count"))
df = df.join(count_column, on=column, how="left")
df = df.filter(col("count") >= 20).drop("count")
enc_paym = [i for i in df.columns if 'enc_paym' in i]
enc_zero_loans = [i for i in df.columns if 'is_zero_loans' in i]
enc_zero = [i for i in df.columns if 'is_zero' in i and 'loans' not in i]
enc_limit = [i for i in df.columns if 'pre_over' in i or 'pre_maxover' in i]
enc_loans = [i for i in df.columns if 'enc_loans' in i]
pre_loans = [i for i in df.columns if 'pre_loans' in i]
pre_since = ['pre_since_opened', 'pre_since_confirmed']
pre_tern = ['pre_pterm', 'pre_fterm']
pre_close = ['pre_till_pclose', 'pre_till_fclose']
list_of_pca = [enc_paym, enc_zero, enc_limit, enc_loans, pre_loans, enc_zero_loans,
pre_since, pre_tern, pre_close]
def retname(i):
flag = 0
for j in i:
if 'enc_paym' in j:
flag = 1
elif 'is_zero_loans' in j:
flag = 2
elif 'pre_over' in j or 'pre_maxover' in j:
flag = 3
elif 'enc_loans' in j:
flag = 4
elif 'pre_loans' in j:
flag = 5
elif 'is_zero' in j and 'loans' not in j:
flag = 6
elif 'pre_since_opened' in j or 'pre_since_confirmed' in j:
flag = 7
elif 'pre_pterm' in j or 'pre_fterm' in j:
flag = 8
elif 'pre_till_pclose' in j or 'pre_till_fclose' in j:
flag = 9
if flag == 1:
return 'enc_paym'
elif flag == 2:
return 'is_zero'
elif flag == 3:
return 'enc_limit'
elif flag == 4:
return 'enc_loans'
elif flag == 5:
return 'pre_loans'
elif flag == 6:
return 'ratio_overdue'
elif flag == 7:
return 'confirm_days'
elif flag == 8:
return 'open_days'
elif flag == 9:
return 'collection_days'
for i in list_of_pca:
vector_enc = VectorAssembler(inputCols=i, outputCol=retname(i))
df = vector_enc.transform(df)
df = df.drop(*i)
pca的列表_of_pca ='enc_paym','is_zero','enc_limit',' enc_loans','pre_loans','ratio_overdue','confirm_days','open_days','collection_days']
pca = [PCA(k=1, inputCol=pca_col, outputCol='{0}_encoded'.format(pca_col)).fit(df) for pca_col in list_of_pca]
pipline1 = Pipeline(stages=pca)
df = pipline1.fit(df).transform(df).drop(*list_of_pca)
list_of_lbe ='is_zero_encoded','ratio_overdue_encoded','enc_limit_encoded','enc_贷款款_encoded']
pca = [StringIndexer(inputCol=lbl_col, outputCol=f"{lbl_col.split('_')[0]+'_'+lbl_col.split('_')[1]}").fit(df) for lbl_col in list_of_lbe]
pipline6 = Pipeline(stages=pca)
df = pipline6.fit(df).transform(df).drop(*list_of_lbe)
1条答案
按热度按时间lhcgjxsq1#
对于每一列,将
groupBy
和join
数据与原始 Dataframe 一起应用是一个昂贵的操作,它将花费更多的时间。相反,你可以尝试下面的示例代码(根据你的逻辑改变sql查询)。