在flink中保持hashmap进入状态的最佳方法是什么

iecba09b  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(581)

我有一个flink作业,其中一个statefull操作符需要在state中保留一个包含hashmap作为属性的类,因为这个hasmap为用户保留不同的亲和力,例如:

public class Affinity {
public String id;
public String colorTriggered;
public Map<String,Integer> affinities;
/*this object keeps the affinity for a user to a different colors for example: 
affinities.put(green, 5);
affinities.put(blue, 9);
affinities.put(white, 2);

to calculate then what is the color's affinity of this user, in this case the answer will be blue

* /

}

这个hashmap用于跟踪这些相似性,并在某个时刻请求用户的颜色相似性,并获得最高相似性值的键,该值为蓝色,即值9。
因为hashmaps不是flink序列化的一部分,所以我需要包括 implement Serializable 去我的班。
这是一个坏主意还是有更好的方法来做到这一点,并保持在美国的对象?
在一个完整的示例中,我或多或少需要做些什么,但不确定将hashmap用于flink操作符和状态是否是个好主意:

public class AffinityFlatMapFunction extends RichFlatMapFunction<Event, Affinity> implements MapOperations {

  @Override
  public void flatMap(Event event, Collector<Affinity> collector) throws Exception {
   Affinity previous = state.value();
    if(previous.hashMap.contains(event.color)){
        previous.hashMap.replace(event.color, value + 1);
    }else previous.hashMap.put(event.color, 1);
   /*something like this*/
  String match = previous.hashMap.stream.filter(x -> 
              x.getKey().contains(event.color)).max(Map.Entry.comparingByValue())
                .map(Map.Entry::getKey).orElse("empty");
   if(!match.equals(previous.colorTriggered){
       previous.colorTriggered = match;
       state.update(previous);
       collector.collect(previous);
   }
 }
}

谨致问候!

hrysbysz

hrysbysz1#

根据文档,有一个状态构造称为 MapState<UK, UV> ,它具有以下功能:
mapstate<uk,uv>:保存Map列表。您可以将键值对放入状态并检索所有当前存储Map的iterable。使用put(uk,uv)或putall(map<uk,uv>)添加Map。可以使用get(uk)检索与用户密钥相关联的值。可以分别使用entries()、keys()和values()检索Map、键和值的iterable视图。还可以使用isempty()检查此Map是否包含任何键值Map。
几天前我在Flink的一篇文章中读到 StateDescriptors 是优化的,几乎总是首选,而不是实现自己的机制。
如果您不以不需要的方式切分流(使用 keyBy(color) 应该很好),您应该始终拥有Map的最新状态。我不知道您对rocksdb延迟的担心是否有效,因为flink状态保持在heap上,并且只检查到rocksdb,所以所有当前值都可以动态使用;但我可能误解了。回想起来,我甚至怀疑你需要一张Map,但是一个简单的 ValueState 保存整数,因为Map的“键”部分由 keyBy() 那样的话。

相关问题