如何在flink中设置RocksDBStateBendback参数?

d7v8vwbk  于 2021-06-25  发布在  Flink
关注(0)|答案(0)|浏览(267)

我使用下面的代码设置rocksdbstatebackend和it选项,它可以在本地正确运行,但不能提交到集群。

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. RocksDBStateBackend rocksDBBackEnd = new RocksDBStateBackend("file:///Users/zsh/tmp/rocksdb");
  3. rocksDBBackEnd.setOptions(new OptionsFactory() {
  4. @Override
  5. public DBOptions createDBOptions(DBOptions currentOptions) {
  6. return currentOptions;
  7. }
  8. @Override
  9. public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
  10. final long blockCacheSize = 8 * 1024 * 1024;
  11. final long blockSize = 4 * 1024;
  12. final long targetFileSize = 2 * 1024 * 1024;
  13. final long writeBufferSize = 64 * 1024 * 1024;
  14. final int writeBufferNum = 1; //default 2
  15. final int minBufferToMerge = 1; //default 2
  16. return currentOptions
  17. .setCompactionStyle(CompactionStyle.LEVEL)
  18. .setTargetFileSizeBase(targetFileSize)
  19. .setWriteBufferSize(writeBufferSize)
  20. .setMaxWriteBufferNumber(writeBufferNum)
  21. .setMinWriteBufferNumberToMerge(minBufferToMerge)
  22. .setTableFormatConfig(
  23. new BlockBasedTableConfig()
  24. .setBlockCacheSize(blockCacheSize)
  25. .setBlockSize(blockSize)
  26. );
  27. }
  28. });
  29. env.setStateBackend(rocksDBBackEnd);
  30. ....
  31. env.execute();

当我以这种方式提交工作时:

  1. flink run -d -c gerryzhou.metricTest target/gerryzhou.flink-1.0-SNAPSHOT.jar

它抛出以下异常:

  1. org.apache.flink.client.program.ProgramInvocationException: The program execution failed: JobManager did not respond within 60000 milliseconds
  2. at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:505)
  3. at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:103)
  4. at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
  5. at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:76)
  6. at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:387)
  7. at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:838)
  8. at org.apache.flink.client.CliFrontend.run(CliFrontend.java:259)
  9. at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1086)
  10. at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1133)
  11. at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1130)
  12. at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
  13. at java.security.AccessController.doPrivileged(Native Method)
  14. at javax.security.auth.Subject.doAs(Subject.java:422)
  15. at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
  16. at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
  17. at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1129)

jobmanager.log如下所示

  1. 2017-06-29 15:37:16,651 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@10.242.98.255:51891] has failed, address is now gated for [5000] ms. Reason: [gerryzhou.metricTest$1]
  2. 2017-06-29 15:37:16,651 ERROR Remoting - gerryzhou.metricTest$1
  3. java.lang.ClassNotFoundException: gerryzhou.metricTest$1
  4. at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
  5. at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  6. at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
  7. at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  8. at java.lang.Class.forName0(Native Method)
  9. at java.lang.Class.forName(Class.java:348)
  10. at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677)

我已经更改了代码并实现了 OptionsFactory 使用单个类文件 MRocksDBFactory ,像这样使用 rocksDBBackEnd.setOptions(new MRocksDBFactory()); . jobmanager.log中的错误信息如下:

  1. 2017-06-29 16:29:27,162 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@10.242.98.255:52638] has failed, address is now gated for [5000] ms. Reason: [gerryzhou.MRocksDBFactory]
  2. 2017-06-29 16:29:27,163 ERROR Remoting - gerryzhou.MRocksDBFactory
  3. java.lang.ClassNotFoundException: gerryzhou.MRocksDBFactory
  4. at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
  5. at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  6. at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
  7. at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  8. at java.lang.Class.forName0(Native Method)
  9. at java.lang.Class.forName(Class.java:348)
  10. at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677)
  11. at akka.util.ClassLoaderObjectInputStream.resolveClass(ClassLoaderObjectInputStream.scala:19)
  12. at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819)
  13. at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
  14. at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986)
  15. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  16. at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
  17. at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
  18. at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
  19. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  20. at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
  21. at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
  22. at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
  23. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  24. at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
  25. at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
  26. at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
  27. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  28. at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
  29. at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
  30. at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
  31. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  32. at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
  33. at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
  34. at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
  35. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  36. at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
  37. at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136)
  38. at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
  39. at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136)
  40. at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
  41. at scala.util.Try$.apply(Try.scala:192)
  42. at akka.serialization.Serialization.deserialize(Serialization.scala:98)
  43. at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23)
  44. at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:58)
  45. at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:58)
  46. at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:76)
  47. at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:967)
  48. at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
  49. at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437)
  50. at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
  51. at akka.actor.ActorCell.invoke(ActorCell.scala:487)
  52. at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
  53. at akka.dispatch.Mailbox.run(Mailbox.scala:220)
  54. at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
  55. at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  56. at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  57. at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  58. at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

有人能帮我吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题