我在尝试创建自定义事件源时遇到了这个问题。它包含一个允许我的其他进程向其中添加项的队列。然后期望我的cep模式在匹配时打印一些调试消息。
但无论我在队列中添加什么,都没有匹配项。然后我注意到mysource.run()中的队列总是空的。这意味着我用来创建mysource示例的队列与其中的队列不同 StreamExecutionEnvironment
. 如果我将队列更改为static,强制所有示例共享同一个队列,则一切都按预期工作。
dummysource.java文件
public class DummySource implements SourceFunction<String> {
private static final long serialVersionUID = 3978123556403297086L;
// private static Queue<String> queue = new LinkedBlockingQueue<String>();
private Queue<String> queue;
private boolean cancel = false;
public void setQueue(Queue<String> q){
queue = q;
}
@Override
public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<String> ctx)
throws Exception {
System.out.println("run");
synchronized (queue) {
while (!cancel) {
if (queue.peek() != null) {
String e = queue.poll();
if (e.equals("exit")) {
cancel();
}
System.out.println("collect "+e);
ctx.collectWithTimestamp(e, System.currentTimeMillis());
}
}
}
}
@Override
public void cancel() {
System.out.println("canceled");
cancel = true;
}
}
所以我深入研究了 StreamExecutionEnvironment
. 在addsource()方法中。有一个clean()方法,它似乎将示例替换为新示例。
返回给定函数的“closure cleaned”版本。
为什么?为什么需要序列化?我还尝试使用getconfig()关闭clean闭包。结果还是一样。我的队列示例与env使用的队列示例不同。
如何解决这个问题?
1条答案
按热度按时间pgx2nnw81#
这个
clean()
在flink中用于函数的方法主要是确保Function
(如sourcefunction、mapfunction)可序列化。flink将序列化这些函数,并将它们分发到任务节点以执行它们。对于flink主代码中的简单变量,比如int,可以在函数中引用它们。但对于大的或不可串行的,最好使用广播和丰富的源函数。请参阅https://cwiki.apache.org/confluence/display/flink/variables+closures+vs.+broadcast+variables