无法在docker上的spark cluster上提交spark作业

mbyulnm0  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(446)

正如标题所预期的,我在向运行在docker上的spark集群提交spark作业时遇到了一些问题。
我在scala中编写了一个非常简单的spark作业,订阅kafka服务器,整理一些数据并将其存储在elastichsearch数据库中。Kafka和elasticsearch已经在docker中运行了。
如果我在我的开发环境(windows/intellij)中从ide运行spark作业,一切都会很好地工作。
然后(我根本不是java爱好者),我按照以下说明添加了一个spark集群:https://github.com/big-data-europe/docker-spark
当查看 Jmeter 板时,集群看起来很健康。我创建了一个由一个主进程和一个工作进程组成的集群。
这是我用scala写的工作:

import java.io.Serializable

import org.apache.commons.codec.StringDecoder
import org.apache.hadoop.fs.LocalFileSystem
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark
import org.apache.spark.SparkConf
import org.elasticsearch.spark._
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.util.parsing.json.JSON

object KafkaConsumer {
  def main(args: Array[String]): Unit = {

    val sc = new SparkConf()
      .setMaster("local[*]")
      .setAppName("Elastic Search Indexer App")

    sc.set("es.index.auto.create", "true")

    val elasticResource = "iot/demo"
    val ssc = new StreamingContext(sc, Seconds(10))

    //ssc.checkpoint("./checkpoint")

    val kafkaParams = Map(
      "bootstrap.servers" -> "kafka:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "auto.offset.reset" -> "earliest",
      "group.id" -> "group0"
    )

    val topics = List("test")
    val stream = KafkaUtils.createDirectStream(
      ssc,
      PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics.distinct, kafkaParams)
    )

    case class message(key: String, timestamp: Long, payload: Object)
    val rdds = stream.map(record => message(record.key, record.timestamp, record.value))

    val es_config: scala.collection.mutable.Map[String, String] =
      scala.collection.mutable.Map(
        "pushdown" -> "true",
        "es.nodes" -> "http://docker-host",
        "es.nodes.wan.only" -> "true",
        "es.resource" -> elasticResource,
        "es.ingest.pipeline" -> "iot-test-pipeline"
      )

    rdds.foreachRDD { rdd =>
      rdd.saveToEs(es_config)
      rdd.collect().foreach(println)
    }

    ssc.start()
    ssc.awaitTermination()
  }
}

要提交给集群,我做了:
使用“sbt assembly”插件,我创建了一个包含所有依赖项的胖jar文件。
在build.sbt中定义程序集策略,以避免合并时出现重复数据消除错误。。。
然后提交:
./spark-submit.cmd--kafkaconsumer类--masterspark://docker-host:7077/c/users/shams/documents/appunti/iot demo app/spark streaming/target/scala-2.11/spark-streaming-assembly-1.0.jar
但我有个错误:
19/02/27 11:18:12警告nativecodeloader:无法为您的平台加载本机hadoop库。。。在线程“main”java.io.ioexception:no filesystem for scheme:c at org.apache.hadoop.fs.filesystem.getfilesystemclass(filesystem)中使用内置java类(如果适用)。java:2660)在org.apache.hadoop.fs.filesystem.createfilesystem(filesystem。java:2667)在org.apache.hadoop.fs.filesystem.access$200(文件系统)。java:94)在org.apache.hadoop.fs.filesystem$cache.getinternal(文件系统)。java:2703)在org.apache.hadoop.fs.filesystem$cache.get(filesystem。java:2685)在org.apache.hadoop.fs.filesystem.get(filesystem。java:373)在org.apache.spark.util.utils$.gethadoopfilesystem(utils。scala:1897)在org.apache.spark.util.utils$.dofetchfile(utils。scala:694)在org.apache.spark.deploy.dependencyutils$.downloadfile(dependencyutils。scala:135)在org.apache.spark.deploy.sparksubmit$$anonfun$dopreparesubmitenvironment$7.apply(sparksubmit。scala:416)在org.apache.spark.deploy.sparksubmit$$anonfun$dopreparesubmitenvironment$7.apply(sparksubmit。scala:416)在scala.option.map(option。scala:146)在org.apache.spark.deploy.sparksubmit$.doprepareResubmitenvironment(sparksubmit。scala:415)在org.apache.spark.deploy.sparksubmit$.preparesubmitenvironment(sparksubmit。scala:250)在org.apache.spark.deploy.sparksubmit$.submit(sparksubmit。scala:171)位于org.apache.spark.deploy.sparksubmit$.main(sparksubmit.com)。scala:137)在org.apache.spark.deploy.sparksubmit.main(sparksubmit.scala)
经过一天的努力,我还没有解决,我无法理解在我的工作中,我想访问某一卷似乎是说的错误
是否与警告消息相关?那么,我应该如何编辑我的脚本来避免这个问题呢?
提前谢谢。
更新:
这个问题似乎与我的代码无关,因为我试图提交一个简单的helloworld应用程序,以同样的方式编译,但我有同样的问题。

bnlyeluc

bnlyeluc1#

经过多次尝试和研究,我得出的结论是,问题可能是我正在使用windows版本的spark submit从我的pc提交作业。
我不能完全理解,但是现在,将文件直接移动到主节点和工作节点,我可以从那里提交它。
容器上的第一份副本:

docker cp spark-streaming-assembly-1.0.jar 21b43cb2e698:/spark/bin

然后我执行(在/spark/bin文件夹中):

./spark-submit --class KafkaConsumer --deploy-mode cluster --master spark://spark-master:7077 spark-streaming-assembly-1.0.jar

这就是我目前找到的解决方法。

相关问题