在这种情况下
val dStream : Stream[_] =
dStream.foreachRDD(a => ... )
dStream.foreachRDD(b => ... )
执行foreach方法:
并行运行?
按顺序运行但没有特定顺序?
在foreachrdd(b=>)之前的foreachrdd(a=>)?
我想知道,因为我想在数据库插入后提交kafka偏移量db连接器只提供一个“foreach”插入)
val dStream : Stream[_] = ...().cache()
dStream.toDb // consume the stream
dStream.foreachRDD(b => //commit offset ) //consume the stream but after the db insert
在spark用户界面上看起来有一个订单,但我不确定它是否可靠。
编辑:如果foreachrdd(a=>)失败,foreachrdd(b=>)是否仍在执行?
2条答案
按热度按时间wtlkbnrh1#
在spark-streaming-2.4.0之前,不能保证多个dstream.foreachrdd按顺序执行。请看jobscheduler类中的以下代码:
jobexecutor是一个线程池,如果将“spark.streaming.concurrentjobs”设置为大于1的数字,则如果有足够的spark执行器可用,则可以并行执行。因此,确保你的设置是正确的,以引出你需要的行为。
qvsjd97n2#
DStream.foreach
自spark 0.9.0以来已弃用。你想要同等的DStream.foreachRDD
首先。spark dag中的阶段是按顺序执行的,因为一个变换的输出通常也是图形中下一个变换的输入,但在您的示例中不是这样的。
所发生的事情是rdd在内部被划分为多个分区。每个分区都在集群管理器可用的不同工作进程上运行。在你的例子中,
DStream.foreach(a => ...)
将在之前执行DStream.foreach(b => ...)
,但在foreach
将与内部RDD
正在迭代。我想知道,因为我想在数据库插入之后提交kafka偏移量。
这个
DStream.foreachRDD
是一个输出转换,意味着它将导致spark具体化图形并开始执行。您可以放心地假设,对数据库的插入将在执行第二个命令之前结束foreach
,但请记住foreach
将在中的每个分区中并行更新数据库RDD
.