这是stbolt.java类。
package com.storm.cassandra;
import java.util.Map;
import net.sf.json.JSONObject;
import net.sf.json.JSONSerializer;
import org.apache.log4j.Logger;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.IBasicBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
public class StBolt implements IBasicBolt {
private static final long serialVersionUID = 1L;
private static final Logger logger = Logger
.getLogger(StBolt.class);
private static Session session = null;
private Cluster cluster = null;
String cassandraURL;
JSONObject eventJson = null;
String topicname = null;
String ip = null;
String menu = null;
String product = null;
Row row = null;
com.datastax.driver.core.ResultSet viewcount = null;
com.datastax.driver.core.ResultSet segmentlistResult = null;
com.datastax.driver.core.ResultSet newCountUpdatedResult = null;
public StBolt(String topicname) {
this.topicname = topicname;
}
public void prepare(Map stormConf, TopologyContext topologyContext) {
cluster = Cluster.builder().addContactPoint("127.0.0.1").build();
System.out.println("load cassandra ip");
session = cluster.connect();
System.out.println("CassandraCounterBolt prepare method ended");
}
public void execute(Tuple input, BasicOutputCollector collector) {
System.out.println("Execute");
Fields fields = input.getFields();
try {
eventJson = (JSONObject) JSONSerializer.toJSON((String) input
.getValueByField(fields.get(0)));
topicname = (String) eventJson.get("topicName");
ip = (String) eventJson.get("ip");
menu = (String) eventJson.get("menu");
product = (String) eventJson.get("product");
String ievent = "ievent";
String install = "install";
viewcount = session
.execute("update webapp.viewcount set count=count+1 where topicname='"+topicname+
"'and ip= '"+ip+"'and menu='"+menu+"'and product='"+product+"'" );
} catch (Exception e) {
e.printStackTrace();
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
public void cleanup() {
}
}
下面是sttopology.java类
package com.storm.cassandra;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;
public class StTopology {
public static void main(String[] args) throws Exception {
if (args.length == 4) {
BrokerHosts hosts = new ZkHosts("localhost:2181");
//System.out
//.println("Insufficent Arguements - topologyName kafkaTopic ZKRoot ID");
SpoutConfig kafkaConf1 = new SpoutConfig(hosts, args[1], args[2],
args[3]);
//System.out
//.println("Insufficent Arguements - topologyName kafkaTopic ZKRoot ID");
//kafkaConf1.forceFromStart = false;
kafkaConf1.zkRoot = args[2];
kafkaConf1.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout1 = new KafkaSpout(kafkaConf1);
StBolt countbolt = new StBolt(args[1]);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafkaspout", kafkaSpout1, 1);
builder.setBolt("counterbolt", countbolt, 1).shuffleGrouping(
"kafkaspout");
Config config = new Config();
config.setDebug(true);
config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 1);
config.setNumWorkers(1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(args[0], config, builder.createTopology());
// StormSubmitter.submitTopology(args[0], config,
// builder.createTopology());
} else {
System.out
.println("Insufficent Arguements - topologyName kafkaTopic ZKRoot ID");
}
}
}
我试图从kafka控制台生产者那里获取json数据,在storm中处理它并将其存储到cassandra中。
由于某些原因,当我使用参数运行代码时,bolt没有响应 viewcount usercount /kafkastorm webapp1
.
我让Kafka从游戏机制作人那里获取数据作为主题 usercount
,以及正确的Cassandra表格。
代码编译和运行时没有任何错误,但控制台显示终止。
尽管多次向kafka控制台生产者提供正确的json输入,但我在任何地方都没有活动 {"topicname":"usercount","ip":"127.0.0.1","menu":"dress","product":"tshirt"}
.
storm ui的拓扑摘要中也没有显示正在创建的拓扑。
我相信我有Kafka,风暴和Cassandra的所有地方。
请给我指出这个问题的正确方向。谢谢。
暂无答案!
目前还没有任何答案,快来回答吧!