apache spark保存的随机林模型在同一个数据集上产生不同的结果

qvk1mo1f  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(476)

我在用磁盘上保存的随机森林模型和使用完全相同的数据集进行预测时,很难再现结果。换句话说,我用数据集a训练一个模型,并将它保存在本地机器上,然后我加载它并用它来预测数据集b,每次预测数据集b我都会得到不同的结果。
我知道随机森林分类器中涉及的随机性,但是据我所知,这种随机性是在训练过程中产生的,一旦创建了模型,如果使用相同的数据进行预测,预测就不会改变。
培训脚本具有以下结构:

df_train = spark.read.format("csv") \
      .option('header', 'true') \
      .option('inferSchema', 'true') \
      .option('delimiter', ';') \
      .load("C:\2020_05.csv") 

# The problem seems to be related to the StringIndexer/One-Hot Encoding

# If I remove all categorical variables the results can be reproduced

categorical_variables = []
for variable in df_train.dtypes:
    if variable[1] == 'string' :
       categorical_variables.append(variable[0])

indexers = [StringIndexer(inputCol=col, outputCol=col+"_indexed") for col in categorical_variables]

for indexer in indexers:
    df_train =indexer.fit(df_train).transform(df_train)
    df_train = df_train.drop(indexer.getInputCol())

indexed_cols = []
for variable in df_train.columns:
    if variable.endswith("_indexed"):
        indexed_cols.append(variable)

encoders = []
for variable in indexed_cols:
    inputCol = variable
    outputCol = variable.replace("_indexed", "_encoded")
    one_hot_encoder_estimator_train = OneHotEncoderEstimator(inputCols=[inputCol], outputCols=[outputCol])

    encoder_model_train = one_hot_encoder_estimator_train.fit(df_train)
    df_train = encoder_model_train.transform(df_train)
    df_train = df_train.drop(inputCol)

inputCols = [x for x in df_train.columns if x != "id" and x != "churn"]

vector_assembler_train = VectorAssembler(
      inputCols=inputCols,
      outputCol='features',
      handleInvalid='keep'
)

df_train = vector_assembler_train.transform(df_train)

df_train = df_train.select('churn', 'features', 'id')

df_train_1 = df_train.filter(df_train['churn'] == 0).sample(withReplacement=False, fraction=0.3, seed=7)
df_train_2 = df_train.filter(df_train['churn'] == 1).sample(withReplacement=True, fraction=20.0, seed=7)
df_train = df_train_1.unionAll(df_train_2) 

rf = RandomForestClassifier(labelCol="churn", featuresCol="features")
  paramGrid = ParamGridBuilder() \
      .addGrid(rf.numTrees, [100]) \
      .addGrid(rf.maxDepth, [15]) \
      .addGrid(rf.maxBins, [32]) \
      .addGrid(rf.featureSubsetStrategy, ['onethird']) \
      .addGrid(rf.subsamplingRate, [1.0])\
      .addGrid(rf.minInfoGain, [0.0])\
      .addGrid(rf.impurity, ['gini']) \
      .addGrid(rf.minInstancesPerNode, [1]) \
      .addGrid(rf.seed, [10]) \
  .build()

  evaluator = BinaryClassificationEvaluator(
      labelCol="churn")

  crossval = CrossValidator(estimator=rf,
                            estimatorParamMaps=paramGrid,
                            evaluator=evaluator,
                            numFolds=3)
  model = crossval.fit(df_train)
  model.save("C:/myModel")

测试脚本如下:

df_test = spark.read.format("csv") \
      .option('header', 'true') \
      .option('inferSchema', 'true') \
      .option('delimiter', ';') \
      .load("C:\2020_06.csv")

# The problem seems to be related to the StringIndexer/One-Hot Encoding

# If I remove all categorical variables the results can be reproduced

categorical_variables = []
for variable in df_test.dtypes:
    if variable[1] == 'string' :
       categorical_variables.append(variable[0])

indexers = [StringIndexer(inputCol=col, outputCol=col+"_indexed") for col in categorical_variables]

for indexer in indexers:
    df_test =indexer.fit(df_test).transform(df_test)
    df_test = df_test.drop(indexer.getInputCol())

indexed_cols = []
for variable in df_test.columns:
    if variable.endswith("_indexed"):
        indexed_cols.append(variable)

encoders = []
for variable in indexed_cols:
    inputCol = variable
    outputCol = variable.replace("_indexed", "_encoded")
    one_hot_encoder_estimator_test = OneHotEncoderEstimator(inputCols=[inputCol], outputCols=[outputCol])

    encoder_model_test= one_hot_encoder_estimator_test.fit(df_test)
    df_test= encoder_model_test.transform(df_test)
    df_test= df_test.drop(inputCol)

inputCols = [x for x in df_test.columns if x != "id" and x != "churn"]

vector_assembler_test = VectorAssembler(
      inputCols=inputCols,
      outputCol='features',
      handleInvalid='keep'
)

df_test = vector_assembler_test.transform(df_test)

df_test = df_test.select('churn', 'features', 'id')

model = CrossValidatorModel.load("C:/myModel")

result = model.transform(df_test)

areaUnderROC = evaluator.evaluate(result)

tp = result.filter("prediction == 1.0 AND churn == 1").count()
tn = result.filter("prediction == 0.0 AND churn == 0").count()
fp = result.filter("prediction == 1.0 AND churn == 0").count()
fn = result.filter("prediction == 0.0 AND churn == 1").count()

每次我运行测试脚本时,auc和混淆矩阵总是不同的。我在windows10机器上使用spark2.4.5和python3.7。如有任何建议或想法,我们将不胜感激。
编辑:问题与stringindexer/one热编码步骤有关。当我只使用数值变量时,我就能重现结果。这个问题仍然悬而未决,因为我无法解释为什么会发生这种情况。

t0ybt7op

t0ybt7op1#

根据我的经验,这个问题是因为您正在测试中重新评估onehotencoder。
以下是onehotencoding的工作原理,来自文档:
一种单热编码器,它将一列分类索引Map到一列二进制向量,每行最多有一个表示输入分类索引的值。例如,对于5个类别,输入值2.0将Map到输出向量[0.0,0.0,1.0,0.0]。默认情况下不包括最后一个类别(可通过droplast配置),因为它使向量条目的总和为一,因此线性相关。因此输入值4.0Map到[0.0,0.0,0.0,0.0]。
因此,每次数据不同时(这在train vs.test中是自然的情况),由一个热编码器在向量中产生的值是不同的。
您应该将onehotencoder与经过训练的模型一起添加到管道中,对其进行拟合,然后保存,然后在测试中再次加载它。这样,每次通过管道运行数据时,一个热编码的值就可以保证与相同的值相匹配。
有关保存和加载管道的更多详细信息,请参阅文档。

相关问题