spark streaming mqtt-apply schema on dataset

py49o6xq  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(396)

我正在开发databricks(spark2.0.1-db1(scala2.11))并尝试使用spark流函数。我正在使用这些库:
-spark-sql-streaming-mqttèu 2.11-2.1.0-snapshot.jar(参见此处:http://bahir.apache.org/docs/spark/current/spark-sql-streaming-mqtt/)
以下命令提供了一个数据集:

  1. val lines = spark.readStream
  2. .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
  3. .option("clientId", "sparkTest")
  4. .option("brokerUrl", "tcp://xxx.xxx.xxx.xxx:xxx")
  5. .option("topic", "/Name/data")
  6. .option("localStorage", "dbfs:/models/mqttPersist")
  7. .option("cleanSession", "true")
  8. .load().as[(String, Timestamp)]

使用此printschema:

  1. root
  2. |-- value : string (nullable : true)
  3. |-- timestamp : timestamp (nullable : true)

我想在我的数据集的“value”列上应用一个模式。您可以看到我的json模式如下。

  1. root
  2. |-- id : string (nullable = true)
  3. |-- DateTime : timestamp (nullable = true)
  4. |-- label : double (nullable = true)

是否可以在流中直接解析我的json以获得如下结果:

  1. root
  2. |-- value : struct (nullable : true)
  3. |-- id : string (nullable = true)
  4. |-- DateTime : timestamp (nullable = true)
  5. |-- label : double (nullable = true)
  6. |-- timestamp : timestamp (nullable : true)

目前,我还没有看到任何从mqtt解析json的方法,任何帮助都是非常好的。
提前谢谢。

ssm49v7z

ssm49v7z1#

我今天也有同样的问题!我使用json4s和jackson来解析json。
如何获取流式数据集(与您的数据集大致相同):

  1. val lines = spark.readStream
  2. .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
  3. .option("topic", topic)
  4. .option("brokerUrl",brokerUrl)
  5. .load().as[(String,Timestamp)]

我使用case类定义了模式:

  1. case class DeviceData(devicename: String, time: Long, metric: String, value: Long, unit: String)

使用org.json4s.jackson.jsonmethods.parse解析json列:

  1. val ds = lines.map {
  2. row =>
  3. implicit val format = DefaultFormats
  4. parse(row._1).extract[DeviceData]
  5. }

输出结果:

  1. val query = ds.writeStream
  2. .format("console")
  3. .option("truncate", false)
  4. .start()

结果是:

  1. +----------+-------------+-----------+-----+----+
  2. |devicename|time |metric |value|unit|
  3. +----------+-------------+-----------+-----+----+
  4. |dht11_4 |1486656575772|temperature|9 |C |
  5. |dht11_4 |1486656575772|humidity |36 |% |
  6. +----------+-------------+-----------+-----+----+

我有点失望,我不能想出一个解决方案,使用原生json解析Spark。相反,我们必须依靠Jackson。如果将文件作为流读取,则可以使用spark原生json解析。因此:

  1. val lines = spark.readStream
  2. .....
  3. .json("./path/to/file").as[(String,Timestamp)]

但对于mqtt,我们不能这样做。

展开查看全部

相关问题