pyspark 编辑spark上的过滤器功能

oknwwptz  于 2023-10-15  发布在  Spark
关注(0)|答案(1)|浏览(93)

这是我的代码,用于过滤掉spark框架中的所有行,其中的列很少遇到值,这段代码正在工作,行数确实减少了(我需要它来做onehotencoder),但在那之后,框架本身变得有点奇怪,通常的df。Show()在2-3分钟内执行,我没有等待模型训练本身,为什么?问题开始后,这段代码,在此之前,所有的补丁都执行得很快

columns_to_check = ['is_zero', 'enc_loans', 'enc_limit', 'ratio_overdue']
for column in columns_to_check:
    count_column = df.groupBy(column).agg(count("*").alias("count"))
    df = df.join(count_column, on=column, how="left")
    df = df.filter(col("count") >= 20).drop("count")

enc_paym = [i for i in df.columns if 'enc_paym' in i]
enc_zero_loans = [i for i in df.columns if 'is_zero_loans' in i]
enc_zero = [i for i in df.columns if 'is_zero' in i and 'loans' not in i]
enc_limit = [i for i in df.columns if 'pre_over' in i or 'pre_maxover' in i]
enc_loans = [i for i in df.columns if 'enc_loans' in i]
pre_loans = [i for i in df.columns if 'pre_loans' in i]
pre_since = ['pre_since_opened', 'pre_since_confirmed']
pre_tern = ['pre_pterm', 'pre_fterm']
pre_close = ['pre_till_pclose', 'pre_till_fclose']

list_of_pca = [enc_paym, enc_zero, enc_limit, enc_loans, pre_loans, enc_zero_loans, 
               pre_since, pre_tern, pre_close]

def retname(i):
  flag = 0
  for j in i:
    if 'enc_paym' in j:
      flag = 1
    elif 'is_zero_loans' in j:
      flag = 2
    elif 'pre_over' in j or 'pre_maxover' in j:
      flag = 3
    elif 'enc_loans' in j:
      flag = 4
    elif 'pre_loans' in j:
      flag = 5
    elif 'is_zero' in j and 'loans' not in j:
      flag = 6
    elif 'pre_since_opened' in j or 'pre_since_confirmed' in j:
      flag = 7
    elif 'pre_pterm' in j or 'pre_fterm' in j:
      flag = 8
    elif 'pre_till_pclose' in j or 'pre_till_fclose' in j:
      flag = 9
  if flag == 1:
    return 'enc_paym'
  elif flag == 2:
    return 'is_zero'
  elif flag == 3:
    return 'enc_limit'
  elif flag == 4:
    return 'enc_loans'
  elif flag == 5:
    return 'pre_loans'
  elif flag == 6:
    return 'ratio_overdue'
  elif flag == 7:
    return 'confirm_days'
  elif flag == 8:
    return 'open_days'
  elif flag == 9:
    return 'collection_days'

for i in list_of_pca:
  vector_enc = VectorAssembler(inputCols=i, outputCol=retname(i))
  df = vector_enc.transform(df)
  df = df.drop(*i)

pca的列表_of_pca ='enc_paym','is_zero','enc_limit',' enc_loans','pre_loans','ratio_overdue','confirm_days','open_days','collection_days']

pca = [PCA(k=1, inputCol=pca_col, outputCol='{0}_encoded'.format(pca_col)).fit(df) for pca_col in list_of_pca]
pipline1 = Pipeline(stages=pca)
df = pipline1.fit(df).transform(df).drop(*list_of_pca)

list_of_lbe ='is_zero_encoded','ratio_overdue_encoded','enc_limit_encoded','enc_贷款款_encoded']

pca = [StringIndexer(inputCol=lbl_col, outputCol=f"{lbl_col.split('_')[0]+'_'+lbl_col.split('_')[1]}").fit(df) for lbl_col in list_of_lbe]
pipline6 = Pipeline(stages=pca)
df = pipline6.fit(df).transform(df).drop(*list_of_lbe)
lhcgjxsq

lhcgjxsq1#

对于每一列,将groupByjoin数据与原始 Dataframe 一起应用是一个昂贵的操作,它将花费更多的时间。相反,你可以尝试下面的示例代码(根据你的逻辑改变sql查询)。

WITH input AS (
        SELECT
            is_zero,
            enc_loans,
            enc_limit,
            ratio_overdue, (
                COUNT(*) OVER(PARTITION BY is_zero) >= 20
                AND COUNT(*) OVER(PARTITION BY enc_loans) >= 20
                AND COUNT(*) OVER(PARTITION BY enc_limit) >= 20
                AND COUNT(*) OVER(PARTITION BY ratio_overdue) >= 20
            ) AS is_valid_count
        FROM raw_data
    )
SELECT
    is_zero,
    enc_loans,
    enc_limit,
    ratio_overdue
FROM input
WHERE is_valid_count

相关问题