我有一个不同密钥的消息流。对于每个键,我希望创建一个事件时间会话窗口,并仅在以下情况下对其进行处理: MIN_EVENTS
窗口中累积的事件数(基本上是键控状态)
对于每把钥匙, MIN_EVENTS
是不同的,在运行时可能会更改。我很难实现这一点。具体来说,我是这样实现这个逻辑的:
inputStream.keyBy(key).
window(EventTimeSessionWindow(INACTIVITY_PERIOD).
trigger(new MyCustomCountTrigger()).
apply(new MyProcessFn())
我正在尝试创建一个自定义 MyCustomCountTrigger()
应该能够从状态存储中读取,例如 MapState<String, Integer> stateStore
那是Map key
到它的 MIN_EVENTS
参数。我知道我可以使用 TriggerContext ctx
对所有触发器可用的对象。
如何从counttrigger()类外部初始化此状态存储?我还没有找到这样做的例子。
1条答案
按热度按时间ycggw6v21#
可以根据发送给触发器类的构造函数的参数初始化状态。但是你不能从这个类之外访问状态。
如果您需要更大的灵活性,我建议您使用进程函数而不是窗口。