kinesis-java

flvtvl50  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(360)

在spark streaming中,每批记录只能处理或触发一个方法/操作一次吗?
我的用例是每个数据流批处理调用loadconfigurations()一次,即使有1到n条记录。加载的配置应在驱动程序处可用,以便进一步处理。
前任:
批处理1:kinesis流中有0条记录-没有loadconfiguration()的触发器
batch-2:kinesis stream-loadconfiguration()中的1条记录被调用一次,变量在驱动程序级别更新
batch-3:kinesis stream-loadconfiguration()中的100条记录被调用一次,变量在驱动程序级别更新
提前谢谢。

omhiaaxx

omhiaaxx1#

我不太清楚我是否明白确切的要求。但是,根据问题描述和您在评论中的解释,这可能是可行的:

dstream.foreachRDD { rdd =>
  val config = loadConfiguration() //  executed at the driver
  rdd.foreach { record =>
   // do stuff here. e.g. config.get(). This code is executed at the worker.
  }
}

这里需要注意的一点是 Config 类必须是可序列化的,因为它将从驱动程序发送给工人。
另外,请注意,这可能是一个反模式,具体取决于您的用例。e、 g.对于每个批处理,config对象将被序列化并发送给worker,worker将根据config对象的大小增加网络开销。
我强烈建议您检查 forEachRDD 明智地构建和选择你的方法。以下是相同的链接:https://spark.apache.org/docs/latest/streaming-programming-guide.html#design-foreachrdd的使用模式

相关问题