java—集群中的拓扑是否可以写入本地文件系统上的txt文件(群集(在同一系统上运行)

9udxz4iz  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(280)

目前,我正在从事一个项目,从ti sensortag cc2650获取其他传感器值,通过http(通过apache tomcat servlet)用python脚本将这些值发送到apache kafka,并将kafka与apache storm连接以处理数据。
这些数据将通过拓扑中的一个螺栓写入本地系统(apachestorm集群文件夹的目录)上的一个.txt文件。
几周前我刚开始讲《暴风雪》和《Kafka》,我对以下几点感到困惑:
如果我在本地集群上运行拓扑,一切正常。但是如果我把它提交到一个“正常”的集群,运行在localhost:8888,那么它就什么也不做了。
storm ui确实显示了拓扑结构,但似乎对来自kafka的传入消息没有任何React。
本地集群上的测试和实际集群上的功能不应该相似吗?或者集群只是没有权限在本地系统上写入/修改文件?
其他信息:
概述(“系统”之间的连接):

它应该如何工作?
当我在kafka中向一个主题写入一条消息时,拓扑的kafka喷口应该获取该消息并将其写入本地文件系统上的一个.txt文件中。
我的代码(带依赖项的jar)位于:
“/home/tobias/storm/apache-storm-0.9.2-incubating/mycode/stormkafkatopology/target/”
我正在尝试写入output.txt文件,该文件位于:
“/home/tobias/storm/apache-storm-0.9.2-incubating/mycode/stromkafcatopology”/tmp/”
拓扑代码:

public class StormKafkaTopology {

public static void main(String[] args) throws Exception {

    Config config = new Config();
    config.setDebug(true);
    config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
    String zkConnString = "localhost:2181";
    String topic = "mytopic";
    BrokerHosts hosts = new ZkHosts(zkConnString);

    SpoutConfig kafkaSpoutConfig = new SpoutConfig(hosts, topic, "/" +topic, UUID.randomUUID().toString());
    kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4;
    kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4;
    //kafkaSpoutConfig.forceFromStart = true;
    kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutConfig));
    builder.setBolt("printer-bolt", new PrinterBolt()).shuffleGrouping("kafka-spout");

    if (args != null && args.length >0) {
        config.setNumWorkers(6);
        config.setNumAckers(6);
        //config.setMaxSpoutPending(100);
        //config.setMessageTimeoutSecs(20);
        StormSubmitter.submitTopology("StormKafkaTopology", config, builder.createTopology());
    } else {
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("StormKafkaTopology", config, builder.createTopology());
        Utils.sleep(10000);
        cluster.killTopology("StormKafkaTopology");
        cluster.shutdown();
    }
}}

打印机螺栓代码:

public class PrinterBolt extends BaseBasicBolt {
/*

* execute-method will be opened if tuples are processed
* /

@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
    String msg = tuple.getString(0);
    System.out.println("======before write file======");
    try {
        // set file directory:
        File file = new File("/home/tobias/storm/apache-storm-0.9.2-incubating/mycode/StormKafkaTopology/tmp/output.txt");
        if(!file.exists()) {
            file.createNewFile();
        }
    //create a FileWriter
    FileWriter fw = new FileWriter(file.getAbsoluteFile(), true);
    //create a BufferedWriter
    BufferedWriter bw = new BufferedWriter(fw);
    //write into the file
    bw.write(msg + "\n");
    //close the BufferedWriter (IMPORTANT)
    bw.close();
    } catch (IOException e) {
        e.printStackTrace();
    }
    System.out.println("======after write file======");
    //you could emit some Date here for further processing:
    //collector.emit(new Values(msg));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("output"));
}}

如果有人能指出我的错误,给我一些建议,我会很高兴的。

hgb9j2n6

hgb9j2n61#

如果您能够在“本地”运行它,那么这是一个很好的第一步。另外,听起来你可以进入风暴用户界面,这是很好的。提交拓扑后,它应该显示在storm ui中,然后您可以单击它以查看拓扑中的喷口和螺栓。单击每个喷口/螺栓,然后单击端口(每个工人一个)以在ui中查看日志。
我猜某处有个错误。是时候开始挖掘风暴/Kafka的原木来找出它是什么了。
问:如何确定哪个工人创建了哪个日志?每个工人都分配了一个端口。每个工作日志是拓扑名称+端口的组合。对于您来说,只需找到最新的日志并查看其中的内容。
有几件事:
从一个工人开始,这更简单
更新printerbolt中的日志以使用slf4j,这样您就可以在storm的日志和ui中看到消息
添加try/catch并在发生异常时使用 collector.reportError(e); 报告错误。然后它会在storm ui中显示为红色!
你的图表上的小澄清,Kafka代理端口是9092不是2181。。。2181只供Zookeeper使用

相关问题