java 带背压的React式SQS轮询器

az31mfrm  于 2023-04-04  发布在  Java
关注(0)|答案(2)|浏览(119)

正在尝试创建SQS轮询器,该轮询器:

  • 执行指数轮询(如果队列中没有消息,则减少请求数)
  • 如果队列中有大量消息,则更频繁地查询SQS
  • 有背压如果收到一定数量的消息,它停止轮询
  • 不受AWS API速率限制

作为一个例子,我使用thisJavaRx实现,它很容易转换为Project Reactor,并通过背压来丰富它。

private static final Long DEFAULT_BACKOFF = 500L;
private static final Long MAX_BACKOFF = 8000L;
private static final Logger LOGGER = LoggerFactory.getLogger(SqsPollerService.class);
private static volatile boolean stopRequested;

public Flux<Message> pollMessages(GetQueueUrlResult q)
{
    return Flux.create(sink -> {
        long backoff = DEFAULT_BACKOFF;

        while (!stopRequested)
        {
            if (sink.isCancelled())
            {
                sink.error(new RuntimeException("Stop requested"));
                break;
            }

            Future<ReceiveMessageResult> future = sink.requestedFromDownstream() > 0
                    ? amazonSQS.receiveMessageAsync(createRequest(q))
                    : completedFuture(new ReceiveMessageResult());

            try
            {
                ReceiveMessageResult result = future.get();

                if (result != null && !result.getMessages().isEmpty())
                {
                    backoff = DEFAULT_BACKOFF;

                    LOGGER.info("New messages found in queue size={}", result.getMessages().size());

                    result.getMessages().forEach(m -> {
                        if (sink.requestedFromDownstream() > 0L)
                        {
                            sink.next(m);
                        }
                    });
                }
                else
                {
                    if (backoff < MAX_BACKOFF)
                    {
                        backoff = backoff * 2;
                    }

                    LOGGER.debug("No messages found on queue.  Sleeping for {} ms.", backoff);

                    // This is to prevent rate limiting by the AWS api
                    Thread.sleep(backoff);
                }
            }
            catch (InterruptedException e)
            {
                stopRequested = true;
            }
            catch (ExecutionException e)
            {
                sink.error(e);
            }
        }
    });
}

实施似乎有效,但有几个问题:

  • 看起来在循环中查询Future结果可以使用Reactor Primitives完成,尝试使用Flux.generate,但无法控制对SqlClient的异步调用次数
  • 如果使用Flux.interval方法,则不了解如何正确实施退避策略
  • 不喜欢Thread.sleep打电话有什么想法如何取代它?
  • 如果取消信号,如何正确停止循环?现在使用sink.error来解决这种情况。
tp5buhyn

tp5buhyn1#

您对以下解决方案有何看法:

private static final Integer batchSize = 1;
    private static final Integer intervalRequest = 3000;
    private static final Integer waitTimeout = 10;
    private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    private static final SqsAsyncClient sqsAsync =
       SqsAsyncClient
         .builder()
         .endpointOverride(URI.create(queueUrl))
         .build();

    public static Flux<Message> sqsPublisher =
        Flux.create(sink -> {
                if (sink.isCancelled()) {
                    sink.error(new RuntimeException("Stop requested"));
                }

            scheduler.scheduleWithFixedDelay(() -> {
                long numberOfRequests = Math.min(sink.requestedFromDownstream(), batchSize);
                if (numberOfRequests > 0) {
                    ReceiveMessageRequest request = ReceiveMessageRequest
                            .builder()
                            .queueUrl(queueUrl)
                            .maxNumberOfMessages((int) numberOfRequests)
                            .waitTimeSeconds(waitTimeout).build();

                    CompletableFuture<ReceiveMessageResponse> response = sqsAsync.receiveMessage(request);

                    response.thenApply(responseValue -> {
                        if (responseValue != null && responseValue.messages() != null && !responseValue.messages().isEmpty()) {
                            responseValue.messages().stream().limit(numberOfRequests).forEach(sink::next);
                        }
                        return responseValue;
                    });

                }
            }, intervalRequest, intervalRequest, TimeUnit.MILLISECONDS);
        });
frebpwbc

frebpwbc2#

以前的回答似乎没有抓住要点:
间隔轮询不符合标准,因此不应采用这种方法,因为无论是否有消息,它都会向SQS请求相同数量的查询。我们可以将其委托给SQS长轮询机制。
Future.get()阻塞线程,导致饥饿并导致性能低下,您不应该使用future.get().wait()或任何阻塞操作,而是使用异步回调,或者在这种情况下,Flux.fromFuture应该做到这一点。
您可以使用Flux.generate,它将为来自下游的每个请求项请求(N)触发,因此您真正需要控制的是下游如何向上游请求。
您的Flux.generate可以只是一个Flux<Mono<ReceiveMessageResponse>>,这样generate就可以创建一个SQS的Mono(异步)查询。
然后,您可以使用.flatMap async行为并设置并发,因此您可以在任何给定时间限制对SQS的并行查询量,防止您的应用向SQS发出太多请求。
因此,本质上,这样的东西应该起作用:

var receiveRequest = ReceiveMessageRequest.builder()
                .queueUrl(queueUrl)
                .waitTimeSeconds(20) // Long polling when there is no messages (non-blocking io)
                .maxNumberOfMessages(10) // Batching 
                .visibilityTimeout(1) // Sample
                .messageAttributeNames("All") // Sample
                .build();

Flux.generate((SynchronousSink<Mono<ReceiveMessageResponse>> sink) ->  
                sink.next(Mono.fromFuture(this.sqsClient.receiveMessage(receiveRequest))))
                .flatMap(asyncSqsQuery -> asyncSqsQuery, 4) // Defaults to 256
                .flatMapIterable(ReceiveMessageResponse::messages);

请记住.flatMap本质上是异步的,因此将并发设置为(4)将允许向上游(生成器)发送最多4个并发请求,本质上它是在订阅时转换(或执行)Mono。
如果你在中间放置任何东西,它可能会改变这种行为,有效地向生成器请求更多的元素,并打破sqs查询的并发限制。
其他注意事项:Thread.sleep获取线程并阻止它,相反,您可以返回Mono.delay以防止有人请求更多项目,这可能很方便,例如SQS关闭并且查询失败,因为没有20秒的长轮询,而是会更快地失败。您可以将生成器Mono设置为来自AWS的响应或错误时的空响应,延迟一定时间,像1秒。
希望能有所帮助。

相关问题