我想在apache flink(flink 1.3)中使用deltatrigger,但我在这段代码中遇到了一些问题:
.trigger(DeltaTrigger.of(100, new DeltaFunction[uniqStruct] {
override def getDelta(oldFp: uniqStruct, newFp: uniqStruct): Double = newFp.time - oldFp.time
}, TypeInformation[uniqStruct]))
我有个错误:
error: object org.apache.flink.api.common.typeinfo.TypeInformation is not a value [ERROR] }, TypeInformation[uniqStruct]))
我不明白deltatrigger为什么需要 TypeSerializer[T]
我不知道该怎么做才能消除这个错误。
谢谢大家。
2条答案
按热度按时间ep6jt1vc1#
DeltaTrigger
需要一个TypeSerializer
因为它使用flink的托管状态机制来存储每个元素,以便以后与下一个元素进行比较(它只保留一个元素,即最后一个元素,随着新元素的到来而更新)。您将在这里找到一个示例(java)。
但是如果你需要的只是一个每100毫秒触发一次的窗口,那么只使用
TimeWindow
,例如更新时间:
要使每100毫秒触发一小时的窗口,可以使用滑动窗口。但是,您将有106060个窗口,并且每个事件都将放置在这36000个窗口中的每个窗口中。所以这不是个好主意。
如果你使用
GlobalWindow
用一个DeltaTrigger
,则只有当事件间隔超过100毫秒时才会触发窗口,而这不是您所说的您想要的。我建议你看看
ProcessFunction
. 这样得到你想要的东西应该很简单。e0bqpujr2#
我会读一读这个https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/types_serialization.html 听起来你可以用
typeInfo.createSerializer(config)
你的类型信息。请注意,当前传入的是类型本身,而不是类型信息,这就是为什么会出现错误的原因。你需要做一些更像
要引用上面有关配置参数的页面,您需要传递以创建序列化程序
config参数的类型为executionconfig,它保存有关程序已注册的自定义序列化程序的信息。在可能的情况下,尝试将正确的executionconfig传递给程序。通常可以通过调用getexecutionconfig()从datastream或dataset获取。在函数(如mapfunction)内部,可以通过将函数设置为富函数并调用getruntimecontext().getexecutionconfig()来获得它。