scala—如何在ApacheFlink中的flatmapfunction中派生不重要的类型?

ffscu2ro  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(443)

在scala中使用flatmapfunction类时,如何隐式指定类型?

val test = env.fromElements((0,1111),(1,2222))

test.flatMap(new FlatMapFunction[A,B]() { 
      override def flatMap(implicit  x:A,  out:Collector[B]):Unit = {
        x => x match {
          case (k,v) =>
            if (moveToP(v))
              out.write(v)
        }
      }
    })

错误:trait flatmapfunction接受类型参数。

c9qzyr3d

c9qzyr3d1#

不知道你在问什么,但无论如何都会尝试回答;)。
不能用泛型类型示例化对象。因为你流的类型是 DataStream[(Int, Int)] 你需要申请 new FlatMapFunction[(Int, Int), Int] .
在scala中,可以对flatmap使用lambda表达式:

test.flatMap((x, out: Collector[Int]) => {
  x match {
    case (k,v) =>
      if (moveToP(v))
        out.collect(v)
  }
}).print()

或者没有收集器:

test.flatMap(x => {
  if (moveToP(x._2)) List(x._2) else List.empty
})

相关问题