我有一个拓扑结构,从2个sqs队列和5个螺栓读取1个喷口。处理后,当我试图确认从第二个螺栓它是没有得到确认。
我在可靠模式下运行它,并试图在最后一个螺栓确认。我收到这封信的时候好像收到了回信。但它不会从队列中被删除,也不会被覆盖 ack()
没有调用方法。它似乎调用了中的默认ack方法 backtype.storm.task.OutputCollector
而不是我喷口中重写的方法。
8240 [Thread-24-conversionBolt] INFO backtype.storm.daemon.task - Emitting: conversionBolt__ack_ack [-7578372739434961741 -8189877254603774958]
我已经将消息id锚定到sqs队列喷口中的元组,并发送到第一个bolt。
collector.emit(getStreamId(message), new Values(jsonObj.toString()), message.getReceiptHandle());
我在队列中重写了ack()和fail()方法 spout.Default Visibility Timeout
已设置为30秒
我的拓扑中的代码段:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("firstQueueSpout",
new SqsQueueSpout(StormConfigurations.getQueueURL()
+ StormConfigurations.getFirstQueueName(), true),
StormConfigurations.getAwsQueueSpoutThreads());
builder.setSpout("secondQueueSpout",
new SqsQueueSpout(StormConfigurations.getQueueURL()
+ StormConfigurations.getSecondQueueName(),
true), StormConfigurations.getAwsQueueSpoutThreads());
builder.setBolt("transformerBolt", new TransformerBolt(),
StormConfigurations.getTranformerBoltThreads())
.shuffleGrouping("firstQueueSpout")
.shuffleGrouping("secondQueueSpout");
builder.setBolt("conversionBolt", new ConversionBolt(),
StormConfigurations.getTranformerBoltThreads())
.shuffleGrouping("transformerBolt");
// To dispatch it to the corresponding bolts based on packet type
builder.setBolt("dispatchBolt", new DispatcherBolt(),
StormConfigurations.getDispatcherBoltThreads())
.shuffleGrouping("conversionBolt");
来自sqsqueuespout(扩展baserichsout)的代码段:
@Override
public void nextTuple()
{
if (queue.isEmpty()) {
ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(
new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(10));
queue.addAll(receiveMessageResult.getMessages());
}
Message message = queue.poll();
if (message != null)
{
try
{
JSONParser parser = new JSONParser();
JSONObject jsonObj = (JSONObject) parser.parse(message.getBody());
// ack(message.getReceiptHandle());
if (reliable) {
collector.emit(getStreamId(message), new Values(jsonObj.toString()), message.getReceiptHandle());
} else {
// Delete it right away
sqs.deleteMessageAsync(new DeleteMessageRequest(queueUrl, message.getReceiptHandle()));
collector.emit(getStreamId(message), new Values(jsonObj.toString()));
}
}
catch (ParseException e)
{
LOG.error("SqsQueueSpout SQLException in SqsQueueSpout.nextTuple(): ", e);
}
} else {
// Still empty, go to sleep.
Utils.sleep(sleepTime);
}
}
public String getStreamId(Message message) {
return Utils.DEFAULT_STREAM_ID;
}
public int getSleepTime() {
return sleepTime;
}
public void setSleepTime(int sleepTime)
{
this.sleepTime = sleepTime;
}
@Override
public void ack(Object msgId) {
System.out.println("......Inside ack in sqsQueueSpout..............."+msgId);
// Only called in reliable mode.
try {
sqs.deleteMessageAsync(new DeleteMessageRequest(queueUrl, (String) msgId));
} catch (AmazonClientException ace) { }
}
@Override
public void fail(Object msgId) {
// Only called in reliable mode.
try {
sqs.changeMessageVisibilityAsync(
new ChangeMessageVisibilityRequest(queueUrl, (String) msgId, 0));
} catch (AmazonClientException ace) { }
}
@Override
public void close() {
sqs.shutdown();
((AmazonSQSAsyncClient) sqs).getExecutorService().shutdownNow();
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("message"));
}
从我的第一个螺栓(扩展baserichbolt)剪下的代码:
public class TransformerBolt extends BaseRichBolt
{
private static final long serialVersionUID = 1L;
public static final Logger LOG = LoggerFactory.getLogger(TransformerBolt.class);
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String eventStr = input.getString(0);
//some code here to convert the json string to map
//Map datamap, long packetId being sent to next bolt
this.collector.emit(input, new Values(dataMap,packetId));
}
catch (Exception e) {
LOG.warn("Exception while converting AWS SQS to HashMap :{}", e);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("dataMap", "packetId"));
}
}
来自第二个螺栓的代码段:
public class ConversionBolt extends BaseRichBolt
{
private static final long serialVersionUID = 1L;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input)
{
try{
Map dataMap = (Map)input.getValue(0);
Long packetId = (Long)input.getValue(1);
//this ack is not working
this.collector.ack(input);
}catch(Exception e){
this.collector.fail(input);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
如果你需要更多的信息,请告诉我。有人解释了为什么在我的喷口被覆盖的ack没有被调用(从我的第二个螺栓)。。。
1条答案
按热度按时间cunj1qz11#
你必须
ack
所有螺栓中的所有传入元组,即collector.ack(input)
至TransformerBolt.execute(Tuple input)
.您看到的日志消息是正确的:您的代码调用
collector.ack(...)
这个电话会被记录下来。打电话给ack
在您的拓扑中不是调用Spout.ack(...)
:每次喷口发出具有消息id的元组时,拓扑的运行acker都会注册该id。这些acker将在bolt的每个ack上获得一条消息,收集这些消息,并在收到元组的所有ack时通知spout。如果一个喷口从一个acker接收到这个消息,它就调用它自己的ack(Object messageID)
方法。有关更多详细信息,请参见此处:https://storm.apache.org/documentation/guaranteeing-message-processing.html