未找到spark流/kafka类

xmd2e60i  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(371)

我想制造Spark StreamingContext 从Kafka主题流式传输消息。因此,我在构建中添加了以下依赖项:

"org.apache.spark:spark-streaming-kafka_2.10:1.6.2"

然后我创建了以下类:

import org.apache.spark.SparkContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream

import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils

object StreamingApp {
    def main(args: Array[String]): Unit = {
        def messageConsumer(): StreamingContext = {
            val topicName : String = "my-topic"
            val brokerHostAndPort : String = "mykafka.example.com:9092"

            val ssc = new StreamingContext(SparkContext.getOrCreate(), Seconds(10))

            createKafkaStream(ssc, topicName, brokerHostAndPort).foreachRDD(rdd => {
                rdd.foreach { msg =>
                    // TODO: Implement message processing here.
                }
            })

            ssc
        }

        StreamingContext.getActive.foreach {
            _.stop(stopSparkContext = false)
        }

        val ssc = StreamingContext.getActiveOrCreate(messageConsumer)
        ssc.start()
        ssc.awaitTermination()
    }

    def createKafkaStream(ssc: StreamingContext,
            kafkaTopics: String, brokers: String): DStream[(String, 
            String)] = {
        val kafkaParams = Map[String, String](
            "bootstrap.servers" -> brokers,
            "key.deserializer" -> "StringDeserializer",
            "value.deserializer" -> "StringDeserializer",
            "auto.offset.reset" -> "latest",
            "enable.auto.commit" -> "false"
        )        

        KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](
            ssc, kafkaParams, Set(kafkaTopics))
    }
}

当我编译这个(通过ant,但这不重要)时,我得到 scalac 编译器错误:

[scalac] /Users/myuser/workspace/myapp/src/main/groovy/com/me/myapp/utils/scala/StreamingApp.scala:11: error: not found: object kafka
[scalac] import kafka.serializer.StringDecoder
[scalac]        ^
[scalac] /Users/myuser/workspace/myapp/src/main/groovy/com/me/myapp/utils/scala/StreamingApp.scala:12: error: object kafka is not a member of package org.apache.spark.streaming
[scalac] import org.apache.spark.streaming.kafka.KafkaUtils
[scalac]                                   ^
[scalac] /Users/myuser/workspace/myapp/src/main/groovy/com/me/myapp/utils/scala/StreamingApp.scala:63: error: not found: value KafkaUtils
[scalac]         KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, Set(kafkaTopics))
[scalac]         ^
[scalac] three errors found

我是不是缺少了什么依赖关系?或者没有使用正确的依赖关系?或者编码不正确?

更新:

有趣的是,当我将依赖项更改为:

"org.apache.spark:spark-streaming-kafka_2.10:1.6.1"

这些编译器错误会消失。。。

jdgnovmf

jdgnovmf1#

kafka依赖项的工件id应该如下所示:

spark-streaming-kafka-0-8_2.11

希望对你有用。

相关问题