实现的目标:
spark streaming从kafka收集json数据,然后使用timewindow(1m,1hour)基于这些数据进行计算
我尝试的解决方案是:
使用sqlcontext转换json数据并进行aggreagte,它将返回一个rdd作为dstream
使用reducebykeyandwindow实现按返回数据流的时间窗口计算
问题是:
我不知道如何将Dataframe作为dstream rdd返回
有没有其他方法来进行基于时间窗的累积计算
代码部分如下:
val sqlContext = SQLContext.getOrCreate(ssc.sparkContext)
import sqlContext.implicits._
val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
/* kafkaStream data is json array like below
[{"metric":"access","host":"10.1.2.3","Num1":80.0,"Num2":90.0},
{"metric":"access","host":"10.1.2.3","Num1":90.0,"Num2":200.0}]
* /
/* kafkaStream convert json data as dataframe and do some transformers */
var aggdata = kafkaStream.transform( rdd => {
if (rdd.count > 0) {
val dataframe = sqlContext.read.json(rdd)
var returndf = dataframe.groupBy($"host",$"metric").agg(sum($"Num1"),sum($"Num2"),sum($"Num1")/sum($"Num2"))
/* returndf.show()
+--------+------+---------+---------+-----------------------+
| host|metric|sum(Num1)|sum(Num2)|(sum(Num1) / sum(Num2))|
+--------+------+---------+---------+-----------------------+
|10.1.2.3|access| 170.0| 290.0| 0.5862068965517241|
+--------+------+---------+---------+-----------------------+
*/
returndf // should return an rdd as dstream (Problem 1)
} else {
rdd.map(l => ("" -> "")) //should return empty rdd as dstream (Problem 1)
}
})
/* do redcue by time window
+--------+------+---------+---------+-----------------------+
| host|metric|sum(Num1)|sum(Num2)|(sum(Num1) / sum(Num2))|
+--------+------+---------+---------+-----------------------+
time1 |10.1.2.3|access| 170.0| 290.0| 0.5862068965517241|
+--------+------+---------+---------+-----------------------+
time2 |10.1.2.3|access| 200.0| 300.0| 0.5862068965517241|
* /
window1min=aggdata.reducebykeyandwindow(xxxx)
/* window1min data shoube time1 data + time2 data +timeN data
+--------+------+---------+---------+-----------------------+
| host|metric|sum(Num1)|sum(Num2)|(sum(Num1) / sum(Num2))|
+--------+------+---------+---------+-----------------------+
|10.1.2.3|access| 370.0| 590.0| 0.5862068965517241|
+--------+------+---------+---------+-----------------------+
* /
这是更新1(我想我已经接近目标了)
在课堂上得到一个案例类
案例类状态(avg:double = 0.0, count:double = 0)
使dataframe返回正确类型的rdd。
var aggdata = kafkaStream.transform( rdd => {
var returndata: RDD[(String,Status)] =
if (!rdd.isEmpty) {
val dataframe = sqlContext.read.json.read.json(rdd).filter("tags.type in ('avgms','gt20ms')")
val report = dataframe.groupBy($"host",$"metric").agg(sum($"Num1"),sum($"Num2"))
report.map(r=> (r(0).toString -> Status(r(1).toString.toDouble),Status(r(2).toString.toDouble))).rdd
} else {
rdd.map(l => ("" -> Status()))
}
returndata
}
aggdata.print()
ps:对不起,我的英语很差,如果对这里有任何误解,请随意询问,欢迎使用中文.
暂无答案!
目前还没有任何答案,快来回答吧!