在本地集群中运行拓扑之后,我创建了一个远程storm集群(storm deploy nathan)。在创建带有“包依赖项”的可运行jar之前,我已经从eclipse的构建路径中删除了storm jar。我的拓扑使用storm-kafka-0.9.0-wip16a-scala292.jar,在创建runnable jar之前,我要么将其保留在build path中,要么将其从build path中删除(只是为了解决这个问题)。当我使用以下命令时:
./storm jar /home/ubuntu/Virtual/stormTopologia4.jar org.vicomtech.main.StormTopologia
它总是回答:
Exception in thread "main" java.lang.NoClassDefFoundError: OpaqueTridentKafkaSpout
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2451)
at java.lang.Class.getMethod0(Class.java:2694)
at java.lang.Class.getMethod(Class.java:1622)
at sun.launcher.LauncherHelper.getMainMethod(LauncherHelper.java:494)
at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:486)
Caused by: java.lang.ClassNotFoundException: OpaqueTridentKafkaSpout
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
由于这个拓扑在aws上作为一个可运行jar在单个示例上运行良好,所以我可以´我不知道我在想什么´我失踪了。。。以下是我的主方法中的代码:
Config conf = new Config();
OpaqueTridentKafkaSpout tridentSpout = crearSpout(
kafkadir, "test");
OpaqueTridentKafkaSpout logUpvSpout = crearSpout(kafkadir,
"logsUpv");
OpaqueTridentKafkaSpout logSnortSpout = crearSpout(
kafkadir, "logsSnort");
try {
StormSubmitter.submitTopology(
"hackaton",
conf,
buildTopology( tridentSpout, logUpvSpout,
logSnortSpout));
} catch (AlreadyAliveException | InvalidTopologyException e) {
e.printStackTrace();
}
} catch (IOException e) {
e.printStackTrace();
} catch (TwitterException e) {
e.printStackTrace();
}
}
private static OpaqueTridentKafkaSpout crearSpout(
String testKafkaBrokerHost, String topic) {
KafkaConfig.ZkHosts hosts = new ZkHosts(testKafkaBrokerHost, "/brokers");
TridentKafkaConfig config = new TridentKafkaConfig(hosts, topic);
config.forceStartOffsetTime(-2);
config.scheme = new SchemeAsMultiScheme(new StringScheme());
return new OpaqueTridentKafkaSpout(config);
}
public static StormTopology buildTopology(OpaqueTridentKafkaSpout tridentSpout,
OpaqueTridentKafkaSpout logUpvSpout,
OpaqueTridentKafkaSpout logSnortSpout
) throws IOException,
TwitterException {
TridentTopology topology = new TridentTopology();
topology.newStream("tweets2", tridentSpout)
.each(new Fields("str"), new OnlyEnglishSpanish())
.each(new Fields("str"), new WholeTweetToMongo())
.each(new Fields("str"), new TextLangExtracter(),
new Fields("text", "lang")).parallelismHint(6)
.project(new Fields("text", "lang"))
.partitionBy(new Fields("lang"))
.each(new Fields("text", "lang"), new Analisis(),
new Fields("result")).parallelismHint(6)
.each(new Fields("result"), new ResultToMongo());
return topology.build();
}
有没有什么办法我可以提供不透明的Kafka?先谢谢你
希望如此´这不是一个愚蠢的提示,我是这个领域的新手
1条答案
按热度按时间mgdq6dx11#
我们可以在生成带有依赖项的jar时将storm jar保留在构建路径中,我们只需告诉maven不要绑定它,就像这样(参见“provided”范围,这意味着jar是由运行时环境提供的,因此不需要绑定):
但是,kafka喷口必须包含在带有依赖项的jar中,因此其maven声明如下所示:
为了验证这些内容,您总是可以解压结果jar,并在部署到storm之前手动检查必要的类是否存在。