我在spark scala中对多个Dataframe应用相同的方法,如何并行化?

jobtbby3  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(768)

嗨,怎么样?我目前正在遍历所有的Dataframe,并在它们上运行基本相同的查询/过滤器。有没有一种方法能更有效地并行运行呢?下面是示例代码。。。

  1. for (db <- list_of_dbs)
  2. {
  3. var df1 = spark.read
  4. .format("csv")
  5. .option("sep",",")
  6. .option("inferSchema","true")
  7. .option("header","true")
  8. .load(path+db+".csv")
  9. .withColumn("name_of_data",lit(db))
  10. if (db!="rules") {
  11. val conversion = mappingDF
  12. .filter(col("col1").isNotNull and col("name") === db)
  13. }

等等。。。
有没有一种方法可以让它一次在所有Dataframe上运行,基本上摆脱for循环?

von4xj4u

von4xj4u1#

您可以合并所有Dataframe,然后对其应用筛选器/查询:
https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/dataset.html#unionbyname(other:org.apache.spark.sql.dataset[t] ):org.apache.spark.sql.dataset[t]
您也可以使用datasetreader的csv方法同时加载多个csv(推荐):
https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/dataframereader.html#csv(paths:string*):org.apache.spark.sql.dataframe

相关问题