如何使用state实现自定义触发器的保存点?

wbgh16ku  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(415)

我已经实现了自定义 WindowAssigher :

  1. public class SessionWindowAssigner extends WindowAssigner<LogItem, SessionWindow> {
  2. @Override
  3. public Collection<SessionWindow> assignWindows(LogItem element, long timestamp) {
  4. return Collections.singletonList(new SessionWindow(element.getSessionUid()));
  5. }
  6. @Override
  7. public Trigger<LogItem, SessionWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
  8. return new SessionTrigger(60_000L);
  9. }
  10. @Override
  11. public TypeSerializer<SessionWindow> getWindowSerializer(ExecutionConfig executionConfig) {
  12. return new SessionWindow.Serializer();
  13. }
  14. }

, Window :

  1. public class SessionWindow extends Window {
  2. private final String sessionUid;
  3. public SessionWindow(String sessionUid) {
  4. this.sessionUid = sessionUid;
  5. }
  6. public String getSessionUid() {
  7. return sessionUid;
  8. }
  9. @Override
  10. public long maxTimestamp() {
  11. return Long.MAX_VALUE;
  12. }
  13. @Override
  14. public boolean equals(Object o) {
  15. if (this == o) return true;
  16. if (o == null || getClass() != o.getClass()) return false;
  17. SessionWindow that = (SessionWindow) o;
  18. return sessionUid.equals(that.sessionUid);
  19. }
  20. @Override
  21. public int hashCode() {
  22. return sessionUid.hashCode();
  23. }
  24. public static class Serializer extends TypeSerializer<SessionWindow> {
  25. private static final long serialVersionUID = 1L;
  26. @Override
  27. public boolean isImmutableType() {
  28. return true;
  29. }
  30. @Override
  31. public TypeSerializer<SessionWindow> duplicate() {
  32. return this;
  33. }
  34. @Override
  35. public SessionWindow createInstance() {
  36. return null;
  37. }
  38. @Override
  39. public SessionWindow copy(SessionWindow from) {
  40. return from;
  41. }
  42. @Override
  43. public SessionWindow copy(SessionWindow from, SessionWindow reuse) {
  44. return from;
  45. }
  46. @Override
  47. public int getLength() {
  48. return 0;
  49. }
  50. @Override
  51. public void serialize(SessionWindow record, DataOutputView target) throws IOException {
  52. target.writeUTF(record.sessionUid);
  53. }
  54. @Override
  55. public SessionWindow deserialize(DataInputView source) throws IOException {
  56. return new SessionWindow(source.readUTF());
  57. }
  58. @Override
  59. public SessionWindow deserialize(SessionWindow reuse, DataInputView source) throws IOException {
  60. return new SessionWindow(source.readUTF());
  61. }
  62. @Override
  63. public void copy(DataInputView source, DataOutputView target) throws IOException {
  64. target.writeUTF(source.readUTF());
  65. }
  66. @Override
  67. public boolean equals(Object obj) {
  68. return obj instanceof Serializer;
  69. }
  70. @Override
  71. public boolean canEqual(Object obj) {
  72. return obj instanceof Serializer;
  73. }
  74. @Override
  75. public int hashCode() {
  76. return 0;
  77. }
  78. }
  79. }

以及 Trigger :

  1. public class SessionTrigger extends Trigger<LogItem, SessionWindow> {
  2. private final long sessionTimeout;
  3. private final ValueStateDescriptor<Long> previousFinishTimestampDesc = new ValueStateDescriptor<>("SessionTrigger.timestamp", LongSerializer.INSTANCE, null);
  4. public SessionTrigger(long sessionTimeout) {
  5. this.sessionTimeout = sessionTimeout;
  6. }
  7. @Override
  8. public TriggerResult onElement(LogItem element, long timestamp, SessionWindow window, TriggerContext ctx) throws Exception {
  9. ValueState<Long> previousFinishTimestampState = ctx.getPartitionedState(previousFinishTimestampDesc);
  10. Long previousFinishTimestamp = previousFinishTimestampState.value();
  11. Long newFinisTimestamp = timestamp + sessionTimeout;
  12. if (previousFinishTimestamp != null) {
  13. ctx.deleteEventTimeTimer(previousFinishTimestamp);
  14. }
  15. ctx.registerEventTimeTimer(newFinisTimestamp);
  16. previousFinishTimestampState.update(newFinisTimestamp);
  17. return TriggerResult.CONTINUE;
  18. }
  19. @Override
  20. public TriggerResult onEventTime(long time, SessionWindow window, TriggerContext ctx) throws Exception {
  21. return TriggerResult.FIRE_AND_PURGE;
  22. }
  23. @Override
  24. public TriggerResult onProcessingTime(long time, SessionWindow window, TriggerContext ctx) throws Exception {
  25. throw new UnsupportedOperationException("This is not processing time trigger");
  26. }
  27. @Override
  28. public void clear(SessionWindow window, TriggerContext ctx) throws Exception {
  29. ValueState<Long> previousFinishTimestampState = ctx.getPartitionedState(previousFinishTimestampDesc);
  30. Long previousFinishTimestamp = previousFinishTimestampState.value();
  31. ctx.deleteEventTimeTimer(previousFinishTimestamp);
  32. previousFinishTimestampState.clear();
  33. }
  34. }

对于按超时检测会话结束,即,如果最后一个事件是n秒前,则评估窗口函数。如您所见,我将最后一个事件时间戳保存在valuestate中,因为我希望在失败后恢复它。
看来我应该实施 Checkpointed 用于在此触发器中保存/还原保存点(和检查点)快照的接口,因为我不希望在重新部署流期间释放触发器状态。
那么,有人能告诉我如何保存 SessionTrigger 在部署期间是否正确触发(可能还有相关的窗口)?
据我所知,我应该 Checkpointed 的接口 SessionTrigger 因为只有它才有状态。正确的?怎么样 SessionWindow -s和 SessionWindowAssigner ? 是在部署后自动恢复还是手动恢复?

hmtdttj4

hmtdttj41#

取自会话窗口

  1. private static class SessionTrigger extends Trigger<Tuple3<String, Long, Integer>, GlobalWindow> {
  2. private static final long serialVersionUID = 1L;
  3. private final Long sessionTimeout;
  4. private final ValueStateDescriptor<Long> stateDesc =
  5. new ValueStateDescriptor<>("last-seen", Long.class, -1L);
  6. public SessionTrigger(Long sessionTimeout) {
  7. this.sessionTimeout = sessionTimeout;
  8. }
  9. @Override
  10. public TriggerResult onElement(Tuple3<String, Long, Integer> element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
  11. ValueState<Long> lastSeenState = ctx.getPartitionedState(stateDesc);
  12. Long lastSeen = lastSeenState.value();
  13. Long timeSinceLastEvent = timestamp - lastSeen;
  14. ctx.deleteEventTimeTimer(lastSeen + sessionTimeout);
  15. // Update the last seen event time
  16. lastSeenState.update(timestamp);
  17. ctx.registerEventTimeTimer(timestamp + sessionTimeout);
  18. if (lastSeen != -1 && timeSinceLastEvent > sessionTimeout) {
  19. System.out.println("FIRING ON ELEMENT: " + element + " ts: " + timestamp + " last " + lastSeen);
  20. return TriggerResult.FIRE_AND_PURGE;
  21. } else {
  22. return TriggerResult.CONTINUE;
  23. }
  24. }
  25. @Override
  26. public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
  27. ValueState<Long> lastSeenState = ctx.getPartitionedState(stateDesc);
  28. Long lastSeen = lastSeenState.value();
  29. if (time - lastSeen >= sessionTimeout) {
  30. System.out.println("CTX: " + ctx + " Firing Time " + time + " last seen " + lastSeen);
  31. return TriggerResult.FIRE_AND_PURGE;
  32. }
  33. return TriggerResult.CONTINUE;
  34. }
  35. @Override
  36. public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
  37. return TriggerResult.CONTINUE;
  38. }
  39. @Override
  40. public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
  41. ValueState<Long> lastSeenState = ctx.getPartitionedState(stateDesc);
  42. if (lastSeenState.value() != -1) {
  43. ctx.deleteEventTimeTimer(lastSeenState.value() + sessionTimeout);
  44. }
  45. lastSeenState.clear();
  46. }
  47. }
展开查看全部

相关问题