scala—迭代sparkDataframe,并将每一行值存储在另一个类的变量中

7hiiyaii  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(376)

我想迭代sparkDataframe,并将每一行的值存储在一个类数据成员(全局变量)中。
代码:

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{
  StructType, StructField, StringType, IntegerType}
import org.apache.spark.sql.Row

object Main {
  class Input_Class {
    var name: String = "";
    var age: String = "";
    var gender: String = "";

    def setter(src: Row) {
      var row = src.toSeq
      var i = 0;
      name = (row(i)).toString;
      i += 1;
      age = (row(i)).toString;
      i += 1;
      gender = (row(i)).toString;
    }
  }
  class Manager extends Serializable{
    var inputObj = new Input_Class();
    def inputSetter(src: Row) = {
        inputObj.setter(src);
    }
  }

  def main(args: Array[String]) {
    val spark = SparkSession.builder.appName("App").config("spark.master", "local").getOrCreate()
    val df = spark.read.csv("data.csv ");
    var ManagerObj = new Manager();
    df.rdd.map(row => {
        ManagerObj.inputSetter(row)
    })
    spark.stop()
  }
}

我不确定这个代码是否正确。我是不是用错Map操作符了?正如错误所说,它是不可序列化的。请帮助我在这里我是一个新手,没有太多的经验,如果有一个更好的或另一种方式来实现我所做的请做推荐。
这是我得到的错误:

20/06/03 17:44:13 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
org.apache.spark.SparkException: Task not serializable                          
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
  at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:371)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
  at org.apache.spark.rdd.RDD.map(RDD.scala:370)
  at Main$.main(Untitled-2.scala:57)
  ... 51 elided
Caused by: java.io.NotSerializableException: Main$Manager
Serialization stack:
        - object not serializable (class: Main$Manager, value: Main$Manager@108f206f)
        - field (class: scala.runtime.ObjectRef, name: elem, type: class java.lang.Object)
        - object (class scala.runtime.ObjectRef, Main$Manager@108f206f)
        - element of array (index: 0)
        - array (class [Ljava.lang.Object;, size 1)
        - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
        - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class Main$, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic Main$.$anonfun$main$1$adapted:(Lscala/runtime/ObjectRef;Lorg/apache/spark/sql/Row;)Ljava/lang/Object;, instantiatedMethodType=(Lorg/apache/spark/sql/Row;)Ljava/lang/Object;, numCaptured=1])
        - writeReplace data (class: java.lang.invoke.SerializedLambda)
        - object (class Main$$$Lambda$2851/2090377899, Main$$$Lambda$2851/2090377899@7722c8e6)
  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
  ... 60 more

谢谢!

e0uiprwp

e0uiprwp1#

您正在使用 Manager 闭包中的类示例。拜托 extends Serializable 中的接口 Manager ```
class Manager extends Serializable {
var inputObj = new Input_Class();
def inputSetter(src: Row) = {
inputObj.setter(src);
}
}

相关问题