找不到数据集中存储的类型的编码器在spark结构化流媒体中

zpqajqem  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(362)

我正在尝试spark网站上给出的spark结构化流媒体示例,但它抛出了错误
1找不到数据集中存储的类型的编码器。通过导入spark.implicits支持基元类型(int、string等)和产品类型(case类)。在将来的版本中将添加对序列化其他类型的支持。
2方法as的参数不足:(隐式证据$2:org.apache.spark.sql.encoder[data])org.apache.spark.sql.dataset[data]。未指定值参数证据$2。val ds:数据集[数据]=df.as[数据]
这是我的密码

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._
import org.apache.spark.sql.Encoders
object final_stream {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
          .builder()
          .appName("kafka-consumer")
          .master("local[*]")
          .getOrCreate()
        import spark.implicits._

        spark.sparkContext.setLogLevel("WARN")

    case class data(name: String, id: String)

    val df = spark
          .readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", "172.21.0.187:9093")
          .option("subscribe", "test")
          .load()
    println(df.isStreaming)

    val ds: Dataset[data] = df.as[data]
    val value = ds.select("name").where("id > 10")

    value.writeStream
          .outputMode("append")
          .format("console")
          .start()
          .awaitTermination()

  }
}

有什么能帮到你的吗。?我想要这样的最终输出我想要这样的输出

+-----+--------+
| name|    id
+-----+--------+
|Jacek|     1
+-----+--------+
thtygnil

thtygnil1#

此错误是由于dataframe中的列数与case类不匹配造成的。
你有 [topic, timestamp, value, key, offset, timestampType, partition] Dataframe中的列
而您的case类只有两列

case class data(name: String, id: String)

您可以将dataframe的内容显示为

val display = df.writeStream.format("console").start()

睡几秒钟然后

display.stop()

也可以使用 option("startingOffsets", "earliest") 如前所述
然后根据您的数据创建一个case类。
希望这有帮助!

nuypyhwy

nuypyhwy2#

错误的原因是你正在处理 Array[Byte] 因为来自Kafka,没有领域可以匹配 data 案例类。

scala> println(schema.treeString)
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

换条线 df.as[data] 以下内容:

df.
  select($"value" cast "string").
  map(value => ...parse the value to get name and id here...).
  as[data]

我强烈建议使用 select 以及 functions 对象来处理传入的数据。

相关问题