flink tableapi:未能获取

neskvpey  于 2021-07-15  发布在  Flink
关注(0)|答案(0)|浏览(1063)

我现在正在玩flink tableapi。我创建了一个表,想看看表中有什么。我想如果我使用select语句和.collect(),我可以得到一个迭代器,我可以用它来迭代并查看表中的内容。但它给了我错误。
代码:

  1. public static void main(final String[] args) throws Exception {
  2. // initialize table environment
  3. StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
  4. EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
  5. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
  6. // A table representing some machines
  7. tableEnv.executeSql("CREATE TEMPORARY TABLE machines(id INT) WITH ( 'connector' = 'kafka', " +
  8. "'topic' = 'quickstart-events', 'properties.bootstrap.servers' = 'localhost:9092', 'value.format' = 'json')");
  9. // Add four machines
  10. tableEnv.executeSql("INSERT INTO machines values(1)");
  11. tableEnv.executeSql("INSERT INTO machines values(3)");
  12. tableEnv.executeSql("INSERT INTO machines values(5)");
  13. tableEnv.executeSql("INSERT INTO machines values(8)");
  14. final CloseableIterator<Row> fetch = tableEnv.executeSql("SELECT id FROM machines").collect();
  15. while (fetch.hasNext()) {
  16. System.out.println(fetch.next());
  17. }
  18. }

我遇到的错误:

  1. Exception in thread "main" java.lang.RuntimeException: Failed to fetch next result
  2. at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
  3. at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:77)
  4. at org.apache.flink.table.planner.sinks.SelectTableSinkBase$RowIteratorWrapper.hasNext(SelectTableSinkBase.java:115)
  5. at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:355)
  6. at com.vmware.dcm.examples.QuickStartWithTableAPI.main(QuickStartWithTableAPI.java:58)
  7. Caused by: java.io.IOException: Failed to fetch job execution result
  8. at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:175)
  9. at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:126)
  10. at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:103)
  11. Caused by: java.io.IOException: Failed to fetch job execution result
  12. ... 4 more
  13. Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
  14. at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
  15. at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2093)
  16. Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
  17. at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:172)
  18. ... 6 more
  19. Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
  20. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
  21. at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119)
  22. Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
  23. at java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:680)
  24. at java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:658)
  25. at java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2165)
  26. at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:117)
  27. ... 7 more
  28. Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
  29. at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
  30. Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
  31. at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
  32. at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
  33. at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
  34. at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
  35. at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
  36. at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
  37. at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
  38. at jdk.internal.reflect.GeneratedMethodAccessor49.invoke(Unknown Source)
  39. at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  40. at java.base/java.lang.reflect.Method.invoke(Method.java:567)
  41. at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
  42. at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
  43. at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
  44. at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
  45. at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
  46. at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
  47. at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
  48. at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
  49. at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
  50. at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
  51. at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
  52. at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
  53. at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
  54. at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
  55. at akka.actor.ActorCell.invoke(ActorCell.scala:561)
  56. at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
  57. at akka.dispatch.Mailbox.run(Mailbox.scala:225)
  58. at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
  59. at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  60. at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  61. at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  62. at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
  63. Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
  64. Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

我该怎么解决这个问题?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题