我有一个带有activeMQ Artemis队列的Kubernetes集群,我正在使用hpa来自动扩展微服务。这些消息通过Qpidlets发送,并通过JMSWindow接收。
消息传递工作,但我不能配置队列/队列的方式,自动缩放工程作为expacted。
这是我的Qpid发送器
public static void send(String avroMessage, String task) throws JMSException, NamingException {
Connection connection = createConnection();
connection.start();
Session session = createSession(connection);
MessageProducer messageProducer = createProducer(session);
TextMessage message = session.createTextMessage(avroMessage);
message.setStringProperty("task", task);
messageProducer.send(
message,
DeliveryMode.NON_PERSISTENT,
Message.DEFAULT_PRIORITY,
Message.DEFAULT_TIME_TO_LIVE);
connection.close();
}
private static MessageProducer createProducer(Session session) throws JMSException {
Destination producerDestination =
session.createQueue("queue?consumer.prefetchSize=1&heartbeat='10000'");
return session.createProducer(producerDestination);
}
private static Session createSession(Connection connection) throws JMSException {
return connection.createSession(Session.AUTO_ACKNOWLEDGE);
}
private static Connection createConnection() throws NamingException, JMSException {
Hashtable<Object, Object> env = new Hashtable<>();
env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
env.put("connectionfactory.factoryLookup", amqUrl);
Context context = new javax.naming.InitialContext(env);
ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("factoryLookup");
PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
pooledConnectionFactory.setConnectionFactory(connectionFactory);
pooledConnectionFactory.setMaxConnections(10);
return connectionFactory.createConnection(amqUsername, amqPassword);
}
这是我的配置文件
@Bean
public JmsConnectionFactory jmsConnection() {
JmsConnectionFactory jmsConnection = new JmsConnectionFactory();
jmsConnection.setRemoteURI(this.amqUrl);
jmsConnection.setUsername(this.amqUsername);
jmsConnection.setPassword(this.amqPassword);
return jmsConnection;
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(jmsConnection());
return factory;
}
这是我的
@JmsListener(
destination = "queue?consumer.prefetchSize=1&heartbeat='10000'",
selector = "task = 'myTask'"
)
public void receiveMsg(Message message) throws IOException, JMSException {
message.acknowledge();
doStuff();
}
我这样发信息QpidSender.send(avroMessage, "myTask");
这个设置有效。我可以发送不同的消息,一旦消息超过2条,我的服务的第二个示例就会启动并消耗消息。如果稍后消息计数低于2,则终止服务。
问题是:我不希望消息在doStuff()之前被确认。因为如果出现错误,或者服务在完成doStuff()之前终止,消息就会丢失(对吗?).
但如果我重新排序
doStuff();
message.acknowledge();
只要第一个服务仍然在doStuff()
中并且还没有确认消息,第二个示例就不能从代理接收消息。
我如何配置这种方式,使多个示例可以从队列中消费消息,但消息不会丢失,如果服务被终止或其他东西在doStuff()
上失败?
1条答案
按热度按时间ee7vknir1#
使用
factory.setSessionTransacted(true)
。请参阅
DefaultMessageListenerContainer
的javadoc: