这是我的输入口的代码snippt,用于将元组发送到处理节点,以便在集群上进行流处理。问题是blockingqueue正在抛出interruptedexception。
private SpoutOutputCollector collector;
public BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>();
public boolean isDistributed() {
return true;
}
public void open(@SuppressWarnings("rawtypes") final Map conf, final TopologyContext context,
final SpoutOutputCollector collector) {
this.collector=collector;
}
@Override
public void nextTuple() {
try {
//Utils.sleep(100);
collector.emit(new Values("Single Temperature Reading", blockingQueue.take()));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void readInputfile() throws IOException, InterruptedException{
FileInputStream file = new FileInputStream("/home/529076/Desktop/Temperature");
DataInputStream readDate=new DataInputStream(file);
BufferedReader readText=new BufferedReader(new InputStreamReader(readDate));
String line;
String singleReading = null;
while((line=readText.readLine())!=null){
singleReading=line;
blockingQueue.add(singleReading);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("Single Temperature Reading"));
}
异常描述如下:---
java.lang.interruptedexception10930[thread-20]info backtype.storm.util-异步循环中断!
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:1961)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1996)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
at com.tcs.storm.test.InputStreamSpout.nextTuple(InputStreamSpout.java:65)
at backtype.storm.daemon.task$fn__3349$fn__3404.invoke(task.clj:413)
以及下一组(inputstreamspout。java:65 is ------>
collector.emit(new Values("Single Temperature Reading", blockingQueue.take()));
谢谢
2条答案
按热度按时间0tdrvxhp1#
在storm中,建议在prepare()/open()方法中初始化螺栓/喷口字段。
原因是螺栓/喷口是在上传拓扑的节点上创建的:blockingqueue将在那里构建。然后,螺栓/喷口被序列化并分发到工作节点,在那里它们被反序列化。这种序列化/反序列化过程可能无法保留在构建bolt/spout时设置的字段的所有属性。在prepare()或open()中初始化的字段没有此问题。
vdgimpew2#
错误的原因是:blockingqueue没有在输出收集器中初始化;