如何使用Apache Camel浏览队列中的消息?

r6hnlfcb  于 2022-11-07  发布在  Apache
关注(0)|答案(2)|浏览(202)

我需要使用Camel路由浏览来自活动MQ的消息,而不消耗消息。
JMS队列中的消息将被读取(仅浏览而非使用)并移动到数据库,同时确保原始队列保持不变。

public class CamelStarter {

    private static CamelContext camelContext;

    public static void main(String[] args) throws Exception {
        camelContext = new DefaultCamelContext();
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_URL);

        camelContext.addComponent("jms",  JmsComponent.jmsComponent(connectionFactory));

        camelContext.addRoutes(new RouteBuilder() {

            @Override
            public void configure() throws Exception {
                from("jms:queue:testQueue").to("browse:orderReceived") .to("jms:queue:testQueue1");
            }

        }
                );

        camelContext.start();

        Thread.sleep(1000);

        inspectReceivedOrders();

        camelContext.stop();

    }

    public static void inspectReceivedOrders() {

        BrowsableEndpoint browse = camelContext.getEndpoint("browse:orderReceived", BrowsableEndpoint.class);
        List<Exchange> exchanges = browse.getExchanges();
        System.out.println("Browsing queue: "+ browse.getEndpointUri() + " size: " + exchanges.size());
        for (Exchange exchange : exchanges) {
            String payload = exchange.getIn().getBody(String.class);
            String msgId = exchange.getIn().getHeader("JMSMessageID", String.class);
            System.out.println(msgId + "=" +payload);
        }
ilmyapht

ilmyapht1#

据我所知,不可能在Camel中读取(而不消耗!)JMS消息...
我找到的唯一解决方法(在JEE应用程序中)是定义一个带有计时器的启动EJB,保存一个QueueBrowser,并将msg处理委托给Camel路由:

@Singleton
    @Startup
    public class MyQueueBrowser  {

        private TimerService timerService;

        @Resource(mappedName="java:/jms/queue/com.company.myqueue")
        private Queue sourceQueue;

        @Inject
        @JMSConnectionFactory("java:/ConnectionFactory")
        private JMSContext jmsContext;  

        @Inject
        @Uri("direct:readMessage")
        private ProducerTemplate camelEndpoint;

        @PostConstruct
        private void init() {       
            TimerConfig timerConfig = new TimerConfig(null, false);
            ScheduleExpression se = new ScheduleExpression().hour("*").minute("*/"+frequencyInMin);
            timerService.createCalendarTimer(se, timerConfig);
        }

        @Timeout
        public void scheduledExecution(Timer timer) throws Exception {      
            QueueBrowser browser = null;
            try {                       
                browser = jmsContext.createBrowser(sourceQueue);                                           
                Enumeration<Message> msgs = browser.getEnumeration();
                while ( msgs.hasMoreElements() ) { 
                    Message jmsMsg = msgs.nextElement(); 
                    // + here: read body and/or properties of jmsMsg                                            
                    camelEndpoint.sendBodyAndHeader(body, myHeaderName, myHeaderValue);
                }                                                                               
            } catch (JMSRuntimeException jmsException) {
                ...
            } finally {        
                browser.close();
            }
        }

    }
jchrr9hc

jchrr9hc2#

Apache camel browse组件正是为此而设计的。请查看here文档。
由于您没有提供任何其他信息,因此无法再多说。
假设你有一条这样的路线

from("activemq:somequeue).to("bean:someBean")

from("activemq:somequeue).process(exchange -> {})

您所要做的就是像这样在中间放一个浏览端点

from("activemq:somequeue).to("browse:someHandler").to("bean:someBean")

然后编写这样的类

@Component
    public class BrowseQueue {

      @Autowired
      CamelContext camelContext;

      public void inspect() {
        BrowsableEndpoint browse = camelContext.getEndpoint("browse:someHandler", BrowsableEndpoint.class);
        List<Exchange> exchanges = browse.getExchanges();

        for (Exchange exchange : exchanges) {
          ...... 
        }
      }

    }

相关问题