apache flink python表api udf依赖问题

dwthyt8l  于 2021-06-26  发布在  Flink
关注(0)|答案(1)|浏览(509)

通过将包含用户定义函数(udf)的python表api作业提交到本地集群来启动该作业后,该作业会因以下原因导致py4j.protocol.py4jjavaerror崩溃:
java.util.serviceconfigurationerror:org.apache.beam.sdk.options.pipelineoptionsregistrar:org.apache.beam.sdk.options.defaultpipelineoptionsregistrar不是子类型。
我知道这是一个关于lib路径/类加载依赖关系的bug。我已尝试按照以下链接中的所有说明进行操作:https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/debugging_classloading.html
我尝试了大量不同的配置 classloader.parent-first-patterns-additional 配置选项。不同的条目 org.apache.beam.sdk.[...] 导致了不同的、额外的错误消息。
以下引用apache beam的依赖项位于lib路径上:
beam-model-fn-execution-2.20.jar文件
beam-model-job-management-2.20.jar
beam-model-pipeline-2.20.jar
beam-runners-core-construction-java-2.20.jar
beam-runners-java-fn-execution-2.20.jar
beam-sdks-java-core-2.20.jar
beam-sdks-java-fn-execution-2.20.jar
beam-vendor-grpc-1_21_0-0.1.jar
beam-vendor-grpc-1_26_0.0.3.jar
beam-vendor-guava-26_0-jre-0.1.jar
beam-vendor-sdks-java-extensions-protobuf-2.20.jar
我也可以排除这是由于我的代码,因为我已经测试了项目网站的以下示例代码:https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)

add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())

t_env.register_function("add", add)

t_env.connect(FileSystem().path('/tmp/input')) \
    .with_format(OldCsv()
                 .field('a', DataTypes.BIGINT())
                 .field('b', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('a', DataTypes.BIGINT())
                 .field('b', DataTypes.BIGINT())) \
    .create_temporary_table('mySource')

t_env.connect(FileSystem().path('/tmp/output')) \
    .with_format(OldCsv()
                 .field('sum', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('sum', DataTypes.BIGINT())) \
    .create_temporary_table('mySink')

t_env.from_path('mySource')\
    .select("add(a, b)") \
    .insert_into('mySink')

t_env.execute("tutorial_job")

执行此代码时,会显示相同的错误消息。
有人描述过可以用udf运行python表api作业的flink集群的配置吗?非常感谢所有的提示提前!

7kqas0il

7kqas0il1#

ApacheFlink的新版本1.10.1解决了这个问题。现在可以通过带有命令的二进制文件来执行问题中显示的示例脚本 run -py path/to/script 没有任何问题。
至于依赖项,它们已经包含在已经交付的 flink_table_x.xx-1.10.1.jar . 因此不需要向lib路径添加更多的依赖项,这是通过调试/配置尝试完成的。

相关问题