数据流上apache-flink随机离群点选择

xtupzzrd  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(460)

我尝试使用apache flink ml包的随机选择模型。
我不知道如何使用它与Kafka作为数据源,我知道它需要一个数据集,而不是一个数据流,但我似乎不能窗口我的Kafka数据流成为一个数据集。
有没有一种方法可以把我的流当作一系列的小数据集。例如,有没有一种方法可以说流中每10个元素匹配一个模式(按元素唯一id滑动窗口),就将它们视为固定大小的数据集,并在这个固定大小的数据集中检测任何异常值?
我要创建的场景是:
数据源->Kafka主题1->flink预处理->Kafka主题2->flink groups by id->组上的异常值检测
我已经有了一个工作实现到预处理,并希望Flink将能够满足我的要求?

dbf7pr2w

dbf7pr2w1#

我想您可以创建一个基于计数的全局窗口,并使用executionenvironment获取数据集。类似以下的操作可能会起作用(getresult将返回一个数据集):

stream.
      keyBy(...).
      window(GlobalWindows.create).
      trigger(CountTrigger.of(10)).
      aggregate(new MyAggregator()).
      ...

    class MyAggregator extends AggregateFunction[..., ..., ...] {  

      var valueList: List[LabeledVector] = List[LabeledVector]()    

      override def createAccumulator(): MyAggregator = new MyAggregator()
      override def add(value: .., accumulator: MyAggregator): ... = ...
      override def merge(agg1: MyAggregator, agg2: MyAggregator): ... = ...
      override def getResult(accumulator: MyAggregator): ... = {
        ExecutionEnvironment.getExecutionEnvironment.fromCollection(valueList)
      }
    }

相关问题