在我的flink代码中,我使用的是一种自定义输入格式,它引发了一个异常。看来我需要一个 RuntimeContext
,但是我怎么能得到一个呢?
我的格式类如下所示:
MyInputFormat extends org.apache.flink.api.common.io.DelimitedInputFormat[T]{
@transient var lineCounter: IntCounter = _
override def open(split: FileInputSplit): Unit = {
super.open(split)
lineCounter = new IntCounter()
getRuntimeContext.addAccumulator("rowsInFile", lineCounter) // this line throws IllegalStateException
我的主程序如下所示:
val env = ExecutionEnvironment.getExecutionEnvironment
val format = new MyInputFormat
env.readFile(format, inputFile.getAbsolutePath) // throws exception
引发的异常:
java.lang.IllegalStateException: The runtime context has not been initialized yet. Try accessing it in one of the other life cycle methods.
at org.apache.flink.api.common.io.RichInputFormat.getRuntimeContext(RichInputFormat.java:51)
我的班级需要一个 RuntimeContext
因为它延伸了 DelimitedInputFormat
延伸到。。。 RichInputFormat
```
public abstract class DelimitedInputFormat extends FileInputFormat
public abstract class FileInputFormat extends RichInputFormat<OT, FileInputSplit>
public abstract class RichInputFormat<OT, T extends InputSplit> implements InputFormat<OT, T>
private transient RuntimeContext runtimeContext;
public void setRuntimeContext(RuntimeContext t)
public RuntimeContext getRuntimeContext()
所以任何 `RichInputFormat` 希望我们 `setRuntimeContext(RuntimeContext t)` 在它被创建之后。
我想我应该做以下工作:
val env = ExecutionEnvironment.getExecutionEnvironment
val runtimeContext: RuntimeContext = ??? // How do I get this?
val format = new MyInputFormat
format.setRuntimeContext(runtimeContext)
env.readFile(format, inputFile.getAbsolutePath) // no longer throws exception
但是如何获取runtimecontext的示例呢?由于我的自定义输入格式没有 `RuntimeContext` . 我想定一个,但我不知道从哪儿弄到。
2条答案
按热度按时间qyswt5oh1#
我还不明白为什么,但看起来
MyInputFormat
多次示例化,包括RuntimeContext
是可用的。然而,尽管如此,这个作业仍然工作并计算它需要做什么。我已经解决了这个问题,将所有对addAccumulator(,)
在一个try
,就像这样:尽管我打电话来,我还是要这么做
addAccumulator(,)
内部open()
,这似乎是正确的生命周期方法。另外:由于并行性,几个子作业试图添加相同的累加器,这是错误的。这就是为什么我要先得到累加器。如果还没有上下文,没问题:我稍后会得到一个。如果累加器已经存在,没问题-没什么可做的。这只是一个解决办法,不是一个解决方案-但这就是我现在所拥有的。zzoitvuj2#
您应该在生命周期方法中初始化runtimecontext,如
open
```MyInputFormat extends org.apache.flink.api.common.io.DelimitedInputFormat[T] {
override def openInputFormat() = {
getRuntimeContext.addAccumulator("rowsInFile", lineCounter)
}