我使用spark 2.1。
我试图读取Kafka记录使用Spark结构化流,反序列化他们和应用聚合之后。
我有以下代码:
SparkSession spark = SparkSession
.builder()
.appName("Statistics")
.getOrCreate();
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaUri)
.option("subscribe", "Statistics")
.option("startingOffsets", "earliest")
.load();
df.selectExpr("CAST(value AS STRING)")
我想要的是反序列化 value
在我的对象中输入字段而不是作为 String
.
我有一个自定义反序列化程序。
public StatisticsRecord deserialize(String s, byte[] bytes)
如何在java中实现这一点?
我找到的唯一相关链接是https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html,但这是给scala的。
2条答案
按热度按时间bnl4lu3b1#
为json消息定义模式。
现在阅读下面的信息。messagedata是json消息的javabean。
bnl4lu3b2#
如果您有一个用于数据的java自定义反序列化程序,请在从kafka获得的字节上使用它
load
.那条线给你
Dataset<Row>
只有一列value
.我专门使用spark api for scala,因此我将在scala中执行以下操作来处理“反序列化”情况:
那应该会给你想要的…在斯卡拉。将其转换为java是您的家庭练习:)
请注意,自定义对象必须有可用的编码器,否则sparksql将拒绝将其对象放入数据集中。