使用的Kafka生产者命令:
# -> kafka-console-producer --broker-list brokerhost:9093 --topic testtopic --producer.config client.properties
Hello
How are you
Bye
Where is my message?
消费者spark流媒体使用的代码片段,打包为jar-
val sparkConf = new SparkConf().setAppName("kk-KafkaSparktest")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val lines = KafkaUtils.createStream(ssc, "brokerhost:9093",
"spark-streaming-consumer-group", Map("testtopic" -> 5))
lines.print()
ssc.start()
ssc.awaitTermination()
用于运行打包jar的命令-
spark-submit --conf 'spark.executor.extraJavaOptions=-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/home/KK/kafka/jaas.conf' --conf 'spark.driver.extraJavaOptions=-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/home/KK/kafka/jaas.conf' --class main.scala.sparkkafka --master yarn --deploy-mode cluster kafkaproj2_2.10-1.0.jar
日志上显示的输出
-------------------------------------------
Time: 1502275406000 ms
-------------------------------------------
-------------------------------------------
Time: 1502275408000 ms
-------------------------------------------
-------------------------------------------
Time: 1502275410000 ms
-------------------------------------------
-------------------------------------------
Time: 1502275412000 ms
-------------------------------------------
-------------------------------------------
// no key value pairs here
我错过了什么?我需要对代码进行任何更改吗?我认为这种期望是存在的key:value pairs 但我们什么也没看到。
2条答案
按热度按时间erhoui1w1#
也许是因为你想推进左舷
9093
读进去9092
?首先,检查你的信息是否在Kafka:
kafka-console-consumer --bootstrap-server brokerhost:9093 --topic testtopic
然后,我认为您会对spark 2.2中添加的名为“结构化流”的新API感兴趣:https://spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.html1yjd4xko2#
以下是一些步骤:
首先,检查Kafka主题是否有消息
kafka-console-consumer.sh--zookeeper--topic--from beging--property print.key=true--property key.separator=,--property print.key=true--property key.separator=,
如果它正在打印消息,那么你有Kafka的消息,如果没有,那么你的生产者是不工作的
尝试下面的代码,如果Kafka有消息
下面是生产者的一个简单命令