Spark LSH approxSimilarityJoin
花费的时间太长:
val column="name"
val new_df=df.select("id", "name", "duns_number", "country_id") 1.7 million record
val new_df_1= df.select("index", "name", "duns_number", "country_id") 0.7 million record
val n_gram = new NGram()
.setInputCol("_"+column)
.setN(4)
.setOutputCol("n_gram_column")
val n_gram_df = n_gram.transform(new_df)
val n_gram_df_1=n_gram.transform(new_df_1)
val validateEmptyVector = udf({ v: Vector => v.numNonzeros > 0 }, DataTypes.BooleanType)
val vectorModeler: CountVectorizerModel = new CountVectorizer()
.setInputCol("n_gram_column")
.setOutputCol("tokenize")
.setVocabSize(456976)
.setMinDF(1)
.fit(n_gram_df)
val vectorizedProductsDF = vectorModeler.transform(n_gram_df)
.filter(validateEmptyVector(col("tokenize")))
.select(col("id"), col(column), col("tokenize"),col("duns_number"),col("country_id"))
val vectorizedProductsDF_1 = vectorModeler.transform(n_gram_df_1)
.filter(validateEmptyVector(col("tokenize")))
.select(col("tokenize"),col(column),col("duns_number"),col("country_id"),col("index"))
val minLshConfig = new MinHashLSH().setNumHashTables(3)
.setInputCol("tokenize")
.setOutputCol("hash")
val lshModel = minLshConfig.fit(vectorizedProductsDF)
val transform_1=lshModel.transform(vectorizedProductsDF)
val transform_2=lshModel.transform(vectorizedProductsDF_1)
val result=lshModel.approxSimilarityJoin(transform_1,transform_2,0.42).toDF
字符串
最后一行代码(approxSimilarityJoin)花费的时间太长,并且在阶段中最后几个任务被卡住。
我尝试使用13个执行器,每个执行器4个内核,spark.sql.shuffle.partitions=600
的值。“
1条答案
按热度按时间ljo96ir51#
在函数approxSimilarityJoin中,将距离阈值从0.42增加到1.0/2.0,这将大大提高性能