我试图从kafka服务器创建一个数据流,然后对该流进行一些转换。我已经包括了一个捕获如果流是空的( if(!rdd.partitions.isEmpty)
); 然而,即使没有任何事件被发表到Kafka的主题中 else
永远达不到语句。
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
stream.foreachRDD { rdd =>
if(!rdd.partitions.isEmpty) {
val message = rdd.map((x$2) => x$2._2).collect().toList.map(parser)
val val = message(0)
} else println("empty stream...")
ssc.start()
ssc.awaitTermination()
}
在使用时,是否应该使用另一种语句来检查流是否为空 KafkaUtils.createDirectStream
而不是 createStream
?
1条答案
按热度按时间ih99xse11#
使用
RDD.isEmpty
而不是RDD.partitions.isEmpty
它添加了一个检查,以查看底层分区是否实际包含元素:原因是什么
RDD.partitions.isEmpty
不起作用的原因是内存中存在一个分区RDD
,但分区本身是空的。但是从partitions
这是一个Array[Partition]
,它不是空的。