我有一个spout类,它有几个integer和string属性,这些属性按预期进行序列化/反序列化。该类还有1个linkedlist,其中包含字节数组。反序列化对象时,此linkedlist始终为空。
我在所有的spout方法中添加了log语句,可以看到spout的'activate'方法被调用,之后linkedlist是空的。当“deactivate”方法发生这种情况时,我看不到任何日志。
似乎奇怪的是,在没有调用“deactivate”方法的情况下调用了spout“activate”方法。调用“activate”方法时,没有重新提交拓扑。
我在spoutt构造函数中还有一个log语句,在清空linkedlist之前不会调用它。
我还反复验证了在spoutt类中没有对任何方法的调用会完全清空linkedlist。有一个spot使用poll方法,紧接着一个log语句来记录新的linkedlist大小。
我找到了这个引用,它指向kryo被用于序列化,但它可能只是用于序列化元组数据。http://storm.apache.org/documentation/serialization.html
storm使用kryo进行序列化。kryo是一个灵活而快速的序列化库,它可以生成小的序列化。
默认情况下,storm可以序列化基元类型、字符串、字节数组、arraylist、hashmap、hashset和clojure集合类型。如果要在元组中使用其他类型,则需要注册自定义序列化程序。
这篇文章听起来好像kryo可能只是用于序列化和传递元组,但是如果它也用于spout对象,那么我就不知道如何使用linkedlist,因为arraylists和hashmaps并不是fifo队列的好选择。我需要自己的linkedlist吗?
public class MySpout extends BaseRichSpout
{
private SpoutOutputCollector _collector;
private LinkedList<byte[]> messages = new LinkedList<byte[]>();
public MyObject()
{
queue = new LinkedList<ObjectType>();
}
public void add(byte[] message)
{
messages.add(message);
}
@Override
public void open( Map conf, TopologyContext context,
SpoutOutputCollector collector )
{
_collector = collector;
try
{
Logger.getInstance().addMessage("Opening Spout");
// ####### Open client connection here to read messages
}
catch (MqttException e)
{
e.printStackTrace();
}
}
@Override
public void close()
{
Logger.getInstance().addMessage("Close Method Called!!!!!!!!!!!!!!!!!");
}
@Override
public void activate()
{
Logger.getInstance().addMessage("Activate Method Called!!!!!!!!!!!!!!!!!");
}
@Override
public void nextTuple()
{
if (!messages.isEmpty())
{
System.out.println("Tuple emitted from spout");
_collector.emit(new Values(messages.poll()));
Logger.getInstance().addMessage("Tuple emitted from spout. Remaining in queue: " + messages.size());
try
{
Thread.sleep(1);
}
catch (InterruptedException e)
{
// TODO Auto-generated catch block
Logger.getInstance().addMessage("Sleep thread interrupted in nextTuple(). " + Logger.convertStacktraceToString(e));
e.printStackTrace();
}
}
}
}
编辑:
引用对象的java序列化正在“丢失值”?http://www.javaspecialists.eu/archive/issue088.html
上面的so链接和javaspecialists文章给出了与我看到的类似的具体示例,问题是如何进行序列化/反序列化缓存。但由于斯托姆正在做这项工作,我不知道该如何处理这个问题。
归根结底,更大的问题似乎是storm突然开始序列化/反序列化数据。
编辑:
就在喷口被激活之前,在不到一秒钟的时间内,大量的日志信息会显示:
执行人mytopology-1-1447093098:[x y]未激活
在这些消息之后,有一个日志:
正在为拓扑id mytopology-1-1447093098设置新分配:#backtype.storm.daemon.common.assignment{:主代码目录。。。
1条答案
按热度按时间5vf7fwbs1#
如果我正确理解您的问题,您可以在客户端示例化您的喷口,通过
addMessage()
,将喷口通过addSpout()
,然后将拓扑提交到集群?当拓扑启动时,您希望喷口消息列表包含您添加的消息吗?如果这是正确的,你的使用模式是很奇怪的。。。我猜这个问题与用于向集群提交拓扑的节俭有关。java序列化没有被使用,我假设thrift代码没有序列化实际的对象。就我所理解的代码而言,拓扑jar是二进制的,拓扑结构是通过thrift来实现的。在执行拓扑的worker上,通过
new
. 因此,不会发生java序列化/反序列化,并且linkedlist为空。由于new
当然不是null
任何一个。顺便说一句:关于kryo你是对的,它只用于传送数据(即元组)。
作为解决方法,您可以添加
LinkedList
到Map
那是给StormSubmitter.submitTopology(...)
. 在Spout.open(...)
你应该从网上得到一份正确的邮件副本Map
参数。然而,正如我已经提到的,您的使用模式是非常奇怪的——您可能需要重新思考这个问题。一般来说,一个喷口应该以一种方式实现,也就是说可以从中获取数据nextTuple()
从外部数据源。