我正在开发databricks(spark2.0.1-db1(scala2.11))并尝试使用spark流函数。我正在使用这些库:
-spark-sql-streaming-mqttèu 2.11-2.1.0-snapshot.jar(参见此处:http://bahir.apache.org/docs/spark/current/spark-sql-streaming-mqtt/)
以下命令提供了一个数据集:
val lines = spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("clientId", "sparkTest")
.option("brokerUrl", "tcp://xxx.xxx.xxx.xxx:xxx")
.option("topic", "/Name/data")
.option("localStorage", "dbfs:/models/mqttPersist")
.option("cleanSession", "true")
.load().as[(String, Timestamp)]
使用此printschema:
root
|-- value : string (nullable : true)
|-- timestamp : timestamp (nullable : true)
我想在我的数据集的“value”列上应用一个模式。您可以看到我的json模式如下。
root
|-- id : string (nullable = true)
|-- DateTime : timestamp (nullable = true)
|-- label : double (nullable = true)
是否可以在流中直接解析我的json以获得如下结果:
root
|-- value : struct (nullable : true)
|-- id : string (nullable = true)
|-- DateTime : timestamp (nullable = true)
|-- label : double (nullable = true)
|-- timestamp : timestamp (nullable : true)
目前,我还没有看到任何从mqtt解析json的方法,任何帮助都是非常好的。
提前谢谢。
1条答案
按热度按时间ssm49v7z1#
我今天也有同样的问题!我使用json4s和jackson来解析json。
如何获取流式数据集(与您的数据集大致相同):
我使用case类定义了模式:
使用org.json4s.jackson.jsonmethods.parse解析json列:
输出结果:
结果是:
我有点失望,我不能想出一个解决方案,使用原生json解析Spark。相反,我们必须依靠Jackson。如果将文件作为流读取,则可以使用spark原生json解析。因此:
但对于mqtt,我们不能这样做。