如何在flink中使用多个计数器

gt0wga4j  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(428)

(与如何在flink中创建动态度量有关)
我有一条小溪 events(someid:String, name:String) 出于监控的原因,我需要每个事件id有一个计数器。在所有的flink文档和示例中,我可以看到,例如,计数器在 open 一个Map函数。
但在我的情况下,我不能初始化计数器,因为我需要每个eventid一个,我不知道提前值。而且,我知道每次偶数通过时创建一个新的计数器是多么昂贵 map() Map函数的方法。最后,我不能保留计数器的“缓存”,因为它太大了。
理想情况下,我会喜欢这样的东西:

  1. class Event(id: String, name: String)
  2. class ExampleMapFunction extends RichMapFunction[Event, Event] {
  3. @transient private var counter: Counter = _
  4. override def open(parameters: Configuration): Unit = {
  5. counter = new Counter()
  6. }
  7. override def map(event: Event): Event = {
  8. counter.inc(event.id)
  9. event
  10. }
  11. }

或者基本上我可以实现我自己的计数器,让我通过一个维度?如果是,怎么做?
对于这种用例有什么建议或最佳实践吗?

ioekq8ef

ioekq8ef1#

如果保留计数器的缓存太大,那么我认为使用度量不会以满足您需求的方式进行扩展。
一些替代方案:
使用边输出在一些外部的、可查询的/可视化的数据存储中收集有意义的事件——例如,influxdb。
将信息保持在键控状态,并根据需要使用广播消息触发其相关部分的输出(再次使用侧输出)。
将信息保持在键控状态,并定期获取保存点,然后使用状态处理器api通过查询进行分析。

相关问题