如何将多个文件加载到多个rdd中?

bkhjykvo  于 2021-05-17  发布在  Spark
关注(0)|答案(1)|浏览(503)

我有多个文件,这些是独立的,需要由Spark处理。如何将它们并行加载到单独的RDD中?谢谢!
编码语言是scala。

eimct9ow

eimct9ow1#

如果您希望并发读取/处理RDD,可以利用scala.concurrent.future(或zio、cats等中的效果)。
加载函数的示例代码如下:

def load(paths: Seq[String], spark: SparkSession)
        (implicit ec: ExecutionContext): Seq[Future[RDD[String]]] = {
  def loadSinglePath(path: String): Future[RDD[String]] = Future {
    spark.sparkContext.textFile(path)
  }

  paths map loadSinglePath
}

使用此函数的示例代码:

import scala.concurrent.duration.{Duration, DurationInt}

val sc = SparkSession.builder.master("local[*]").getOrCreate()
implicit val ec = ExecutionContext.global
val result = load(Seq("t1.txt", "t2.txt", "t3.txt"), sc).zipWithIndex
      .map { case (rddFuture, idx) =>
        rddFuture.map( rdd =>
          println(s"Rdd with index $idx has ${rdd.count()}")
        )
      }

Await.result(Future.sequence(result), 1 hour)

例如,提供了默认的全局executioncontext,但是可以配置为在自定义的executioncontext中运行代码(只需用自己的executioncontext替换这个隐式的valec)

相关问题