scala ApacheSpark-将CSV数据加载到数据集的通用方法

insrf1ej  于 2022-11-09  发布在  Scala
关注(0)|答案(1)|浏览(188)

我想编写具有三个输入参数的泛型方法:
1.文件路径--字符串
1.模式-?
1.案例类
因此,我的想法是编写如下方法:

def load_sms_ds(filePath: String, schemaInfo: ?, cc: ?) = {
  val ds = spark.read
    .format("csv")
    .option("header", "true")
    .schema(?)
    .option("delimiter",",")
    .option("dateFormat", "yyyy-MM-dd HH:mm:ss.SSS")
    .load(schemaInfo)
    .as[?]

   ds
}

并根据输入参数返回数据集。但我不确定参数schemaInfo和cc应该是什么类型?

ryevplcw

ryevplcw1#

首先,我要重新推荐阅读spark sql programming guide。其中包含了一些我认为在您学习Spark时通常会对您有所帮助的内容。
让我们使用Case类来定义模式,来完成读入CSV文件的过程。
首先,添加本例所需的各种导入:

import java.io.{File, PrintWriter} // for reading / writing the example data

import org.apache.spark.sql.types.{StringType, StructField} // to define the schema
import org.apache.spark.sql.catalyst.ScalaReflection // used to generate the schema from a case class

import scala.reflect.runtime.universe.TypeTag // used to provide type information of the case class at runtime
import org.apache.spark.sql.Dataset, SparkSession}
import org.apache.spark.sql.Encoder // Used by spark to generate the schema

定义案例类,可以在以下位置找到可用的不同类型:

case class Example(
    stringField : String,
    intField : Int,
    doubleField : Double
)

添加用于提取给定案例类类型作为参数的模式(StructType)的方法:

// T : TypeTag means that an implicit value of type TypeTag[T] must be available at the method call site. Scala will automatically generate this for you. See [here][3] for further details. 
def schemaOf[T: TypeTag]: StructType = { 
    ScalaReflection
        .schemaFor[T] // this method requires a TypeTag for T
        .dataType
        .asInstanceOf[StructType] // cast it to a StructType, what spark requires as its Schema
}

定义从具有使用Case类定义的架构的路径读取CSV文件的方法:

// The implicit Encoder is needed by the `.at` method in order to create the Dataset[T]. The TypeTag is required by the schemaOf[T] call.
def readCSV[T : Encoder : TypeTag](
    filePath: String
)(implicit spark : SparkSession) : Dataset[T]= {
    spark.read
        .option("header", "true")
        .option("dateFormat", "yyyy-MM-dd HH:mm:ss.SSS")
        .schema(schemaOf[T])
        .csv(filePath) // spark provides this more explicit call to read from a csv file by default it uses comma and the separator but this can be changes.
        .as[T]
}

创建SpakSession:

implicit val spark = SparkSession.builder().master("local").getOrCreate()

将一些样本数据写入临时文件:

val data =
    s"""|stringField,intField,doubleField
        |hello,1,1.0
        |world,2,2.0
    |""".stripMargin
val file = File.createTempFile("test",".csv")
val pw = new PrintWriter(file)
pw.write(data)
pw.close()

调用此方法的示例:

import spark.implicits._ // so that an implicit Encoder gets pulled in for the case class
val df = readCSV[Example](file.getPath)
df.show()

相关问题