(与如何在flink中创建动态度量有关)
我有一条小溪 events(someid:String, name:String)
出于监控的原因,我需要每个事件id有一个计数器。在所有的flink文档和示例中,我可以看到,例如,计数器在 open
一个Map函数。
但在我的情况下,我不能初始化计数器,因为我需要每个eventid一个,我不知道提前值。而且,我知道每次偶数通过时创建一个新的计数器是多么昂贵 map()
Map函数的方法。最后,我不能保留计数器的“缓存”,因为它太大了。
理想情况下,我会喜欢这样的东西:
class Event(id: String, name: String)
class ExampleMapFunction extends RichMapFunction[Event, Event] {
@transient private var counter: Counter = _
override def open(parameters: Configuration): Unit = {
counter = new Counter()
}
override def map(event: Event): Event = {
counter.inc(event.id)
event
}
}
或者基本上我可以实现我自己的计数器,让我通过一个维度?如果是,怎么做?
对于这种用例有什么建议或最佳实践吗?
1条答案
按热度按时间ioekq8ef1#
如果保留计数器的缓存太大,那么我认为使用度量不会以满足您需求的方式进行扩展。
一些替代方案:
使用边输出在一些外部的、可查询的/可视化的数据存储中收集有意义的事件——例如,influxdb。
将信息保持在键控状态,并根据需要使用广播消息触发其相关部分的输出(再次使用侧输出)。
将信息保持在键控状态,并定期获取保存点,然后使用状态处理器api通过查询进行分析。