spring集成-轮询器触发太频繁

py49o6xq  于 2021-08-25  发布在  Java
关注(0)|答案(1)|浏览(301)

我有以下代码:

@Bean
  public IntegrationFlow aggregatingFlow(
      AmqpInboundChannelAdapter aggregatorInboundAdapter,
      PollableChannel aggregatingChannel,
      AmqpOutboundEndpointEnhanced amqpOutboundEndpoint,
      PollSkipAdvice pollSkipAdvice) {
    return IntegrationFlows.from(aggregatorInboundAdapter)
        .wireTap(wtChannel())
        .channel(aggregatingChannel)
        .handle(
            amqpOutboundEndpoint,
            e ->
                e.poller(
                        Pollers.fixedDelay(1, TimeUnit.SECONDS, 1)
                            .maxMessagesPerPoll(DEFAULT_MESSAGES_PER_POLL)
                            .advice(pollSkipAdvice))
                    .id("pollingConsumer"))
        .get();
  }

这是哪里 pollSkipAdvice 定义为

@Bean
  public PollSkipAdvice pollSkipAdvice() {
    return new PollSkipAdvice(
        new PollSkipStrategy() {
          int currentPoll = 1;

          @Override
          public synchronized boolean skipPoll() {
            int hitRate = 0; //just not to add to the code; its dynamic
            if (currentPoll >= hitRate) {
              System.out.println(MessageFormat.format("{0} : Hit poll number {1} for message number of {2} | {3}",
                  DateTimeFormatter.ofPattern("HH:mm:ss. SSS")
                      .withZone(ZoneOffset.UTC)
                      .format(Instant.now()),
                  currentPoll,
                  currentMessagesInQueue, Thread.currentThread().getName()));
              currentPoll = 1;
              return false;
            }
            currentPoll++;
            return true;
          }
        });
  }

我面临的问题是轮询器每秒调用一次以上。这是我在测试代码时得到的日志:

21:41:47. 652 : Hit poll number 1 for message number of 0 | pool-3-thread-1
21:41:47. 654 : Hit poll number 1 for message number of 0 | pool-3-thread-1
EventChange(id=190)
21:41:47. 656 : Hit poll number 1 for message number of 0 | pool-3-thread-1
EventChange(id=191)
21:41:47. 658 : Hit poll number 1 for message number of 0 | pool-3-thread-1
21:41:47. 660 : Hit poll number 1 for message number of 0 | pool-3-thread-1
EventChange(id=192)
21:41:47. 662 : Hit poll number 1 for message number of 0 | pool-3-thread-1
EventChange(id=193)
21:41:47. 665 : Hit poll number 1 for message number of 0 | pool-3-thread-1
EventChange(id=194)
21:41:47. 667 : Hit poll number 1 for message number of 0 | pool-3-thread-1
EventChange(id=195)
21:41:47. 669 : Hit poll number 1 for message number of 0 | pool-3-thread-1
21:41:47. 671 : Hit poll number 1 for message number of 0 | pool-3-thread-1
EventChange(id=196)
EventChange(id=197)
EventChange(id=198)
EventChange(id=199)
21:41:48. 653 : Hit poll number 1 for message number of 0 | pool-3-thread-1
21:41:49. 656 : Hit poll number 1 for message number of 0 | pool-3-thread-1
21:41:50. 659 : Hit poll number 1 for message number of 0 | pool-3-thread-1
21:41:51. 661 : Hit poll number 1 for message number of 0 | pool-3-thread-1
21:41:52. 662 : Hit poll number 1 for message number of 0 | pool-3-thread-1

似乎每当我收到amqp的消息,这个轮询器就会触发很多次。您可以在日志中看到 21:41:47. 65221:41:47. 671 在1秒内触发10次。另一方面,一旦结束,它就会按它应该的方式工作(每秒一次)。
当我尝试打印这两种情况下的堆栈跟踪时,只有几毫秒的距离,我得到以下结果:

20:32:56. 406 : Hit poll number 2 for message number of 0 | pool-3-thread-1

java.lang.Thread.getStackTrace(Thread.java:1559)
com.a.configuration.component.EventAggregatorConfig$1.skipPoll(EventAggregatorConfig.java:313)
org.springframework.integration.scheduling.PollSkipAdvice.invoke(PollSkipAdvice.java:51)
org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
com.sun.proxy.$Proxy136.call(Unknown Source)
org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:328)
org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:275)
org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)

20:32:56. 408 : Hit poll number 1 for message number of 0 | pool-3-thread-1

java.lang.Thread.getStackTrace(Thread.java:1559)
com.a.configuration.component.EventAggregatorConfig$1.skipPoll(EventAggregatorConfig.java:313)
org.springframework.integration.scheduling.PollSkipAdvice.invoke(PollSkipAdvice.java:51)
org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
com.sun.proxy.$Proxy136.call(Unknown Source)
org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:328)
org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:275)
org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)

我是不是配置错了?我试图定义单独的taskscheduler,但它似乎根本没有使用那个。
更新
经过一点分析后,是否可能是由于Spring中的这一部分 AbstractPollingEndpoint 类别:

while (this.initialized && (this.maxMessagesPerPoll <= 0 || count < this.maxMessagesPerPoll)) {
                        if (pollForMessage() == null) {
                            break;
                        }
                        count++;
                    }

所以基本上,即使我说每1秒进行一次民意调查,如果有10条信息,也要记录下来;但是如果没有消息,就什么也不做——如果没有找到消息,它会继续尝试轮询吗?
更新2
是的,似乎是这样 skipPoll 每次邮件轮询都会调用通知,而不是一次 maxMessagesPerPoll .

zaqlnxep

zaqlnxep1#

我不确定问题是什么。这一逻辑显然是我们从这一建议中所期望的:

private Runnable createPoller() {
    return () ->
            this.taskExecutor.execute(() -> {
                int count = 0;
                while (this.initialized && (this.maxMessagesPerPoll <= 0 || count < this.maxMessagesPerPoll)) {
                    if (this.maxMessagesPerPoll == 0) {
                        logger.info("Polling disabled while 'maxMessagesPerPoll == 0'");
                        break;
                    }
                    if (pollForMessage() == null) {
                        break;
                    }
                    count++;
                }
            });
}

那么,如果 pollForMessage() 返回 null 我们就这样打破了 while() 循环并退出当前轮询周期。那个 null 发生在 PollSkipAdvice 当它 PollSkipStrategy 返回 false .
您可能只是误解了轮询周期,并真正进行了轮询。第一个事实上是触发器触发时间和时间的时刻 maxMessagesPerPoll . 真实的 poll 确实是关于一个人的 pollForMessage 呼叫您可能需要修改自定义策略的逻辑,以满足应用此类建议的现有要求。

相关问题