基于时间窗口的spark流式聚集数据与kafka的json数据

f45qwnt8  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(237)

实现的目标:

spark streaming从kafka收集json数据,然后使用timewindow(1m,1hour)基于这些数据进行计算

我尝试的解决方案是:

使用sqlcontext转换json数据并进行aggreagte,它将返回一个rdd作为dstream
使用reducebykeyandwindow实现按返回数据流的时间窗口计算

问题是:

我不知道如何将Dataframe作为dstream rdd返回
有没有其他方法来进行基于时间窗的累积计算
代码部分如下:

val sqlContext = SQLContext.getOrCreate(ssc.sparkContext)
 import sqlContext.implicits._

 val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
/*  kafkaStream data  is json array like below
  [{"metric":"access","host":"10.1.2.3","Num1":80.0,"Num2":90.0},
  {"metric":"access","host":"10.1.2.3","Num1":90.0,"Num2":200.0}]

* /

/* kafkaStream  convert json data as dataframe  and do some  transformers */
var aggdata = kafkaStream.transform( rdd => {
    if (rdd.count > 0) {
      val dataframe = sqlContext.read.json(rdd)
      var returndf = dataframe.groupBy($"host",$"metric").agg(sum($"Num1"),sum($"Num2"),sum($"Num1")/sum($"Num2"))
      /* returndf.show()         
        +--------+------+---------+---------+-----------------------+ 
        |    host|metric|sum(Num1)|sum(Num2)|(sum(Num1) / sum(Num2))|
        +--------+------+---------+---------+-----------------------+
        |10.1.2.3|access|    170.0|    290.0|     0.5862068965517241|
        +--------+------+---------+---------+-----------------------+
      */
      returndf  // should return an rdd as dstream   (Problem 1)
    } else {
        rdd.map(l => ("" -> ""))  //should return empty rdd as dstream (Problem 1)  
    }
}) 
/* do redcue by time window 
        +--------+------+---------+---------+-----------------------+
        |    host|metric|sum(Num1)|sum(Num2)|(sum(Num1) / sum(Num2))|
        +--------+------+---------+---------+-----------------------+
time1   |10.1.2.3|access|    170.0|    290.0|     0.5862068965517241|
        +--------+------+---------+---------+-----------------------+
time2   |10.1.2.3|access|    200.0|    300.0|     0.5862068965517241|

* /

window1min=aggdata.reducebykeyandwindow(xxxx)
/*  window1min data shoube time1 data  +  time2 data  +timeN data 
        +--------+------+---------+---------+-----------------------+
        |    host|metric|sum(Num1)|sum(Num2)|(sum(Num1) / sum(Num2))|
        +--------+------+---------+---------+-----------------------+
        |10.1.2.3|access|    370.0|    590.0|     0.5862068965517241|
        +--------+------+---------+---------+-----------------------+

* /

这是更新1(我想我已经接近目标了)

在课堂上得到一个案例类
案例类状态(avg:double = 0.0, count:double = 0)
使dataframe返回正确类型的rdd。

var aggdata = kafkaStream.transform( rdd => {
        var returndata: RDD[(String,Status)] = 
        if (!rdd.isEmpty) {
            val dataframe = sqlContext.read.json.read.json(rdd).filter("tags.type in ('avgms','gt20ms')")
            val report = dataframe.groupBy($"host",$"metric").agg(sum($"Num1"),sum($"Num2"))
            report.map(r=> (r(0).toString -> Status(r(1).toString.toDouble),Status(r(2).toString.toDouble))).rdd
        } else {
            rdd.map(l => ("" -> Status()))
        }
        returndata
    }

aggdata.print()
ps:对不起,我的英语很差,如果对这里有任何误解,请随意询问,欢迎使用中文.

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题