我的spring集成图如下所示。
在我的serviceactivator中,是否可以根据我的消息获取侦听器,然后暂停一段时间?如果再往下看,我的处理速度会很慢?我需要这种方法来处理一些溢出机制。
我看到我可以实现一个新的ConsumerSekCallback,但是在集成设置中,据我所知,我无法访问messagedrivenchanneladapter。
我正在使用链接到消息驱动通道适配器的concurrentmessagelistenercontainer。
<int-kafka:message-driven-channel-adapter
id="kafkaListenertest" listener-container="containertest" auto-startup="true"
phase="100" send-timeout="5000" channel="kafkaMessage" error-channel="overflow" />
<bean id="containertest"
class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer">
<constructor-arg ref="kafkaConsumerFactory"/>
<constructor-arg ref="consumerContainerPropertiestest" />
<property name="concurrency" value="4"/>
</bean>
<bean id="consumerContainerPropertiestest"
class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics" value="events.test" />
<property name="ackMode" value="MANUAL_IMMEDIATE"></property>
</bean>
<int:service-activator input-channel="kafkaMessage"
ref="MyListener" method="handleIncomingKafkaEvent" ></int:service-activator>
<int:channel id="kafkaMessage"></int:channel>
1条答案
按热度按时间2admgd591#
启动版本
2.1.3
,spring kafka在MessageListenerContainer
:因此,您确实可以从应用程序的任何一点暂停和恢复侦听器容器
containertest
提供适当的服务。这个
KafkaMessageDrivenChannelAdapter
也暴露了pause()
以及resume()
钩子。另外,在
KafkaMessageDrivenChannelAdapter
,的MessagingMessageConverter
将这些标头填充到消息中以进行下游处理:因此,您可以获得
KafkaHeaders.CONSUMER
标题并使用本机pause()/resume()
从KafkaConsumer
.