无法通过spark流使用kafka消息

aydmsdu9  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(277)

我试图消费Kafka生产者通过Spark流节目的信息。
这是我的程序

val Array(zkQuorum, group, topics, numThreads) = args
      val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local")
      val ssc = new StreamingContext(sparkConf, Seconds(5))

      val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
      val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
     // lines.print()
lines.foreachRDD(rdd=>{
            rdd.foreach(message=>
      println(message))
    })

以上程序正在成功运行。但我看不到任何信息被打印出来。

w6lpcovy

w6lpcovy1#

使用设置主url "local[*]" ```
val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[*]")

您还可以尝试调用collect()并查看是否收到消息。

lines.foreachRDD { rdd =>
rdd.collect().foreach(println)
}

相关问题