我想制造Spark StreamingContext
从Kafka主题流式传输消息。因此,我在构建中添加了以下依赖项:
"org.apache.spark:spark-streaming-kafka_2.10:1.6.2"
然后我创建了以下类:
import org.apache.spark.SparkContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
object StreamingApp {
def main(args: Array[String]): Unit = {
def messageConsumer(): StreamingContext = {
val topicName : String = "my-topic"
val brokerHostAndPort : String = "mykafka.example.com:9092"
val ssc = new StreamingContext(SparkContext.getOrCreate(), Seconds(10))
createKafkaStream(ssc, topicName, brokerHostAndPort).foreachRDD(rdd => {
rdd.foreach { msg =>
// TODO: Implement message processing here.
}
})
ssc
}
StreamingContext.getActive.foreach {
_.stop(stopSparkContext = false)
}
val ssc = StreamingContext.getActiveOrCreate(messageConsumer)
ssc.start()
ssc.awaitTermination()
}
def createKafkaStream(ssc: StreamingContext,
kafkaTopics: String, brokers: String): DStream[(String,
String)] = {
val kafkaParams = Map[String, String](
"bootstrap.servers" -> brokers,
"key.deserializer" -> "StringDeserializer",
"value.deserializer" -> "StringDeserializer",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> "false"
)
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](
ssc, kafkaParams, Set(kafkaTopics))
}
}
当我编译这个(通过ant,但这不重要)时,我得到 scalac
编译器错误:
[scalac] /Users/myuser/workspace/myapp/src/main/groovy/com/me/myapp/utils/scala/StreamingApp.scala:11: error: not found: object kafka
[scalac] import kafka.serializer.StringDecoder
[scalac] ^
[scalac] /Users/myuser/workspace/myapp/src/main/groovy/com/me/myapp/utils/scala/StreamingApp.scala:12: error: object kafka is not a member of package org.apache.spark.streaming
[scalac] import org.apache.spark.streaming.kafka.KafkaUtils
[scalac] ^
[scalac] /Users/myuser/workspace/myapp/src/main/groovy/com/me/myapp/utils/scala/StreamingApp.scala:63: error: not found: value KafkaUtils
[scalac] KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, Set(kafkaTopics))
[scalac] ^
[scalac] three errors found
我是不是缺少了什么依赖关系?或者没有使用正确的依赖关系?或者编码不正确?
更新:
有趣的是,当我将依赖项更改为:
"org.apache.spark:spark-streaming-kafka_2.10:1.6.1"
这些编译器错误会消失。。。
1条答案
按热度按时间jdgnovmf1#
kafka依赖项的工件id应该如下所示:
希望对你有用。