我在没有群集的情况下在本地运行以下代码:
val count = new AtomicInteger()
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val text: DataStream[String] = env.readTextFile("file:///flink/data2")
val mapped: DataStream[Map[String, Any]] = text.map((x: String) => Map("user" -> x.split(",")(0), "val" -> x.split(",")(1)))
val pattern: ...
CEP.pattern(mapped, pattern).select(eventMap => {
println("Found: " + (patternName, eventMap))
count.incrementAndGet()
})
env.execute()
println(count)
我的数据是以下格式的csv文件(user,val):
1,1
1,2
1,3
2,1
2,2
2,3
...
我正试图检测出 event(val=1) -> event(val=2) -> event(val=3)
. 当我在一个大的输入流上运行这个程序时,我知道流中存在一定数量的事件,我检测到的事件数不一致,几乎总是小于系统中的事件数。如果我这样做了 env.setParallelism(1)
(就像我在代码第3行中所做的那样),所有事件都被检测到。
我假设问题是当并行度大于1时,多个线程正在处理流中的事件,这意味着当一个线程 event(val=1) -> event(val=2)
, event(val=3)
可能被发送到另一个线程,而整个模式可能不会被检测到。
有什么我不知道的吗?我不能丢失流中的任何模式,但将parallelism设置为1似乎无法实现像flink这样的系统检测事件的目的。
更新:
我尝试使用以下方法设置流的键:
val mapped: KeyedStream[Map[String, Any]] = text.map(...).keyBy((m) => m.get("user"))
尽管这样可以防止不同用户的事件相互干扰:
1,1
2,2
1,3
这并不能阻止flink将事件无序地发送到节点,这意味着非确定性仍然存在。
2条答案
按热度按时间svdrlsy41#
您是否考虑过使用userid(您的第一个值)为流设置密钥?flink保证一个键的所有事件都到达同一个处理节点。当然,这只会有帮助,如果您想检测每个用户的val=1->val=2->val=3的模式。
dz6r00yl2#
最可能的问题在于在map操作符之后应用keyby操作符。
所以,不是:
应该有:
我知道这是个老问题,但也许能帮上忙。