flink sourcefunction< >?

5q4ezhmt  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(326)

我在尝试创建自定义事件源时遇到了这个问题。它包含一个允许我的其他进程向其中添加项的队列。然后期望我的cep模式在匹配时打印一些调试消息。
但无论我在队列中添加什么,都没有匹配项。然后我注意到mysource.run()中的队列总是空的。这意味着我用来创建mysource示例的队列与其中的队列不同 StreamExecutionEnvironment . 如果我将队列更改为static,强制所有示例共享同一个队列,则一切都按预期工作。
dummysource.java文件

public class DummySource implements SourceFunction<String> {

    private static final long serialVersionUID = 3978123556403297086L;
//  private static Queue<String> queue = new LinkedBlockingQueue<String>();
    private Queue<String> queue;
    private boolean cancel = false;

    public void setQueue(Queue<String> q){
        queue = q;
    }   

    @Override
    public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<String> ctx)
            throws Exception {
        System.out.println("run");
        synchronized (queue) {          
            while (!cancel) {
                if (queue.peek() != null) {
                    String e = queue.poll();
                    if (e.equals("exit")) {
                        cancel();
                    }
                    System.out.println("collect "+e);
                    ctx.collectWithTimestamp(e, System.currentTimeMillis());
                }
            }
        }
    }

    @Override
    public void cancel() {
        System.out.println("canceled");
        cancel = true;
    }
}

所以我深入研究了 StreamExecutionEnvironment . 在addsource()方法中。有一个clean()方法,它似乎将示例替换为新示例。
返回给定函数的“closure cleaned”版本。
为什么?为什么需要序列化?我还尝试使用getconfig()关闭clean闭包。结果还是一样。我的队列示例与env使用的队列示例不同。
如何解决这个问题?

pgx2nnw8

pgx2nnw81#

这个 clean() 在flink中用于函数的方法主要是确保 Function (如sourcefunction、mapfunction)可序列化。flink将序列化这些函数,并将它们分发到任务节点以执行它们。
对于flink主代码中的简单变量,比如int,可以在函数中引用它们。但对于大的或不可串行的,最好使用广播和丰富的源函数。请参阅https://cwiki.apache.org/confluence/display/flink/variables+closures+vs.+broadcast+variables

相关问题