flink multiple keyby()

yuvru6vn  于 2021-06-24  发布在  Flink
关注(0)|答案(0)|浏览(652)

我有一个 SingleOutputStreamOperator 我在上面做了一些处理,为此我需要做多次 keyBy() .
下面是一个示例代码:

  1. public SingleOutputStreamOperator<Map<String, Object>> process(DataStreamSource<Map<String, Object>> stream) {
  2. BroadcastStream<Map<String,Object>> broadcastedStream = ...;
  3. return stream
  4. .assignTimestampsAndWatermarks(...)
  5. .keyBy(new MyKeySelector("fieldAAA"))
  6. .window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10)))
  7. .aggregate(new MyAggregationFunction1())
  8. .keyBy(new MapKeySelector("fieldAAA"))
  9. .connect(broadcastedStream)
  10. .process(new MyEvaluator()) // <-- 'fieldBBB' is built here
  11. .keyBy(new MyKeySelector("fieldBBB"))
  12. .window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10)))
  13. .aggregate(new MyAggregationFunction2());
  14. }

但我得到以下错误:

  1. 2019-10-10 10:40:07.664 INFO org.apache.flink.runtime.taskmanager.Task - Co-Process-Broadcast-Keyed (6/12) (50ed3ab36b8f4078d865b7026cab08e5) switched from RUNNING to FAILED.
  2. java.lang.RuntimeException: null
  3. at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
  4. at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
  5. at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
  6. at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
  7. at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
  8. at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
  9. at my.package.processElement(MyEvaluator.java:54)
  10. at my.package.processElement(MyEvaluator.java:22)
  11. at org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator.processElement1(CoBroadcastWithKeyedOperator.java:113)
  12. at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:238)
  13. at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:117)
  14. at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
  15. at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
  16. at java.base/java.lang.Thread.run(Thread.java:834)
  17. Caused by: java.lang.NullPointerException: null
  18. at org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignToKeyGroup(KeyGroupRangeAssignment.java:59)
  19. at org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator(KeyGroupRangeAssignment.java:48)
  20. at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:58)
  21. at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:32)
  22. at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:128)
  23. at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
  24. ... 13 common frames omitted

从stacktrace来看,错误似乎在 processElement 的功能 MyEvaluator (keyedbroadcastprocessfunction):

  1. @Override
  2. public void processElement(Map<String, Object> value, ReadOnlyContext ctx, Collector<Map<String, Object>> out) throws Exception {
  3. List<Map<String, Object>> newFieldsList = ... ; // Retrieve a list of new fields based on 'value' and elements received using the broadcastedStream
  4. for(Map<String, Object> newFields : newFieldsList){
  5. value.putAll(newFields); // Add all newFields to current value
  6. out.collect(value); // <-- NPE occur here
  7. }
  8. }

但在我的处理过程中,如果我把 keyBy("fieldBBB") 代码运行。更有趣的是,如果我把 keyBy("fieldBBB")keyBy("fieldAAA") ,代码运行。
你怎么解释这种行为?我怎么做我想做的事?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题