我试图通过从一个主题中读取数据并尝试将该主题的内容打印到数据框中来实现kafka和spark之间的链接,但是通过在kafka和spark之间进行连接,我得到了以下错误:
错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161)
at readFromKafka$.main(readFromKafka.scala:17)
at readFromKafka.main(readFromKafka.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
我的代码内容:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, from_json}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
object readFromKafka
{
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("spark://192.168.1.51:7077").appName("kafka_spark").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.1.51:9092,192.168.1.52:9092,192.168.1.53:9092")
.option("subscribe", "test")
.option("startingOffsets", "earliest") // From starting
.load()
df.printSchema() }}
内部版本.sbt:
name := "readFromSpark"
version := "1.0"
scalaVersion := "2.12.8"
val sparkVersion = "2.4.4"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.4" % Test
)
我做了一个sbt清理包,它没有任何错误地得到以下消息:
[info] welcome to sbt 1.4.7 (Private Build Java 1.8.0_282)
[info] loading project definition from /home/hamza/spark-project/spark-learning/project
[info] loading settings for project spark-learning from build.sbt ...
[info] set current project to spark-learning (in build file:/home/hamza/spark-project/spark-learning/)
[success] Total time: 0 s, completed Mar 12, 2021 1:13:50 PM
[warn] There may be incompatibilities among your library dependencies; run 'evicted' to see detailed eviction warnings.
[info] compiling 7 Scala sources to /home/hamza/spark-project/spark-learning/target/scala-2.12/classes ...
[warn] multiple main classes detected: run 'show discoveredMainClasses' to see the list
[success] Total time: 18 s, completed Mar 12, 2021 1:14:08 PM
运行spark作业:
spark-submit --master spark://192.168.1.51:7077 --class readFromKafka target/scala-2.12/spark-learning_2.12-1.0.jar
需要帮忙吗?
1条答案
按热度按时间yacmzcpb1#
你需要移除
% Test
如果您想通过spark submit运行代码,请从kafka依赖项您还需要设置sbt assembly,以确保kafka依赖项实际上包含在jar中