我无法拟合spark中的fp增长模型

icnyk63a  于 2021-07-14  发布在  Java
关注(0)|答案(1)|浏览(433)

拜托,你能帮我吗?我有一个80个csv文件数据集和一个主机和4个从机集群。我想读取Dataframe中的csv文件,并在四个从机上并行化它。之后,我想用groupby过滤Dataframe。在我的spark查询中,结果包含按(“code\uccam”,“dossier”)分组的“code\uccam”和“dossier”列。我想使用fp-growth算法来检测由“folder”重复的“code\u ccam”序列。但是,当我使用fpgrowth.fit()命令时,出现以下错误:

  1. "error: type mismatch;
  2. found : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
  3. required: org.apache.spark.sql.Dataset[_]"

以下是我的spark命令:

  1. val df = spark.read.option("header", "true").csv("file:///home/ia/Projet-Spark-ace/Donnees/Fichiers CSV/*.csv")
  2. import org.apache.spark.sql.functions.{concat, lit}
  3. val df2 = df.withColumn("dossier", concat(col("num_immatriculation"), lit(""), col("date_acte"), lit(""), col("rang_naissance"), lit(""), col("date_naissance")))
  4. val df3 = df2.drop("num_immatriculation").drop("date_acte").drop("rang_naissance").drop("date_naissance")
  5. val df4 = df3.select("dossier","code_ccam").groupBy("dossier","code_ccam").count()
  6. val transactions = df4.agg(collect_list("code_ccam").alias("codes_ccam")).rdd.map(x => x)
  7. import org.apache.spark.ml.fpm.FPGrowth
  8. val fpgrowth = new FPGrowth().setItemsCol("code_ccam").setMinSupport(0.5).setMinConfidence(0.6)
  9. val model = fpgrowth.fit(transactions)
piv4azn7

piv4azn71#

非常感谢你。成功了。我用collect\u set替换collect\u list。

相关问题