用redis作为数据源的storm

nzk0hqpo  于 2021-06-21  发布在  Storm
关注(0)|答案(1)|浏览(301)

我有一个需要从redis示例流式传输数据的storm拓扑,我尝试从单个redis示例运行拓扑读取,但似乎没有从redis读取任何内容,当我检查返回的队列时,它是空的。我使用的是storm版本0.9.3。
这是我的 RedisQueueSpout 这是一个风暴喷口,它将使用指定的模式(aka key)将您的拓扑插入redis,每次风暴轮询它时,它都会在该模式下查找输入数据。喷口发射一个带有id消息的单个场到它后面的任何一个螺栓。

package storm.starter.spout;

import java.util.List;
import java.util.Map;
import redis.clients.jedis.Jedis;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

public class RedisQueueSpout extends BaseRichSpout {
  static final long            serialVersionUID = 737015318988609460L;
  private SpoutOutputCollector _collector;
  private final String         host;
  private final int            port;
  private final String         pattern;
  private transient JedisQueue jq;

  public RedisQueueSpout(String host, int port, String pattern) {
    this.host = host;
    this.port = port;
    this.pattern = pattern;
  }

  @SuppressWarnings("rawtypes")
  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    _collector = collector;
    Jedis newJedis = new Jedis(host, port);
    newJedis.connect();
    this.jq = new JedisQueue(newJedis, pattern);
  }

  public void close() {}

  public void nextTuple() {
    List<String> ret = this.jq.dequeue();
    if (ret == null) {
      Utils.sleep(5L);
    }
    else {
      System.out.println(ret);
      _collector.emit(new Values(ret));
    }
  }

  @Override
  public void ack(Object msgId) {}

  @Override
  public void fail(Object msgId) {}

  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("name"));
  }
}

这是我的 JedisQueue 它是redis支持的标准队列数据结构的实现。请注意,dequeue方法有些非常规地返回 List<String> 因为这就是底层jedis实现返回的结果:这是由于redis能够为一个键存储许多值。

package storm.starter;

import java.util.List;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisDataException;

public class JedisQueue {
  private transient Jedis jedis;
  private final String pattern;

  public JedisQueue(Jedis jedis, String pattern) {
    this.jedis = jedis;
    this.pattern = pattern;
  }

  public void clear() {
    this.jedis.del(this.pattern);
  }

  public boolean isEmpty() { 
    return (this.size() == 0);
  }

  public int size() {
    return new Integer(this.jedis.llen(this.pattern).toString());
  }

  public List<String> toArray() {
    return this.jedis.lrange(this.pattern, 0, -1);
  }

  public void enqueue(String... elems) {
    this.jedis.rpush(this.pattern, elems);
  }

  public List<String> dequeue() {
    List<String> out = null;
    try {
      out = this.jedis.blpop(0, this.pattern);
    }
    catch (JedisDataException e) {
      // It wasn't a list of strings
    }

    return out;
  }
}

代码来自风暴绝地,更多信息请查看链接。
这是我的拓扑结构:

package storm.starter;

import org.tomdz.storm.esper.EsperBolt;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import storm.starter.spout.RedisQueueSpout;;

public class NameCountTopology {
  public static void main (String[] args) throws Exception {
    String host = "10.0.0.251";
    int port = 6379;
    String pattern = "Name:*";
    TopologyBuilder builder = new TopologyBuilder();

    EsperBolt bolt = new EsperBolt.Builder().inputs().aliasComponent("spout").toEventType("names").outputs()
            .onDefaultStream().emit("nps").statements()
            .add("select count(*) as nps from names.win:time_batch(1 sec)").build();

    builder.setSpout("spout", new RedisQueueSpout(host,port,pattern),1);
    builder.setBolt("count-bolt", bolt, 1).fieldsGrouping("spout", new Fields("name"));

    Config conf = new Config();
    conf.setDebug(true);

    if (args != null && args.length > 0) {
        conf.setNumWorkers(1);

        StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

    } else {

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("name-count-topology", conf, builder.createTopology());
        Utils.sleep(300000);
        cluster.killTopology("name-count-topology");
        cluster.shutdown();

    }
}

}
我的redis键值使用hmset存储,格式如下:

HMSET Name:1 NAME Mary YEAR 1880 GENDER F COUNT 7065
HMSET Name:2 NAME Anna YEAR 1880 GENDER F COUNT 2604
...

这是来自“我的主管”节点的日志:

2016-05-04 07:37:56 b.s.d.executor [INFO] Opened spout spout:(3) 
2016-05-04 07:37:56 b.s.d.executor [INFO] Activating spout spout:(3)
2016-05-04 07:37:56 STDIO [INFO] Queue is empty... 
2016-05-04 07:37:56 c.e.e.c.EPServiceProviderImpl [INFO] Initializing engine URI 'org.tomdz.storm.esper.EsperBolt@44d83ea0' version 4.3.0 
2016-05-04 07:37:58 b.s.d.executor [INFO] Prepared bolt count-bolt:(2)
2016-05-04 07:38:54 b.s.d.executor [INFO] Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60] 
2016-05-04 07:38:54 b.s.d.task [INFO] Emitting: __system __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@70f9b3ee> [#<DataPoint [__ack-count = {}]> #<DataPoint [memory/heap = {unusedBytes=9418640, usedBytes=14710896, maxBytes=259522560, initBytes=8035520, virtualFreeBytes=244811664, committedBytes=24129536}]> #<DataPoint [__receive = {write_pos=1, read_pos=0, capacity=1024, population=1}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [newWorkerEvent = 1]> #<DataPoint [__emit-count = {}]> #<DataPoint [__execute-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=1024, population=0}]> #<DataPoint [memory/nonHeap = {unusedBytes=1218808, usedBytes=36529928, maxBytes=224395264, initBytes=24313856, virtualFreeBytes=187865336, committedBytes=37748736}]> #<DataPoint [uptimeSecs = 77.358]> #<DataPoint [__transfer = {write_pos=0, read_pos=0, capacity=1024, population=0}]> #<DataPoint [startTimeSecs = 1.462347457159E9]> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count = {}]>]] 
2016-05-04 07:38:54 b.s.d.executor [INFO] Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60] 
2016-05-04 07:38:54 b.s.d.task [INFO] Emitting: __acker __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@19940834> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=1024, population=0}]> #<DataPoint [__receive = {write_pos=1, read_pos=0, capacity=1024, population=1}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {}]> #<DataPoint [__execute-count = {}]>]]

日志一直这样重复。这是我运行拓扑后的ui:storm ui
现在我的问题是,为什么喷口不工作,什么也没有排放出来,似乎什么都没有从redis捡起来。
ps:我已经检查了主机和端口,我可以从redis获取数据,所以我想与redis的连接没有问题。

ercv8c1e

ercv8c1e1#

hmset代表hash,blpop代表list。它们不兼容。
blpop不期望有模式。它需要确切的键名。请参考http://redis.io/commands/blpop 详情。
由于spout从单个线程执行nexttuple()、ack()、fail()方法,因此具有长(或无限)超时的blpop也会阻止spout,除非有一条消息可供pop使用。
希望有帮助。

相关问题