我正在使用spark的结构化流媒体(2.2.1),使用kafka每60秒接收一次来自传感器的数据。我一直在思考如何 Package 这些Kafka数据,以便能够正确处理它。
我需要能够做一些计算,因为数据与Kafka进来。
我的问题是将来自kafka的json数据解包到我可以使用的数据集中
数据
简化的数据如下所示:
{
id: 1,
timestamp: "timestamp"
pump: {
current: 1.0,
flow: 20.0
torque: 5.0
},
reactors: [
{
id: 1,
status: 200,
},
{
id: 2,
status: 300,
}
],
settings: {
pumpTimer: 20.0,
reactorStatusTimer: 200.0
}
}
为了能够使用这个is spark,我为其中的每一个创建了一些case类结构:
// First, general package
case class RawData(id: String, timestamp: String, pump: String, reactors: Array[String], settings: String)
// Each of the objects from the data
case class Pump(current: Float, flow: Float, torque: Float)
case class Reactor(id: Int, status: Int)
case class Settings(oos: Boolean, pumpTimer: Float, reactorStatusTimer: Float)
以及使用以下方法生成模式:
val rawDataSchema = Encoders.product[RawData].schema
原始数据到spark架构
首先,我将Kafka的“值”字段放入我的通用模式中:
val rawDataSet = df.select($"value" cast "string" as "json")
.select(from_json($"json", rawDataSchema))
.select("data.*").as[RawData]
使用这个rawdataset,我可以将每个单独的对象打包成数据集。
val pump = rawDataSet.select(from_json($"pump", pumpSchema) as 'pumpData)
.select("pumpData.*").as[Pump]
val settings = rawDataSet.select(from_json($"settings", settingsSchema) as 'settingsData)
.select("settingsData.*").as[Settings]
这为每个json对象提供了漂亮干净的数据集。
使用数据
这里是我的问题,例如,如果我想比较或计算设置和泵的两个数据集之间的一些值,join不能使用结构化流。
val joinedData = pump.join(settings)
错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Inner join between two streaming DataFrames/Datasets is not supported;
我对这一切的态度都错了吗?或者有没有其他处理方法的建议?
谢谢
1条答案
按热度按时间uoifb46i1#
我会用我现在的解决方案来回答我自己的问题
不用为json中的每个对象创建case类,我可以将它们作为一个case类与嵌套对象连接在一起:
为了使它成为一个可用的数据集,我可以简单地调用
在处理完json并将其放入我的define模式之后,我可以像这样选择每个特定的传感器:
谢谢你给我指出了正确的方向