作业简单窗口问题-java.lang.runtimeexception:段已释放-小型集群问题

ht4b089n  于 2021-06-21  发布在  Flink
关注(0)|答案(0)|浏览(297)

ApacheFlink-作业简单窗口问题-java.lang.runtimeexception:段已释放
你好,
我是一个flink新手,在我的工作中,我尝试使用窗口来简单地聚合元素以启用延迟处理:

  1. src = src.timeWindowAll(Time.milliseconds(1000)).process(new BaseDelayingProcessAllWindowFunctionImpl());

processwindow函数只收集输入元素:

  1. public class BaseDelayingProcessAllWindowFunction<IN> extends ProcessAllWindowFunction<IN, IN, TimeWindow> {
  2. private static final long serialVersionUID = 1L;
  3. protected Logger logger;
  4. public BaseDelayingProcessAllWindowFunction() {
  5. logger = LoggerFactory.getLogger(getClass());
  6. }
  7. @Override
  8. public void process(ProcessAllWindowFunction<IN, IN, TimeWindow>.Context context, Iterable<IN> elements, Collector<IN> out) throws Exception {
  9. for (IN in : elements) {
  10. out.collect(in);
  11. }
  12. }
  13. }

问题是,我在本地调试过程(从eclipse启动作业)中出现以下错误:

  1. [2019-01-18 14:38:18,753] INFO Running job on local embedded Flink mini cluster (org.apache.flink.streaming.api.environment.LocalStreamEnvironment:114)
  2. [2019-01-18 14:38:30,825] INFO Source: dataSource -> Flat Map (1/1) (3677b50300c3c432e862af413796ee5d) switched from RUNNING to FAILED. (org.apache.flink.runtime.taskmanager.Task:940)
  3. TimerException{org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator}
  4. at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288)
  5. at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  6. at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  7. at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
  8. at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
  9. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  10. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  11. at java.lang.Thread.run(Thread.java:745)
  12. Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
  13. at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:483)
  14. at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:691)
  15. at org.apache.flink.streaming.api.operators.StreamSourceContexts$AutomaticWatermarkContext$WatermarkEmittingTask.onProcessingTime(StreamSourceContexts.java:264)
  16. at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
  17. ... 7 more
  18. Caused by: java.lang.RuntimeException: segment has been freed
  19. at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:123)
  20. at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:691)
  21. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:759)
  22. at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:479)
  23. ... 10 more
  24. Caused by: java.lang.IllegalStateException: segment has been freed
  25. at org.apache.flink.core.memory.HybridMemorySegment.put(HybridMemorySegment.java:228)
  26. at org.apache.flink.core.memory.HybridMemorySegment.put(HybridMemorySegment.java:381)
  27. at org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:85)
  28. at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:97)
  29. at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
  30. at org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:117)
  31. at org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:87)
  32. at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121)
  33. ... 13 more

google搜索让我认为这个错误与oom erros有关,所以我尝试了以下方法(全部失败):
我曾尝试通过hack将defaultlocalparallelism从8更改为1:

  1. private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
  2. public static LocalStreamEnvironment createLocalEnvironment() {
  3. return createLocalEnvironment(defaultLocalParallelism);
  4. }

还尝试增加内存(-xms4096m-xmx4096m-xmn512m),并尝试将窗口大小减少到10ms,但以上都没有帮助。。
请告知
更新
在评论之后,为了缩小问题的范围,我将复杂的作业简化为一条打印语句,如下所示,但仍然有相同的错误:

  1. DataStream<String> dataStream = getSource(KAFKA_DATA_SOURCE_NAME).getDataStream();
  2. SingleOutputStreamOperator<String> out2 = dataStream.timeWindowAll(Time.milliseconds(10)).process(new StringDelayingProcessAllWindowFunction());
  3. out2.print();

语句,但仍有相同的错误。
在进程所有窗口函数子类中没有实现。

  1. public class StringDelayingProcessAllWindowFunction extends BaseDelayingProcessAllWindowFunction<String> {
  2. private static final long serialVersionUID = 1L;
  3. }

是否有任何特殊的设置为迷你集群或任何其他设置为窗口?
更新2
我确认,这个丑陋的问题只发生在小型集群环境中:

  1. Running job on local embedded Flink mini cluster (org.apache.flink.streaming.api.environment.LocalStreamEnvironment:114)

当我在测试集群上提交相同的作业时,这个简单的作业没有收到错误。所以问题是如何在迷你集群中运行窗口。尝试使用32位jdk也没有帮助。。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题