我有一个flink作业,其中一个statefull操作符需要在state中保留一个包含hashmap作为属性的类,因为这个hasmap为用户保留不同的亲和力,例如:
public class Affinity {
public String id;
public String colorTriggered;
public Map<String,Integer> affinities;
/*this object keeps the affinity for a user to a different colors for example:
affinities.put(green, 5);
affinities.put(blue, 9);
affinities.put(white, 2);
to calculate then what is the color's affinity of this user, in this case the answer will be blue
* /
}
这个hashmap用于跟踪这些相似性,并在某个时刻请求用户的颜色相似性,并获得最高相似性值的键,该值为蓝色,即值9。
因为hashmaps不是flink序列化的一部分,所以我需要包括 implement Serializable
去我的班。
这是一个坏主意还是有更好的方法来做到这一点,并保持在美国的对象?
在一个完整的示例中,我或多或少需要做些什么,但不确定将hashmap用于flink操作符和状态是否是个好主意:
public class AffinityFlatMapFunction extends RichFlatMapFunction<Event, Affinity> implements MapOperations {
@Override
public void flatMap(Event event, Collector<Affinity> collector) throws Exception {
Affinity previous = state.value();
if(previous.hashMap.contains(event.color)){
previous.hashMap.replace(event.color, value + 1);
}else previous.hashMap.put(event.color, 1);
/*something like this*/
String match = previous.hashMap.stream.filter(x ->
x.getKey().contains(event.color)).max(Map.Entry.comparingByValue())
.map(Map.Entry::getKey).orElse("empty");
if(!match.equals(previous.colorTriggered){
previous.colorTriggered = match;
state.update(previous);
collector.collect(previous);
}
}
}
谨致问候!
1条答案
按热度按时间hrysbysz1#
根据文档,有一个状态构造称为
MapState<UK, UV>
,它具有以下功能:mapstate<uk,uv>:保存Map列表。您可以将键值对放入状态并检索所有当前存储Map的iterable。使用put(uk,uv)或putall(map<uk,uv>)添加Map。可以使用get(uk)检索与用户密钥相关联的值。可以分别使用entries()、keys()和values()检索Map、键和值的iterable视图。还可以使用isempty()检查此Map是否包含任何键值Map。
几天前我在Flink的一篇文章中读到
StateDescriptors
是优化的,几乎总是首选,而不是实现自己的机制。如果您不以不需要的方式切分流(使用
keyBy(color)
应该很好),您应该始终拥有Map的最新状态。我不知道您对rocksdb延迟的担心是否有效,因为flink状态保持在heap上,并且只检查到rocksdb,所以所有当前值都可以动态使用;但我可能误解了。回想起来,我甚至怀疑你需要一张Map,但是一个简单的ValueState
保存整数,因为Map的“键”部分由keyBy()
那样的话。