我通过做一个近似的相似性连接来找到jaccard的距离,如这里所描述的。但我有一个问题,与性能相关的文件有超过20000行这样。
具体实施如下:
从dbfs读取文件
val f1= spark.read.option("inferSchema","true").option("header","true").csv("/FileStore/tables/file1.csv")
var f2= spark.read.option("inferSchema","true").option("header","true").csv("/FileStore/tables/file2.csv")
为简单起见,将选定列转换为通过管道运行以计算jaccard距离
val f3= f1.withColumnRenamed("column1","opsCol")
val f4= f2.withColumnRenamed("column2","opsCol")
用于拆分单词的正则表达式标记器
val regexTokenizer = new RegexTokenizer()
.setInputCol("opsCol")
.setOutputCol("tokens")
.setPattern("")
.setMinTokenLength(1)
n-克
val ngram = new NGram().setN(3).setInputCol("tokens").setOutputCol("ngrams")
转换为向量
val hashingTF = new HashingTF()
.setInputCol("ngrams").setOutputCol("vectors")
我怀疑下一步该怎么做
计算lsh
val mh = new MinHashLSH()
.setInputCol("vectors")
.setOutputCol("lsh")
使用上面的lsh计算,代码非常快速,对于大小文件都非常有效,但是我没有得到精确的匹配,即缺少一些最近的匹配。
所以我想用lsh计算setnumhashtables()方法如下:
val mh = new MinHashLSH()
.setNumHashTables(5)
.setInputCol("vectors")
.setOutputCol("lsh")
但是上面的新更改对于大文件来说需要花费大量的时间,即使对于超过20000行的文件,但是我没有错过最近的匹配。
现在加入一个
val pipeline = new Pipeline()
.setStages(Array(regexTokenizer, ngram, hashingTF, mh))
val model = pipeline.fit(f3)
val s1= model.transform(f3)
val s2= model.transform(f4).cache
val computedResult = model.stages.last.asInstanceOf[MinHashLSHModel].approxSimilarityJoin(s1, s2, 0.1, "JD").
select(col("datasetA.opsCol").as("orginalCol1"), col("datasetB.opsCol").as("orginalCol2"), col("JD"))
有没有什么方法可以使这一点有效而不失去最近的比赛?
暂无答案!
目前还没有任何答案,快来回答吧!