我有我的拓扑运行,每一个螺栓工程,除了redis螺栓。我正在尝试将信息写入redis数据库,我在网上找到了一个例子。但是,当db Bolt执行时,拓扑工作,它显示以下错误:
3594 [Thread-18-print] INFO b.s.d.executor - Processing received message FOR 22 TUPLE: source: meal:20, stream: default, id: {}, [2009 +1.2815365e-01 :-) :-)]
source: meal:20, stream: default, id: {}, [2009 +1.2815365e-01 :-) :-)]
13595 [Thread-18-print] INFO b.s.d.executor - BOLT ack TASK: 22 TIME: TUPLE: source: meal:20, stream: default, id: {}, [2009 +1.2815365e-01 :-) :-)]
13595 [Thread-18-print] INFO b.s.d.executor - Execute done TUPLE source: meal:20, stream: default, id: {}, [2009 +1.2815365e-01 :-) :-)] TASK: 22 DELTA:
13595 [Thread-38-bd] ERROR o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Thread Thread[Thread-38-bd,5,main] died
java.lang.RuntimeException: org.apache.storm.shade.org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /errors/test-1-1454011533/bd-last-error
at backtype.storm.util$wrap_in_runtime.invoke(util.clj:49) ~[storm-core-0.10.0.jar:0.10.0]
at backtype.storm.zookeeper$create_node.invoke(zookeeper.clj:92) ~[storm-core-0.10.0.jar:0.10.0]
at backtype.storm.cluster$mk_distributed_cluster_state$reify__4580.set_data(cluster.clj:106) ~[storm-core-0.10.0.jar:0.10.0]
at backtype.storm.cluster$mk_storm_cluster_state$reify__5120.report_error(cluster.clj:465) ~[storm-core-0.10.0.jar:0.10.0]
at backtype.storm.daemon.executor$throttled_report_error_fn$fn__5469.invoke(executor.clj:193) ~[storm-core-0.10.0.jar:0.10.0]
at backtype.storm.daemon.executor$mk_executor_data$fn__5523$fn__5524.invoke(executor.clj:256) ~[storm-core-0.10.0.jar:0.10.0]
at backtype.storm.util$async_loop$fn__545.invoke(util.clj:489) ~[storm-core-0.10.0.jar:0.10.0]
at clojure.lang.AFn.run(AFn.java:22) ~[clojure-1.6.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.7.0_91]
Caused by: org.apache.storm.shade.org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /errors/test-1-1454011533/bd-last-error
at org.apache.storm.shade.org.apache.zookeeper.KeeperException.create(KeeperException.java:119) ~[storm-core-0.10.0.jar:0.10.0]
at org.apache.storm.shade.org.apache.zookeeper.KeeperException.create(KeeperException.java:51) ~[storm-core-0.10.0.jar:0.10.0]
at org.apache.storm.shade.org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:783) ~[storm-core-0.10.0.jar:0.10.0]
at org.apache.storm.shade.org.apache.curator.framework.imps.CreateBuilderImpl$11.call(CreateBuilderImpl.java:676) ~[storm-core-0.10.0.jar:0.10.0]
at org.apache.storm.shade.org.apache.curator.framework.imps.CreateBuilderImpl$11.call(CreateBuilderImpl.java:660) ~[storm-core-0.10.0.jar:0.10.0]
at org.apache.storm.shade.org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107) ~[storm-core-0.10.0.jar:0.10.0]
at org.apache.storm.shade.org.apache.curator.framework.imps.CreateBuilderImpl.pathInForeground(CreateBuilderImpl.java:656) ~[storm-core-0.10.0.jar:0.10.0]
at org.apache.storm.shade.org.apache.curator.framework.imps.CreateBuilderImpl.protectedPathInForeground(CreateBuilderImpl.java:441) ~[storm-core-0.10.0.jar:0.10.0]
at org.apache.storm.shade.org.apache.curator.framework.imps.CreateBuilderImpl.forPath(CreateBuilderImpl.java:431) ~[storm-core-0.10.0.jar:0.10.0]
at org.apache.storm.shade.org.apache.curator.framework.imps.CreateBuilderImpl$3.forPath(CreateBuilderImpl.java:239) ~[storm-core-0.10.0.jar:0.10.0]
at org.apache.storm.shade.org.apache.curator.framework.imps.CreateBuilderImpl$3.forPath(CreateBuilderImpl.java:193) ~[storm-core-0.10.0.jar:0.10.0]
at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source) ~[?:?]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.7.0_91]
at java.lang.reflect.Method.invoke(Method.java:606) ~[?:1.7.0_91]
at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) ~[clojure-1.6.0.jar:?]
at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28) ~[clojure-1.6.0.jar:?]
at backtype.storm.zookeeper$create_node.invoke(zookeeper.clj:91) ~[storm-core-0.10.0.jar:0.10.0]
... 7 more
或者这另一个(s'ha refusat la connexi)ó = 连接被拒绝):
13375 [Thread-48-bd2] ERROR b.s.util - Async loop died!
java.lang.RuntimeException: redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:135) ~[storm-core-0.10.0.jar:0.10.0]
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:106) ~[storm-core-0.10.0.jar:0.10.0]
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.10.0.jar:0.10.0]
at backtype.storm.daemon.executor$fn__5694$fn__5707$fn__5758.invoke(executor.clj:819) ~[storm-core-0.10.0.jar:0.10.0]
at backtype.storm.util$async_loop$fn__545.invoke(util.clj:479) [storm-core-0.10.0.jar:0.10.0]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.7.0_91]
Caused by: redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool
at redis.clients.util.Pool.getResource(Pool.java:50) ~[jedis-2.7.0.jar:?]
at redis.clients.jedis.JedisPool.getResource(JedisPool.java:86) ~[jedis-2.7.0.jar:?]
at Storm.practice.Storm.Prova.ProvaTopology$RedisBolt.publish(ProvaTopology.java:175) ~[classes/:?]
at Storm.practice.Storm.Prova.ProvaTopology$RedisBolt.execute(ProvaTopology.java:157) ~[classes/:?]
at backtype.storm.daemon.executor$fn__5694$tuple_action_fn__5696.invoke(executor.clj:690) ~[storm-core-0.10.0.jar:0.10.0]
at backtype.storm.daemon.executor$mk_task_receiver$fn__5615.invoke(executor.clj:436) ~[storm-core-0.10.0.jar:0.10.0]
at backtype.storm.disruptor$clojure_handler$reify__5189.onEvent(disruptor.clj:58) ~[storm-core-0.10.0.jar:0.10.0]
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:132) ~[storm-core-0.10.0.jar:0.10.0]
... 6 more
Caused by: redis.clients.jedis.exceptions.JedisConnectionException: java.net.ConnectException: S’ha refusat la connexió
at redis.clients.jedis.Connection.connect(Connection.java:154) ~[jedis-2.7.0.jar:?]
at redis.clients.jedis.BinaryClient.connect(BinaryClient.java:83) ~[jedis-2.7.0.jar:?]
at redis.clients.jedis.BinaryJedis.connect(BinaryJedis.java:1643) ~[jedis-2.7.0.jar:?]
at redis.clients.jedis.JedisFactory.makeObject(JedisFactory.java:85) ~[jedis-2.7.0.jar:?]
at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:861) ~[commons-pool2-2.3.jar:2.3]
at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:435) ~[commons-pool2-2.3.jar:2.3]
at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:363) ~[commons-pool2-2.3.jar:2.3]
at redis.clients.util.Pool.getResource(Pool.java:48) ~[jedis-2.7.0.jar:?]
at redis.clients.jedis.JedisPool.getResource(JedisPool.java:86) ~[jedis-2.7.0.jar:?]
at Storm.practice.Storm.Prova.ProvaTopology$RedisBolt.publish(ProvaTopology.java:175) ~[classes/:?]
at Storm.practice.Storm.Prova.ProvaTopology$RedisBolt.execute(ProvaTopology.java:157) ~[classes/:?]
at backtype.storm.daemon.executor$fn__5694$tuple_action_fn__5696.invoke(executor.clj:690) ~[storm-core-0.10.0.jar:0.10.0]
at backtype.storm.daemon.executor$mk_task_receiver$fn__5615.invoke(executor.clj:436) ~[storm-core-0.10.0.jar:0.10.0]
at backtype.storm.disruptor$clojure_handler$reify__5189.onEvent(disruptor.clj:58) ~[storm-core-0.10.0.jar:0.10.0]
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:132) ~[storm-core-0.10.0.jar:0.10.0]
... 6 more
Caused by: java.net.ConnectException: S’ha refusat la connexió
at java.net.PlainSocketImpl.socketConnect(Native Method) ~[?:1.7.0_91]
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) ~[?:1.7.0_91]
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) ~[?:1.7.0_91]
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) ~[?:1.7.0_91]
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[?:1.7.0_91]
at java.net.Socket.connect(Socket.java:579) ~[?:1.7.0_91]
at redis.clients.jedis.Connection.connect(Connection.java:148) ~[jedis-2.7.0.jar:?]
at redis.clients.jedis.BinaryClient.connect(BinaryClient.java:83) ~[jedis-2.7.0.jar:?]
at redis.clients.jedis.BinaryJedis.connect(BinaryJedis.java:1643) ~[jedis-2.7.0.jar:?]
at redis.clients.jedis.JedisFactory.makeObject(JedisFactory.java:85) ~[jedis-2.7.0.jar:?]
at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:861) ~[commons-pool2-2.3.jar:2.3]
at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:435) ~[commons-pool2-2.3.jar:2.3]
at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:363) ~[commons-pool2-2.3.jar:2.3]
at redis.clients.util.Pool.getResource(Pool.java:48) ~[jedis-2.7.0.jar:?]
at redis.clients.jedis.JedisPool.getResource(JedisPool.java:86) ~[jedis-2.7.0.jar:?]
at Storm.practice.Storm.Prova.ProvaTopology$RedisBolt.publish(ProvaTopology.java:175) ~[classes/:?]
at Storm.practice.Storm.Prova.ProvaTopology$RedisBolt.execute(ProvaTopology.java:157) ~[classes/:?]
at backtype.storm.daemon.executor$fn__5694$tuple_action_fn__5696.invoke(executor.clj:690) ~[storm-core-0.10.0.jar:0.10.0]
at backtype.storm.daemon.executor$mk_task_receiver$fn__5615.invoke(executor.clj:436) ~[storm-core-0.10.0.jar:0.10.0]
at backtype.storm.disruptor$clojure_handler$reify__5189.onEvent(disruptor.clj:58) ~[storm-core-0.10.0.jar:0.10.0]
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:132) ~[storm-core-0.10.0.jar:0.10.0]
... 6 more
我不知道为什么它不能拥有这些资源。我不确定这个螺栓是否能写出确认的元组。我希望有人知道如何修复它(有很多未使用的导入,但这是因为我正在发疯,试图让它写入redis数据库,我不知道如何完成它,这只是一个测试,我正在从事一个大数据项目,我需要了解这个数据库连接):
package Storm.practice.Storm.Prova;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.testing.TestWordSpout;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.ITuple;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.topology.base.BaseRichSpout;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
import org.apache.storm.redis.bolt.AbstractRedisBolt;
import org.apache.storm.redis.bolt.RedisStoreBolt;
import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisLookupMapper;
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import org.apache.storm.redis.trident.state.RedisState;
import org.apache.storm.redis.trident.state.RedisStateQuerier;
import org.apache.storm.redis.trident.state.RedisStateUpdater;
import org.apache.storm.shade.com.google.common.collect.Lists;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCommands;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
//import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisPubSub;
import storm.trident.Stream;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
/**
* This is a basic example of a Storm topology.
*/
public class ProvaTopology implements Serializable {
public static class ProvaBolt extends BaseRichBolt {
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + " :-)"));
_collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("Morts"));
}
}
public class ProvaSpout extends BaseRichSpout {
SpoutOutputCollector _collector;
//Random _rand;
private String fileName;
//private SpoutOutputCollector _collector;
private BufferedReader reader;
private AtomicLong linesRead;
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
try {
fileName= (String)"prova.tsv";
reader = new BufferedReader(new FileReader(fileName));
// read and ignore the header if one exists
} catch (Exception e) {
throw new RuntimeException(e);
}
// _rand = new Random();
}
public void nextTuple() {
Utils.sleep(100);
try {
String line = reader.readLine();
if (line != null) {
//long id = linesRead.incrementAndGet();
System.out.println("Finished reading line, " + line);
_collector.emit(new Values((String)line));// id));
} else {
System.out.println("Finished reading file, " + linesRead.get() + " lines read");
Thread.sleep(10000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void ack(Object id) {
}
public void fail(Object id) {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("Morts"));
}
}
public class RedisBolt implements IRichBolt {
protected String channel = "Somriures";
// protected String configChannel;
protected OutputCollector collector;
// protected Tuple currentTuple;
// protected Logger log;
protected JedisPool pool;
// protected ConfigListenerThread configListenerThread;
public RedisBolt(){}
public RedisBolt(String channel) {
// log = Logger.getLogger(getClass().getName());
// setupNonSerializableAttributes();
}
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
pool = new JedisPool("localhost");
}
public void execute(Tuple tuple) {
String current = tuple.getString(0);
if(current != null) {
// for(Object obj: result) {
publish(current);
collector.emit(tuple, new Values(current));
// }
collector.ack(tuple);
}
}
public void cleanup() {
if(pool != null) {
pool.destroy();
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(channel));
}
public void publish(String msg) {
Jedis jedis = pool.getResource();
jedis.publish(channel, msg);
pool.returnResource(jedis);
}
protected void setupNonSerializableAttributes() {
}
public Map getComponentConfiguration() {
return null;
}
}
public class PrinterBolt extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer ofd) {
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
ProvaTopology Pt = new ProvaTopology();
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost("127.0.0.1").setPort(666).build();
builder.setSpout("Morts", Pt.new ProvaSpout(), 10);//emisorTestWordSpout
builder.setBolt("happy", new ProvaBolt(), 3).shuffleGrouping("Morts");// de on llig?
builder.setBolt("meal", new ProvaBolt(), 2).shuffleGrouping("happy");// de on llig?
builder.setBolt("bd", Pt.new RedisBolt(), 2).shuffleGrouping("meal");// de on llig?
builder.setBolt("print", Pt.new PrinterBolt(), 2).shuffleGrouping("meal");
builder.setBolt("bd2", Pt.new RedisBolt(), 2).shuffleGrouping("happy");
// builder.setBolt("StoreM", (storeMapperS));
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(5);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
//WithProgressBar
}
else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
}
}
}
提前谢谢!
1条答案
按热度按时间6vl6ewon1#
昨天我得到了答案,我需要的是把localhost改成127.0.0.1,然后我在一个终端上启动redis数据库,在另一个终端上启动monitor,我的发布方法开始工作了。