java—如何将kafka的json转换为spark的机器学习算法

wb1gzix0  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(229)

我正在尝试学习Spark和Spark流使用java。开发物联网应用。我有一个kafka服务器,它接受json数据,并且我能够使用sqlcontext和foreach函数解析它。
数据格式如下:,

  1. [{"t":1481368346000,"sensors":[{"s":"s1","d":"+149.625"},{"s":"s2","d":"+23.062"},{"s":"s3","d":"+16.375"},{"s":"s4","d":"+235.937"},{"s":"s5","d":"+271.437"},{"s":"s6","d":"+265.937"},{"s":"s7","d":"+295.562"},{"s":"s8","d":"+301.687"}]}]

其中,t是每个数据流的时间戳,传感器是传感器数据的数组,s是每个传感器的名称,d包含数据。
我现在所做的是,

  1. JavaPairInputDStream<String, String> directKafkaStream =
  2. KafkaUtils.createDirectStream(ssc,
  3. String.class,
  4. String.class,
  5. StringDecoder.class,
  6. StringDecoder.class,
  7. kafkaParams,
  8. topics);
  9. SQLContext sqlContext = spark.sqlContext();
  10. StreamingLinearRegressionWithSGD model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(2));
  11. JavaDStream<String> json = directKafkaStream.map(new Function<Tuple2<String,String>, String>() {
  12. public String call(Tuple2<String,String> message) throws Exception {
  13. return message._2();
  14. };
  15. });
  16. json.print();
  17. json.foreachRDD(new VoidFunction<JavaRDD<String>>() {
  18. @Override
  19. public void call(JavaRDD<String> jsonRecord) throws Exception {
  20. System.out.println("JSON Record ---- "+jsonRecord);
  21. if(!jsonRecord.isEmpty()){
  22. Dataset<Row> timestamp = sqlContext.read().json(jsonRecord).select("t");
  23. timestamp.printSchema();
  24. timestamp.show(false);
  25. Dataset<Row> data = sqlContext.read().json(jsonRecord).select("sensors");
  26. data.printSchema();
  27. data.show(false);
  28. //DF in table
  29. Dataset<Row> df = data.select(org.apache.spark.sql.functions.explode(org.apache.spark.sql.functions.col("sensors")))
  30. .toDF("sensors").select("sensors.s","sensors.d").where("sensors.s = 's1'");
  31. Row firstRow = df.head();
  32. String valueOfFirstSensor = firstRow.getString(1);
  33. System.out.println("---------valueOfFirstSensor --------"+ valueOfFirstSensor);
  34. double[] values = new double[1];
  35. values[0] = firstRow.getDouble(0);
  36. new LabeledPoint(timestamp.head().getDouble(0), Vectors.dense(values));
  37. df.show(false);
  38. }
  39. }
  40. });
  41. ssc.start();
  42. ssc.awaitTermination();

我想做的是,将javadstream中的json转换为StreamingLinearGressionWithGD模型接受的数据结构。
当我尝试使用sparks的map函数将json流Map到javadstream时,如下所示,

  1. JavaDStream<LabeledPoint> forML = json.map(new Function<String, LabeledPoint>() {
  2. @Override
  3. public LabeledPoint call(String jsonRecord) throws Exception {
  4. // TODO Auto-generated method stub
  5. System.out.println("\n\n\n here is JSON in"+ jsonRecord);
  6. LabeledPoint returnObj = null;
  7. if(!jsonRecord.isEmpty()){
  8. Dataset<Row> timestamp = sqlContext.read().json(jsonRecord).select("t");
  9. timestamp.printSchema();
  10. timestamp.show(false);
  11. Dataset<Row> data = sqlContext.read().json(jsonRecord).select("sensors");
  12. data.printSchema();
  13. data.show(false);
  14. //DF in table
  15. Dataset<Row> df = data.select(org.apache.spark.sql.functions.explode(org.apache.spark.sql.functions.col("sensors")))
  16. .toDF("sensors").select("sensors.s","sensors.d").where("sensors.s = 's1'");
  17. Row firstRow = df.head();
  18. String valueOfFirstSensor = firstRow.getString(1);
  19. System.out.println("---------valueOfFirstSensor --------"+ valueOfFirstSensor);
  20. double[] values = new double[1];
  21. values[0] = firstRow.getDouble(0);
  22. returnObj = new LabeledPoint(timestamp.head().getDouble(0), Vectors.dense(values));
  23. df.show(false);
  24. }
  25. return returnObj;
  26. }
  27. }).cache();
  28. model.trainOn(forML);

并调用model.train,但失败时出现nullpointerexception

  1. Dataset<Row> timestamp = sqlContext.read().json(jsonRecord).select("t");

现在我的问题是,
我做得对吗?
我将如何预测值,为什么以及如何需要创建一个不同的流来将其传递给模型的predicton函数?
我将接收多个传感器,但每个传感器只有一个值,而且可能有成千上万个这样的流,我如何才能为这几千个传感器中的每一个创建不同的模型,并有效地预测如此大量的数据?
有没有其他好的机器学习算法或方法可以用于这种类型的传感器数据?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题