kafka和spark流媒体简单生产者-消费者

uujelgoq  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(314)

我不知道为什么生产者发送的数据没有到达消费者手中。我正在cloudera虚拟机上工作。我试图写一个简单的生产者消费者,生产者使用Kafka和消费者使用Spark流。
scala中的生产者代码:

import java.util.Properties
import org.apache.kafka.clients.producer._

object kafkaProducer {

  def main(args: Array[String]) {
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)

    val TOPIC = "test"

    for (i <- 1 to 50) {
      Thread.sleep(1000) //every 1 second
      val record = new ProducerRecord(TOPIC, generator.getID().toString(),generator.getRandomValue().toString())
      producer.send(record)
    }

    producer.close()
  }
}

scala中的消费者代码:

import java.util

import org.apache.kafka.clients.consumer.KafkaConsumer

import scala.collection.JavaConverters._
import java.util.Properties

import kafka.producer._

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._

object kafkaConsumer {
      def main(args: Array[String]) {

        var totalCount = 0L
        val sparkConf = new SparkConf().setMaster("local[1]").setAppName("AnyName").set("spark.driver.host", "localhost")
        val ssc =  new StreamingContext(sparkConf, Seconds(2))
        ssc.checkpoint("checkpoint")
        val stream = KafkaUtils.createStream(ssc, "localhost:9092", "spark-streaming-consumer-group", Map("test" -> 1))

        stream.foreachRDD((rdd: RDD[_], time: Time) => {
          val count = rdd.count()
          println("\n-------------------")
          println("Time: " + time)
          println("-------------------")
          println("Received " + count + " events\n")
          totalCount += count
        })
        ssc.start()
        Thread.sleep(20 * 1000)
        ssc.stop()

        if (totalCount > 0) {
          println("PASSED")
        } else {
          println("FAILED")
        }
      }
}
tv6aics1

tv6aics11#

通过更改消费代码行中的以下内容,可以解决此问题:

val stream = KafkaUtils.createStream(ssc, "localhost:9092", "spark-streaming-consumer-group", Map("test" -> 1))

第二个参数应该是zookeeper端口,该端口不是9092,zookeeper将自动连接到kafka端口9092。
注:Kafka应该从终端启动,然后再运行生产者和消费者。

相关问题