我正在尝试迁移我当前的流媒体应用程序,它基于使用RDD(从他们的文档中)到使用结构化流媒体的新数据集api,我被告知这是目前使用spark进行实时流媒体的首选方法。
目前,我有一个应用程序设置来使用1个名为“satellite”的主题,其中包含一个键时间戳的消息和一个包含 Satellite
波乔。但是我在弄清楚如何实现反序列化程序时遇到了问题。在我目前的应用程序很容易,你只要添加一行到你喜欢Kafka属性Map kafkaParams.put("value.deserializer", SatelliteMessageDeserializer.class);
我是用java来做这件事的,这是最大的挑战,因为所有的解决方案似乎都是用scala来做的,我不太了解scala,我也不容易将scala代码转换成java代码。
我遵循了这个问题中概述的一个json示例,这个示例目前很有效,但对于我需要做的事情来说似乎过于复杂。考虑到我已经有了用于此目的的自定义反序列化程序,我不明白为什么我必须首先将其转换为字符串,而只是将其转换为json,然后将其转换为所需的类类型。我也试着用我在这里找到的一些例子,但到目前为止我没有运气。
目前我的应用程序看起来是这样的(使用json方法):
import common.model.Satellite;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public class SparkStructuredStreaming implements Runnable{
private String bootstrapServers;
private SparkSession session;
public SparkStructuredStreaming(final String bootstrapServers, final SparkSession session) {
this.bootstrapServers = bootstrapServers;
this.session = session;
}
@Override
public void run() {
Dataset<Row> df = session
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", "SATELLITE")
.load();
StructType schema = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("id", DataTypes.StringType, true),
DataTypes.createStructField("gms", DataTypes.StringType, true),
DataTypes.createStructField("satelliteId", DataTypes.StringType, true),
DataTypes.createStructField("signalId", DataTypes.StringType, true),
DataTypes.createStructField("cnr", DataTypes.DoubleType, true),
DataTypes.createStructField("constellation", DataTypes.StringType, true),
DataTypes.createStructField("timestamp", DataTypes.TimestampType, true),
DataTypes.createStructField("mountPoint", DataTypes.StringType, true),
DataTypes.createStructField("pseudorange", DataTypes.DoubleType, true),
DataTypes.createStructField("epochTime", DataTypes.IntegerType, true)
});
Dataset<Satellite> df1 = df.selectExpr("CAST(value AS STRING) as message")
.select(functions.from_json(functions.col("message"),schema).as("json"))
.select("json.*")
.as(Encoders.bean(Satellite.class));
try {
df1.writeStream()
.format("console")
.option("truncate", "false")
.start()
.awaitTermination();
} catch (StreamingQueryException e) {
e.printStackTrace();
}
}
}
我有一个自定义反序列化程序
import common.model.Satellite;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class SatelliteMessageDeserializer implements Deserializer<Satellite> {
private static Logger logger = LoggerFactory.getLogger(SatelliteMessageDeserializer.class);
private ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map configs, boolean isKey) {
}
@Override
public void close() {
}
@Override
public Satellite deserialize(String topic, byte[] data) {
try {
return objectMapper.readValue(new String(data, "UTF-8"), getMessageClass());
} catch (Exception e) {
logger.error("Unable to deserialize message {}", data, e);
return null;
}
}
protected Class<Satellite> getMessageClass() {
return Satellite.class;
}
}
如何从中使用自定义反序列化程序 SparkStructuredStreaming
上课?我使用的是spark 2.4、openjdk 10和kafka 2.0
编辑:我试过创建我自己的自定义项,我认为这是应该怎么做的,但我不知道如何让它返回一个特定的类型,因为它似乎只允许我在 Datatypes
同学们!
UserDefinedFunction mode = udf(
(byte[] bytes) -> deserializer.deserialize("", bytes), DataTypes.BinaryType //Needs to be type Satellite, but only allows ones of type DataTypes
);
Dataset df1 = df.select(mode.apply(col("value")));
1条答案
按热度按时间mpbci0fu1#
from_json
只能处理字符串类型的列。结构化流总是以字节的形式使用kafka值
值总是使用bytearraydeserializer反序列化为字节数组。使用Dataframe操作显式反序列化值
因此,您至少应该首先反序列化为一个字符串,但我认为您并不需要这样做。
也许可以这样做
如果这不起作用,您可以尝试定义自己的udf/解码器,这样您就可以
SATELLITE_DECODE(value)
在斯卡拉看到这篇文章的灵感,也提到在databricks博客