如何使用spark每5分钟分析一次pv、uv、ip

ercv8c1e  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(337)

如何每天每5分钟分析uv、pv、ip,并存储mysql。数据来自Kafka,格式如下:

Message sent: {"cookie":"a95f22eabc4fd4b580c011a3161a9d9d","ip":"125.119.144.252","event_time":"2017-08-07 10:50:16"}
Message sent: {"cookie":"6b67c8c700427dee7552f81f3228c927","ip":"202.109.201.181","event_time":"2017-08-07 10:50:26"}

就像00:00-00:05 00:05--00:10等等,我用了:

val write=new JDBCSink()
       val query=counts.writeStream.foreach(write).outputMode("complete")
          .trigger(ProcessingTime("5 minutes"))    
          .start()

但是当我在00:01提交它或者它崩溃了,我怎么能确定它不会像00:01-00:06那样分析。

bq3bfh9z

bq3bfh9z1#

使用 window 功能:

query = counts.groupBy(window('event_time', '5 second')).agg()
query.writeStream.start()

相关问题