我已经使用init脚本覆盖了databricks log4j日志。当我的代码被触发时,它一直运行良好,直到某个时候。当它到达下面的行时:
val ds = df.as[MySDMData]
ds.map(a => func1(a)).write.format("delta").mode("overwrite").option("header","true").save(s"${Interimpath}/sdm_outer_java")
它将失败,并显示以下堆栈跟踪:
Caused by: Job aborted due to stage failure.
Caused by: NotSerializableException: org.apache.log4j.Logger
Serialization stack:
- object not serializable (class: org.apache.log4j.Logger, value: org.apache.log4j.Logger@33280b3f)
- field (class: $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw$$iw$$iw$$iw$$iw, name: logger, type: class org.apache.log4j.Logger)
- object (class $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw$$iw$$iw$$iw$$iw, $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw$$iw$$iw$$iw$$iw@78706dad)
- field (class: $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw$$iw$$iw$$iw, name: $iw, type: class $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw$$iw$$iw$$iw$$iw)
- object (class $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw$$iw$$iw$$iw, $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw$$iw$$iw$$iw@259eefbe)
- field (class: $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw$$iw$$iw, name: $iw, type: class $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw$$iw$$iw$$iw)
- object (class $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw$$iw$$iw, $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw$$iw$$iw@58203e28)
- field (class: $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw$$iw, name: $iw, type: class $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw$$iw$$iw)
- object (class $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw$$iw, $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw$$iw@621d2f16)
- field (class: $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw, name: $iw, type: class $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw$$iw)
- object (class $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw, $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw@49da2284)
- field (class: $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw, name: $iw, type: class $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw)
- object (class $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw, $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw@4f8ac0c9)
- field (class: $linef43f9ceebbd54e07ba09b7bf5984364029.$read, name: $iw, type: class $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw)
- object (class $linef43f9ceebbd54e07ba09b7bf5984364029.$read, $linef43f9ceebbd54e07ba09b7bf5984364029.$read@3fc7ede0)
- field (class: $linef43f9ceebbd54e07ba09b7bf5984364041.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, name: $linef43f9ceebbd54e07ba09b7bf5984364029$read, type: class $linef43f9ceebbd54e07ba09b7bf5984364029.$read)
- object (class $linef43f9ceebbd54e07ba09b7bf5984364041.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, $linef43f9ceebbd54e07ba09b7bf5984364041.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw@375936db)
- field (class: $linef43f9ceebbd54e07ba09b7bf5984364041.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, name: $outer, type: class $linef43f9ceebbd54e07ba09b7bf5984364041.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw)
- object (class $linef43f9ceebbd54e07ba09b7bf5984364041.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, $linef43f9ceebbd54e07ba09b7bf5984364041.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw@750dbdc3)
- field (class: $linef43f9ceebbd54e07ba09b7bf5984364041.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, name: $outer, type: class $linef43f9ceebbd54e07ba09b7bf5984364041.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw)
- object (class $linef43f9ceebbd54e07ba09b7bf5984364041.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, $linef43f9ceebbd54e07ba09b7bf5984364041.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw@4c9f0247)
- field (class: $linef43f9ceebbd54e07ba09b7bf5984364041.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, name: $outer, type: class $linef43f9ceebbd54e07ba09b7bf5984364041.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw)
- object (class $linef43f9ceebbd54e07ba09b7bf5984364041.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, $linef43f9ceebbd54e07ba09b7bf5984364041.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw@e81047)
- element of array (index: 4)
- array (class [Ljava.lang.Object;, size 7)
- element of array (index: 1)
- array (class [Ljava.lang.Object;, size 3)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.execution.WholeStageCodegenExec, functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/execution/WholeStageCodegenExec.$anonfun$doExecute$4$adapted:(Lorg/apache/spark/sql/catalyst/expressions/codegen/CodeAndComment;[Ljava/lang/Object;Lorg/apache/spark/sql/execution/metric/SQLMetric;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=3])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$6664/133918073, org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$6664/133918073@36b47765)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.rdd.RDD, functionalInterfaceMethod=scala/Function3.apply:(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/rdd/RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted:(Lscala/Function2;Lorg/apache/spark/TaskContext;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, instantiatedMethodType=(Lorg/apache/spark/TaskContext;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=1])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class org.apache.spark.rdd.RDD$$Lambda$6661/1112983908, org.apache.spark.rdd.RDD$$Lambda$6661/1112983908@63563080)
- field (class: org.apache.spark.rdd.MapPartitionsRDD, name: f, type: interface scala.Function3)
- object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[4613] at execute at DeltaInvariantCheckerExec.scala:85)
- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
- object (class scala.Tuple2, (MapPartitionsRDD[4613] at execute at DeltaInvariantCheckerExec.scala:85,org.apache.spark.sql.execution.datasources.FileFormatWriter$$$Lambda$7277/44213980@6853ff1b))
这是我的案例课
case class MyData(var case_id: String,
var mbr_facet_id: String,
var mbr_id: String,
....
var tpc_chg: String,
var icue_evi_flg: String)
我正在将我的case类与其他 Dataframe 进行Map。
val ds = df.as[MyData]
ds.map(a => func1(a)).write.mode("overwrite").option("header","true").parquet(s"${Interimpath}/cdf_cpm_interim3")
当它到达这一点时,我得到这个错误。
这个错误是因为Map功能吗?我该怎么解决这个问题?
新建编辑
我们有一个 Dataframe df,而MySDMData是一个具有一些参数的case类。
使用这个,我使数据类型在两个相同。ds.map(a => func1(a)).write.format("delta").mode("overwrite").option("header", "true").save(s"${Interimpath}/sdm_outer_java")
这里ds是数据集
然后在下面做ds.map(a => func1(a)).write.format("delta").mode("overwrite").option("header", "true").save(s"${Interimpath}/sdm_outer_java")
个
其中,func1是接受数据集(ds)作为参数的方法,执行一些逻辑运算并返回
def func1(ds: MySDMData): MySDMData = {
/*logical operation*/
val obj = MySDMData(ds.case_id,ds.mbr_facet_id,ds.mbr_id,....)
obj //return
}
1条答案
按热度按时间8ftvxx2r1#
您是否正在将
org.apache.log4j.Logger
的示例添加到您的某个类/case类中?请参见堆栈跟踪中的以下行:它试图序列化记录器,如果是这样,不要这样做,记录器不是要序列化并将其发送到其他地方的东西,记录器属于您代码的特定范围(它们只意味着特定于使用它们的地方)。