kafka生产者消息未显示在消费者中(通过kafka spark流读取)

lp0sw83n  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(301)

使用的Kafka生产者命令:


# -> kafka-console-producer --broker-list  brokerhost:9093 --topic testtopic --producer.config client.properties

Hello 

How are you

Bye

Where is my message?

消费者spark流媒体使用的代码片段,打包为jar-

val sparkConf = new SparkConf().setAppName("kk-KafkaSparktest")
        val ssc = new StreamingContext(sparkConf, Seconds(2))
        val lines = KafkaUtils.createStream(ssc, "brokerhost:9093", 
        "spark-streaming-consumer-group", Map("testtopic" -> 5))
        lines.print()
        ssc.start()
        ssc.awaitTermination()

用于运行打包jar的命令-

spark-submit --conf 'spark.executor.extraJavaOptions=-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/home/KK/kafka/jaas.conf' --conf 'spark.driver.extraJavaOptions=-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/home/KK/kafka/jaas.conf' --class main.scala.sparkkafka --master yarn --deploy-mode cluster kafkaproj2_2.10-1.0.jar

日志上显示的输出

-------------------------------------------
Time: 1502275406000 ms
-------------------------------------------

-------------------------------------------
Time: 1502275408000 ms
-------------------------------------------

-------------------------------------------
Time: 1502275410000 ms
-------------------------------------------

-------------------------------------------
Time: 1502275412000 ms
-------------------------------------------

-------------------------------------------
// no key value pairs here

我错过了什么?我需要对代码进行任何更改吗?我认为这种期望是存在的key:value pairs 但我们什么也没看到。

erhoui1w

erhoui1w1#

也许是因为你想推进左舷 9093 读进去 9092 ?
首先,检查你的信息是否在Kafka: kafka-console-consumer --bootstrap-server brokerhost:9093 --topic testtopic 然后,我认为您会对spark 2.2中添加的名为“结构化流”的新API感兴趣:https://spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.html

1yjd4xko

1yjd4xko2#

以下是一些步骤:
首先,检查Kafka主题是否有消息
kafka-console-consumer.sh--zookeeper--topic--from beging--property print.key=true--property key.separator=,--property print.key=true--property key.separator=,
如果它正在打印消息,那么你有Kafka的消息,如果没有,那么你的生产者是不工作的
尝试下面的代码,如果Kafka有消息

lines.foreachRDD { rdd =>
// Get the offset ranges in the RDD
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (o <- offsetRanges) {
  rdd.map(x=>x.value().mkString(",")).foreach(println)
  println(s"${o.topic} ${o.partition} offsets: ${o.fromOffset} to 
${o.untilOffset}")
}
}
lines.map(record => record.value().mkString(",")).count()

下面是生产者的一个简单命令

echo '00157,{"name":"xyz", "title":"dev"}' | 
kafka-console-producer.sh \
        --broker-list <brokeer list> \
        --topic <topic name> \
        --property parse.key=true \
        --property key.separator=, \
        --new-producer

相关问题