spark streaming+kafka:sparkexception:找不到集合的引线偏移

yacmzcpb  于 2021-06-08  发布在  Kafka
关注(0)|答案(5)|浏览(425)

我正在尝试设置spark流以从kafka队列获取消息。我得到以下错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o30.createDirectStream.
: org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
org.apache.spark.SparkException: Couldn't find leader offsets for Set([test-topic,0])
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at scala.util.Either.fold(Either.scala:97)

下面是我正在执行的代码(pyspark):

from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

directKafkaStream = KafkaUtils.createDirectStream(ssc, ["test-topic"], {"metadata.broker.list": "host.domain:9092"})

ssc.start()
ssc.awaitTermination()

有几个类似的帖子有同样的错误。在所有情况下,原因是空洞的Kafka主题。我的“测试主题”中有消息。我可以带他们出去

kafka-console-consumer --zookeeper host.domain:2181 --topic test-topic --from-beginning --max-messages 100

有人知道问题出在哪里吗?
我正在使用:
spark 1.5.2(apache)
Kafka0.8.2.0+Kafka1.3.0(cdh 5.4.7)

xyhw6mcr

xyhw6mcr1#

1) 你必须确保你已经创建了主题 test-topic 运行以下命令检查主题列表 kafka-topics.sh --list --zookeeper [host or ip of zookeeper]:[port] 2) 检查完主题后,必须在中配置kafka配置 Socket Server Settings 部分 listeners=PLAINTEXT://[host or ip of Kafka]:[port]

70gysomp

70gysomp2#

这种类型的错误(无法找到指定主题的引线)的原因之一是kafka服务器配置有问题。
打开kafka服务器配置:

vim ./kafka/kafka-<your-version>/config/server.properties

在“socket server settings”(套接字服务器设置)部分,如果主机缺少ip,请为其提供ip:

listeners=PLAINTEXT://{host-ip}:{host-port}

我使用mapr沙盒提供的kafka设置,并试图通过spark代码访问kafka。我在访问Kafka时遇到了相同的错误,因为我的配置缺少ip。

whitzsjs

whitzsjs3#

你需要检查两件事:
检查这个主题和分区是否存在,在您的例子中是topic is test-topic 分区为0。
根据您的代码,您正在尝试使用来自偏移量0的消息,可能来自偏移量0的消息不可用,请检查最早的偏移量是多少,然后尝试从那里使用。
下面是检查最早偏移量的命令:

sh kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "your broker list" --topic "topic name" --time -1
wn9m85ua

wn9m85ua4#

如果在/etc/hosts中定义短主机名并在kafka服务器的配置中使用它们,则应将这些名称更改为ip。或者在本地pc或客户端的/etc/hosts中注册相同的短主机名。
发生错误,因为spark streaming lib无法解析pc或客户端中的短主机名。

zujrkrfu

zujrkrfu5#

如果主题不存在,则强制创建该主题的另一个选项。您可以通过在kafkaparamsMap中将属性“auto.create.topics.enable”设置为“true”来实现。

val kafkaParams = Map[String, String](
  "bootstrap.servers" -> kafkaHost,
  "group.id" -> kafkaGroup,
  "auto.create.topics.enable" -> "true")

使用Scala2.11和Kafka0.10版本。

相关问题