scala—只处理kafka流中的第一条消息

acruukt9  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(395)

在spark中,我从kafka创建了一个批处理时间为5秒的流。在这段时间内可以收到许多消息,我想分别处理其中的每一条消息,但在我当前的逻辑中,似乎只处理每一批的第一条消息。

val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, params, topics)

val messages = stream.map((x$2) => x$2._2)

messages.foreachRDD { rdd =>
    if(!rdd.isEmpty) {
        val message = rdd.map(parse)
        println(message.collect())
    }
}

这个 parse 函数只是将json消息中的相关字段提取到一个元组中。
我可以深入到分区并以这种方式分别处理每条消息:

messages.foreachRDD { rdd =>
    if(!rdd.isEmpty) {
        rdd.foreachPartition { partition =>
            partition.foreach{msg =>
                val message = parse(msg)
                println(message)
            }
        }
    }
}

但我肯定有办法保持rdd水平。在第一个例子中,我做错了什么?
我正在使用spark 2.0.0、scala 2.11.8和spark流kafka 0.8。

zd287kbt

zd287kbt1#

下面是一个示例流应用程序,它将批处理中的每条消息转换为每个循环中的大写字母,并打印出来。尝试此示例应用程序,然后重新检查您的应用程序。希望这有帮助。

object SparkKafkaStreaming {

def main(args: Array[String]) {

//Broker and topic
val brokers = "localhost:9092"
val topic = "myTopic"

//Create context with 5 second batch interval
val sparkConf = new SparkConf().setAppName("SparkKafkaStreaming").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))

//Create direct kafka stream with brokers and topics
val topicsSet = Set[String](topic)
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val msgStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

//Message
val msg = msgStream.map(_._2)    
msg.print()

//For each
msg.foreachRDD { rdd =>
  if (!rdd.isEmpty) {
     println("-----Convert Message to UpperCase-----")
    //convert messages to upper case
    rdd.map { x => x.toUpperCase() }.collect().foreach(println)
  } else {
    println("No Message Received")
  }
}

//Start the computation
ssc.start()
ssc.awaitTermination()
  }
}

相关问题