使用spark 2.0.2、kafka源和scalapb的结构化流媒体

xwbd5t1u  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(356)

我正在使用结构化流媒体(spark 2.0.2)来使用kafka消息。使用scalapb,在protobuf中生成消息。我得到以下错误。请帮忙。。
线程“main”scala.scalareflectionexception中的异常:不是scala.reflect.api.symbols$symbolapi$class.asterm(symbols)中的术语。scala:199)在scala.reflect.internal.symbols$symbolcontextapiimpl.asterm(symbols。scala:84)位于org.apache.spark.sql.catalyst.scalareflection$class.constructparams(scalareflection.com)。scala:811)在org.apache.spark.sql.catalyst.scalareflection$.constructparams(scalareflection。scala:39)在org.apache.spark.sql.catalyst.scalareflection$class.getconstructorparameters(scalareflection。scala:800)在org.apache.spark.sql.catalyst.scalareflection$.getconstructorparameters(scalareflection。scala:39)在org.apache.spark.sql.catalyst.scalareflection$.org$apache$spark$sql$catalyst$scalareflection$$serializer用于(scalareflection)。scala:582)位于org.apache.spark.sql.catalyst.scalareflection$.org$apache$spark$sql$catalyst$scalareflection$$serializer(scalareflection)。scala:460)在org.apache.spark.sql.catalyst.scalareflection$$anonfun$9.apply(scalareflection)。scala:592)在org.apache.spark.sql.catalyst.scalareflection$$anonfun$9.apply(scalareflection。scala:583)在scala.collection.traversablelike$$anonfun$flatmap$1.apply(traversablelike。scala:252)在scala.collection.traversablelike$$anonfun$flatmap$1.apply(traversablelike。scala:252)在scala.collection.immutable.list.foreach(list。scala:381)在scala.collection.traversablelike$class.flatmap(traversablelike。scala:252)在scala.collection.immutable.list.flatmap(list。scala:344)在org.apache.spark.sql.catalyst.scalareflection$.org$apache$spark$sql$catalyst$scalareflection$$serializer用于(scalareflection)。scala:583)位于org.apache.spark.sql.catalyst.scalareflection$.serializer(scalareflection.com)。scala:425)在org.apache.spark.sql.catalyst.encoders.expressionencoder$.apply(expressionencoder。scala:61)在org.apache.spark.sql.encoders$.product(编码器。scala:274)位于org.apache.spark.sql.sqlimplicits.newproductencoder(sqlimplicits。scala:47)在personconsumer$.main(personconsumer。scala:33)位于sun.reflect.nativemethodaccessorimpl.invoke0(本机方法)的personconsumer.main(personconsumer.scala)sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl。java:62)在sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl。java:43)在java.lang.reflect.method.invoke(方法。java:498)在com.intellij.rt.execution.application.appmain.main(appmain。java:147)
以下是我的代码。。。

object PersonConsumer {
  import org.apache.spark.rdd.RDD
  import com.trueaccord.scalapb.spark._
  import org.apache.spark.sql.{SQLContext, SparkSession}
  import com.example.protos.demo._

  def main(args : Array[String]) {

    def parseLine(s: String): Person =
      Person.parseFrom(
        org.apache.commons.codec.binary.Base64.decodeBase64(s))

    val spark = SparkSession.builder.
      master("local")
      .appName("spark session example")
      .getOrCreate()

    import spark.implicits._

    val ds1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load()

    val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String]

    val ds3 = ds2.map(str => parseLine(str)).createOrReplaceTempView("persons")

    val ds4 = spark.sqlContext.sql("select name from persons")

    val query = ds4.writeStream
      .outputMode("append")
      .format("console")
      .start()
    query.awaitTermination()
  }
}
nlejzf6q

nlejzf6q1#

在person类中,gender是一个枚举,这就是导致这个问题的原因。删除此字段后,它可以正常工作。下面是我从databricks的石雄(ryan)那里得到的答案。
问题是“可选性别=3;”。生成的类“gender”是一个trait,spark不知道如何创建trait,因此不支持它。您可以定义sql编码器支持的类,并将生成的类转换为中的新类 parseLine .

kmb7vmvb

kmb7vmvb2#

与…的连线 val ds3 应该是:

val ds3 = ds2.map(str => parseLine(str))

sqlContext.protoToDataFrame(ds3).registerTempTable("persons")

rdd在保存为临时表之前需要转换为Dataframe。

相关问题