Camel 即使缓存级别设置为CACHE_CONSUMER,也会删除使用者队列

g6ll5ycj  于 2023-10-18  发布在  Apache
关注(0)|答案(1)|浏览(113)

我意识到,即使在显式地设置属性之后,

<property name="cacheLevelName" value="CACHE_CONSUMER"/>

我的消费者还是被删除了。
参见下面的日志:

23:14:25,180 | Usage | 49 - org.apache.activemq.activemq-core - 5.5.0.fuse-00-43 |     Main:memory:queue://reply:memory: usage change from: 0% of available memory, to: 1% of available memory
23:14:25,180 | Queue | 49 - org.apache.activemq.activemq-core - 5.5.0.fuse-00-43 | reply toPageIn: 1, Inflight: 0, pagedInMessages.size 2, enqueueCount: 2, dequeueCount: 1
23:14:25,180 | Queue | 49 - org.apache.activemq.activemq-core - 5.5.0.fuse-00-43 | reply toPageIn: 0, Inflight: 0, pagedInMessages.size 3, enqueueCount: 2, dequeueCount: 1
23:14:25,180 | Queue | 49 - org.apache.activemq.activemq-core - 5.5.0.fuse-00-43 | reply toPageIn: 0, Inflight: 0, pagedInMessages.size 3, enqueueCount: 2, dequeueCount: 1
23:14:25,649 | ActiveMQMessageConsumer | 49 - org.apache.activemq.activemq-core - 5.5.0.fuse-00-43 | remove: ID:TestESB-3364-1321307776420-8:1:1:84, lastDeliveredSequenceId:0
23:14:25,649 | AbstractRegion | 49 - org.apache.activemq.activemq-core -     5.5.0.fuse-00-43 | default removing consumer: ID:TestESB-3364-1321307776420-8:1:1:84 for destination: queue://routerResponse
23:14:25,649 | Queue | 49 - org.apache.activemq.activemq-core - 5.5.0.fuse-00-43 | queue://routerResponse remove sub: QueueSubscription: consumer=ID:TestESB-3364-1321307776420-8:1:1:84, destinations=1, dispatched=0, delivered=0, pending=0, lastDeliveredSeqId: 0, dequeues: 0, dispatched: 0, inflight: 0
23:14:25,649 | AbstractRegion | 49 - org.apache.activemq.activemq-core - 5.5.0.fuse-00-43 | default adding consumer: ID:TestESB-3364-1321307776420-8:1:1:85 for destination: queue://routerResponse
23:14:25,649 | Queue | 49 - org.apache.activemq.activemq-core - 5.5.0.fuse-00-43 | queue://routerResponse add sub: QueueSubscription: consumer=ID:TestESB-3364-1321307776420-8:1:1:85, destinations=0, dispatched=0, delivered=0, pending=0, dequeues: 0, dispatched: 0, inflight: 0
23:14:26,165 | ActiveMQMessageConsumer | 49 - org.apache.activemq.activemq-core - 5.5.0.fuse-00-43 | remove: ID:TestESB-3364-1321307776420-14:1:1:4, lastDeliveredSequenceId:0
23:14:26,165 | AbstractRegion | 49 - org.apache.activemq.activemq-core - 5.5.0.fuse-00-43 | default removing consumer: ID:TestESB-3364-1321307776420-14:1:1:4 for destination: queue://reply
23:14:26,165 | Queue | 49 - org.apache.activemq.activemq-core - 5.5.0.fuse-00-43 | queue://reply remove sub: QueueSubscription: consumer=ID:TestESB-3364-1321307776420-14:1:1:4, destinations=1, dispatched=0, delivered=0, pending=0, lastDeliveredSeqId: 0, dequeues: 1, dispatched: 1, inflight: 0
23:14:26,165 | Queue | 49 - org.apache.activemq.activemq-core - 5.5.0.fuse-00-43 | reply toPageIn: 0, Inflight: 0, pagedInMessages.size 3, enqueueCount: 2, dequeueCount: 1
23:14:26,165 | AbstractRegion | 49 - org.apache.activemq.activemq-core - 5.5.0.fuse-00-43 | default adding consumer: ID:TestESB-3364-1321307776420-14:1:1:5 for destination: queue://reply
23:14:26,165 | Queue | 49 - org.apache.activemq.activemq-core - 5.5.0.fuse-00-43 | queue://reply add sub: QueueSubscription: consumer=ID:TestESB-3364-1321307776420-14:1:1:5, destinations=0, dispatched=0, delivered=0, pending=0, dequeues: 1, dispatched: 1, inflight: 0
23:14:26,165 | Queue | 49 - org.apache.activemq.activemq-core - 5.5.0.fuse-00-43 | reply toPageIn: 0, Inflight: 0, pagedInMessages.size 3, enqueueCount: 2, dequeueCount: 1
23:14:26,649 | ActiveMQMessageConsumer | 49 - org.apache.activemq.activemq-core - 5.5.0.fuse-00-43 | remove: ID:TestESB-3364-1321307776420-8:1:1:85, lastDeliveredSequenceId:0

看看下面的Camel Spring配置:

<bean id="providerConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<!--<property name="brokerURL"><value>${aero-provider-broker-url}</value></property>-->
<bean id="providerActiveMQConfig" class="org.apache.activemq.camel.component.ActiveMQConfiguration">
  <property name="connectionFactory" ref="providerConnectionFactory"/>
  <property name="cacheLevelName" value="CACHE_CONSUMER"/>
  <property name="concurrentConsumers" value="${jms-concurrent-consumers}"/>
  <property name="requestTimeout" value="${jms-request-timeout}"/>
  <property name="priority" value="${jms-message-priority}"/>
</bean>

<bean id="providerActivemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
  <property name="configuration" ref="providerActiveMQConfig"/>
  <property name="brokerURL"><value>${provider-broker-url}</value></property>
</bean>

编辑这里>>
BEan定义:

<bean id="localJMSConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="alwaysSessionAsync" value="false"/>
    <property name="alwaysSyncSend" value="true"/>
    <property name="brokerURL"><value>${local-broker-url}</value></property>
    <property name="closeTimeout" value="150000"/>
    <property name="copyMessageOnSend" value="true"/>
    <property name="disableTimeStampsByDefault" value="false"/>
    <property name="dispatchAsync" value="false"/>
    <property name="objectMessageSerializationDefered" value="false"/>
    <property name="optimizeAcknowledge" value="false"/>
    <property name="optimizedMessageDispatch" value="true"/>
    <property name="producerWindowSize" value="0"/>
    <property name="statsEnabled" value="false"/>
    <property name="useAsyncSend" value="false"/>
    <property name="useCompression" value="false"/>
    <property name="sendTimeout" value="0"/>
</bean> 
<bean id="localJMSTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
    <property name="connectionFactory" ref="localJMSConnectionFactory" />               
</bean> 
<bean id="localRequestJMSConfig" class="org.apache.camel.component.jms.JmsConfiguration">
        <property name="connectionFactory" ref="localJMSConnectionFactory"/>
    <property name="concurrentConsumers" value="${jms-concurrent-consumers}"/>
    <property name="deliveryPersistent" value="true"/>
    <property name="priority" value="9"/>
</bean> 
<bean id="localRequestJMS" class="org.apache.camel.component.jms.JmsComponent">
    <property name="configuration" ref="localRequestJMSConfig"/>        
</bean>

Camel Enpoint定义:

<camel:endpoint id="jmsEndpoint" camelContextId="bohProcessorCC" uri="localRequestJMS:queue:${boh-processor-queue}"/>

<camel:endpoint id="providerEndpoint" camelContextId="bohProcessorCC" uri="providerActivemq:queue:${generic-http-provider-queue}?exchangePattern=InOut&amp;requestTimeout=${jms-request-timeout}&amp;replyTo=reply"/>

Camel 路线:

<from ref="jmsEndpoint"/>
 <bean ref="service" method="getSignInRequest"/> 
 <transform><simple>XML=${in.body}</simple></transform> 
 <inOut ref="providerEndpoint"/> 
 <bean ref="service" method="getSignInResponseData"/>  
 <bean ref="utilityServicesBean" method="process"/> 
 <bean ref="service" method="getPassword"/> 
 <inOut ref="providerEndpoint"/> 
 <bean ref="camelPropertyEnricherBean" method="addCustReference"/> 
 <bean ref="camelPropertyEnricherBean" method="normalizeXML"/> 
 <inOut ref="providerEndpoint"/>

在第二次调用时,交换超时,通常说超时。通常情况下,应答队列上有响应,但日志显示相关ID不匹配。我注意到,如果我在调用提供者端点之前要求camel将消息id设置为相关id,它通常会返回一条消息。即使这样,有时也会在交通繁忙的时候失败。

ubby3x7f

ubby3x7f1#

您可以使用Camel通过JMS进行请求/回复,例如您可以执行inOut ref=“providerEndpoint”,它也使用固定的replyTo队列名称,因为您指定了replyTo=reply。
这意味着Camel强制使用CACHE_SESSION,因为应答队列被视为共享应答队列,并且Camel使用JMS消息选择器根据相关ID拾取预期的应答消息。为了更新JMS消息选择器,您需要重新创建消费者。这就是为什么您会看到在日志中删除了a消费者。这是消费者的回答。
在Camel 2.9之后,我们引入了一个replyToType选项,您可以将其设置为replyToType=Exclusive。然后应答队列被认为是独占的,这意味着Camel使用CACHE_CONSUMER。

相关问题