在spark中对rdd进行排序,然后再将其发布给kafka?

ttcibm8c  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(213)

在我的代码中,我首先订阅一个kafka流,处理每个rdd以创建类的一个示例 People 然后,我想发布结果集( Dataset[People] )Kafka的一个特定主题。需要注意的是,并不是从kafka接收到的所有传入消息都Map到 People . 此外,人的示例应该按照从Kafka收到的顺序发送给Kafka。
但是,我不确定排序是否真的有必要,或者 People 在执行器上运行相应的代码时保持相同的顺序(我可以直接将数据集发布到kafka)。据我所知,排序是必要的,因为里面的代码 foreachRDD 可以在群集中的不同节点上执行。是这样吗?
这是我的密码:

val myStream = KafkaUtils.createDirectStream[K, V](streamingContext, PreferConsistent, Subscribe[K, V](topics, consumerConfig))

def process(record: (RDD[ConsumerRecord[String, String]], Time)): Unit = record match {
case (rdd, time) if !rdd.isEmpty =>
    // More Code...
    // In the end, I have: Dataset[People]
case _ =>
}

myStream.foreachRDD((x, y) => process((x, y))) // Do I have to replace this call with map, sort the RDD and then publish it to Kafka?
svmlkihl

svmlkihl1#

此外,人的示例应该按照从Kafka收到的顺序发送给Kafka。
除非您有一个单独的分区(然后您就不会使用spark了,是吗?),否则接收数据的顺序是不确定的,同样地,发送数据的顺序也不会确定。分类在这里没有任何区别。
如果您需要一个非常特定的处理顺序(这通常是一个设计错误,如果您使用的是数据密集型应用程序),那么您需要一个顺序应用程序,或者一个比spark具有更细粒度控制的系统。

相关问题