Kafka消费者无法订阅Kafka主题(通过spark流媒体运行)

slsn1g29  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(496)

创建props对象后设置消费者的代码

val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(util.Arrays.asList(topic))

代码导入如下

package main.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.kafka.clients.consumer.KafkaConsumer
import java.util
import java.util.Properties
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import java.io.IOException

我通过sbt创建了一个组装jar

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0" % "provided" 
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0" % "provided" 
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0" libraryDependencies += "org.apache.kafka" % "kafka_2.10" % "0.10.0-kafka-2.1.1"

我错过了什么?
错误消息:
用户类引发异常:java.lang.nosuchmethoderror:org.apache.kafka.clients.consumer.kafkaconsumer.subscribe(ljava/util/collection;)v

a6b3iqyw

a6b3iqyw1#

我在spark 2.2.0和kafka 0.10.0上也遇到了同样的问题,问题是因为spark2 submit(也是spark2 shell)中的kafka默认版本不同
我在这里找到了决定

1. Before spark2-submit you have to export kafka version
$ export SPARK_KAFKA_VERSION=0.10
$ spark2-submit ...
ktca8awb

ktca8awb2#

subscribe 接受类型的输入 java.util.Collections 而不是 java.util.Arrays.asList .
尝试

consumer.subscribe(java.util.Arrays.asList("topic"))

它应该有用。。。

相关问题