spring云流-如何处理下游块?

8ljdwjyq  于 2021-07-14  发布在  Java
关注(0)|答案(1)|浏览(408)

在kafka集群的计划停机期间,我们基本上遇到了以下问题:如何指定使用spring云流向rabbitmq发送消息的超时(显然是Kafka而不是拉比。
来自@garyrussell的答案是:
海峡 sendTimeout 仅当通道本身可以阻塞时适用,例如 QueueChannel 当前已满的有界队列;调用者将阻塞,直到队列中的任何一个空间变为可用,或者发生超时。
在这种情况下,块位于通道的下游,因此sendtimeout是不相关的(在任何情况下,它是一个directchannel,无论如何都不能阻止,订阅的处理程序直接在调用线程上调用)。
你看到的实际阻塞很可能在 socket.write() 在rabbitmq客户机中,没有超时且不可中断;调用线程无法执行任何操作来“超时”写入。
我知道的唯一可能的解决方案是通过调用 resetConnection() 在连接工厂。
很好地解释了为什么有问题的方法( org.springframework.integration.channel.AbstractSubscribableChannel#doSend )不接受 timeout 考虑到。不过,这对我来说还是有点奇怪。
spring-integration-kafka-3.2.1.RELEASE-sources.jar!/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java:566 ,我们可以看到,如果 sync 需要行为:

565    if (this.sync) {
566        Long sendTimeout = this.sendTimeoutExpression.getValue(this.evaluationContext, message, Long.class);
567        if (sendTimeout == null || sendTimeout < 0) {
568            future.get();
569        }
570        else {
571            try {
572                future.get(sendTimeout, TimeUnit.MILLISECONDS);
573            }
574            catch (TimeoutException te) {
575                throw new MessageTimeoutException(message, "Timeout waiting for response from KafkaProducer", te);
576            }
577        }
578    }

调用,其中考虑了超时。这个 sendTimeoutExpression 指定给默认值:

private static final long DEFAULT_SEND_TIMEOUT = 10000;
    private Expression sendTimeoutExpression = new ValueExpression<>(DEFAULT_SEND_TIMEOUT);

然而,我们的堆栈跟踪揭示了一些不同的东西:

"pool-1-thread-3" - Thread t@108
   java.lang.Thread.State: TIMED_WAITING
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for <4ebda621> (a org.springframework.util.concurrent.SettableListenableFuture$SettableTask)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:426)
    at java.util.concurrent.FutureTask.get(FutureTask.java:204)
    at org.springframework.util.concurrent.SettableListenableFuture.get(SettableListenableFuture.java:134)

* at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.processSendResult(KafkaProducerMessageHandler.java:572)

    at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:414)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:69)
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1035)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:69)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)

标记为的调用 * 对应于 future.get(sendTimeout, TimeUnit.MILLISECONDS); 打电话。
看看底层客户似乎是如何支持它的 future.get() 呼叫支持超时),如何设置?在活页夹引用中我能找到的仅有的两个属性是 spring.cloud.stream.kafka.binder.healthTimeout 以及 batchTimeout ,据我所知,这不会影响这个设置。
看看 KafkaProducerMessageHandler 建造于 org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.ProducerConfigurationMessageHandler ,一个私有类,bean重写似乎不是推荐的方法。

5m1hhzi4

5m1hhzi41#

它似乎没有文档化,但类似于侦听器容器定制器https://docs.spring.io/spring-cloud-stream/docs/3.1.2/reference/html/spring-cloud-stream.html#_advanced_consumer_configuration 您可以添加
ProducerMessageHandlerCustomizer @Bean 在消息处理程序上设置任意属性。
在较新版本的处理程序中,超时总是配置为至少与 ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG ,以避免误报(处理程序超时后发布成功)。

相关问题