apache flink+spring boot

dced5bon  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(882)

我正在测试apache flink和spring boot之间的集成,在ide上运行可以,但是当我尝试在apache flink集群上运行时,我遇到了一个与classloader相关的异常。
课程非常简单:
bootflink应用程序

  1. @SpringBootApplication
  2. @ComponentScan("com.example.demo")
  3. public class BootFlinkApplication {
  4. public static void main(String[] args) {
  5. System.out.println("some test");
  6. SpringApplication.run(BootFlinkApplication.class, args);
  7. }
  8. }

燧石试验

  1. @Service
  2. public class FlinkTest {
  3. @PostConstruct
  4. public void init() {
  5. StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
  6. see.fromElements(1, 2, 3, 4)
  7. .filter(new RemoveNumber3Filter()).print();
  8. try {
  9. see.execute();
  10. } catch (Exception e) {
  11. System.out.println("Error executing flink job: " + e.getMessage());
  12. }
  13. }
  14. }

拆下3号滤清器

  1. public class RemoveNumber3Filter implements FilterFunction<Integer> {
  2. @Override
  3. public boolean filter(Integer i) throws Exception {
  4. return i != 3;
  5. }
  6. }

例外情况:

  1. Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
  2. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
  3. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
  4. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
  5. at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
  6. at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
  7. at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
  8. at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
  9. at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  10. at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  11. at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  12. at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
  13. Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.example.demo.RemoveNumber3Filter
  14. ClassLoader info: URL ClassLoader:
  15. file: '/tmp/blobStore-850f3189-807e-4f8d-a8a6-3bd3c1bd76b4/job_eb93b239080b4d4e09f10f1e3605744d/blob_p-5fd56f3348976c0d333d680fde4a79573c21cd40-48ac0995eee11f38ce3ff4f890102af8' (valid JAR)
  16. Class not resolvable through given classloader.
  17. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:232)
  18. at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:355)
  19. at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282)
  20. at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126)
  21. at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
  22. at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
  23. at java.lang.Thread.run(Thread.java:748)
xxe27gdn

xxe27gdn1#

您可能会使用springbootmaven插件(https://docs.spring.io/spring-boot/docs/current/reference/html/build-tool-plugins-maven-plugin.html)但是,为了重新打包jar以生成可执行jar,它使用了apacheflink的内部类加载器不支持的自定义引导布局。在您尝试部署的jar文件(.jar.original)旁边应该有一个原始jar文件,您可以使用它在flink集群上进行部署。
另一种方法是,可以使用不同的方法生成包含所有依赖项的jar,例如maven shade(https://maven.apache.org/plugins/maven-shade-plugin/)

相关问题