在storm中,如何发出原语列表

yfwxisqw  于 2021-06-21  发布在  Storm
关注(0)|答案(2)|浏览(402)

我使用的是Storm0.10,我有一个列表,在上面迭代,然后发出元组

for (CustomObject o: List<CustomObject>) {
  collector.emit(STREAM_NAME, new Values(o.getFirst, o.getName, o.getAddress));
}

我不想发出多个元组,我只想发出一个元组,它是一个嵌套列表,类似这样,
我的主要问题是关于序列化。阅读storm文档表明java序列化非常昂贵,storm使用kryo序列化。此外,如果没有kryo,常规的javapojo类不应该通过网络发送。所以我想发一封信 List<List<Objects>> 具体如下:,

List<List<Object>> valueList = new ArrayList<List<Object>>();
for (CustomObject o: List<CustomObject>) {
  v.add(new ArrayList<Object>{ 
    {
     add(o.getFirst);
     add( o.getName);
     add(o.getAddress);
    }
  });
}
collector.emit(STREAM_NAME, new Values(valueList));

所以问题是-这是用kryo通过storm完成的吗?

bkhjykvo

bkhjykvo1#

使用kryoserialization。
实施 CustomObjectSerializer :

class CustomObjectSerializer extends Serializer<CustomObject> {  
  @Override
  public void write(final Kryo kryo,
                    final Output output,
                    final CustomObject cObject) {
    // do serialization
  }

  @Override
  public CustomObject read(final Kryo kryo,
                           final Input input,
                           final Class<CustomObject> aClass) {
    // do deserialization
  }
}

将customobjectserializer注册到storm配置: conf.registerSerialization(CustomObject.class,CustomObjectSerializer.class);

9q78igpj

9q78igpj2#

你可以简单地发出你的声音 List ,

collector.emit(new Values(listCustomObject));

当你在报纸上读到的时候 bolt 这样地,

List<CustomObject> listCustomObject = (List<CustomObject>) tuple.getValue(0);

希望有帮助。
编辑:事实上我忘了提到对象必须是 Serializable ! 为了处理这个列表,我会这样做:

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class CustomObjectList implements Serializable {

    private static final long serialVersionUID = 6877020084704724252L;

    public class CustomObject {
        ...
    }

    private final List<CustomObject> list = new ArrayList<>();

    ...
}

edit2:因为我背了太多的东西,所以我通过修改 ExclamationBolt 在storm starter exaclamationtopology中
看起来像这样。

package storm.starter;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class ExclamationBolt extends BaseRichBolt{

    public static Logger LOG = LoggerFactory.getLogger(ExclamationBolt.class);    

    public class CustomObject {
        private String word;

        public CustomObject(String word) {
            setWord(word);
        }

        public String getWord() {
            return word;
        }

        public void setWord(String word) {
            this.word = word;
        }

    }
    public class CustomObjectList implements Serializable {

        private static final long serialVersionUID = 6877020084704724252L;

        private final List<CustomObject> list = new ArrayList<>();

        public CustomObjectList() {
        }

        public List<CustomObject> getObjects() {
            return list;
        }

        public void addObject(CustomObject o) {
            list.add(o);
        }
    }

    OutputCollector _collector;

    public ExclamationBolt() {
    }

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        _collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        String id = tuple.getSourceComponent();
        String word;
        if ("word".equals(id)) {
            word = tuple.getString(0);
        } else {
            CustomObjectList list = (CustomObjectList) tuple.getValue(0);
            word = list.getObjects().get(0).getWord();
        }
        word = word + "!!!";
        LOG.debug(word);
        CustomObjectList list = new CustomObjectList();
        list.addObject(new CustomObject(word));
        _collector.emit(tuple, new Values(list));
        _collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

}

希望有帮助。

相关问题