我正在将Kafka消息移到JDBC中。目前为了运行我的代码,我只在控制台上打印消息。我当前的Kafka队列正在控制台日志中打印,但它没有等待新的Kafka消息,程序正在停止,没有任何错误。我希望我的Storm代码永远运行,并打印任何新的Kafka消息。
我的Kafka萌芽拓扑代码是:
package org.example;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.kafka.spout.FirstPollOffsetStrategy;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.topology.TopologyBuilder;
import java.util.Properties;
public class MainKSprout1 {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
Properties prop = new Properties();
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "1");
KafkaSpoutConfig spoutConfig = KafkaSpoutConfig.
builder("PLAINTEXT://10.20.73.69:9092","testtopic")
.setProp(prop)
.setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST)
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
.setOffsetCommitPeriodMs(100)
.setProp("session.timeout.ms", 20000)
.setProp("heartbeat.interval.ms", 15000)
.build();
builder.setSpout("stations", new KafkaSpout<String, String>(spoutConfig));
builder.setBolt("MultiplierBolt", new KBolt()).shuffleGrouping("stations");
Config config = new Config();
LocalCluster cluster = new LocalCluster();
while (args != null && args.length == 0){
cluster.submitTopology("HelloTopology", config, builder.createTopology());
}
try {
Thread.sleep(10000);
} catch (InterruptedException e){
e.printStackTrace();
}
finally {
cluster.shutdown();
}
}
}
我的Kafka博尔特密码是:
package org.example;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
public class KBolt extends BaseBasicBolt {
private OutputCollector collector;
private BasicOutputCollector basicOutputCollector;
private Connection connection;
private PreparedStatement statement;
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
this.basicOutputCollector = basicOutputCollector;
System.out.println("Vikas " + tuple.getValueByField("value"));//.getString(0));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("id"));
}
}
我还没有看到任何人有这种问题。请告诉我如何才能做到这一点。我想我需要把我的拓扑无限循环,将等待新的消息来。这将是适当的方式吗?
while (args != null && args.length == 0){
cluster.submitTopology("HelloTopology", config, builder.createTopology());
}
1条答案
按热度按时间tvokkenx1#
由于这是我第一次使用Storm,我犯了一个错误,使用了非常旧版本的Store-Core,因此我遇到了这个问题。在将Storm-Core更新到最新版本后,我的问题得到了解决。
感谢@moosehead42给了我一个公平的想法,帮助我解决了这个问题。