未收到风暴sqs消息

2skhul33  于 2021-06-21  发布在  Storm
关注(0)|答案(1)|浏览(357)

我有一个拓扑结构,从2个sqs队列和5个螺栓读取1个喷口。处理后,当我试图确认从第二个螺栓它是没有得到确认。
我在可靠模式下运行它,并试图在最后一个螺栓确认。我收到这封信的时候好像收到了回信。但它不会从队列中被删除,也不会被覆盖 ack() 没有调用方法。它似乎调用了中的默认ack方法 backtype.storm.task.OutputCollector 而不是我喷口中重写的方法。

8240 [Thread-24-conversionBolt] INFO  backtype.storm.daemon.task - Emitting: conversionBolt__ack_ack [-7578372739434961741 -8189877254603774958]

我已经将消息id锚定到sqs队列喷口中的元组,并发送到第一个bolt。

collector.emit(getStreamId(message), new Values(jsonObj.toString()), message.getReceiptHandle());

我在队列中重写了ack()和fail()方法 spout.Default Visibility Timeout 已设置为30秒
我的拓扑中的代码段:

TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("firstQueueSpout",
                new SqsQueueSpout(StormConfigurations.getQueueURL()
                        + StormConfigurations.getFirstQueueName(), true),
                StormConfigurations.getAwsQueueSpoutThreads());

        builder.setSpout("secondQueueSpout",
                new SqsQueueSpout(StormConfigurations.getQueueURL()
                        + StormConfigurations.getSecondQueueName(),
                        true), StormConfigurations.getAwsQueueSpoutThreads());

        builder.setBolt("transformerBolt", new TransformerBolt(),
                StormConfigurations.getTranformerBoltThreads())
                .shuffleGrouping("firstQueueSpout")
                .shuffleGrouping("secondQueueSpout");

        builder.setBolt("conversionBolt", new ConversionBolt(),
                StormConfigurations.getTranformerBoltThreads())
                .shuffleGrouping("transformerBolt");

        // To dispatch it to the corresponding bolts based on packet type
        builder.setBolt("dispatchBolt", new DispatcherBolt(),
                StormConfigurations.getDispatcherBoltThreads())
                .shuffleGrouping("conversionBolt");

来自sqsqueuespout(扩展baserichsout)的代码段:

@Override
public void nextTuple() 
{
        if (queue.isEmpty()) {
            ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(
                    new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(10));
            queue.addAll(receiveMessageResult.getMessages());
        }       
        Message message = queue.poll();
        if (message != null) 
        {
            try
            {
                JSONParser parser = new JSONParser();           
                JSONObject jsonObj = (JSONObject) parser.parse(message.getBody());
                //      ack(message.getReceiptHandle());
                if (reliable) {
                    collector.emit(getStreamId(message), new Values(jsonObj.toString()), message.getReceiptHandle());
                } else {
                    // Delete it right away
                    sqs.deleteMessageAsync(new DeleteMessageRequest(queueUrl, message.getReceiptHandle()));             
                    collector.emit(getStreamId(message), new Values(jsonObj.toString()));
                }
            }
            catch (ParseException e) 
            {
                LOG.error("SqsQueueSpout SQLException in SqsQueueSpout.nextTuple(): ", e);
            }
        } else {
            // Still empty, go to sleep.
            Utils.sleep(sleepTime);
        }
    }

    public String getStreamId(Message message) {
        return Utils.DEFAULT_STREAM_ID;
    }

    public int getSleepTime() {
        return sleepTime;
    }

    public void setSleepTime(int sleepTime) 
    {
        this.sleepTime = sleepTime;
    }

    @Override
    public void ack(Object msgId) {
        System.out.println("......Inside ack in sqsQueueSpout..............."+msgId);
        // Only called in reliable mode.
        try {
            sqs.deleteMessageAsync(new DeleteMessageRequest(queueUrl, (String) msgId));
        } catch (AmazonClientException ace) { }
    }

    @Override
    public void fail(Object msgId) {
        // Only called in reliable mode.
        try {
            sqs.changeMessageVisibilityAsync(
                    new ChangeMessageVisibilityRequest(queueUrl, (String) msgId, 0));
        } catch (AmazonClientException ace) { }
    }

    @Override
    public void close() {
        sqs.shutdown();
        ((AmazonSQSAsyncClient) sqs).getExecutorService().shutdownNow();
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("message"));
    }

从我的第一个螺栓(扩展baserichbolt)剪下的代码:

public class TransformerBolt extends BaseRichBolt 
{
    private static final long serialVersionUID = 1L;
    public static final Logger LOG = LoggerFactory.getLogger(TransformerBolt.class);
    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        String eventStr = input.getString(0);
//some code here to convert the json string to map
//Map datamap, long packetId being sent to next bolt
    this.collector.emit(input, new Values(dataMap,packetId));       
        } 
        catch (Exception e) {
            LOG.warn("Exception while converting AWS SQS to HashMap :{}", e);
        }    
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("dataMap", "packetId"));
    }
}

来自第二个螺栓的代码段:

public class ConversionBolt extends BaseRichBolt 
{
    private static final long serialVersionUID = 1L;
    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) 
    {
        try{
            Map dataMap = (Map)input.getValue(0);
            Long packetId = (Long)input.getValue(1);

                //this ack is not working
                this.collector.ack(input);
        }catch(Exception e){
            this.collector.fail(input);
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }

如果你需要更多的信息,请告诉我。有人解释了为什么在我的喷口被覆盖的ack没有被调用(从我的第二个螺栓)。。。

cunj1qz1

cunj1qz11#

你必须 ack 所有螺栓中的所有传入元组,即 collector.ack(input)TransformerBolt.execute(Tuple input) .
您看到的日志消息是正确的:您的代码调用 collector.ack(...) 这个电话会被记录下来。打电话给 ack 在您的拓扑中不是调用 Spout.ack(...) :每次喷口发出具有消息id的元组时,拓扑的运行acker都会注册该id。这些acker将在bolt的每个ack上获得一条消息,收集这些消息,并在收到元组的所有ack时通知spout。如果一个喷口从一个acker接收到这个消息,它就调用它自己的 ack(Object messageID) 方法。
有关更多详细信息,请参见此处:https://storm.apache.org/documentation/guaranteeing-message-processing.html

相关问题