storm kafka拓扑终止,没有输出

ulmd4ohb  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(289)

这是stbolt.java类。

package com.storm.cassandra;

import java.util.Map;

import net.sf.json.JSONObject;
import net.sf.json.JSONSerializer;

import org.apache.log4j.Logger;

import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.IBasicBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;

public class StBolt implements IBasicBolt {

private static final long serialVersionUID = 1L;

private static final Logger logger = Logger
        .getLogger(StBolt.class);

private static Session session = null;
private Cluster cluster = null;
String cassandraURL;
JSONObject eventJson = null;
String topicname = null;
String ip = null;
String menu = null;
String product = null;

Row row = null;

com.datastax.driver.core.ResultSet viewcount = null;
com.datastax.driver.core.ResultSet segmentlistResult = null;
com.datastax.driver.core.ResultSet newCountUpdatedResult = null;

public StBolt(String topicname) {
    this.topicname = topicname;
}

public void prepare(Map stormConf, TopologyContext topologyContext) {

    cluster = Cluster.builder().addContactPoint("127.0.0.1").build();
    System.out.println("load cassandra ip");
    session = cluster.connect();
    System.out.println("CassandraCounterBolt prepare method ended");

}

public void execute(Tuple input, BasicOutputCollector collector) {

    System.out.println("Execute");
    Fields fields = input.getFields();

    try {
        eventJson = (JSONObject) JSONSerializer.toJSON((String) input
                .getValueByField(fields.get(0)));
        topicname = (String) eventJson.get("topicName");
        ip = (String) eventJson.get("ip");
        menu = (String) eventJson.get("menu");
        product = (String) eventJson.get("product");

        String ievent = "ievent";
        String install = "install";

        viewcount = session
                .execute("update webapp.viewcount set count=count+1 where topicname='"+topicname+ 
                        "'and ip= '"+ip+"'and menu='"+menu+"'and product='"+product+"'" );

    } catch (Exception e) {
        e.printStackTrace();
    }

}

public void declareOutputFields(OutputFieldsDeclarer declarer) {

}

public Map<String, Object> getComponentConfiguration() {
    return null;
}

public void cleanup() {

}

}

下面是sttopology.java类

package com.storm.cassandra;

 import org.apache.storm.kafka.BrokerHosts;
 import org.apache.storm.kafka.KafkaSpout;
 import org.apache.storm.kafka.SpoutConfig;
 import org.apache.storm.kafka.StringScheme;
 import org.apache.storm.kafka.ZkHosts;
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.spout.SchemeAsMultiScheme;
 import org.apache.storm.topology.TopologyBuilder;

  public class StTopology {

  public static void main(String[] args) throws Exception {
    if (args.length == 4) {

        BrokerHosts hosts = new ZkHosts("localhost:2181");
        //System.out
        //.println("Insufficent Arguements - topologyName kafkaTopic ZKRoot ID");

        SpoutConfig kafkaConf1 = new SpoutConfig(hosts, args[1], args[2],
                args[3]);
        //System.out
        //.println("Insufficent Arguements - topologyName kafkaTopic ZKRoot ID");

        //kafkaConf1.forceFromStart = false;
        kafkaConf1.zkRoot = args[2];
        kafkaConf1.scheme = new SchemeAsMultiScheme(new StringScheme());
        KafkaSpout kafkaSpout1 = new KafkaSpout(kafkaConf1);

        StBolt countbolt = new StBolt(args[1]);
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafkaspout", kafkaSpout1, 1);
        builder.setBolt("counterbolt", countbolt, 1).shuffleGrouping(
                "kafkaspout");
        Config config = new Config();
        config.setDebug(true);
        config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 1);
        config.setNumWorkers(1);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(args[0], config, builder.createTopology());

        // StormSubmitter.submitTopology(args[0], config,
        // builder.createTopology());

    } else {
        System.out
                .println("Insufficent Arguements - topologyName kafkaTopic ZKRoot ID");
    }
}
 }

我试图从kafka控制台生产者那里获取json数据,在storm中处理它并将其存储到cassandra中。
由于某些原因,当我使用参数运行代码时,bolt没有响应 viewcount usercount /kafkastorm webapp1 .
我让Kafka从游戏机制作人那里获取数据作为主题 usercount ,以及正确的Cassandra表格。
代码编译和运行时没有任何错误,但控制台显示终止。
尽管多次向kafka控制台生产者提供正确的json输入,但我在任何地方都没有活动 {"topicname":"usercount","ip":"127.0.0.1","menu":"dress","product":"tshirt"} .
storm ui的拓扑摘要中也没有显示正在创建的拓扑。
我相信我有Kafka,风暴和Cassandra的所有地方。
请给我指出这个问题的正确方向。谢谢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题