如何在flink中使用delta触发器?

e7arh2l6  于 2021-06-25  发布在  Flink
关注(0)|答案(2)|浏览(468)

我想在apache flink(flink 1.3)中使用deltatrigger,但我在这段代码中遇到了一些问题:

.trigger(DeltaTrigger.of(100, new DeltaFunction[uniqStruct] {
    override def getDelta(oldFp: uniqStruct, newFp: uniqStruct): Double = newFp.time - oldFp.time
  }, TypeInformation[uniqStruct]))

我有个错误:

error: object org.apache.flink.api.common.typeinfo.TypeInformation is not a value [ERROR] }, TypeInformation[uniqStruct]))

我不明白deltatrigger为什么需要 TypeSerializer[T] 我不知道该怎么做才能消除这个错误。
谢谢大家。

ep6jt1vc

ep6jt1vc1#

DeltaTrigger 需要一个 TypeSerializer 因为它使用flink的托管状态机制来存储每个元素,以便以后与下一个元素进行比较(它只保留一个元素,即最后一个元素,随着新元素的到来而更新)。
您将在这里找到一个示例(java)。
但是如果你需要的只是一个每100毫秒触发一次的窗口,那么只使用 TimeWindow ,例如

input
  .keyBy(<key selector>)
  .timeWindow(Time.milliseconds(100)))
  .apply(<window function>)

更新时间:
要使每100毫秒触发一小时的窗口,可以使用滑动窗口。但是,您将有106060个窗口,并且每个事件都将放置在这36000个窗口中的每个窗口中。所以这不是个好主意。
如果你使用 GlobalWindow 用一个 DeltaTrigger ,则只有当事件间隔超过100毫秒时才会触发窗口,而这不是您所说的您想要的。
我建议你看看 ProcessFunction . 这样得到你想要的东西应该很简单。

e0bqpujr

e0bqpujr2#

我会读一读这个https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/types_serialization.html 听起来你可以用 typeInfo.createSerializer(config) 你的类型信息。请注意,当前传入的是类型本身,而不是类型信息,这就是为什么会出现错误的原因。
你需要做一些更像

val uniqStructTypeInfo: TypeInformation[uniqStruct] = createTypeInformation[uniqStruct]
val uniqStrictTypeSerializer = typeInfo.createSerializer(config)

要引用上面有关配置参数的页面,您需要传递以创建序列化程序
config参数的类型为executionconfig,它保存有关程序已注册的自定义序列化程序的信息。在可能的情况下,尝试将正确的executionconfig传递给程序。通常可以通过调用getexecutionconfig()从datastream或dataset获取。在函数(如mapfunction)内部,可以通过将函数设置为富函数并调用getruntimecontext().getexecutionconfig()来获得它。

相关问题