我在一个java类中定义某些变量,并用另一个类访问它,以便过滤流中的唯一元素。请参考代码以更好地理解问题。
我面临的问题是这个过滤函数不能很好地工作,不能过滤唯一的事件。我怀疑这个变量在不同的线程之间是共享的,这就是原因!?如果这不是正确的方法,请建议另一种方法。提前谢谢。
**ClassWithVariables.java**
public static HashMap<String, ArrayList<String>> uniqueMap = new HashMap<>();
**FilterClass.java**
public boolean filter(String val) throws Exception {
if(ClassWithVariables.uniqueMap.containsKey(key)) {
Arraylist<String> al = uniqueMap.get(key);
if(al.contains(val) {
return false;
} else {
//Update the hashmap list(uniqueMap)
return true;
}
} else {
//Add to hashmap list(uniqueMap)
return true;
}
}
1条答案
按热度按时间wi3ka0sx1#
消除流重复的正确方法包括按密钥对流进行分区,以便包含相同密钥的所有元素将由同一个worker处理,并使用flink的托管键控状态机制,以便状态具有容错性和可重新扩展性。下面是一个示例实现:
顺便说一句,这也可以实现为richfilterfunction。但是请注意,如果您有一个无界的密钥空间,所使用的状态将无限增长,直到堆或磁盘上的空间用完为止,这取决于您选择的flink的状态后端。如果这是一个问题,您可能需要通过状态生存时间设置状态保留策略。
还要注意,在flink管道的不同部分之间共享状态是不可能的。与正常情况相比,您需要将事情从内到外进行处理,并使事件流进入状态,而不是获取它。