bahir2.4.0是否支持spark2.4.0?

f8rj6qna  于 2021-05-16  发布在  Spark
关注(0)|答案(0)|浏览(541)

我是spark流媒体的新手,正在尝试将activemq与spark连接起来,当我使用 spark 2.3.0 随着 bahir 2.3.2 一切正常,但一旦我开始使用 spark 2.4.0 随着 bahir 2.4.0 同样的代码停止工作,并给出了一些 hadoop-class 缺少错误。我通过升级我的 hadoop 但现在我又犯了一个新错误:

[error] (stream execution thread for [id = 93f322f9-de32-46df-a896-765a46eb5cdf, runId = 9d10d47d-1543-405d-adf9-d57f8b8b35e0]) java.lang.ExceptionInInitializerError
[error] (run-main-0) org.apache.spark.sql.streaming.StreamingQueryException: null
[error] === Streaming Query ===
[error] Identifier: [id = 93f322f9-de32-46df-a896-765a46eb5cdf, runId = 9d10d47d-1543-405d-adf9-d57f8b8b35e0]
[error] Current Committed Offsets: {}
[error] Current Available Offsets: {MQTTStreamSource[brokerUrl: tcp://localhost:1883, topic: sample_topic2 clientId: paho1210429831111]: -1}
[error] 
[error] Current State: ACTIVE
[error] Thread State: RUNNABLE
[error] 
[error] Logical Plan:
[error] MQTTStreamSource[brokerUrl: tcp://localhost:1883, topic: sample_topic2 clientId: paho1210429831111]
[error]         at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
[error]         at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
[error] Caused by: java.lang.ExceptionInInitializerError

我的 build.sbt 是:

//For spark
libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" %  "2.4.0",
    "org.apache.spark" %% "spark-mllib" % "2.4.0" ,
    "org.apache.spark" %% "spark-sql" % "2.4.0" ,
    "org.apache.spark" %% "spark-hive" % "2.4.0" ,
    "org.apache.spark" %% "spark-streaming" % "2.4.0" ,
    "org.apache.spark" %% "spark-graphx" % "2.4.0",
//    "org.apache.spark" %% "spark-streaming-kafka" % "1.6.3",
//    "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.0",
)

//Bahir
libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-mqtt" % "2.4.0"

//Hadoop
libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "3.0.2"

libraryDependencies += "com.fasterxml.jackson.core" % "jackson-core" % "2.6.7.1"

我的计划是:

def main(args : Array[String]): Unit =
    {
        val df = spark
            .readStream
            .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
            .option("brokerUrl","tcp://localhost:1883")
            .option("topic","sample_topic2")
            .option("persistence","memory")
            .option("cleanSession", "true")
            .load()

        df.printSchema()

        df
            .writeStream
            .outputMode("append")
            .format("console")
            .start
            .awaitTermination()

        spark.close()
        spark.stop()
    }

程序正确执行,直到 df.printSchema() . 有没有其他的连接方式 activemq 通过 spark-structured-streaming

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题