spark流kafka createdirectstream未解析

nxagd54h  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(313)

需要帮忙吗。
我正在使用intellij和sbt来构建我的应用程序。
我正在开发一个应用程序来阅读spark streaming中的一个Kafka主题,以便对其进行一些etl工作。不幸的是,我读不懂Kafka的书。
kafkautils.createdirectstream未解析并不断给我错误(无法解析符号)。我已经做了我的研究,似乎我有正确的依赖关系。
这是我的build.sbt:

name := "ASUIStreaming"
version := "0.1"
scalacOptions += "-target:jvm-1.8"
scalaVersion := "2.11.11"

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.1.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.1.0"
libraryDependencies += "org.apache.kafka" %% "kafka-clients" % "0.8.2.1"
libraryDependencies += "org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.4"

有什么建议吗?我还应该提到,我没有对笔记本电脑的管理权限,因为这是一台工作电脑,我使用的是便携式jdk和intellij安装。然而,我的同事们也处于同样的情况,这对他们来说很好。
提前谢谢!

sycxhyv7

sycxhyv71#

我能解决这个问题。在重新创建项目并再次添加所有依赖项之后,我发现在intellij中,某些代码必须在同一行上,否则它将无法编译。
在本例中,将val kafkaparams代码放在同一行(而不是放在代码块中)解决了问题!

zd287kbt

zd287kbt2#

下面是我正在使用的主要spark流代码片段。注意:我屏蔽了一些机密工作数据,如ip和主题名等。

import org.apache.kafka.clients.consumer.ConsumerRecord
import kafka.serializer.StringDecoder
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.serialization.StringDeserializer
import scala.util.parsing.json._
import org.apache.spark.streaming.kafka._

object ASUISpeedKafka extends App

{
  // Create a new Spark Context
  val conf = new SparkConf().setAppName("ASUISpeedKafka").setMaster("local[*]")
  val sc = new SparkContext(conf)
  val ssc = new StreamingContext(sc, Seconds(2))

  //Identify the Kafka Topic and provide the parameters and Topic details
  val kafkaTopic = "TOPIC1"
    val topicsSet = kafkaTopic.split(",").toSet
    val kafkaParams = Map[String, String]
  (

    "metadata.broker.list" -> "IP1:PORT, IP2:PORT2",
    "auto.offset.reset" -> "smallest"
  )

  val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]
  (
  ssc, kafkaParams, topicsSet
  )
}

相关问题