spark流是否并行运行多个foreach

3phpmpom  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(821)

在这种情况下

val dStream : Stream[_] = 
dStream.foreachRDD(a => ... )
dStream.foreachRDD(b => ... )

执行foreach方法:
并行运行?
按顺序运行但没有特定顺序?
在foreachrdd(b=>)之前的foreachrdd(a=>)?
我想知道,因为我想在数据库插入后提交kafka偏移量db连接器只提供一个“foreach”插入)

val dStream : Stream[_] = ...().cache()
dStream.toDb // consume the stream
dStream.foreachRDD(b => //commit offset ) //consume the stream but after the db insert

在spark用户界面上看起来有一个订单,但我不确定它是否可靠。
编辑:如果foreachrdd(a=>)失败,foreachrdd(b=>)是否仍在执行?

wtlkbnrh

wtlkbnrh1#

在spark-streaming-2.4.0之前,不能保证多个dstream.foreachrdd按顺序执行。请看jobscheduler类中的以下代码:

class JobScheduler(val ssc: StreamingContext) extends Logging {

  // Use of ConcurrentHashMap.keySet later causes an odd runtime problem due to Java 7/8 diff
  // https://gist.github.com/AlainODea/1375759b8720a3f9f094
  private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet]
  private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
  private val jobExecutor =
    ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")

jobexecutor是一个线程池,如果将“spark.streaming.concurrentjobs”设置为大于1的数字,则如果有足够的spark执行器可用,则可以并行执行。因此,确保你的设置是正确的,以引出你需要的行为。

qvsjd97n

qvsjd97n2#

DStream.foreach 自spark 0.9.0以来已弃用。你想要同等的 DStream.foreachRDD 首先。
spark dag中的阶段是按顺序执行的,因为一个变换的输出通常也是图形中下一个变换的输入,但在您的示例中不是这样的。
所发生的事情是rdd在内部被划分为多个分区。每个分区都在集群管理器可用的不同工作进程上运行。在你的例子中, DStream.foreach(a => ...) 将在之前执行 DStream.foreach(b => ...) ,但在 foreach 将与内部 RDD 正在迭代。
我想知道,因为我想在数据库插入之后提交kafka偏移量。
这个 DStream.foreachRDD 是一个输出转换,意味着它将导致spark具体化图形并开始执行。您可以放心地假设,对数据库的插入将在执行第二个命令之前结束 foreach ,但请记住 foreach 将在中的每个分区中并行更新数据库 RDD .

相关问题