在spark中,我从kafka创建了一个批处理时间为5秒的流。在这段时间内可以收到许多消息,我想分别处理其中的每一条消息,但在我当前的逻辑中,似乎只处理每一批的第一条消息。
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, params, topics)
val messages = stream.map((x$2) => x$2._2)
messages.foreachRDD { rdd =>
if(!rdd.isEmpty) {
val message = rdd.map(parse)
println(message.collect())
}
}
这个 parse
函数只是将json消息中的相关字段提取到一个元组中。
我可以深入到分区并以这种方式分别处理每条消息:
messages.foreachRDD { rdd =>
if(!rdd.isEmpty) {
rdd.foreachPartition { partition =>
partition.foreach{msg =>
val message = parse(msg)
println(message)
}
}
}
}
但我肯定有办法保持rdd水平。在第一个例子中,我做错了什么?
我正在使用spark 2.0.0、scala 2.11.8和spark流kafka 0.8。
1条答案
按热度按时间zd287kbt1#
下面是一个示例流应用程序,它将批处理中的每条消息转换为每个循环中的大写字母,并打印出来。尝试此示例应用程序,然后重新检查您的应用程序。希望这有帮助。