cassandra 多个应用程序正在提交到spark Cluster并保持等待,然后退出并显示错误

enxuqcxy  于 2022-11-05  发布在  Cassandra
关注(0)|答案(1)|浏览(123)

我正在使用以下命令向群集提交spark应用程序

/root/spark/bin/spark-submit --conf spark.driver.momory=10g --class com.knoldus.SampleApp /pathToJar/Application.jar

但现在的情况是:提交了多个应用程序,其中一个正在运行,其他所有应用程序都在等待,然后一段时间后,代码退出并出现异常。
Spark用户界面如下所示:

在此之后,代码退出并显示以下错误:

8.149.243): java.io.IOException: Failed to write statements to keyspace.tableName.
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:167)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:135)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:140)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110)
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:135)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

16/12/14 06:26:28 WARN TaskSetManager: Lost task 0.1 in stage 2.0 (TID 561, 10.178.149.243): java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:347)
    at scala.None$.get(Option.scala:345)
    at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)
    at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:644)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:281)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

16/12/14 06:26:28 ERROR TaskSetManager: Task 0 in stage 2.0 failed 4 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 563, 10.178.149.225): java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:347)
    at scala.None$.get(Option.scala:345)
    at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)
    at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:644)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:281)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1904)
    at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:37)
    at com.knoldus.xml.RNF2Driver$.main(RNFIngestPipeline.scala:56)
    at com.knoldus.xml.RNF2Driver.main(RNFIngestPipeline.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:347)
    at scala.None$.get(Option.scala:345)
    at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)
    at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:644)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:281)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

我的Spark-conf是:

private val conf = new SparkConf()
    .setAppName("SampleApp")
    .setMaster(sparkClusterIP)
    .set("spark.sql.shuffle.partitions", "8")
    .set("spark.cassandra.connection.host", cassandraIP)
    .set("spark.sql.crossJoin.enabled", "true")
    .set("spark.kryoserializer.buffer.max", "640m")
    .set("spark.executor.memory", "10g")
    .set("spark.executor.cores", "3")
    .set("spark.cassandra.output.batch.size.rows", "10")
    .set("spark.cassandra.output.batch.size.bytes", "20480")

这是我的示例代码。任何人都可以让我知道问题是什么:

val cassandraIDs = sc.cassandraTable[A](keySpace,tableName).map(_.filename.split("/").last.split("\\.")(0).toLong).collect()
    val broadCastList = sc.broadcast(cassandraIDs)
    val files = sc.wholeTextFiles(hdFSIP).map(_._1).filter { path =>
      val listOfCassandraID = broadCastList.value
      !listOfCassandraID.contains(path.split("/").last.split("\\.")(0).toLong)
    }.take(100)

    import sqlContext.implicits._
    val fileNameRDD = sc.parallelize(files)

    val cassandraRdd = fileNameRDD.map { path =>
     ...

    //do some task

    }.toDF(columnNames)

    cassandraRdd.saveToCassandra(keySpace,tablename)
    println(s"Completed Processing of $numOfDocs in ${System.currentTimeMillis() - start} milliseconds")

  sc.stop()
yeotifhr

yeotifhr1#

为什么提交多个应用程序?

因为您在驱动程序代码中提交了多个spark作业。下面的每个语句都将在当前spark上下文中触发一个新作业,
如果您的数据库中有一个数据库,那么您可以使用
sc.wholeTextFiles
sc.并行化(文件)
cassandra读取.saveToCassandra(关键字空间,表名称)
.toDF(数据行名称)
sc.broadcast
(Not最后两个肯定)
我没有通过spark和cassandra一起工作过。但是,上面的代码没有利用spark的能力。你应该尽可能多地将任务流水线化,这样spark就可以以分布式的方式计划和运行任务。

范例:

在您的代码中,通过调用take(100)创建files,然后使用sc.parallelize(files)创建一个名为fileNameRDD的RDD。
这会触发两个spark作业,一个获取100个项目,另一个使用这100个项目创建一个新的RDD。
相反,您应该将这两个任务结合起来,以便它们可以由spark流水线化。

sc.wholeTextFiles(hdFSIP).map(_._1).filter { path =>
  val listOfCassandraID = broadCastList.value
  !listOfCassandraID.contains(path.split("/").last.split("\\.")(0).toLong)

}.map { path =>    /*<---Combine the two tasks like this*/
 ...

//do some task

}.toDF(columnNames)
  • 注意:* 我已经跳过了take(100)部分,但您应该能够使用过滤器来完成该操作
    为什么会出现NoSuchElementException: None.get错误

这更有可能是因为scala版本不匹配您的代码和用于构建spark的scala版本不匹配。

相关问题