我是spark流媒体的新手,正在尝试将activemq与spark连接起来,当我使用 spark 2.3.0
随着 bahir 2.3.2
一切正常,但一旦我开始使用 spark 2.4.0
随着 bahir 2.4.0
同样的代码停止工作,并给出了一些 hadoop-class
缺少错误。我通过升级我的 hadoop
但现在我又犯了一个新错误:
[error] (stream execution thread for [id = 93f322f9-de32-46df-a896-765a46eb5cdf, runId = 9d10d47d-1543-405d-adf9-d57f8b8b35e0]) java.lang.ExceptionInInitializerError
[error] (run-main-0) org.apache.spark.sql.streaming.StreamingQueryException: null
[error] === Streaming Query ===
[error] Identifier: [id = 93f322f9-de32-46df-a896-765a46eb5cdf, runId = 9d10d47d-1543-405d-adf9-d57f8b8b35e0]
[error] Current Committed Offsets: {}
[error] Current Available Offsets: {MQTTStreamSource[brokerUrl: tcp://localhost:1883, topic: sample_topic2 clientId: paho1210429831111]: -1}
[error]
[error] Current State: ACTIVE
[error] Thread State: RUNNABLE
[error]
[error] Logical Plan:
[error] MQTTStreamSource[brokerUrl: tcp://localhost:1883, topic: sample_topic2 clientId: paho1210429831111]
[error] at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
[error] at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
[error] Caused by: java.lang.ExceptionInInitializerError
我的 build.sbt
是:
//For spark
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.4.0",
"org.apache.spark" %% "spark-mllib" % "2.4.0" ,
"org.apache.spark" %% "spark-sql" % "2.4.0" ,
"org.apache.spark" %% "spark-hive" % "2.4.0" ,
"org.apache.spark" %% "spark-streaming" % "2.4.0" ,
"org.apache.spark" %% "spark-graphx" % "2.4.0",
// "org.apache.spark" %% "spark-streaming-kafka" % "1.6.3",
// "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.0",
)
//Bahir
libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-mqtt" % "2.4.0"
//Hadoop
libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "3.0.2"
libraryDependencies += "com.fasterxml.jackson.core" % "jackson-core" % "2.6.7.1"
我的计划是:
def main(args : Array[String]): Unit =
{
val df = spark
.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("brokerUrl","tcp://localhost:1883")
.option("topic","sample_topic2")
.option("persistence","memory")
.option("cleanSession", "true")
.load()
df.printSchema()
df
.writeStream
.outputMode("append")
.format("console")
.start
.awaitTermination()
spark.close()
spark.stop()
}
程序正确执行,直到 df.printSchema()
. 有没有其他的连接方式 activemq
通过 spark-structured-streaming
暂无答案!
目前还没有任何答案,快来回答吧!