flink docker compose-自定义库

9q78igpj  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(522)

我正在尝试使用docker compose设置flink会话集群。我想有一个自定义库加载在flink,因为这个库包含的代码是我所有的工作使用。我通过如下方式创建自定义docker映像:

FROM flink:1.10.0
WORKDIR /opt/flink/lib

RUN mkdir /opt/flink/usrlib
RUN chown flink:flink /opt/flink/usrlib

ADD --chown=flink:flink ./myLibrary.jar /opt/flink/lib/myLibary.jar

作业/任务管理器启动成功。当我使用web ui提交作业时,我的作业运行正常,但有一个例外:
在我的库中,我有一个flinkMap操作符(称为DeserializationMapper),它使用来自kafka的json消息,并基于消息中的标记创建自定义java对象。例如,如果消息

{"objectType": "Address", "street": "Street 1"}

我的DeserializationMapper生成一个JavaPOJO,类地址的示例,其名为“street”的字段设置为“street1”。我使用java反射来实现。pojo的定制java类仅在作业本身(而不是库)中可用。当我在eclipse中执行程序时(我的自定义库作为maven依赖项提供),一切都正常。DeserializationMapper能够定位作业项目中的自定义java类。当我为作业导出一个“fat”jar时,这个jar包含了作业的所有依赖项(例如mylibrary.jar),并将其部署到flink集群中,它也可以正常工作。但是,当我尝试将我的库放入flink集群(使用上面显示的自定义图像)并将其从作业jar中排除时,我会得到一个classnotfoundexception,提示无法定位特定的类(例如address),尽管指向该类的路径看起来是正确的(例如。,org.eclipse.myjob.datatypes.address)-我确认类位于jobjar中的正确位置。注意:我的作业实际上可以访问mylibrary.jar中的方法,例如,kafka使用者是在mylibrary.jar中由我的作业调用的方法中创建的。
为什么会这样?mylibrary.jar难道不能定位jobjar中包含的类吗?我应该做任何具体的配置,否则这根本不可能?

gzjq41n4

gzjq41n41#

我自己找到了解决办法。根据这一点,会话集群中用户jar中的类是动态加载的,因此不能被flink类路径中加载的lib访问。一种解决方案是将用户jar放在lib文件夹中,这对我不起作用,因为我希望我的用户能够通过ui提交他们的作业。本小节描述了另一个对我有效的解决方案。基本上,当需要来自用户jar的类时,您的flink操作符应该使用 getRuntimeContext().getUserCodeClassLoader() . 为了做到这一点,它们应该是富函数(例如richflatmapfunction)。然后,使用这个类加载器,您可以调用 loadClass(className) 方法指向用户类所在的路径。

相关问题