ApacheFlink:作业引发堆栈溢出错误

3phpmpom  于 2021-06-24  发布在  Flink
关注(0)|答案(0)|浏览(355)

我正试图在apache flink中执行这个简单的任务。

public class StreamingJob {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        Properties inputProperties = new Properties();
        ObjectMapper mapper = new ObjectMapper();
        DataStream<String> eventStream = env
                .addSource(new FileSourceFunction("/path/to/file"));
        DataStream<ObjectNode> eventStreamObject = eventStream
                .map(x -> mapper.readValue(x, ObjectNode.class));
        DataStream<ObjectNode> eventStreamWithTime = eventStreamObject
                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<ObjectNode>() {
                    @Override
                    public long extractAscendingTimestamp(ObjectNode element) {
                        String data = element.get("ts").asText();
                        if(data.endsWith("Z")) {
                            data = data.substring(0, data.length() -1);
                        }
                        return LocalDateTime.parse(data).toEpochSecond(ZoneOffset.UTC);
                    }

                });
        eventStreamObject.print();
        env.execute("Local job");
    }

}
``` `FileSourceFunction` 是一种习俗 `SourceFunction` ```
public class FileSourceFunction implements SourceFunction<String> {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    private String fileName;
    private volatile boolean isRunning = true;

    public FileSourceFunction(String fileName) {
        this.fileName = fileName;
    }

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        // TODO Auto-generated method stub
        try (BufferedReader br = new BufferedReader(new FileReader(fileName))) {
        try (Stream<String> stream = br.lines()) {
            Iterator<String> it = stream.iterator();
            while (isRunning && it.hasNext()) {
                synchronized (ctx.getCheckpointLock()) {
                    ctx.collect(it.next());
                }
            }
        }
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

}

当我运行作业时,它抛出一个 StackOverFlowError . 我使用的是ApacheFlink1.8.1。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题