keyed进程函数中的java类成员

mwyxok5s  于 2021-07-15  发布在  Flink
关注(0)|答案(1)|浏览(401)

我有下面的flink-keyedprocessfunction。我基本上是在尝试实现状态设计模式。

  1. public AlertProcessor extends KeyedProcessFunction<Tuple2<String, String>, Event1, Event2> {
  2. private transient AlertState currentState;
  3. private transient AlertState activeAlertState;
  4. private transient AlertState noActiveAlertState;
  5. private transient AlertState resolvedAlertState;
  6. @Override
  7. public void open(Configuration parameters) {
  8. activeAlertState = new ActiveAlertState();
  9. noActiveAlertState = new NoActiveAlertState();
  10. resolvedAlertState = new ResolvedAlertState();
  11. }
  12. @Override
  13. public processElement(Event1 event1, Context ctx, Collector<Event2> out) throws Exception {
  14. // Would the below if condition work for multiple keys?
  15. if (currentAlertState == null) {
  16. currentAlertState = noActiveAlertState;
  17. }
  18. currentAlertState.handle(event1, out);
  19. }
  20. private interface AlertState {
  21. void handle(Event1 event1, Collector<Event2> out);
  22. }
  23. private class ActiveAlertState implements AlertState {
  24. void handle(Event1 event1, Collector<Event2> out) {
  25. logger.debug("Moving to no alertState");
  26. // Do something and push some Event2 to out
  27. currentAlertState = resolvedActiveAlertState;
  28. }
  29. }
  30. private class NoActiveAlertState implements AlertState {
  31. void handle(Event1 event1, Collector<Event2> out) {
  32. logger.debug("Moving to no alertState");
  33. // Do something and push some Event2 to out
  34. currentAlertState = activeAlertState;
  35. }
  36. }
  37. private class ResolvedAlertState implements AlertState {
  38. void handle(Event1 event1, Collector<Event2> out) {
  39. logger.debug("Moving to no alertState");
  40. // Do something and push some Event2 to out
  41. currentAlertState = noActiveAlertState;
  42. }
  43. }
  44. }

我的问题是-
流中的每个键是否有一个alertprocessor示例(或对象)?换句话说,currentalertstate对象是否每个键都是唯一的?或者该alertprocessor操作符的每个示例将有一个currentalertstate?
如果currentalertstate是每个操作符示例的值,那么这个代码将不会真正起作用,因为currentalertstate将被不同的键覆盖。我的理解正确吗?
我可以将currentalertstate存储在keyed状态,并为每个processelement()调用初始化它。如果这样做,我就不需要在handle()实现中将currentalertstate分配或设置为下一个状态,因为currentalertstate将根据flink状态进行初始化。
有没有更好的方法在flink中实现状态设计模式并减少创建的状态对象的数量?

fdbelqdn

fdbelqdn1#

单身汉 AlertProcessor 示例将在管道的每个并行示例(每个任务槽)中创建,并将在该槽处理的所有密钥上进行多路复用。
如果currentalertstate是每个操作符示例的值,那么这个代码将不会真正起作用,因为currentalertstate将被不同的键覆盖。我的理解正确吗?
对的。您应该使用键控状态 currentAlertState ,这将在状态后端为每个不同的键生成一个条目。

相关问题