spark版本:3.0.1 scala版本:2.12.10
因为我不生活在讲英语的世界里,如果语法不正确,请理解。
我是scala和spark的初学者。
我想绘制数据分布的直方图,如下图所示。
所以,我想创建一个数据集,将数据分成10个部分,然后计算开始范围、结束范围和数字。
但是,以下代码中出现错误,无法处理该操作。
(1) sparksession连接代码
object SparkResource {
implicit val oSparkOrigin = SparkSession.builder()
.master("spark://centos-master:7077")
// .master("local[*]")
.appName("spark-api")
.getOrCreate()
implicit val oSpark = SparkSession.active
}
(2) 使用rdd函数的代码(出错代码)
def handleDataFromHDFS(
pSpark: SparkSession,
pFilename: String,
pFiletype: String,
pFileseq: String,
pUserno: String,
pGroupId: String,
pJobId: String)(implicit mc: MarkerContext): Future[ResponseResource] = {
require(pFiletype != null, "filetype must be not null.")
require(pFileseq != null, "pFileseq must be not null.")
apiService.serviceDataFromHDFS(pSpark, pFilename, pFiletype, pFileseq, pUserno).map { sData =>
// pSpark == SparkResource.oSpark (SparkSession)
// sData == Dataset[Row]
// table name
val sName = pUserno + "_" + pGroupId + "_" + pJobId
// Temp View Create
sData.createOrReplaceTempView(sName)
// cache
pSpark.sqlContext.cacheTable(sName)
sData.show()
// +----------+------+----------+----------+----------+----------+---------+
// | date|symbol| open| close| low| high| volume|
// +----------+------+----------+----------+----------+----------+---------+
// |2016-01-05| WLTW| 123.43|125.839996|122.309998| 126.25|2163600.0|
// |2016-01-06| WLTW|125.239998|119.980003|119.940002|125.540001|2386400.0|
// |2016-01-07| WLTW|116.379997|114.949997| 114.93|119.739998|2489500.0|
// |2016-01-08| WLTW|115.480003|116.620003| 113.5|117.440002|2006300.0|
// |2016-01-11| WLTW|117.010002|114.970001|114.089996|117.330002|1408600.0|
// |2016-01-12| WLTW|115.510002|115.550003| 114.5|116.059998|1098000.0|
// |2016-01-13| WLTW|116.459999|112.849998|112.589996| 117.07| 949600.0|
// |2016-01-14| WLTW|113.510002|114.379997|110.050003|115.029999| 785300.0|
// |2016-01-15| WLTW|113.330002|112.529999|111.919998|114.879997|1093700.0|
// |2016-01-19| WLTW|113.660004|110.379997|109.870003|115.870003|1523500.0|
// |2016-01-20| WLTW|109.059998|109.300003| 108.32|111.599998|1653900.0|
// |2016-01-21| WLTW|109.730003| 110.0| 108.32|110.580002| 944300.0|
// |2016-01-22| WLTW|111.879997|111.949997|110.190002|112.949997| 744900.0|
// |2016-01-25| WLTW| 111.32|110.120003| 110.0|114.629997| 703800.0|
// |2016-01-26| WLTW|110.419998| 111.0|107.300003|111.400002| 563100.0|
// |2016-01-27| WLTW|110.769997|110.709999|109.019997| 112.57| 896100.0|
// |2016-01-28| WLTW|110.900002|112.580002|109.900002|112.970001| 680400.0|
// |2016-01-29| WLTW|113.349998|114.470001|111.669998|114.589996| 749900.0|
// |2016-02-01| WLTW| 114.0| 114.5|112.900002|114.849998| 574200.0|
// |2016-02-02| WLTW| 113.25|110.559998| 109.75|113.860001| 694800.0|
// +----------+------+----------+----------+----------+----------+---------+
import org.apache.spark.sql.functions.{col, column, expr}
import pSpark.implicits._
var sMinMax_df = sData.agg(max($"open"), min($"open")).head()
var sMaxValue = sMinMax_df(0).toString
var sMinValue = sMinMax_df(1).toString
println("=sMaxValue=")
println(sMaxValue)
println("=sMinValue=")
println(sMinValue)
val thresholds: Array[Double] = (((sMinValue.toDouble until sMaxValue.toDouble by (sMaxValue.toDouble - sMinValue.toDouble)/10).toArray ++ Array(sMaxValue.toDouble)).map(_.toDouble))
thresholds.foreach(x => println(x))
// 1.66
// 159.9379941
// 318.2159882
// 476.4939823
// 634.7719764
// 793.0499705
// 951.3279646
// 1109.6059587000002
// 1267.8839528
// 1426.1619469
// 1584.439941
// // Convert DataFrame to RDD and calculate histogram values
// error-occurring code here
val _tmpHist = sData
.select($"open" cast "double")
.rdd.map(r => r.getDouble(0))
.histogram(thresholds)
// // Result DataFrame contains `from`, `to` range and the `value`.
val histogram = pSpark.sparkContext.parallelize((thresholds, thresholds.tail, _tmpHist).zipped.toList).toDF("from", "to", "value")
// histogram.show()
// +------------------+------------------+------+
// | from| to| value|
// +------------------+------------------+------+
// | 1.66| 159.9379941|811608|
// | 159.9379941| 318.2159882| 28881|
// | 318.2159882| 476.4939823| 4959|
// | 476.4939823| 634.7719764| 2883|
// | 634.7719764| 793.0499705| 1834|
// | 793.0499705| 951.3279646| 257|
// | 951.3279646|1109.6059587000002| 120|
// |1109.6059587000002| 1267.8839528| 396|
// | 1267.8839528| 1426.1619469| 237|
// | 1426.1619469| 1584.439941| 89|
// +------------------+------------------+------+
ResponseResource("select", Json.toJson(sData.limit(20).toJSON.collect()), Json.parse(sData.schema.json)("fields"), sName, sData.count(), 0)
}
}
(3) sbt公司
name := """spark-api-test"""
organization := "com.baeldung"
version := "1.0-SNAPSHOT"
lazy val root = (project in file(".")).enablePlugins(PlayScala)
scalaVersion := "2.12.10"
resolvers += "jitpack" at "https://jitpack.io"
libraryDependencies += guice
libraryDependencies += "org.scalatestplus.play" %% "scalatestplus-play" % "5.0.0" % Test
libraryDependencies += "mysql" % "mysql-connector-java" % "5.1.41"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.1"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.0.1"
libraryDependencies += "org.apache.spark" %% "spark-mllib" % "3.0.1"
libraryDependencies += "org.joda" % "joda-convert" % "2.2.1"
libraryDependencies += "net.logstash.logback" % "logstash-logback-encoder" % "6.2"
libraryDependencies += "io.lemonlabs" %% "scala-uri" % "1.5.1"
libraryDependencies += "net.codingwell" %% "scala-guice" % "4.2.6"
libraryDependencies += "com.crealytics" %% "spark-excel" % "0.13.6"
libraryDependencies += "com.github.shin285" % "KOMORAN" % "3.3.4"
// hdfs save lib
libraryDependencies += "com.github.mrpowers" %% "spark-daria" % "1.0.0"
// https://mvnrepository.com/artifact/org.scala-lang/scala-compiler
libraryDependencies += "org.scala-lang" % "scala-compiler" % "2.12.10"
addCompilerPlugin("com.github.aoiroaoino" %% "totuple" % "0.1.2")
enablePlugins(JavaAppPackaging)
(4) 错误发生代码:在错误中添加.histogram。
val _tmpHist = sData
.select($"open" cast "double")
.rdd.map(r => r.getDouble(0))
.histogram(thresholds)
(5) 例外
Job aborted due to stage failure: Task 0 in stage 5.0 failed 4 times, most recent failure: Lost task 0.3 in stage 5.0 (TID 12, 192.168.0.220, executor 0): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2410)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1666)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2404)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1666)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:502)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:460)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2295)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1666)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2404)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1666)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2404)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1666)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:502)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:460)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
如果在sparksession中将master设置为local[*],则其工作正常。
implicit val oSparkOrigin = SparkSession.builder()
.master("local[*]")
.appName("spark-api")
.getOrCreate()
但我必须用spark主机的url。
我为此挣扎了好几天。请告诉我一个解决办法。
暂无答案!
目前还没有任何答案,快来回答吧!