正在尝试创建SQS轮询器,该轮询器:
- 执行指数轮询(如果队列中没有消息,则减少请求数)
- 如果队列中有大量消息,则更频繁地查询SQS
- 有背压如果收到一定数量的消息,它停止轮询
- 不受AWS API速率限制
作为一个例子,我使用thisJavaRx实现,它很容易转换为Project Reactor,并通过背压来丰富它。
private static final Long DEFAULT_BACKOFF = 500L;
private static final Long MAX_BACKOFF = 8000L;
private static final Logger LOGGER = LoggerFactory.getLogger(SqsPollerService.class);
private static volatile boolean stopRequested;
public Flux<Message> pollMessages(GetQueueUrlResult q)
{
return Flux.create(sink -> {
long backoff = DEFAULT_BACKOFF;
while (!stopRequested)
{
if (sink.isCancelled())
{
sink.error(new RuntimeException("Stop requested"));
break;
}
Future<ReceiveMessageResult> future = sink.requestedFromDownstream() > 0
? amazonSQS.receiveMessageAsync(createRequest(q))
: completedFuture(new ReceiveMessageResult());
try
{
ReceiveMessageResult result = future.get();
if (result != null && !result.getMessages().isEmpty())
{
backoff = DEFAULT_BACKOFF;
LOGGER.info("New messages found in queue size={}", result.getMessages().size());
result.getMessages().forEach(m -> {
if (sink.requestedFromDownstream() > 0L)
{
sink.next(m);
}
});
}
else
{
if (backoff < MAX_BACKOFF)
{
backoff = backoff * 2;
}
LOGGER.debug("No messages found on queue. Sleeping for {} ms.", backoff);
// This is to prevent rate limiting by the AWS api
Thread.sleep(backoff);
}
}
catch (InterruptedException e)
{
stopRequested = true;
}
catch (ExecutionException e)
{
sink.error(e);
}
}
});
}
实施似乎有效,但有几个问题:
- 看起来在循环中查询Future结果可以使用Reactor Primitives完成,尝试使用
Flux.generate
,但无法控制对SqlClient的异步调用次数 - 如果使用
Flux.interval
方法,则不了解如何正确实施退避策略 - 不喜欢
Thread.sleep
打电话有什么想法如何取代它? - 如果取消信号,如何正确停止循环?现在使用
sink.error
来解决这种情况。
2条答案
按热度按时间tp5buhyn1#
您对以下解决方案有何看法:
frebpwbc2#
以前的回答似乎没有抓住要点:
间隔轮询不符合标准,因此不应采用这种方法,因为无论是否有消息,它都会向SQS请求相同数量的查询。我们可以将其委托给SQS长轮询机制。
Future.get()
阻塞线程,导致饥饿并导致性能低下,您不应该使用future.get()
或.wait()
或任何阻塞操作,而是使用异步回调,或者在这种情况下,Flux.fromFuture
应该做到这一点。您可以使用Flux.generate,它将为来自下游的每个请求项请求(N)触发,因此您真正需要控制的是下游如何向上游请求。
您的Flux.generate可以只是一个
Flux<Mono<ReceiveMessageResponse>>
,这样generate就可以创建一个SQS的Mono(异步)查询。然后,您可以使用.flatMap async行为并设置并发,因此您可以在任何给定时间限制对SQS的并行查询量,防止您的应用向SQS发出太多请求。
因此,本质上,这样的东西应该起作用:
请记住.flatMap本质上是异步的,因此将并发设置为(4)将允许向上游(生成器)发送最多4个并发请求,本质上它是在订阅时转换(或执行)Mono。
如果你在中间放置任何东西,它可能会改变这种行为,有效地向生成器请求更多的元素,并打破sqs查询的并发限制。
其他注意事项:
Thread.sleep
获取线程并阻止它,相反,您可以返回Mono.delay
以防止有人请求更多项目,这可能很方便,例如SQS关闭并且查询失败,因为没有20秒的长轮询,而是会更快地失败。您可以将生成器Mono设置为来自AWS的响应或错误时的空响应,延迟一定时间,像1秒。希望能有所帮助。