过滤apache flink中的独特事件

jyztefdp  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(359)

我在一个java类中定义某些变量,并用另一个类访问它,以便过滤流中的唯一元素。请参考代码以更好地理解问题。
我面临的问题是这个过滤函数不能很好地工作,不能过滤唯一的事件。我怀疑这个变量在不同的线程之间是共享的,这就是原因!?如果这不是正确的方法,请建议另一种方法。提前谢谢。


**ClassWithVariables.java**

public static HashMap<String, ArrayList<String>> uniqueMap = new HashMap<>();

**FilterClass.java**

public boolean filter(String val) throws Exception {

       if(ClassWithVariables.uniqueMap.containsKey(key)) {

                Arraylist<String> al = uniqueMap.get(key);

                if(al.contains(val) {
                    return false;
                } else {
                    //Update the hashmap list(uniqueMap)                    
                    return true;    
                }

       } else {

               //Add to hashmap list(uniqueMap)
               return true;
       }

}
wi3ka0sx

wi3ka0sx1#

消除流重复的正确方法包括按密钥对流进行分区,以便包含相同密钥的所有元素将由同一个worker处理,并使用flink的托管键控状态机制,以便状态具有容错性和可重新扩展性。下面是一个示例实现:

public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

  env.addSource(new EventSource())
    .keyBy(e -> e.key)
    .flatMap(new Deduplicate())
    .print();

  env.execute();
}

public static class Deduplicate extends RichFlatMapFunction<Event, Event> {
  ValueState<Boolean> seen;

  @Override
  public void open(Configuration conf) {
    ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("seen", Types.BOOLEAN);
    seen = getRuntimeContext().getState(desc);
  }

  @Override
  public void flatMap(Event event, Collector<Event> out) throws Exception {
    if (seen.value() == null) {
      out.collect(event);
      seen.update(true);
    }
  }
}

顺便说一句,这也可以实现为richfilterfunction。但是请注意,如果您有一个无界的密钥空间,所使用的状态将无限增长,直到堆或磁盘上的空间用完为止,这取决于您选择的flink的状态后端。如果这是一个问题,您可能需要通过状态生存时间设置状态保留策略。
还要注意,在flink管道的不同部分之间共享状态是不可能的。与正常情况相比,您需要将事情从内到外进行处理,并使事件流进入状态,而不是获取它。

相关问题