我有一个需要从redis示例流式传输数据的storm拓扑,我尝试从单个redis示例运行拓扑读取,但似乎没有从redis读取任何内容,当我检查返回的队列时,它是空的。我使用的是storm版本0.9.3。
这是我的 RedisQueueSpout
这是一个风暴喷口,它将使用指定的模式(aka key)将您的拓扑插入redis,每次风暴轮询它时,它都会在该模式下查找输入数据。喷口发射一个带有id消息的单个场到它后面的任何一个螺栓。
package storm.starter.spout;
import java.util.List;
import java.util.Map;
import redis.clients.jedis.Jedis;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
public class RedisQueueSpout extends BaseRichSpout {
static final long serialVersionUID = 737015318988609460L;
private SpoutOutputCollector _collector;
private final String host;
private final int port;
private final String pattern;
private transient JedisQueue jq;
public RedisQueueSpout(String host, int port, String pattern) {
this.host = host;
this.port = port;
this.pattern = pattern;
}
@SuppressWarnings("rawtypes")
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
Jedis newJedis = new Jedis(host, port);
newJedis.connect();
this.jq = new JedisQueue(newJedis, pattern);
}
public void close() {}
public void nextTuple() {
List<String> ret = this.jq.dequeue();
if (ret == null) {
Utils.sleep(5L);
}
else {
System.out.println(ret);
_collector.emit(new Values(ret));
}
}
@Override
public void ack(Object msgId) {}
@Override
public void fail(Object msgId) {}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("name"));
}
}
这是我的 JedisQueue
它是redis支持的标准队列数据结构的实现。请注意,dequeue方法有些非常规地返回 List<String>
因为这就是底层jedis实现返回的结果:这是由于redis能够为一个键存储许多值。
package storm.starter;
import java.util.List;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisDataException;
public class JedisQueue {
private transient Jedis jedis;
private final String pattern;
public JedisQueue(Jedis jedis, String pattern) {
this.jedis = jedis;
this.pattern = pattern;
}
public void clear() {
this.jedis.del(this.pattern);
}
public boolean isEmpty() {
return (this.size() == 0);
}
public int size() {
return new Integer(this.jedis.llen(this.pattern).toString());
}
public List<String> toArray() {
return this.jedis.lrange(this.pattern, 0, -1);
}
public void enqueue(String... elems) {
this.jedis.rpush(this.pattern, elems);
}
public List<String> dequeue() {
List<String> out = null;
try {
out = this.jedis.blpop(0, this.pattern);
}
catch (JedisDataException e) {
// It wasn't a list of strings
}
return out;
}
}
代码来自风暴绝地,更多信息请查看链接。
这是我的拓扑结构:
package storm.starter;
import org.tomdz.storm.esper.EsperBolt;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import storm.starter.spout.RedisQueueSpout;;
public class NameCountTopology {
public static void main (String[] args) throws Exception {
String host = "10.0.0.251";
int port = 6379;
String pattern = "Name:*";
TopologyBuilder builder = new TopologyBuilder();
EsperBolt bolt = new EsperBolt.Builder().inputs().aliasComponent("spout").toEventType("names").outputs()
.onDefaultStream().emit("nps").statements()
.add("select count(*) as nps from names.win:time_batch(1 sec)").build();
builder.setSpout("spout", new RedisQueueSpout(host,port,pattern),1);
builder.setBolt("count-bolt", bolt, 1).fieldsGrouping("spout", new Fields("name"));
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(1);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("name-count-topology", conf, builder.createTopology());
Utils.sleep(300000);
cluster.killTopology("name-count-topology");
cluster.shutdown();
}
}
}
我的redis键值使用hmset存储,格式如下:
HMSET Name:1 NAME Mary YEAR 1880 GENDER F COUNT 7065
HMSET Name:2 NAME Anna YEAR 1880 GENDER F COUNT 2604
...
这是来自“我的主管”节点的日志:
2016-05-04 07:37:56 b.s.d.executor [INFO] Opened spout spout:(3)
2016-05-04 07:37:56 b.s.d.executor [INFO] Activating spout spout:(3)
2016-05-04 07:37:56 STDIO [INFO] Queue is empty...
2016-05-04 07:37:56 c.e.e.c.EPServiceProviderImpl [INFO] Initializing engine URI 'org.tomdz.storm.esper.EsperBolt@44d83ea0' version 4.3.0
2016-05-04 07:37:58 b.s.d.executor [INFO] Prepared bolt count-bolt:(2)
2016-05-04 07:38:54 b.s.d.executor [INFO] Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
2016-05-04 07:38:54 b.s.d.task [INFO] Emitting: __system __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@70f9b3ee> [#<DataPoint [__ack-count = {}]> #<DataPoint [memory/heap = {unusedBytes=9418640, usedBytes=14710896, maxBytes=259522560, initBytes=8035520, virtualFreeBytes=244811664, committedBytes=24129536}]> #<DataPoint [__receive = {write_pos=1, read_pos=0, capacity=1024, population=1}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [newWorkerEvent = 1]> #<DataPoint [__emit-count = {}]> #<DataPoint [__execute-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=1024, population=0}]> #<DataPoint [memory/nonHeap = {unusedBytes=1218808, usedBytes=36529928, maxBytes=224395264, initBytes=24313856, virtualFreeBytes=187865336, committedBytes=37748736}]> #<DataPoint [uptimeSecs = 77.358]> #<DataPoint [__transfer = {write_pos=0, read_pos=0, capacity=1024, population=0}]> #<DataPoint [startTimeSecs = 1.462347457159E9]> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count = {}]>]]
2016-05-04 07:38:54 b.s.d.executor [INFO] Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
2016-05-04 07:38:54 b.s.d.task [INFO] Emitting: __acker __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@19940834> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=1024, population=0}]> #<DataPoint [__receive = {write_pos=1, read_pos=0, capacity=1024, population=1}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {}]> #<DataPoint [__execute-count = {}]>]]
日志一直这样重复。这是我运行拓扑后的ui:storm ui
现在我的问题是,为什么喷口不工作,什么也没有排放出来,似乎什么都没有从redis捡起来。
ps:我已经检查了主机和端口,我可以从redis获取数据,所以我想与redis的连接没有问题。
1条答案
按热度按时间ercv8c1e1#
hmset代表hash,blpop代表list。它们不兼容。
blpop不期望有模式。它需要确切的键名。请参考http://redis.io/commands/blpop 详情。
由于spout从单个线程执行nexttuple()、ack()、fail()方法,因此具有长(或无限)超时的blpop也会阻止spout,除非有一条消息可供pop使用。
希望有帮助。