Storm 喷口不等待Kafka的新信息

xmjla07d  于 2023-03-11  发布在  Apache
关注(0)|答案(1)|浏览(255)

我正在将Kafka消息移到JDBC中。目前为了运行我的代码,我只在控制台上打印消息。我当前的Kafka队列正在控制台日志中打印,但它没有等待新的Kafka消息,程序正在停止,没有任何错误。我希望我的Storm代码永远运行,并打印任何新的Kafka消息。
我的Kafka萌芽拓扑代码是:

package org.example;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.kafka.spout.FirstPollOffsetStrategy;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.topology.TopologyBuilder;
import java.util.Properties;

public class MainKSprout1 {
    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        Properties prop = new Properties();
        prop.put(ConsumerConfig.GROUP_ID_CONFIG, "1");
        KafkaSpoutConfig spoutConfig = KafkaSpoutConfig.
            builder("PLAINTEXT://10.20.73.69:9092","testtopic")
            .setProp(prop)
            .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST)
            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
            .setOffsetCommitPeriodMs(100)
            .setProp("session.timeout.ms", 20000)
            .setProp("heartbeat.interval.ms", 15000)
            .build();
        builder.setSpout("stations", new KafkaSpout<String, String>(spoutConfig));
        builder.setBolt("MultiplierBolt", new KBolt()).shuffleGrouping("stations");
        Config config = new Config();
        LocalCluster cluster = new LocalCluster();
        while (args != null && args.length == 0){
            cluster.submitTopology("HelloTopology", config, builder.createTopology());
        }
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e){
            e.printStackTrace();
        }
        finally {
            cluster.shutdown();
        }
    }
}

我的Kafka博尔特密码是:

package org.example;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

public class KBolt extends BaseBasicBolt {
    private OutputCollector collector;
    private BasicOutputCollector basicOutputCollector;
    private Connection connection;
    private PreparedStatement statement;

    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        this.basicOutputCollector = basicOutputCollector;
        System.out.println("Vikas " + tuple.getValueByField("value"));//.getString(0));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("id"));
    }
}

我还没有看到任何人有这种问题。请告诉我如何才能做到这一点。我想我需要把我的拓扑无限循环,将等待新的消息来。这将是适当的方式吗?

while (args != null && args.length == 0){
            cluster.submitTopology("HelloTopology", config, builder.createTopology());
        }
tvokkenx

tvokkenx1#

由于这是我第一次使用Storm,我犯了一个错误,使用了非常旧版本的Store-Core,因此我遇到了这个问题。在将Storm-Core更新到最新版本后,我的问题得到了解决。
感谢@moosehead42给了我一个公平的想法,帮助我解决了这个问题。

相关问题