控制Azure服务总线使用者在Spring Boot 时开始或停止侦听主题

cbwuti44  于 2023-01-31  发布在  Spring
关注(0)|答案(1)|浏览(115)

我要实现的目标- * 控制服务总线使用者开始/停止从队列/主题接收消息。*

以下是详细解释。
目前,我已将Azure Service Bus集成到我的应用程序中,我们将在Spring Boot 应用程序启动时侦听消息。现在,我希望修改此逻辑。在ApplicationReadyEvent上,我希望禁用ServiceBusConsumer以开始侦听,然后我希望执行一些任务,之后我希望再次启用ServiceBusConsumer以从主题/队列开始侦听。
那么我如何才能做到呢?

应用程序.yml

spring:
  cloud:
    azure:
      servicebus:
        namespace: **********
        
xxx:
  azure:
    servicebus:
      connection: ***********
      queue: **********

蓝色配置.java

import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter;
import com.azure.spring.messaging.servicebus.core.ServiceBusProcessorFactory;
import com.azure.spring.messaging.servicebus.core.listener.ServiceBusMessageListenerContainer;
import com.azure.spring.messaging.servicebus.core.properties.ServiceBusContainerProperties;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.MessageChannel;

@Configuration
public class AzureConfiguration{

    @Value("${xxx.azure.servicebus.connection}")
    private String serviceBusConnection;

    @Value("${xxx.azure.servicebus.queue}")
    private String serviceBusQueue;

    private static final String SERVICE_BUS_INPUT_CHANNEL = "yyyyy";
    private static final String SENSOR_DATA_CHANNEL = "zzzzz";
    private static final String SERVICE_BUS_LISTENER_CONTAINER = "aaaaa";

    @Bean(name = SERVICE_BUS_LISTENER_CONTAINER)
    public ServiceBusMessageListenerContainer serviceBusMessageListenerContainer(ServiceBusProcessorFactory processorFactory) {

        ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
        containerProperties.setConnectionString(serviceBusConnection);
        containerProperties.setEntityName(serviceBusQueue);
        containerProperties.setAutoComplete(true);
        return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
    }

    @Bean
    public ServiceBusInboundChannelAdapter serviceBusInboundChannelAdapter(
            @Qualifier(SERVICE_BUS_INPUT_CHANNEL) MessageChannel inputChannel,
            @Qualifier(SERVICE_BUS_LISTENER_CONTAINER) ServiceBusMessageListenerContainer listenerContainer) {

        ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
        adapter.setOutputChannel(inputChannel);
        
        return adapter;
    }

    @Bean(name = SERVICE_BUS_INPUT_CHANNEL)
    public MessageChannel serviceBusInputChannel() {

        return new DirectChannel();
    }

    @Bean(name = SENSOR_DATA_CHANNEL)
    public MessageChannel sensorDataChannel() {

        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow serviceBusMessageFlow() {

        return IntegrationFlows.from(SERVICE_BUS_INPUT_CHANNEL)
                .<byte[], String>transform(String::new)
                .channel(SENSOR_DATA_CHANNEL)
                .get();
    }
}

应用事件侦听器服务.java

import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;

import java.util.List;

@Slf4j
@Service
@AllArgsConstructor
public class AppEventListenerService{

   
    @EventListener(ApplicationReadyEvent.class)
    public void OnApplicationStarted() {
        log.debug("Enter OnApplicationStarted");
        // Disable Azure Bus Message Listener
        // do some task
        // Enable Azure Bus Message Listener
        log.debug("Exit OnApplicationStarted");
    }
}

在www.example.com的上述代码中AppEventListenerService.java,
//禁用Azure总线消息侦听器-在这里,我希望停止ServiceBusConsumer以从主题/队列接收消息
//启用Azure总线消息侦听器-在这里,我希望启动ServiceBusConsumer以从主题/队列接收消息。

eh57zj3b

eh57zj3b1#

  • 在这里,我有一个使用JMS来使用服务总线消息的解决方案
  • 使用JMS的原因是,当我们使用@JMSListener时,我们可以开始停止它。
  • 现在,要使用ServiceBus实现JMS,请参阅MSDOC
  • 现在,您必须Autowired这个JmsListenerEndpointRegistry对象并停止侦听器。
@Autowired  
JmsListenerEndpointRegistry registry;

要停止JMS,必须使用stop函数:

registry.stop();
  • 这里我创建了两个API,它们将启动/停止JMS和消息接收器:
@Component  
@RestController  
public class Reciever {  
  
    @Autowired  
  JmsListenerEndpointRegistry registry;  
  
  @GetMapping("/stop")  
    public String readBlobFile ()  
    {  
        registry.stop();  
 return "Stopped" ;  
  }  
  
    @GetMapping("/start")  
    public String readBlobFile1 ()  
    {  
        registry.start();  
 return "StARTED" ;  
  }  

    private static final String QUEUE_NAME = "test";  
 private final Logger logger = LoggerFactory.getLogger(Reciever.class);  
  
  
  
  @JmsListener(destination = QUEUE_NAME, containerFactory = "jmsListenerContainerFactory")  
    public void receiveMessage(String s) {  
        logger.info("Received message: {}", s);  
  }  
}
  • 现在,首先我调用/stop API,它将停止JMS,只有在调用/start API后,消息才会开始到来。

输出:

相关问题