我正在尝试spark网站上给出的spark结构化流媒体示例,但它抛出了错误
1找不到数据集中存储的类型的编码器。通过导入spark.implicits支持基元类型(int、string等)和产品类型(case类)。在将来的版本中将添加对序列化其他类型的支持。
2方法as的参数不足:(隐式证据$2:org.apache.spark.sql.encoder[data])org.apache.spark.sql.dataset[data]。未指定值参数证据$2。val ds:数据集[数据]=df.as[数据]
这是我的密码
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._
import org.apache.spark.sql.Encoders
object final_stream {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("kafka-consumer")
.master("local[*]")
.getOrCreate()
import spark.implicits._
spark.sparkContext.setLogLevel("WARN")
case class data(name: String, id: String)
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "172.21.0.187:9093")
.option("subscribe", "test")
.load()
println(df.isStreaming)
val ds: Dataset[data] = df.as[data]
val value = ds.select("name").where("id > 10")
value.writeStream
.outputMode("append")
.format("console")
.start()
.awaitTermination()
}
}
有什么能帮到你的吗。?我想要这样的最终输出我想要这样的输出
+-----+--------+
| name| id
+-----+--------+
|Jacek| 1
+-----+--------+
2条答案
按热度按时间thtygnil1#
此错误是由于dataframe中的列数与case类不匹配造成的。
你有
[topic, timestamp, value, key, offset, timestampType, partition]
Dataframe中的列而您的case类只有两列
您可以将dataframe的内容显示为
睡几秒钟然后
也可以使用
option("startingOffsets", "earliest")
如前所述然后根据您的数据创建一个case类。
希望这有帮助!
nuypyhwy2#
错误的原因是你正在处理
Array[Byte]
因为来自Kafka,没有领域可以匹配data
案例类。换条线
df.as[data]
以下内容:我强烈建议使用
select
以及functions
对象来处理传入的数据。