Spring Boot 如何在ActiveMQ上配置Jmsource,以便使用Qpid进行自动缩放

sqserrrh  于 2023-10-16  发布在  Spring
关注(0)|答案(1)|浏览(101)

我有一个带有activeMQ Artemis队列的Kubernetes集群,我正在使用hpa来自动扩展微服务。这些消息通过Qpidlets发送,并通过JMSWindow接收。
消息传递工作,但我不能配置队列/队列的方式,自动缩放工程作为expacted。
这是我的Qpid发送器

public static void send(String avroMessage, String task) throws JMSException, NamingException {
    Connection connection = createConnection();
    connection.start();

    Session session = createSession(connection);
    MessageProducer messageProducer = createProducer(session);

    TextMessage message = session.createTextMessage(avroMessage);
    message.setStringProperty("task", task);
    messageProducer.send(
        message, 
        DeliveryMode.NON_PERSISTENT, 
        Message.DEFAULT_PRIORITY, 
        Message.DEFAULT_TIME_TO_LIVE);

    connection.close();
}

private static MessageProducer createProducer(Session session) throws JMSException {
    Destination producerDestination = 
       session.createQueue("queue?consumer.prefetchSize=1&heartbeat='10000'");
    return session.createProducer(producerDestination);
}

private static Session createSession(Connection connection) throws JMSException {
    return connection.createSession(Session.AUTO_ACKNOWLEDGE);
}

private static Connection createConnection() throws NamingException, JMSException {
    Hashtable<Object, Object> env = new Hashtable<>();
    env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
    env.put("connectionfactory.factoryLookup", amqUrl);
    Context context = new javax.naming.InitialContext(env);

    ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("factoryLookup");
    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
    pooledConnectionFactory.setConnectionFactory(connectionFactory);
    pooledConnectionFactory.setMaxConnections(10);

    return connectionFactory.createConnection(amqUsername, amqPassword);
}

这是我的配置文件

@Bean
public JmsConnectionFactory jmsConnection() {
    JmsConnectionFactory jmsConnection = new JmsConnectionFactory();
    jmsConnection.setRemoteURI(this.amqUrl);
    jmsConnection.setUsername(this.amqUsername);
    jmsConnection.setPassword(this.amqPassword);
    return jmsConnection;
}

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(jmsConnection());

    return factory;
}

这是我的

@JmsListener(
        destination = "queue?consumer.prefetchSize=1&heartbeat='10000'",
        selector = "task = 'myTask'"
)
public void receiveMsg(Message message) throws IOException, JMSException {
    message.acknowledge();
    doStuff();
}

我这样发信息
QpidSender.send(avroMessage, "myTask");
这个设置有效。我可以发送不同的消息,一旦消息超过2条,我的服务的第二个示例就会启动并消耗消息。如果稍后消息计数低于2,则终止服务。
问题是:我不希望消息在doStuff()之前被确认。因为如果出现错误,或者服务在完成doStuff()之前终止,消息就会丢失(对吗?).
但如果我重新排序

doStuff();
message.acknowledge();

只要第一个服务仍然在doStuff()中并且还没有确认消息,第二个示例就不能从代理接收消息。
我如何配置这种方式,使多个示例可以从队列中消费消息,但消息不会丢失,如果服务被终止或其他东西在doStuff()上失败?

ee7vknir

ee7vknir1#

使用factory.setSessionTransacted(true)
请参阅DefaultMessageListenerContainer的javadoc:

* <p><b>It is strongly recommended to either set {@link #setSessionTransacted
 * "sessionTransacted"} to "true" or specify an external {@link #setTransactionManager
 * "transactionManager"}.</b> See the {@link AbstractMessageListenerContainer}
 * javadoc for details on acknowledge modes and native transaction options, as
 * well as the {@link AbstractPollingMessageListenerContainer} javadoc for details
 * on configuring an external transaction manager. Note that for the default
 * "AUTO_ACKNOWLEDGE" mode, this container applies automatic message acknowledgment
 * before listener execution, with no redelivery in case of an exception.

相关问题