org.apache.spark.sparkexception:任务不可序列化(由org.apache.hadoop.conf.configuration引起)

92vpleto  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(446)

我想将转换后的流写入elasticsearch索引,如下所示:

transformed.foreachRDD(rdd => {
  if (!rdd.isEmpty()) {
    val messages = rdd.map(prepare)
    messages.saveAsNewAPIHadoopFile("-", classOf[NullWritable], classOf[MapWritable], classOf[EsOutputFormat], ec)
  }
})

线路 val messages = rdd.map(prepare) 抛出错误(见下文)。我一直在尝试不同的方法来解决这个问题(例如添加 @transient 旁边 val conf ),但似乎什么都不管用。
6/06/28 19:23:00错误jobscheduler:运行作业流作业146713480000 ms.0 org.apache.spark.sparkexception时出错:无法在org.apache.spark.util.closurecleaner$.ensureserializable(closurecleaner)上序列化任务。scala:304)在org.apache.spark.util.closurecleaner$.org$apache$spark$util$closurecleaner$$clean(closurecleaner)。scala:294)在org.apache.spark.util.closurecleaner$.clean(closurecleaner。scala:122)在org.apache.spark.sparkcontext.clean(sparkcontext。scala:2055)在org.apache.spark.rdd.rdd$$anonfun$map$1.apply(rdd。scala:324)在org.apache.spark.rdd.rdd$$anonfun$map$1.apply(rdd。scala:323)在org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope。scala:150)在org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope。scala:111)在org.apache.spark.rdd.rdd.withscope(rdd。scala:316)在org.apache.spark.rdd.rdd.map(rdd。scala:323)在de.kp.spark.elastic.stream.esstream$$anonfun$运行$1.apply(esstream。scala:77)在de.kp.spark.elastic.stream.esstream$$anonfun$运行$1.apply(esstream。scala:75)在org.apache.spark.streaming.dstream$$anonfun$foreachrdd$1$$anonfun$apply$mcv$sp$3.apply(dstream。scala:661)在org.apache.spark.streaming.dstream$$anonfun$foreachrdd$1$$anonfun$apply$mcv$sp$3.apply(dstream。scala:661)在org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1$$anonfun$apply$mcv$sp$1.apply$mcv$sp(foreachdstream)。scala:50)在org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1$$anonfun$apply$mcv$sp$1.apply(foreachdstream。scala:50)在org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1$$anonfun$apply$mcv$sp$1.apply(foreachdstream。scala:50)在org.apache.spark.streaming.dstream.dstream.createrddwithlocalproperties(dstream。scala:426)在org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1.apply$mcv$sp(foreachdstream)。scala:49)在org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1.apply(foreachdstream。scala:49)在org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1.apply(foreachdstream。scala:49)在scala.util.try$.apply(try。scala:161)在org.apache.spark.streaming.scheduler.job.run(作业。scala:39)在org.apache.spark.streaming.scheduler.jobscheduler$jobhandler$$anonfun$run$1.apply$mcv$sp(作业调度器)。scala:224)在org.apache.spark.streaming.scheduler.jobscheduler$jobhandler$$anonfun$run$1.apply(作业调度器)。scala:224)在org.apache.spark.streaming.scheduler.jobscheduler$jobhandler$$anonfun$run$1.apply(作业调度器)。scala:224)在scala.util.dynamicvariable.withvalue(dynamicvariable。scala:57)在org.apache.spark.streaming.scheduler.jobscheduler$jobhandler.run(jobscheduler。scala:223)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1142)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:617)在java.lang.thread.run(线程。java:745)引起原因:java.io.notserializableeexception:org.apache.hadoop.conf.configuration序列化堆栈:-对象不可序列化(类:org.apache.hadoop.conf.configuration,值:configuration:core-default.xml,core-site.xml,mapred-default.xml,mapred-site.xml,yarn-default.xml,yarn site.xml)-字段(类:de.kp.spark.elastic.stream.esstream,名称:de$kp$spark$elastic$stream$esstream$$conf,类型:class org.apache.hadoop.conf.conf)-对象(类:de.kp.spark.elastic.stream.esstream,de.kp.spark.elastic.stream)。esstream@6b156e9a)-字段(类:de.kp.spark.elastic.stream.esstream$$anonfun$run$1,名称:$outer,类型:class de.kp.spark.elastic.stream.esstream)-对象(class de.kp.spark.elastic.stream.esstream$$anonfun$run$1,)-字段(class:de.kp.spark.elastic.stream.esstream$$anonfun$run$1$$anonfun$2,名称:$outer,类型:class de.kp.spark.elastic.stream.esstream$$anonfun$run$1)-对象(class de.kp.spark.elastic.stream.esstream$$anonfun$run$1$$anonfun$2,)位于org.apache.spark.serializer.serializationdebugger$.improveexception(serializationdebugger)。scala:40)位于org.apache.spark.serializer.javaserializationstream.writeobject(javaserializer。scala:47)在org.apache.spark.serializer.javaserializerinstance.serialize(javaserializer。scala:101)在org.apache.spark.util.closurecleaner$.ensureserializable(closurecleaner。scala:301) ... 线程“main”org.apache.spark.sparkexception中还有30个异常:任务不能在org.apache.spark.util.closurecleaner$.ensureserializable(closurecleaner)上序列化。scala:304)在org.apache.spark.util.closurecleaner$.org$apache$spark$util$closurecleaner$$clean(closurecleaner)。scala:294)在org.apache.spark.util.closurecleaner$.clean(closurecleaner。scala:122)在org.apache.spark.sparkcontext.clean(sparkcontext。scala:2055)在org.apache.spark.rdd.rdd$$anonfun$map$1.apply(rdd。scala:324)在org.apache.spark.rdd.rdd$$anonfun$map$1.apply(rdd。scala:323)在org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope。scala:150)在org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope。scala:111)在org.apache.spark.rdd.rdd.withscope(rdd。scala:316)在org.apache.spark.rdd.rdd.map(rdd。scala:323)在de.kp.spark.elastic.stream.esstream$$anonfun$运行$1.apply(esstream。scala:77)在de.kp.spark.elastic.stream.esstream$$anonfun$运行$1.apply(esstream。scala:75)在org.apache.spark.streaming.dstream$$anonfun$foreachrdd$1$$anonfun$apply$mcv$sp$3.apply(dstream。scala:661)在org.apache.spark.streaming.dstream$$anonfun$foreachrdd$1$$anonfun$apply$mcv$sp$3.apply(dstream。scala:661)在org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1$$anonfun$apply$mcv$sp$1.apply$mcv$sp(foreachdstream)。scala:50)在org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1$$anonfun$apply$mcv$sp$1.apply(foreachdstream。scala:50)在org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1$$anonfun$apply$mcv$sp$1.apply(foreachdstream。scala:50)在org.apache.spark.streaming.dstream.dstream.createrddwithlocalproperties(dstream。scala:426)在org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1.apply$mcv$sp(foreachdstream)。scala:49)在org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1.apply(foreachdstream。scala:49)在org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1.apply(foreachdstream。scala:49)在scala.util.try$.apply(try。scala:161)在org.apache.spark.streaming.scheduler.job.run(作业。scala:39)在org.apache.spark.streaming.scheduler.jobscheduler$jobhandler$$anonfun$run$1.apply$mcv$sp(作业调度器)。scala:224)在org.apache.spark.streaming.scheduler.jobscheduler$jobhandler$$anonfun$run$1.apply(作业调度器)。scala:224)在org.apache.spark.streaming.scheduler.jobscheduler$jobhandler$$anonfun$run$1.apply(作业调度器)。scala:224)在scala.util.dynamicvariable.withvalue(dynamicvariable。scala:57)在org.apache.spark.streaming.scheduler.jobscheduler$jobhandler.run(jobscheduler。scala:223)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1142)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:617)在java.lang.thread.run(线程。java:745)引起原因:java.io.notserializableeexception:org.apache.hadoop.conf.configuration序列化堆栈:-对象不可序列化(类:org.apache.hadoop.conf.configuration,值:configuration:core-default.xml,core-site.xml,mapred-default.xml,mapred-site.xml,yarn-default.xml,yarn site.xml)-字段(类:de.kp.spark.elastic.stream.esstream,名称:de$kp$spark$elastic$stream$esstream$$conf,类型:class org.apache.hadoop.conf.conf)-对象(类:de.kp.spark.elastic.stream.esstream,de.kp.spark.elastic.stream)。esstream@6b156e9a)-字段(类:de.kp.spark.elastic.stream.esstream$$anonfun$run$1,名称:$outer,类型:class de.kp.spark.elastic.stream.esstream)-对象(class de.kp.spark.elastic.stream.esstream$$anonfun$run$1,)-字段(class:de.kp.spark.elastic.stream.esstream$$anonfun$run$1$$anonfun$2,名称:$outer,类型:class de.kp.spark.elastic.stream.esstream$$anonfun$run$1)-对象(class de.kp.spark.elastic.stream.esstream$$anonfun$run$1$$anonfun$2,)位于org.apache.spark.serializer.serializationdebugger$.improveexception(serializationdebugger)。scala:40)位于org.apache.spark.serializer.javaserializationstream.writeobject(javaserializer。scala:47)在org.apache.spark.serializer.javaserializerinstance.serialize(javaserializer。scala:101)在org.apache.spark.util.closurecleaner$.ensureserializable(closurecleaner。scala:301) ... 30多个
它是否与hadoop的配置有关(我指的是这个信息: class: org.apache.hadoop.conf.Configuration, value: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml )
更新:

class EsStream(name:String,conf:HConf) extends SparkBase with Serializable {

  /* Elasticsearch configuration */ 
  val ec = getEsConf(conf)               

  /* Kafka configuration */
  val (kc,topics) = getKafkaConf(conf)

  def run() {

    val ssc = createSSCLocal(name,conf)

    /*
     * The KafkaInputDStream returns a Tuple where only the second component
     * holds the respective message; we therefore reduce to a DStream[String]
     */
    val stream = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,kc,topics,StorageLevel.MEMORY_AND_DISK).map(_._2)
    /*
     * Inline transformation of the incoming stream by any function that maps 
     * a DStream[String] onto a DStream[String]
     */
    val transformed = transform(stream)
    /*
     * Write transformed stream to Elasticsearch index
     */
    transformed.foreachRDD(rdd => {
      if (!rdd.isEmpty()) {
        val messages = rdd.map(prepare)
        messages.saveAsNewAPIHadoopFile("-", classOf[NullWritable], classOf[MapWritable], classOf[EsOutputFormat], ec)
      }
    })

    ssc.start()
    ssc.awaitTermination()    

  }

  def transform(stream:DStream[String]) = stream

  private def getEsConf(config:HConf):HConf = {

    val _conf = new HConf()

    _conf.set("es.nodes", conf.get("es.nodes"))
    _conf.set("es.port", conf.get("es.port"))

    _conf.set("es.resource", conf.get("es.resource"))

    _conf

  }

  private def getKafkaConf(config:HConf):(Map[String,String],Map[String,Int]) = {

    val cfg = Map(
      "group.id" -> conf.get("kafka.group"),

      "zookeeper.connect" -> conf.get("kafka.zklist"),
      "zookeeper.connection.timeout.ms" -> conf.get("kafka.timeout")

    )

    val topics = conf.get("kafka.topics").split(",").map((_,conf.get("kafka.threads").toInt)).toMap   

    (cfg,topics)

  }

  private def prepare(message:String):(Object,Object) = {

    val m = JSON.parseFull(message) match {
      case Some(map) => map.asInstanceOf[Map[String,String]]
      case None => Map.empty[String,String]
    }

    val kw = NullWritable.get

    val vw = new MapWritable
    for ((k, v) <- m) vw.put(new Text(k), new Text(v))

    (kw, vw)

  }

}
fcg9iug3

fcg9iug31#

摆脱 conf:HConf 来自的类构造函数 EsStream 把它写成 class EsStream(name:String) .
接下来创建一个带有签名的方法: public def init(conf:HConf):Map(String,String) 在此方法中,您将读取所需的配置并进行更新 ec 以及 (kc,topics) 在这里。
在此之后,应该调用run方法。

相关问题