java—在另一个数据流中创建新的数据流

mspsb9vt  于 2021-06-25  发布在  Flink
关注(0)|答案(2)|浏览(481)

我有两种数据类型。

type1 and type2

我有一个 type1 .

DataStream<type1> stream1 =...

内部 stream1 我想创建 type2 我想收集这两样东西 type1 以及 type2 .
一个数据流可以有一个输入类型和两个输出类型吗?或者可以创建一个新的数据流( DataStream<type2> stream2 )内部 stream1 ?
或者有没有其他方法可以从一种类型收集两种不同类型的数据?

yduiuuwa

yduiuuwa1#

import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._

case class Type1(){}
case class Type2(){}

object MultipleOutputJob {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // Stream of Type1
    val stream1 = env.addSource((sc: SourceFunction.SourceContext[Type1]) => {
      while(true){
        Thread.sleep(1000)
        sc.collect(Type1())
      }
    })

    // Mapping from Type1 to Type2
    val stream2 = stream1.map(t1 => Type2())

    // Collect both the original and the derived data
    stream1.print
    stream2.print

    env.execute()
  }
}
tsm1rwdh

tsm1rwdh2#

您需要先创建一个 Package 器类型,然后拆分并选择流。对于 Package 器,只有一个成员不为null;

class TypeWrapper {
    // keeping this short for brevity
    public TypeA firstType;
    public TypeB secondType;
}

拆分并选择:

DataStream<TypeWrapper> stream1 = ...

DataStream<TypeA> streamA = stream1.filter(new FilterFunction<TypeWrapper>() {
    @Override
    public boolean filter(TypeWrapper value) throws Exception {
        return value.firstType != null;
    }
})
.map(new MapFunction<TypeWrapper, TypeA>() {
    @Override
    public TypeA map(TypeWrapper value) throws Exception {
        return value.firstType;
    }
});

DataStream<TypeB> streamB = stream1.filter(new FilterFunction<TypeWrapper>() {
    @Override
    public boolean filter(TypeWrapper value) throws Exception {
        return value.secondType != null;
    }
})
.map(new MapFunction<TypeWrapper, TypeB>() {
    @Override
    public TypeB map(TypeWrapper value) throws Exception {
        return value.secondType;
    }
});

因为 filter() 以及 map() 将被锁在 stream1 两者都在同一个线程上执行,而且操作成本很低。

相关问题