我要实现的目标- * 控制服务总线使用者开始/停止从队列/主题接收消息。*
以下是详细解释。
目前,我已将Azure Service Bus集成到我的应用程序中,我们将在Spring Boot 应用程序启动时侦听消息。现在,我希望修改此逻辑。在ApplicationReadyEvent
上,我希望禁用ServiceBusConsumer以开始侦听,然后我希望执行一些任务,之后我希望再次启用ServiceBusConsumer以从主题/队列开始侦听。
那么我如何才能做到呢?
应用程序.yml
spring:
cloud:
azure:
servicebus:
namespace: **********
xxx:
azure:
servicebus:
connection: ***********
queue: **********
蓝色配置.java
import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter;
import com.azure.spring.messaging.servicebus.core.ServiceBusProcessorFactory;
import com.azure.spring.messaging.servicebus.core.listener.ServiceBusMessageListenerContainer;
import com.azure.spring.messaging.servicebus.core.properties.ServiceBusContainerProperties;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.MessageChannel;
@Configuration
public class AzureConfiguration{
@Value("${xxx.azure.servicebus.connection}")
private String serviceBusConnection;
@Value("${xxx.azure.servicebus.queue}")
private String serviceBusQueue;
private static final String SERVICE_BUS_INPUT_CHANNEL = "yyyyy";
private static final String SENSOR_DATA_CHANNEL = "zzzzz";
private static final String SERVICE_BUS_LISTENER_CONTAINER = "aaaaa";
@Bean(name = SERVICE_BUS_LISTENER_CONTAINER)
public ServiceBusMessageListenerContainer serviceBusMessageListenerContainer(ServiceBusProcessorFactory processorFactory) {
ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
containerProperties.setConnectionString(serviceBusConnection);
containerProperties.setEntityName(serviceBusQueue);
containerProperties.setAutoComplete(true);
return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
}
@Bean
public ServiceBusInboundChannelAdapter serviceBusInboundChannelAdapter(
@Qualifier(SERVICE_BUS_INPUT_CHANNEL) MessageChannel inputChannel,
@Qualifier(SERVICE_BUS_LISTENER_CONTAINER) ServiceBusMessageListenerContainer listenerContainer) {
ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean(name = SERVICE_BUS_INPUT_CHANNEL)
public MessageChannel serviceBusInputChannel() {
return new DirectChannel();
}
@Bean(name = SENSOR_DATA_CHANNEL)
public MessageChannel sensorDataChannel() {
return new DirectChannel();
}
@Bean
public IntegrationFlow serviceBusMessageFlow() {
return IntegrationFlows.from(SERVICE_BUS_INPUT_CHANNEL)
.<byte[], String>transform(String::new)
.channel(SENSOR_DATA_CHANNEL)
.get();
}
}
应用事件侦听器服务.java
import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import java.util.List;
@Slf4j
@Service
@AllArgsConstructor
public class AppEventListenerService{
@EventListener(ApplicationReadyEvent.class)
public void OnApplicationStarted() {
log.debug("Enter OnApplicationStarted");
// Disable Azure Bus Message Listener
// do some task
// Enable Azure Bus Message Listener
log.debug("Exit OnApplicationStarted");
}
}
在www.example.com的上述代码中AppEventListenerService.java,
//禁用Azure总线消息侦听器-在这里,我希望停止ServiceBusConsumer以从主题/队列接收消息
//启用Azure总线消息侦听器-在这里,我希望启动ServiceBusConsumer以从主题/队列接收消息。
1条答案
按热度按时间eh57zj3b1#
@JMSListener
时,我们可以开始停止它。Autowired
这个JmsListenerEndpointRegistry
对象并停止侦听器。要停止JMS,必须使用stop函数:
/stop
API,它将停止JMS,只有在调用/start
API后,消息才会开始到来。输出: