java apache storm spout空的反序列化linkedlist对象属性

vyu0f0g1  于 2021-06-21  发布在  Storm
关注(0)|答案(1)|浏览(313)

我有一个spout类,它有几个integer和string属性,这些属性按预期进行序列化/反序列化。该类还有1个linkedlist,其中包含字节数组。反序列化对象时,此linkedlist始终为空。
我在所有的spout方法中添加了log语句,可以看到spout的'activate'方法被调用,之后linkedlist是空的。当“deactivate”方法发生这种情况时,我看不到任何日志。
似乎奇怪的是,在没有调用“deactivate”方法的情况下调用了spout“activate”方法。调用“activate”方法时,没有重新提交拓扑。
我在spoutt构造函数中还有一个log语句,在清空linkedlist之前不会调用它。
我还反复验证了在spoutt类中没有对任何方法的调用会完全清空linkedlist。有一个spot使用poll方法,紧接着一个log语句来记录新的linkedlist大小。
我找到了这个引用,它指向kryo被用于序列化,但它可能只是用于序列化元组数据。http://storm.apache.org/documentation/serialization.html
storm使用kryo进行序列化。kryo是一个灵活而快速的序列化库,它可以生成小的序列化。
默认情况下,storm可以序列化基元类型、字符串、字节数组、arraylist、hashmap、hashset和clojure集合类型。如果要在元组中使用其他类型,则需要注册自定义序列化程序。
这篇文章听起来好像kryo可能只是用于序列化和传递元组,但是如果它也用于spout对象,那么我就不知道如何使用linkedlist,因为arraylists和hashmaps并不是fifo队列的好选择。我需要自己的linkedlist吗?

public class MySpout extends BaseRichSpout
{

    private SpoutOutputCollector _collector;
    private LinkedList<byte[]> messages = new LinkedList<byte[]>();

    public MyObject()
    {
        queue = new LinkedList<ObjectType>();
    }

    public void add(byte[] message)
    {
        messages.add(message);
    }

    @Override
    public void open( Map conf, TopologyContext context,
            SpoutOutputCollector collector )
    {
        _collector = collector;

        try
        {           
            Logger.getInstance().addMessage("Opening Spout");
            // ####### Open client connection here to read messages
        }
        catch (MqttException e)
        {
            e.printStackTrace();
        }
    }

    @Override
    public void close()
    {
        Logger.getInstance().addMessage("Close Method Called!!!!!!!!!!!!!!!!!");
    }

    @Override
    public void activate()
    {
        Logger.getInstance().addMessage("Activate Method Called!!!!!!!!!!!!!!!!!");
    }

    @Override
    public void nextTuple()
    {

        if (!messages.isEmpty())
        {
            System.out.println("Tuple emitted from spout");            
            _collector.emit(new Values(messages.poll()));
            Logger.getInstance().addMessage("Tuple emitted from spout. Remaining in queue: " + messages.size());
            try
            {
                Thread.sleep(1);
            }
            catch (InterruptedException e)
            {
                // TODO Auto-generated catch block
                Logger.getInstance().addMessage("Sleep thread interrupted in nextTuple(). " + Logger.convertStacktraceToString(e));
                e.printStackTrace();
            }
        }
    }
}

编辑:
引用对象的java序列化正在“丢失值”?http://www.javaspecialists.eu/archive/issue088.html
上面的so链接和javaspecialists文章给出了与我看到的类似的具体示例,问题是如何进行序列化/反序列化缓存。但由于斯托姆正在做这项工作,我不知道该如何处理这个问题。
归根结底,更大的问题似乎是storm突然开始序列化/反序列化数据。
编辑:
就在喷口被激活之前,在不到一秒钟的时间内,大量的日志信息会显示:
执行人mytopology-1-1447093098:[x y]未激活
在这些消息之后,有一个日志:
正在为拓扑id mytopology-1-1447093098设置新分配:#backtype.storm.daemon.common.assignment{:主代码目录。。。

5vf7fwbs

5vf7fwbs1#

如果我正确理解您的问题,您可以在客户端示例化您的喷口,通过 addMessage() ,将喷口通过 addSpout() ,然后将拓扑提交到集群?当拓扑启动时,您希望喷口消息列表包含您添加的消息吗?如果这是正确的,你的使用模式是很奇怪的。。。
我猜这个问题与用于向集群提交拓扑的节俭有关。java序列化没有被使用,我假设thrift代码没有序列化实际的对象。就我所理解的代码而言,拓扑jar是二进制的,拓扑结构是通过thrift来实现的。在执行拓扑的worker上,通过 new . 因此,不会发生java序列化/反序列化,并且linkedlist为空。由于 new 当然不是 null 任何一个。
顺便说一句:关于kryo你是对的,它只用于传送数据(即元组)。
作为解决方法,您可以添加 LinkedListMap 那是给 StormSubmitter.submitTopology(...) . 在 Spout.open(...) 你应该从网上得到一份正确的邮件副本 Map 参数。然而,正如我已经提到的,您的使用模式是非常奇怪的——您可能需要重新思考这个问题。一般来说,一个喷口应该以一种方式实现,也就是说可以从中获取数据 nextTuple() 从外部数据源。

相关问题