import backtype.storm.Config;`enter code here`
import backtype.storm.`StormSubmitter`;`enter code here
import backtype.storm.topology.TopologyBuilder;`enter code here`
import consumer.bolt.FirebaseBolt;`enter code here`
import consumer.bolt.WordCountBolt;
import consumer.bolt.WordCountDumpBolt;
import nl.minvenj.nfi.storm.kafka.KafkaSpout;
/**
* User: tonymeng
* Date: 3/31/14
*/
public class FirebaseTopologyCluster {
public static void main(String[]args) throws Exception {
if (args == null || args.length != 3) {
throw new IllegalArgumentException("localhost:2181 ,testTopic, https://chem-9b445.firebaseio.com/");
}
String zkConnect = args[0];
String topic = args[1];
String firebaseNamespace = args[2];
Config config = new Config();
config.setNumWorkers(1);
config.put("kafka.spout.topic", topic);
config.put("kafka.spout.consumer.group", "test-consumer-group");
config.put("kafka.zookeeper.connect", zkConnect);
config.put("kafka.consumer.timeout.ms", 4000);
KafkaSpout spout = new KafkaSpout();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafkaspout", spout);
builder.setBolt("countbolt", new WordCountBolt()).shuffleGrouping("kafkaspout");
builder.setBolt("countfilebolt", new WordCountDumpBolt("/tmp/stats")).shuffleGrouping("countbolt");
// using '`' as a delimiter
builder.setBolt("firebasebolt", new FirebaseBolt(firebaseNamespace, "`")).shuffleGrouping("countbolt");
StormSubmitter.submitTopology("statstopology", config, builder.createTopology());
}
}
这是我的密码。在此运行之后,显示此类型的消息。我在上面写的。如何解决此问题:
线程“main”java.lang.illegalargumentexception中出现异常:localhost:2181 ,测试主题,化学-9b445
1条答案
按热度按时间t5fffqht1#
看起来你在抛出那个异常。
运行时,应该传递3个参数才能正确运行此代码。
这些参数是:;