我使用flink stream处理3g网络(gprs隧道协议)中的数据流量日志。我在用户会话的信息合成方面遇到了问题。
例如:如何Map一个会话的开始和结束。我不知道flink流媒体是否适合处理这样复杂的协议?
零件编号:
我们捕获了3g网络中sgsn和ggsn之间的数据交换(使用gtp协议和gtp-c/u消息)。当sgsn发送createreq(teid,seq,imsi,teid\u dl,teid\u data\u dl)消息和ggsn响应creatersp(teid\u dl,seq,teid\u ul,teid\u data\u ul)消息时,会话启动。会话建立后,从sgsn发送到ggsn的其他gtp-c消息(例如updatereq,deletereq)使用teid_ul,响应消息使用teid_dl,gtp-u消息使用teid_data_ul(sgsn->ggsn)和teid_data_dl(ggsn->sgsn)。gtp-u消息包含appid(facebook、twitter、web)、url等信息,。。。
最后,我要处理连续的日志数据流,并Map同一个用户(imsi)的gtp-c消息和gtp-u以生成报告。
我试过这个:
val sessions = createReqs.connect(createRsps).flatMap(new CoFlatMapFunction[CreateReq, CreateRsp, Session] {
// holds CreateReqs indexed by (tedid_dl,seq)
private val createReqs = mutable.HashMap.empty[(String, String), CreateReq]
// holds CreateRsps indexed by (tedid,seq)
private val createRsps = mutable.HashMap.empty[(String, String), CreateRsp]
override def flatMap1(req: CreateReq, out: Collector[Session]): Unit = {
val key = (req.teid_dl, req.header.seqNum)
val oRsp = createRsps.get(key)
if (!oRsp.isEmpty) {
val rsp = oRsp.get
println("OK")
out.collect(new Session(rsp.header.time, req.imsi, req.teid_dl, req.teid_ddl, rsp.teid_upl, rsp.teid_dupl, req.rat, req.apn))
createRsps.remove(key)
} else {
createReqs.put(key, req)
}
}
override def flatMap2(rsp: CreateRsp, out: Collector[Session]): Unit = {
val key = (rsp.header.teid, rsp.header.seqNum)
val oReq = createReqs.get(key)
if (!oReq.isEmpty) {
val req = oReq.get
out.collect(new Session(rsp.header.time, req.imsi, req.teid_dl, req.teid_ddl, rsp.teid_upl, rsp.teid_dupl, req.rat, req.apn))
createReqs.remove(key)
} else {
createRsps.put(key, rsp)
}
}
}).print()
此代码始终返回空结果。输入流包含同一会话的creatersp和createreq消息的事实。它们看起来非常接近(在1秒之内)。每次调试时,oreq.isempty==true。我做错了什么?
1条答案
按热度按时间hmmo2u0o1#
老实说,要看透这里的电信细节有点困难,但如果我理解正确的话,您至少有3个流,前两个是createreq和creatersp流。
为了检测会话的建立,我将使用connecteddatastream抽象在前面提到的两个流之间共享状态。查看这个示例了解用法或相关的flink文档。
这就是你想要达到的目标吗?