pyspark Spark LSH approxSimilarityJoin占用太多时间

b1payxdu  于 2024-01-06  发布在  Spark
关注(0)|答案(1)|浏览(192)

Spark LSH approxSimilarityJoin花费的时间太长:

  1. val column="name"
  2. val new_df=df.select("id", "name", "duns_number", "country_id") 1.7 million record
  3. val new_df_1= df.select("index", "name", "duns_number", "country_id") 0.7 million record
  4. val n_gram = new NGram()
  5. .setInputCol("_"+column)
  6. .setN(4)
  7. .setOutputCol("n_gram_column")
  8. val n_gram_df = n_gram.transform(new_df)
  9. val n_gram_df_1=n_gram.transform(new_df_1)
  10. val validateEmptyVector = udf({ v: Vector => v.numNonzeros > 0 }, DataTypes.BooleanType)
  11. val vectorModeler: CountVectorizerModel = new CountVectorizer()
  12. .setInputCol("n_gram_column")
  13. .setOutputCol("tokenize")
  14. .setVocabSize(456976)
  15. .setMinDF(1)
  16. .fit(n_gram_df)
  17. val vectorizedProductsDF = vectorModeler.transform(n_gram_df)
  18. .filter(validateEmptyVector(col("tokenize")))
  19. .select(col("id"), col(column), col("tokenize"),col("duns_number"),col("country_id"))
  20. val vectorizedProductsDF_1 = vectorModeler.transform(n_gram_df_1)
  21. .filter(validateEmptyVector(col("tokenize")))
  22. .select(col("tokenize"),col(column),col("duns_number"),col("country_id"),col("index"))
  23. val minLshConfig = new MinHashLSH().setNumHashTables(3)
  24. .setInputCol("tokenize")
  25. .setOutputCol("hash")
  26. val lshModel = minLshConfig.fit(vectorizedProductsDF)
  27. val transform_1=lshModel.transform(vectorizedProductsDF)
  28. val transform_2=lshModel.transform(vectorizedProductsDF_1)
  29. val result=lshModel.approxSimilarityJoin(transform_1,transform_2,0.42).toDF

字符串

最后一行代码(approxSimilarityJoin)花费的时间太长,并且在阶段中最后几个任务被卡住。

我尝试使用13个执行器,每个执行器4个内核,
spark.sql.shuffle.partitions=600的值。“

ljo96ir5

ljo96ir51#

在函数approxSimilarityJoin中,将距离阈值从0.42增加到1.0/2.0,这将大大提高性能

相关问题