使用python flink时,作业执行失败

amrnrhlw  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(537)

作为第一次尝试,我想从文件中读取json数据并将其传递给flink。我定义了一个源代码(逐行读取json字符串)和一个占位符过滤器。参见代码:

from org.apache.flink.streaming.api.functions.source import SourceFunction
from org.apache.flink.api.common.functions import FilterFunction
import json
import sys

class Json_reader(SourceFunction):
    def readjason(self, ctx):
        sys.stdin = open('capture.json', 'r')
        for line in sys.stdin:
            ctx.collect(json.loads(line))

class Dummy_Filter(FilterFunction):
    def filter(self, value):
        return True

# 

# The pipeline definition.

# 

def main(factory):
    env = factory.get_execution_environment()
    env.create_python_source(Json_reader()) \
        .filter(Dummy_Filter()) \
        .output()
    env.execute()

当我构建作业并将其移动到启动的flink群集时,会收到以下错误消息:
virtualbox:/media/sf\u python$./flink-1.7.2/bin/pyflink-stream.sh./json\u parser\u flink.py启动程序执行计划失败:空回溯(最近一次调用):文件“”,第1行,在文件“/tmp/flink\u streaming\u plan\u fbe13c4c-6918-46d4-a4bc-36908a2bea24/json\u parser\u flink.py”第25行,在main的org.apache.flink.client.program.rest.restclusterclient.submitjob(restclusterclient。java:268)在org.apache.flink.client.program.clusterclient.run(clusterclient。java:487)位于org.apache.flink.streaming.api.environment.streamcontextenvironment.execute(streamcontextenvironment)。java:66)在org.apache.flink.streaming.api.environment.streamexecutionenvironment.execute(streamexecutionenvironment。java:1510)位于org.apache.flink.streaming.python.api.environment.pythonstreamexecutionenvironment.execute(pythonstreamexecutionenvironment)。java:245)位于sun.reflect.nativemethodaccessorimpl.invoke0(本机方法)sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl。java:62)在sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl。java:43)在java.lang.reflect.method.invoke(方法。java:498)org.apache.flink.client.program.programinvocationexception:org.apache.flink.client.program.programinvocationexception:作业失败(工号:31615948194c951be03d46576929aa23)
程序中没有flink作业。也许您忘记在执行环境中调用execute()。
我没有忘记调用execute()。

xv8emn3q

xv8emn3q1#

我发现了问题。fast需要sourcefunction中的run()函数。

相关问题