嗨,怎么样?我目前正在遍历所有的Dataframe,并在它们上运行基本相同的查询/过滤器。有没有一种方法能更有效地并行运行呢?下面是示例代码。。。
for (db <- list_of_dbs)
{
var df1 = spark.read
.format("csv")
.option("sep",",")
.option("inferSchema","true")
.option("header","true")
.load(path+db+".csv")
.withColumn("name_of_data",lit(db))
if (db!="rules") {
val conversion = mappingDF
.filter(col("col1").isNotNull and col("name") === db)
}
等等。。。
有没有一种方法可以让它一次在所有Dataframe上运行,基本上摆脱for循环?
1条答案
按热度按时间von4xj4u1#
您可以合并所有Dataframe,然后对其应用筛选器/查询:
https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/dataset.html#unionbyname(other:org.apache.spark.sql.dataset[t] ):org.apache.spark.sql.dataset[t]
您也可以使用datasetreader的csv方法同时加载多个csv(推荐):
https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/dataframereader.html#csv(paths:string*):org.apache.spark.sql.dataframe