scala在类型化数据集上进行模式匹配

nnsrf1az  于 2021-05-29  发布在  Spark
关注(0)|答案(4)|浏览(419)

我尝试根据spark数据集的类型应用不同类型的逻辑。取决于传递给的case类的类型 doWork ( Customer 或者 Worker )我必须应用不同类型的聚合。我该怎么做?

import org.apache.spark.sql.{Dataset, SparkSession}

object SparkSql extends App {
  import spark.implicits._

  val spark = SparkSession
    .builder()
    .appName("Simple app")
    .config("spark.master", "local")
    .getOrCreate()

  sealed trait Person {
    def name: String
  }

  final case class Customer(override val name: String, email: String)                extends Person
  final case class Worker(override val name: String, id: Int, skills: Array[String]) extends Person

  val workers: Dataset[Worker] = Seq(
    Worker("Bob", id = 1, skills = Array("communication", "teamwork")),
    Worker("Sam", id = 1, skills = Array("self-motivation"))
  ).toDS

  def doWork(persons: Dataset[Person]): Unit = {
    persons match {
      case ... // Dataset[Customer] ... do something
      case ... // Dataset[Worker] ... do something else
    }
  }

}
wooyq4lh

wooyq4lh1#

使用case类可以进行模式匹配。case类是scala的方法,它允许在对象上进行模式匹配,而不需要大量的样板文件。通常,您所需要做的就是为每个希望模式匹配的类添加一个case关键字。
例如:

abstract class Expr
case class Var(name: String) extends Expr
case class Number(num: Double) extends Expr
case class UnOp(operator: String, arg: Expr) extends Expr
case class BinOp(operator: String,left: Expr, right: Expr) extends Expr

def simplifyTop(expr: Expr): Expr = expr match {
  case UnOp("",UnOp("",e)) => e // Double negation
  case BinOp("+", e, Number(0)) => e // Adding zero
  case BinOp("*", e, Number(1)) => e // Multiplying by one
  case _ => expr
}

以你为例,我会试试这个

def doWork(persons: Person): Unit = {
    persons match {
      case Customer => ... do something
      case Worker ... do something else
    }
  }

dataset.map(doWork)
mbskvtky

mbskvtky2#

修改方法以接受 [T <:parent] 然后从中提取bean类名 Dataset.javaRdd 如下所示

import org.apache.spark.sql.Dataset

object InheritDataframe {

  private def matcherDef[T <:parent](dfb: Dataset[T]): Unit = {

    dfb.toJavaRDD.classTag.toString() match {
      case "child1" =>  println("child1")
      case "child2" => println("child2")
      case _ => println("Unkown")
    }

  }

  def main(args: Array[String]): Unit = {

    val spark = Constant.getSparkSess

    import spark.implicits._

    val dfB  = List(child1(1)).toDS()
    val dfC  = List(child2(1)).toDS()

    matcherDef(dfB)
    matcherDef(dfC)

  }

}

case class child1(i: Int) extends parent(i)

case class child2(i: Int) extends parent(i)

class parent(j: Int)
llew8vvj

llew8vvj3#

试试这个-

sealed trait Person {
  def name: String
}

final case class Customer(override val name: String, email: String)                extends Person
final case class Worker(override val name: String, id: Int, skills: Array[String]) extends Person

测试用例-

@Test
  def test62262873(): Unit = {

    val workers: Dataset[Worker] = Seq(
      Worker("Bob", id = 1, skills = Array("communication", "teamwork")),
      Worker("Sam", id = 2, skills = Array("self-motivation"))
    ).toDS

    import scala.reflect.runtime.universe._
    def doWork[T : TypeTag](persons: Dataset[T]): Unit = {
      typeOf[T] match {
        case t if t =:= typeOf[Worker] => println("I'm worker")
          persons.as[Worker].filter(_.id == 2).show(false)
        case t if t =:= typeOf[Customer] => println("I'm Customer")
          persons.as[Customer].filter(_.name.contains("B")).show(false)

      }
    }
    doWork(workers)

    /**
      * I'm worker
      * +----+---+-----------------+
      * |name|id |skills           |
      * +----+---+-----------------+
      * |Sam |2  |[self-motivation]|
      * +----+---+-----------------+
      */
  }
cngwdvgl

cngwdvgl4#

我找到了一个解决我自己的问题,但我想给予信贷萨默什瓦尔甘蓝的答案,因为它做什么是要求。在这个版本中,我使用隐式来创建转换器,我可以根据需要进行扩展。

import org.apache.spark.sql.{Dataset, SparkSession}

object TempProject extends App {
  import spark.implicits._

  val spark = SparkSession
    .builder()
    .appName("Simple app")
    .config("spark.master", "local")
    .getOrCreate()

  sealed trait Person {
    def name: String
  }
  final case class Customer(override val name: String, email: String)                extends Person
  final case class Worker(override val name: String, id: Int, skills: Array[String]) extends Person

  trait CustomDataProcessor[T] {
    def doSomethingCool(dataset: Dataset[T]): Dataset[T]
  }

  implicit object CustomerDataProcessor extends CustomDataProcessor[Customer] {

    override def doSomethingCool(dataset: Dataset[Customer]): Dataset[Customer] =
      dataset.filter(_.name.contains("B"))
  }

  implicit object WorkerDataProcessor extends CustomDataProcessor[Worker] {

    override def doSomethingCool(dataset: Dataset[Worker]): Dataset[Worker] =
      dataset.filter(_.id == 2)
  }

  def doWork[T](person: Dataset[T])(implicit processor: CustomDataProcessor[T]): Unit = {
    processor.doSomethingCool(person)
  }

  val workers: Dataset[Worker] = Seq(
    Worker("Bob", id = 1, skills = Array("communication", "teamwork")),
    Worker("Sam", id = 1, skills = Array("self-motivation"))
  ).toDS

  val customers: Dataset[Customer] = Seq(
    Customer("Bob", "bob@email"),
    Customer("Jack", "jack@email")
  ).toDS

  doWork(workers)
  doWork(customers)
}

相关问题