【Flink】Flink实验特性--reinterpretAsKeyedStream 将DataStream重新解释为KeyedStream

x33g5p2x  于2022-03-28 转载在 Flink  
字(8.2k)|赞(0)|评价(0)|浏览(887)

1.概述

1.1背景

这个实验特性应该是在Flink 1.5版本已经引进,但是直到现在(1.11)仍然是实验特性。官网对于它的描述 :这个特性仍然在不断的优化,目前是可能是不稳定、不兼容的,并且在以后的版本甚至发生大的改变。

1.2 作用

将DataStream重新解释为KeyedStream,这种方式可以避免shuffle

那么自然它的使用也会受到相应的约束,这个只能去重新解释那些已经预分区的DataStream。

1.3 官网例子

在源码中找到了这样一个测试代码,结果是:Tests passed

  1. public class ReinterpretAsKeyedStreamDemo {
  2. public void reinterpretAsKeyedStream() throws Exception {
  3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4. env.setParallelism(1);
  5. DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 1, 2, 3, 1, 2, 3);
  6. KeyedStream<Integer, Integer> reinterpret = DataStreamUtils.reinterpretAsKeyedStream(source, new KeySelector<Integer, Integer>() {
  7. @Override
  8. public Integer getKey(Integer value) throws Exception {
  9. return value;
  10. }
  11. });
  12. SingleOutputStreamOperator<Integer> reducer = reinterpret.countWindow(2)
  13. .reduce(new ReduceFunction<Integer>() {
  14. @Override
  15. public Integer reduce(Integer value1, Integer value2) throws Exception {
  16. return value1 + value2;
  17. }
  18. });
  19. reducer.addSink(new PrintSinkFunction<>());
  20. env.execute("xx");
  21. }
  22. }

上面结果我们可以看到 输出了2 4 6 其实就是

但是我们其实有9条数据,1,2,3分别是3组数据,为什么少输出呢?

因为前面两组1,2,3已经结束了一个窗口,满足同一个key下有两个数据,然后最后一组的1,2,3,并不满足有两个数据,无法触发窗口。

为了方便理解我们再次修改如下代码:

  1. public void reinterpretAsKeyedStream() throws Exception {
  2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. env.setParallelism(1);
  4. DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 1, 2, 3, 1, 2, 3,2);
  5. KeyedStream<Integer, Integer> reinterpret = DataStreamUtils.reinterpretAsKeyedStream(source, new KeySelector<Integer, Integer>() {
  6. @Override
  7. public Integer getKey(Integer value) throws Exception {
  8. return value;
  9. }
  10. });
  11. SingleOutputStreamOperator<Integer> reducer = reinterpret.countWindow(2)
  12. .reduce(new ReduceFunction<Integer>() {
  13. @Override
  14. public Integer reduce(Integer value1, Integer value2) throws Exception {
  15. return value1 + value2;
  16. }
  17. });
  18. reducer.addSink(new PrintSinkFunction<>());
  19. env.execute("xx");
  20. }

运行结果如下

  1. 2
  2. 4
  3. 6
  4. 4

我们数据源数据里面最后加入了一个2,然后最后输出多了一个4。当然这个是countwindow的使用,因为官网例子给的不明确,这里只是简单给大家补充一下,便于理解,避免初次使用产生太多疑问。

1.3.1 实战Demo分析

代码功能:

  1. 从文件中读取数据然后构建ds1:DataStream[Event]流,然后输出文件数据;
  2. 接着ds1流会根据Event的字段 key 进行keyby操作,使用一个窗口大小为2的CountWindow,然后保留这两条数据中 partition 字段值最大的一条数据,构建数据流ds2:DataStreamp[Event]
  • 最后我们继续使用一个窗口大小为2的CountWindow,然后对窗口内两条数据处理:

  • 如果两条数据的event_type字段值不等,那么我们使用第一条数据的值去创建一个Event对象,然后新数据Event对象的event_type字段设置为3,并且把字段 v 设置为两个数据的字段 v 的字符串拼接;

  • 如果event_type字段值相等,那么我们保留time_字段值大的一条数据。

第三步中,正常情况我们会对ds2进行keyby然后继续按照key 字段值hash,这样会产生相应的Shuffle,但是通过使用本文的实验特性reinterpretAsKeyedStream,可以避免Shuffle。

  1. // scala
  2. object SessionwindowingOriginal {
  3. // 主函数
  4. def main(args: Array[String]): Unit = {
  5. Logger.getRootLogger.setLevel(Level.WARN)
  6. val params = ParameterTool.fromArgs(args)
  7. val env = StreamExecutionEnvironment.createLocalEnvironment(2)
  8. env.getConfig.setGlobalJobParameters(params)
  9. env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
  10. env.setParallelism(2)
  11. env.setMaxParallelism(2)
  12. // 从文件读取数据 ds1:DataStream[Event]类型
  13. val ds1 = env.readTextFile("/Users/hehuiyuan/gitwarehouse/flinksql/src/main/resources/f1").map(e => {
  14. val l = e.split(",")
  15. val (key, time_, event_type, v, partition) = (l(0).trim, l(1).trim.toLong, l(2).trim.toInt, l(3).trim, l(4).trim)
  16. Event(key, time_, event_type, v, partition)
  17. }).name("f1_source")
  18. //输出原始数据
  19. ds1.addSink(new SinkFunction[Event] {
  20. override def invoke(value: Event, context: SinkFunction.Context[_]): Unit = System.out.println("原始数据:"+value.toString)
  21. }).name("origin_data_sink")
  22. //按照event对象的key字段分组
  23. // 相同key下 窗口大小数据量是2,然后取partition字段取最大的数据
  24. val ds2 = ds1.keyBy(_.key).countWindow(2).max("partition")
  25. // 输出ds2:DataStream[Event]
  26. ds2.addSink(new SinkFunction[Event] {
  27. override def invoke(value: Event, context: SinkFunction.Context[_]): Unit = System.out.println("ds2:"+value.toString)
  28. }).name("ds2")
  29. //ds2是DataSteam,ds1按照字段key分区处理后得到的流
  30. //此时还想继续使用KededStream的一些操作,需要把ds2进行keyby
  31. // 但是会存在shuffle,key不变情况下,我们可以直接把DataStream变为KeyedStream
  32. val aggregated = new DataStreamUtils(ds2)
  33. .reinterpretAsKeyedStream((event) => event.key)
  34. .countWindow(2)
  35. .reduce((e1, e2) =>
  36. if(e1.event_type != e2.event_type)
  37. Event(e1.key,e1.time_,3,e2.v+"_"+e1.v,e1.partition)
  38. else if(e2.time_ > e1.time_) e2
  39. else e1
  40. )
  41. .addSink(new SinkFunction[Event] {
  42. override def invoke(value: Event, context: SinkFunction.Context[_]): Unit =
  43. System.out.println(value.toString)
  44. }).name("result")
  45. env.execute()
  46. }
  47. }

我们看一下读取的文件中的数据样式:

每一行都会被封装到一个Event对象中,然后构成DataStream。

  1. //创建一个Pojo类,4个字段
  2. case class Event(
  3. key: String,
  4. time_ : Long,
  5. event_type: Int,
  6. v: String,
  7. partition: String
  8. )

Event对象有五个字段,会使用逗号分割

输出结果:

  1. 原始数据:Event(a,1,1,banana,0)
  2. 原始数据:Event(b,21,2,tomato,0)
  3. 原始数据:Event(c,12,1,apple,0)
  4. 原始数据:Event(d,10,2,orange,0)
  5. 原始数据:Event(e,101,1,watermeleon,0)
  6. 原始数据:Event(a,3,1,ba,1)
  7. 原始数据:Event(b,11,2,to,0)
  8. 原始数据:Event(c,42,1,ap,0)
  9. 原始数据:Event(d,20,2,or,0)
  10. 原始数据:Event(e,111,2,wa,0)
  11. ds2Event(a,1,1,banana,1)
  12. ds2Event(b,21,2,tomato,0)
  13. ds2Event(c,12,1,apple,0)
  14. ds2Event(d,10,2,orange,0)
  15. ds2Event(e,101,1,watermeleon,0)
  16. 原始数据:Event(a,2,1,ba,0)
  17. 原始数据:Event(b,2,2,to,0)
  18. 原始数据:Event(c,88,1,ap,0)
  19. 原始数据:Event(d,44,2,or,0)
  20. 原始数据:Event(e,11,2,wa,0)
  21. 原始数据:Event(a,33,1,banana,0)
  22. 原始数据:Event(b,21,2,tomato,1)
  23. 原始数据:Event(c,55,2,apple,0)
  24. 原始数据:Event(d,66,1,orange,0)
  25. 原始数据:Event(e,101,1,watermeleon,0)
  26. ds2Event(a,2,1,ba,0)
  27. ds2Event(b,2,2,to,1)
  28. Event(a,2,1,ba,0)
  29. Event(b,21,2,tomato,0)
  30. ds2Event(c,88,1,ap,0)
  31. Event(c,88,1,ap,0)
  32. ds2Event(d,44,2,or,0)
  33. Event(d,44,2,or,0)
  34. ds2Event(e,11,2,wa,0)
  35. Event(e,101,3,wa_watermeleon,0)

我们拿其中一个输出结果的数据简单分析一下:(上面最后一行)

  1. Event(e,101,3,wa_watermeleon,0)

那么这个数据是如何输出的呢?

我们会发现ds1经过keyby以及counwindow后的max处理以后,留下了两条数据:

  1. ds2Event(e,101,1,watermeleon,0)
  2. ds2Event(e,11,2,wa,0)

紧接着,把数据流ds2转为keyedStream,然后又做了一次CountWindow操作,窗口大小是2,具体实现的代码我们下面分析,这里先把结果分析完:

因为上面对于key = e 下,满足了两条数据,也就是满足了countwindow的触发计算,这个时候会对这两个数据处理,根据我们第三步功能描述可知处理如下:

  1. event_type = 1
  2. event_type = 2

这两条数据的该字段不等,根据(3.1)可知,会创建一个新的Event对象,该对象的 event_type = 3, v = wa_watermeleon(两条数据的该字段的字符串拼接构成)

最终得到如下结果:

  1. Event(e,101,3,wa_watermeleon,0)

最后,我们对ds2使用了本文的主要介绍的特性reinterpretAsKeyedStream进行分析,这个方法在DataStreamUtils中。

使用reinterpretAsKeyedStream的代码:

  1. val aggregated = new DataStreamUtils(ds2)
  2. .reinterpretAsKeyedStream((event) => event.key)
  3. .countWindow(2)
  4. .reduce((e1, e2) =>
  5. if(e1.event_type != e2.event_type)
  6. Event(e1.key,e1.time_,3,e2.v+"_"+e1.v,e1.partition)
  7. else if(e2.time_ > e1.time_) e2
  8. else e1
  9. )
  10. .addSink(new SinkFunction[Event] {
  11. override def invoke(value: Event, context: SinkFunction.Context[_]): Unit =
  12. System.out.println(value.toString)
  13. }).name("result")

不用reinterpretAsKeyedStream的代码:

  1. val aggregated = ds2
  2. .keyBy(_.key)
  3. .countWindow(2)
  4. .reduce((e1, e2) =>
  5. if(e1.event_type != e2.event_type)
  6. Event(e1.key,e1.time_,3,e2.v+"_"+e1.v,e1.partition)
  7. else if(e2.time_ > e1.time_) e2
  8. else e1
  9. )
  10. .addSink(new SinkFunction[Event] {
  11. override def invoke(value: Event, context: SinkFunction.Context[_]): Unit =
  12. System.out.println(value.toString)
  13. }).name("result")

在这里就涉及到使用reinterpretAsKeyedStream的优势了,可能代码你无法更好的体会,我们通过StreamGraph来了解这两者的区别:

图片可能有点小,我们把关键地方放大查看:

在这里我们可以发现,同样是Window Operator,但是第一个Window Operator 的数据是通过上游HASH过来的,第二个是通过FORWARD方式过来

两个Operator之间的边展示的关键词,其实展示了两个算子之间数据是如何传输的,在之前的文章提到过关于partition的概念以及Flink已经提供的实现,此处阅读 。

2.重点源码分析

  1. public static <T, K> KeyedStream<T, K> reinterpretAsKeyedStream(
  2. DataStream<T> stream,
  3. KeySelector<T, K> keySelector,
  4. TypeInformation<K> typeInfo) {
  5. PartitionTransformation<T> partitionTransformation = new PartitionTransformation<>(
  6. stream.getTransformation(),
  7. new ForwardPartitioner<>());
  8. return new KeyedStream<>(
  9. stream,
  10. partitionTransformation,
  11. keySelector,
  12. typeInfo);
  13. }

上面代码是 方法reinterpretAsKeyedStream的具体实现,最后我们可以看到return了一个KeyedStream流,创建这个流的时候首先创建了PartitionTransformation对象,其中使用了ForwardPartitioner分区器,那么FORWARD其实也是来源于此。

我们看一下keyby操作如何生产KeyedStream的:

  1. public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key, TypeInformation<K> keyType) {
  2. Preconditions.checkNotNull(key);
  3. Preconditions.checkNotNull(keyType);
  4. return new KeyedStream<>(this, clean(key), keyType);
  5. }
  6. public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {
  7. this(
  8. dataStream,
  9. new PartitionTransformation<>(
  10. dataStream.getTransformation(),
  11. new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)),
  12. keySelector,
  13. keyType);
  14. }

上面代码我们可以看出,keyby同样构建keyedStream流,但是使用的分区器是KeyGroupStreamPartitioner。

M.概述

转载:Flink实验特性–reinterpretAsKeyedStream

相关文章

最新文章

更多