如何在Spark1.6集群上运行Spark2.1组装的spark应用程序?

eni9jsuy  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(444)

有人告诉我,只要我使用一个版本的spark,就可以构建一个spark应用程序 sbt assembly 为了构建它,我可以在任何spark集群上使用spark submit运行它。
所以,我用spark 2.1.1构建了一个简单的应用程序。你可以在下面看到我的build.sbt文件。而不是在我的集群上开始:

cd spark-1.6.0-bin-hadoop2.6/bin/    
spark-submit --class  App --master local[*] /home/oracle/spark_test/db-synchronizer.jar

如你所见,我正在用spark 1.6.0执行它。
我得到了一个错误:

17/06/08 06:59:20 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-4] shutting down ActorSystem [sparkDriver]
java.lang.NoSuchMethodError: org.apache.spark.SparkConf.getTimeAsMs(Ljava/lang/String;Ljava/lang/String;)J
        at org.apache.spark.streaming.kafka010.KafkaRDD.<init>(KafkaRDD.scala:70)
        at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:219)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
        at scala.Option.orElse(Option.scala:257)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
        at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
        at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:243)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:241)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:241)
        at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:177)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:86)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
17/06/08 06:59:20 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@ac5b61d,BlockManagerId(<driver>, localhost, 26012))] in 1 attempts
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
        at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
17/06/08 06:59:23 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@ac5b61d,BlockManagerId(<driver>, localhost, 26012))] in 2 attempts
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
        at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
17/06/08 06:59:26 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@ac5b61d,BlockManagerId(<driver>, localhost, 26012))] in 3 attempts
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
        at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
17/06/08 06:59:29 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@ac5b61d,BlockManagerId(<driver>, localhost, 26012))]
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
Caused by: akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
        at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
        ... 1 more
17/06/08 06:59:39 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@5e4d0345,BlockManagerId(<driver>, localhost, 26012))] in 1 attempts
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
        at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
17/06/08 06:59:42 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@5e4d0345,BlockManagerId(<driver>, localhost, 26012))] in 2 attempts
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
        at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
17/06/08 06:59:45 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@5e4d0345,BlockManagerId(<driver>, localhost, 26012))] in 3 attempts
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
        at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
17/06/08 06:59:48 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@5e4d0345,BlockManagerId(<driver>, localhost, 26012))]
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
Caused by: akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
        at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
        ... 1 more

根据一些阅读,我看到了一个典型的错误: java.lang.NoSuchMethodError 连接到不同版本的spark。这可能是真的,因为我用的是不同的。但不应该 sbt assembly 盖上那个?请参见下面的by build.sbt和assembly.sbt文件
构建.sbt

name := "spark-db-synchronizator"

//Versions
version := "1.0.0"
scalaVersion := "2.10.6"
val sparkVersion = "2.1.1"
val sl4jVersion = "1.7.10"
val log4jVersion = "1.2.17"
val scalaTestVersion = "2.2.6"
val scalaLoggingVersion = "3.5.0"
val sparkTestingBaseVersion = "1.6.1_0.3.3"
val jodaTimeVersion = "2.9.6"
val jodaConvertVersion = "1.8.1"
val jsonAssertVersion = "1.2.3"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-hive" % sparkVersion,
  "org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion,
  "org.apache.spark" %% "spark-streaming" % sparkVersion,
  "org.slf4j" % "slf4j-api" % sl4jVersion,
  "org.slf4j" % "slf4j-log4j12" % sl4jVersion exclude("log4j", "log4j"),
  "log4j" % "log4j" % log4jVersion % "provided",
  "org.joda" % "joda-convert" % jodaConvertVersion,
  "joda-time" % "joda-time" % jodaTimeVersion,
  "org.scalatest" %% "scalatest" % scalaTestVersion % "test",
  "com.holdenkarau" %% "spark-testing-base" % sparkTestingBaseVersion % "test",
  "org.skyscreamer" % "jsonassert" % jsonAssertVersion % "test"
)

assemblyJarName in assembly := "db-synchronizer.jar"

run in Compile := Defaults.runTask(fullClasspath in Compile, mainClass in(Compile, run), runner in(Compile, run))
runMain in Compile := Defaults.runMainTask(fullClasspath in Compile, runner in(Compile, run))

assemblyMergeStrategy in assembly := {
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case x => MergeStrategy.first
}

// Spark does not support parallel tests and requires JVM fork
parallelExecution in Test := false

fork in Test := true
javaOptions in Test ++= Seq("-Xms512M", "-Xmx2048M", "-XX:MaxPermSize=2048M", "-XX:+CMSClassUnloadingEnabled")

装配.sbt

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
5m1hhzi4

5m1hhzi41#

您是正确的,可以在一些spark1.6环境(如hadoopyarn,cdh或hdp)上绑定spark2.1.1库来运行spark应用程序。
这种技巧通常在大型公司中使用,因为cdh(yarn)或hdp(yarn)不支持基础设施团队,所以开发团队只能强制使用一些较旧的spark版本。
你应该使用 spark-submit 从较新的spark安装(我建议使用最新的和最好的2.1.1作为本文的撰写),并捆绑所有spark jar作为spark应用程序的一部分。
只是 sbt assembly 使用spark 2.1.1的spark应用程序(如中所述 build.sbt )以及 spark-submit uberjar将使用与spark2.1.1版本完全相同的版本来支持较旧的spark环境。
事实上,hadoopyarn并没有使spark比任何其他应用程序库或框架更好。它很不愿意特别注意spark。
但是,这需要一个集群环境(刚刚检查过,当您的spark应用程序使用spark 2.1.1时,它不能与spark standalone 1.6一起工作)。
在您的情况下,当您使用 local[*] 主url,它不应该工作。

cd spark-1.6.0-bin-hadoop2.6/bin/    
spark-submit --class  App --master local[*] /home/oracle/spark_test/db-synchronizer.jar

原因有两个: local[*] 受到类路径的限制,试图说服spark 1.6.0在同一jvm上运行spark 2.1.1可能需要相当长的时间(如果可能的话)
您可以使用较旧的版本来运行较新的2.1.1。反之亦然。
使用hadoopYarn…嗯…它不注意Spark,已经在我的项目中测试过几次了。
我一直在想,我怎么知道运行时使用的是哪个版本的i.e.spark-core
使用WebUI,您应该可以在左上角看到版本。

您还应该查阅webui的environment选项卡,在那里可以找到运行时环境的配置。这是有关spark应用程序宿主环境的最权威的来源。

在靠近底部的地方,您应该可以看到类路径条目,这些条目将为您提供包含jar、文件和类的类路径。

使用它来查找任何与类路径相关的问题。

相关问题