我正在使用结构化流媒体(spark 2.0.2)来使用kafka消息。使用scalapb,在protobuf中生成消息。我得到以下错误。请帮忙。。
线程“main”scala.scalareflectionexception中的异常:不是scala.reflect.api.symbols$symbolapi$class.asterm(symbols)中的术语。scala:199)在scala.reflect.internal.symbols$symbolcontextapiimpl.asterm(symbols。scala:84)位于org.apache.spark.sql.catalyst.scalareflection$class.constructparams(scalareflection.com)。scala:811)在org.apache.spark.sql.catalyst.scalareflection$.constructparams(scalareflection。scala:39)在org.apache.spark.sql.catalyst.scalareflection$class.getconstructorparameters(scalareflection。scala:800)在org.apache.spark.sql.catalyst.scalareflection$.getconstructorparameters(scalareflection。scala:39)在org.apache.spark.sql.catalyst.scalareflection$.org$apache$spark$sql$catalyst$scalareflection$$serializer用于(scalareflection)。scala:582)位于org.apache.spark.sql.catalyst.scalareflection$.org$apache$spark$sql$catalyst$scalareflection$$serializer(scalareflection)。scala:460)在org.apache.spark.sql.catalyst.scalareflection$$anonfun$9.apply(scalareflection)。scala:592)在org.apache.spark.sql.catalyst.scalareflection$$anonfun$9.apply(scalareflection。scala:583)在scala.collection.traversablelike$$anonfun$flatmap$1.apply(traversablelike。scala:252)在scala.collection.traversablelike$$anonfun$flatmap$1.apply(traversablelike。scala:252)在scala.collection.immutable.list.foreach(list。scala:381)在scala.collection.traversablelike$class.flatmap(traversablelike。scala:252)在scala.collection.immutable.list.flatmap(list。scala:344)在org.apache.spark.sql.catalyst.scalareflection$.org$apache$spark$sql$catalyst$scalareflection$$serializer用于(scalareflection)。scala:583)位于org.apache.spark.sql.catalyst.scalareflection$.serializer(scalareflection.com)。scala:425)在org.apache.spark.sql.catalyst.encoders.expressionencoder$.apply(expressionencoder。scala:61)在org.apache.spark.sql.encoders$.product(编码器。scala:274)位于org.apache.spark.sql.sqlimplicits.newproductencoder(sqlimplicits。scala:47)在personconsumer$.main(personconsumer。scala:33)位于sun.reflect.nativemethodaccessorimpl.invoke0(本机方法)的personconsumer.main(personconsumer.scala)sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl。java:62)在sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl。java:43)在java.lang.reflect.method.invoke(方法。java:498)在com.intellij.rt.execution.application.appmain.main(appmain。java:147)
以下是我的代码。。。
object PersonConsumer {
import org.apache.spark.rdd.RDD
import com.trueaccord.scalapb.spark._
import org.apache.spark.sql.{SQLContext, SparkSession}
import com.example.protos.demo._
def main(args : Array[String]) {
def parseLine(s: String): Person =
Person.parseFrom(
org.apache.commons.codec.binary.Base64.decodeBase64(s))
val spark = SparkSession.builder.
master("local")
.appName("spark session example")
.getOrCreate()
import spark.implicits._
val ds1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load()
val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String]
val ds3 = ds2.map(str => parseLine(str)).createOrReplaceTempView("persons")
val ds4 = spark.sqlContext.sql("select name from persons")
val query = ds4.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
}
}
2条答案
按热度按时间nlejzf6q1#
在person类中,gender是一个枚举,这就是导致这个问题的原因。删除此字段后,它可以正常工作。下面是我从databricks的石雄(ryan)那里得到的答案。
问题是“可选性别=3;”。生成的类“gender”是一个trait,spark不知道如何创建trait,因此不支持它。您可以定义sql编码器支持的类,并将生成的类转换为中的新类
parseLine
.kmb7vmvb2#
与…的连线
val ds3
应该是:rdd在保存为临时表之前需要转换为Dataframe。