背景:我正在与Kafka一起使用scala中的spark流媒体程序。我的意图是将一个文件读给kafka,并将这些消息发布到spark流应用程序中进行一些分析。
但是,当我将文件传输到kafka并启动流应用程序来收听特定主题时,我在kafka producer控制台上看到这些错误消息。
用于读取文件的命令:
C:\Kafka\bin\windows>kafka-console-producer --broker-list localhost:9092 --topic mytopic2 < C:\somefile.csv
错误:
[2016-09-04 10:08:42,122] ERROR Error when sending message to topic mytopic2 w
h key: null, value: 116 bytes with error: (org.apache.kafka.clients.producer.i
ernals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 109 record(s
expired due to timeout while requesting metadata from brokers for mytopic2-0
[2016-09-04 10:08:42,122] ERROR Error when sending message to topic mytopic2 w
h key: null, value: 116 bytes with error: (org.apache.kafka.clients.producer.i
ernals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 109 record(s
expired due to timeout while requesting metadata from brokers for mytopic2-0
[2016-09-04 10:08:42,122] ERROR Error when sending message to topic mytopic2 w
h key: null, value: 116 bytes with error: (org.apache.kafka.clients.producer.i
ernals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 109 record(s
expired due to timeout while requesting metadata from brokers for mytopic2-0
[2016-09-04 10:08:42,122] ERROR Error when sending message to topic mytopic2 w
h key: null, value: 116 bytes with error: (org.apache.kafka.clients.producer.i
ernals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 109 record(s
expired due to timeout while requesting metadata from brokers for mytopic2-0
我在我的windows机器上本地运行这个应用程序,kafka服务器也在我的机器上本地运行。
spark应用程序类似于:
val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
val topics = List("mytopic2").toSet
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)
lines.foreachRDD((rdd, time) => {
// do something
}
我不知道Kafka/Spark的错误到底意味着什么。
如有任何指示,我们将不胜感激。
2条答案
按热度按时间gr8qqesn1#
这毕竟是Kafka的问题。我怀疑这与我下载的用于spark流媒体的Kafka版本有更多的关系,而与Kafka设置本身没有太大关系。
我已经为spark streaming 1.6.2下载了kafka 0.10.0.x->这是我得到超时错误的时候。我找到了这个链接:https://spark.apache.org/docs/1.6.2/streaming-programming-guide.html#linking,其中指出:“kafka:spark streaming 1.6.2与kafka 0.8.2.1兼容。”。
所以,当我下载0.8.2.1时,它运行得很好-我再也没有“超时错误”了。
hi3rlvi22#
这个错误与spark/spark流无关。看起来你的Kafka装置有问题。
当zookeeper设置出现问题时,通常会出现超时错误。你的zookeeper配置正确了吗?确保设置正确。另外,试着先运行Kafka附带的简单的Kafka生产者和消费者脚本。