暂停/从我的听众那里寻找消费者一小段时间

6tdlim6h  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(382)

我的spring集成图如下所示。

在我的serviceactivator中,是否可以根据我的消息获取侦听器,然后暂停一段时间?如果再往下看,我的处理速度会很慢?我需要这种方法来处理一些溢出机制。
我看到我可以实现一个新的ConsumerSekCallback,但是在集成设置中,据我所知,我无法访问messagedrivenchanneladapter。
我正在使用链接到消息驱动通道适配器的concurrentmessagelistenercontainer。

<int-kafka:message-driven-channel-adapter
    id="kafkaListenertest" listener-container="containertest" auto-startup="true"
    phase="100" send-timeout="5000" channel="kafkaMessage" error-channel="overflow"  />

<bean id="containertest"
    class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer">
    <constructor-arg ref="kafkaConsumerFactory"/>
    <constructor-arg ref="consumerContainerPropertiestest" />
    <property name="concurrency" value="4"/>
</bean>

<bean id="consumerContainerPropertiestest"
    class="org.springframework.kafka.listener.config.ContainerProperties">
    <constructor-arg name="topics" value="events.test" />
    <property name="ackMode" value="MANUAL_IMMEDIATE"></property>
</bean>

<int:service-activator input-channel="kafkaMessage"
    ref="MyListener" method="handleIncomingKafkaEvent" ></int:service-activator>

<int:channel id="kafkaMessage"></int:channel>
2admgd59

2admgd591#

启动版本 2.1.3 ,spring kafka在 MessageListenerContainer :

/**
 * Pause this container before the next poll().
 * @since 2.1.3
 */
default void pause() {
    throw new UnsupportedOperationException("This container doesn't support pause");
}

/**
 * Resume this container, if paused, after the next poll().
 * @since 2.1.3
 */
default void resume() {
    throw new UnsupportedOperationException("This container doesn't support resume");
}

/**
 * Return true if {@link #pause()} has been called; the container might not have actually
 * paused yet.
 * @return true if pause has been requested.
 * @since 2.1.5
 */
default boolean isPauseRequested() {
    throw new UnsupportedOperationException("This container doesn't support pause/resume");
}

/**
 * Return true if {@link #pause()} has been called; and all consumers in this container
 * have actually paused.
 * @return true if the container is paused.
 * @since 2.1.5
 */
default boolean isContainerPaused() {
    throw new UnsupportedOperationException("This container doesn't support pause/resume");
}

因此,您确实可以从应用程序的任何一点暂停和恢复侦听器容器 containertest 提供适当的服务。
这个 KafkaMessageDrivenChannelAdapter 也暴露了 pause() 以及 resume() 钩子。
另外,在 KafkaMessageDrivenChannelAdapter ,的 MessagingMessageConverter 将这些标头填充到消息中以进行下游处理:

rawHeaders.put(KafkaHeaders.RECEIVED_MESSAGE_KEY, record.key());
    rawHeaders.put(KafkaHeaders.RECEIVED_TOPIC, record.topic());
    rawHeaders.put(KafkaHeaders.RECEIVED_PARTITION_ID, record.partition());
    rawHeaders.put(KafkaHeaders.OFFSET, record.offset());
    rawHeaders.put(KafkaHeaders.TIMESTAMP_TYPE, record.timestampType().name());
    rawHeaders.put(KafkaHeaders.RECEIVED_TIMESTAMP, record.timestamp());

    if (acknowledgment != null) {
        rawHeaders.put(KafkaHeaders.ACKNOWLEDGMENT, acknowledgment);
    }
    if (consumer != null) {
        rawHeaders.put(KafkaHeaders.CONSUMER, consumer);
    }

因此,您可以获得 KafkaHeaders.CONSUMER 标题并使用本机 pause()/resume()KafkaConsumer .

相关问题