CouchBasePojoseOperator在运行ApacheApex应用程序时导致启动问题

5t7ly7z5  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(288)

我有一个基本的顶点流,Kafka输入操作符输入couchbase集操作符。Kafka方面的工作很好,并证明了它删除沙发经营者。我试过切换malhar库的版本,看看它是否在最新版本中被破坏。
我还使用了apex的datatorrent风格。
当我添加couch操作符时,我得到以下异常

  1. java.lang.RuntimeException: Error creating local cluster
  2. at org.apache.apex.engine.EmbeddedAppLauncherImpl.getController(EmbeddedAppLauncherImpl.java:124)
  3. at org.apache.apex.engine.EmbeddedAppLauncherImpl.getController(EmbeddedAppLauncherImpl.java:46)
  4. at com.usaa.data.streams.apex.ApplicationTest.testApplication(ApplicationTest.java:30)
  5. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  6. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  7. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  8. at java.lang.reflect.Method.invoke(Method.java:498)
  9. at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45)
  10. at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
  11. at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42)
  12. at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
  13. at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
  14. at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:68)
  15. at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:47)
  16. at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
  17. at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
  18. at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
  19. at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
  20. at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
  21. at org.junit.runners.ParentRunner.run(ParentRunner.java:300)
  22. at org.junit.runner.JUnitCore.run(JUnitCore.java:157)
  23. at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
  24. at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
  25. at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
  26. at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
  27. Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): org.codehaus.jackson.map.DeserializationConfig
  28. Serialization trace:
  29. _deserializationConfig (org.codehaus.jackson.map.ObjectMapper)
  30. mapper (com.datatorrent.contrib.couchbase.CouchBaseJSONSerializer)
  31. serializer (com.datatorrent.contrib.couchbase.CouchbasePOJOSetOperator)
  32. at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
  33. at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
  34. at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
  35. at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
  36. at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
  37. at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
  38. at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
  39. at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
  40. at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
  41. at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
  42. at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
  43. at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
  44. at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
  45. at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
  46. at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
  47. at com.datatorrent.common.util.FSStorageAgent.retrieve(FSStorageAgent.java:192)
  48. at com.datatorrent.stram.plan.logical.LogicalPlan$OperatorMeta.readObject(LogicalPlan.java:898)
  49. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  50. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  51. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  52. at java.lang.reflect.Method.invoke(Method.java:498)
  53. at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
  54. at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
  55. at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
  56. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  57. at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
  58. at java.util.ArrayList.readObject(ArrayList.java:791)
  59. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  60. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  61. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  62. at java.lang.reflect.Method.invoke(Method.java:498)
  63. at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
  64. at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
  65. at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
  66. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  67. at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
  68. at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
  69. at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
  70. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  71. at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
  72. at com.datatorrent.stram.plan.logical.LogicalPlan.read(LogicalPlan.java:2326)
  73. at com.datatorrent.stram.StramLocalCluster.cloneLogicalPlan(StramLocalCluster.java:323)
  74. at com.datatorrent.stram.StramLocalCluster.<init>(StramLocalCluster.java:285)
  75. at org.apache.apex.engine.EmbeddedAppLauncherImpl.getController(EmbeddedAppLauncherImpl.java:122)
  76. ... 24 more

以下是相关的应用程序代码:

  1. KafkaSinglePortInputOperator kafkaInput = dag.addOperator("kafkaInput", KafkaSinglePortInputOperator.class);
  2. kafkaInput.setTopics("testing2");
  3. Properties props = new Properties();
  4. props.put("bootstrap.servers", "localhost:9092");
  5. props.put("group.id", "test");
  6. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  7. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  8. props.put("enable.auto.commit", "true");
  9. kafkaInput.setConsumerProps(props);
  10. kafkaInput.setClusters("localhost:9092");
  11. CouchbasePOJOSetOperator couchOutput = dag.addOperator("couchOutput", CouchbasePOJOSetOperator.class);
  12. CouchBaseWindowStore store = new CouchBaseWindowStore();
  13. store.setBucket("default");
  14. store.setUriString("localhost:8091,localhost:8091");
  15. store.setUserConfig("");
  16. store.setPasswordConfig("");
  17. store.setPassword("");
  18. couchOutput.setStore(store);
  19. try{
  20. store.connect();
  21. }catch (Exception e){
  22. e.printStackTrace();
  23. }
  24. CouchBaseJSONSerializer serializer = new CouchBaseJSONSerializer();
  25. couchOutput.setSerializer(serializer);
  26. ArrayList<String> expressions = new ArrayList<String>();
  27. expressions.add("getValue()");
  28. couchOutput.setExpressions(expressions);
  29. dag.addStream("kafkaInput", kafkaInput.outputPort, couchOutput.input).setLocality(Locality.CONTAINER_LOCAL);

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题