scala spark rdd错误:无法将java.lang.invoke.serializedlambda的示例分配给字段org.apache.spark.rdd.mappartitionsrdd]

wvyml7n5  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(405)

spark版本:3.0.1 scala版本:2.12.10
因为我不生活在讲英语的世界里,如果语法不正确,请理解。
我是scala和spark的初学者。
我想绘制数据分布的直方图,如下图所示。

所以,我想创建一个数据集,将数据分成10个部分,然后计算开始范围、结束范围和数字。
但是,以下代码中出现错误,无法处理该操作。
(1) sparksession连接代码

object SparkResource {

    implicit val oSparkOrigin = SparkSession.builder()
        .master("spark://centos-master:7077")
        // .master("local[*]")
        .appName("spark-api")
        .getOrCreate()

    implicit val oSpark = SparkSession.active
}

(2) 使用rdd函数的代码(出错代码)

def handleDataFromHDFS(
      pSpark: SparkSession, 
      pFilename: String, 
      pFiletype: String, 
      pFileseq: String, 
      pUserno: String,
      pGroupId: String, 
      pJobId: String)(implicit mc: MarkerContext): Future[ResponseResource] = {

    require(pFiletype != null, "filetype must be not null.")
    require(pFileseq != null, "pFileseq must be not null.")

    apiService.serviceDataFromHDFS(pSpark, pFilename, pFiletype, pFileseq, pUserno).map { sData =>

      // pSpark == SparkResource.oSpark (SparkSession)
      // sData == Dataset[Row]

      // table name
      val sName = pUserno + "_" + pGroupId + "_" + pJobId
      // Temp View Create
      sData.createOrReplaceTempView(sName)
      // cache
      pSpark.sqlContext.cacheTable(sName) 

      sData.show()

      // +----------+------+----------+----------+----------+----------+---------+
      // |      date|symbol|      open|     close|       low|      high|   volume|
      // +----------+------+----------+----------+----------+----------+---------+
      // |2016-01-05|  WLTW|    123.43|125.839996|122.309998|    126.25|2163600.0|
      // |2016-01-06|  WLTW|125.239998|119.980003|119.940002|125.540001|2386400.0|
      // |2016-01-07|  WLTW|116.379997|114.949997|    114.93|119.739998|2489500.0|
      // |2016-01-08|  WLTW|115.480003|116.620003|     113.5|117.440002|2006300.0|
      // |2016-01-11|  WLTW|117.010002|114.970001|114.089996|117.330002|1408600.0|
      // |2016-01-12|  WLTW|115.510002|115.550003|     114.5|116.059998|1098000.0|
      // |2016-01-13|  WLTW|116.459999|112.849998|112.589996|    117.07| 949600.0|
      // |2016-01-14|  WLTW|113.510002|114.379997|110.050003|115.029999| 785300.0|
      // |2016-01-15|  WLTW|113.330002|112.529999|111.919998|114.879997|1093700.0|
      // |2016-01-19|  WLTW|113.660004|110.379997|109.870003|115.870003|1523500.0|
      // |2016-01-20|  WLTW|109.059998|109.300003|    108.32|111.599998|1653900.0|
      // |2016-01-21|  WLTW|109.730003|     110.0|    108.32|110.580002| 944300.0|
      // |2016-01-22|  WLTW|111.879997|111.949997|110.190002|112.949997| 744900.0|
      // |2016-01-25|  WLTW|    111.32|110.120003|     110.0|114.629997| 703800.0|
      // |2016-01-26|  WLTW|110.419998|     111.0|107.300003|111.400002| 563100.0|
      // |2016-01-27|  WLTW|110.769997|110.709999|109.019997|    112.57| 896100.0|
      // |2016-01-28|  WLTW|110.900002|112.580002|109.900002|112.970001| 680400.0|
      // |2016-01-29|  WLTW|113.349998|114.470001|111.669998|114.589996| 749900.0|
      // |2016-02-01|  WLTW|     114.0|     114.5|112.900002|114.849998| 574200.0|
      // |2016-02-02|  WLTW|    113.25|110.559998|    109.75|113.860001| 694800.0|
      // +----------+------+----------+----------+----------+----------+---------+

      import org.apache.spark.sql.functions.{col, column, expr}
      import pSpark.implicits._

      var sMinMax_df = sData.agg(max($"open"), min($"open")).head()
      var sMaxValue = sMinMax_df(0).toString
      var sMinValue = sMinMax_df(1).toString

      println("=sMaxValue=")
      println(sMaxValue)
      println("=sMinValue=")
      println(sMinValue)

      val thresholds: Array[Double] = (((sMinValue.toDouble until sMaxValue.toDouble by (sMaxValue.toDouble - sMinValue.toDouble)/10).toArray ++ Array(sMaxValue.toDouble)).map(_.toDouble))
      thresholds.foreach(x => println(x))

      // 1.66
      // 159.9379941
      // 318.2159882
      // 476.4939823
      // 634.7719764
      // 793.0499705
      // 951.3279646
      // 1109.6059587000002
      // 1267.8839528
      // 1426.1619469
      // 1584.439941

      // // Convert DataFrame to RDD and calculate histogram values
      // error-occurring code here
      val _tmpHist = sData
          .select($"open" cast "double")
          .rdd.map(r => r.getDouble(0))
          .histogram(thresholds)

      // // Result DataFrame contains `from`, `to` range and the `value`.
      val histogram = pSpark.sparkContext.parallelize((thresholds, thresholds.tail, _tmpHist).zipped.toList).toDF("from", "to", "value")

      // histogram.show()
      // +------------------+------------------+------+
      // |              from|                to| value|
      // +------------------+------------------+------+
      // |              1.66|       159.9379941|811608|
      // |       159.9379941|       318.2159882| 28881|
      // |       318.2159882|       476.4939823|  4959|
      // |       476.4939823|       634.7719764|  2883|
      // |       634.7719764|       793.0499705|  1834|
      // |       793.0499705|       951.3279646|   257|
      // |       951.3279646|1109.6059587000002|   120|
      // |1109.6059587000002|      1267.8839528|   396|
      // |      1267.8839528|      1426.1619469|   237|
      // |      1426.1619469|       1584.439941|    89|
      // +------------------+------------------+------+

      ResponseResource("select", Json.toJson(sData.limit(20).toJSON.collect()),  Json.parse(sData.schema.json)("fields"), sName, sData.count(), 0)
    }
  }

(3) sbt公司

name := """spark-api-test"""
organization := "com.baeldung"

version := "1.0-SNAPSHOT"

lazy val root = (project in file(".")).enablePlugins(PlayScala)

scalaVersion := "2.12.10"

resolvers += "jitpack" at "https://jitpack.io"

libraryDependencies += guice

libraryDependencies += "org.scalatestplus.play" %% "scalatestplus-play" % "5.0.0" % Test
libraryDependencies += "mysql" % "mysql-connector-java" % "5.1.41"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.1"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.0.1"
libraryDependencies += "org.apache.spark" %% "spark-mllib" % "3.0.1"
libraryDependencies +=     "org.joda" % "joda-convert" % "2.2.1"
libraryDependencies +=     "net.logstash.logback" % "logstash-logback-encoder" % "6.2"
libraryDependencies +=     "io.lemonlabs" %% "scala-uri" % "1.5.1"
libraryDependencies +=     "net.codingwell" %% "scala-guice" % "4.2.6"
libraryDependencies +=  "com.crealytics" %% "spark-excel" % "0.13.6"
libraryDependencies += "com.github.shin285" % "KOMORAN" % "3.3.4"

// hdfs save lib
libraryDependencies += "com.github.mrpowers" %% "spark-daria" % "1.0.0"

// https://mvnrepository.com/artifact/org.scala-lang/scala-compiler
libraryDependencies += "org.scala-lang" % "scala-compiler" % "2.12.10"

addCompilerPlugin("com.github.aoiroaoino" %% "totuple" % "0.1.2")

enablePlugins(JavaAppPackaging)

(4) 错误发生代码:在错误中添加.histogram。

val _tmpHist = sData
          .select($"open" cast "double")
          .rdd.map(r => r.getDouble(0))
          .histogram(thresholds)

(5) 例外

Job aborted due to stage failure: Task 0 in stage 5.0 failed 4 times, most recent failure: Lost task 0.3 in stage 5.0 (TID 12, 192.168.0.220, executor 0): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301)
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2410)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1666)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2404)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1666)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:502)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:460)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2295)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1666)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2404)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1666)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2404)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1666)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:502)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:460)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:

如果在sparksession中将master设置为local[*],则其工作正常。

implicit val oSparkOrigin = SparkSession.builder()
        .master("local[*]")
        .appName("spark-api")
        .getOrCreate()

但我必须用spark主机的url。
我为此挣扎了好几天。请告诉我一个解决办法。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题