我的理解是,当在生产者中指定AcknowledgementMode.ClientAcknowledge时,我应该能够停止和重新启动消费者,代理将重新发送关于该主题的所有未确认的消息。但是这个不行。(我也尝试过在消费者中使用onMessage侦听器,但我得到了相同的行为。)任何帮助都是感激的。这是我的制作人:
static void Main(string[] args) {
IConnectionFactory factory = new NMSConnectionFactory("tcp://localhost:61616");
using (IConnection connection = factory.CreateConnection()) {
connection.ClientId = "producer";
using (ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge)) {
IDestination destination = SessionUtil.GetDestination(session, "MY_TOPIC", DestinationType.Topic);
IMessageProducer producer = session.CreateProducer(destination);
while (!Console.KeyAvailable) {
string message = "[" + DateTime.UtcNow.ToString("yyyy/MM/dd HH:mm:ss.fff") + "]";
ITextMessage xtext_message = session.CreateTextMessage(message);
producer.Send(xtext_message, MsgDeliveryMode.Persistent, MsgPriority.Normal, new TimeSpan(1, 0, 0));
Console.WriteLine("Sent message:" + message);
Thread.Sleep(1000);
}
}
}
}
这是我的消费者:
static void Main(string[] args) {
IConnectionFactory factory = new NMSConnectionFactory("tcp://localhost:61616/");
using (IConnection connection = factory.CreateConnection()) {
connection.ClientId = "consumer";
connection.Start();
using (ISession session = connection.CreateSession()) {
IMessageConsumer consumer = session.CreateConsumer(SessionUtil.GetDestination(session, "MY_TOPIC", DestinationType.Topic));
while (!Console.KeyAvailable) {
ITextMessage receivedMsg = consumer.Receive() as ITextMessage;
if (receivedMsg != null) {
Console.WriteLine("--> received: " + receivedMsg.Text);
}
}
}
}
}
1条答案
按热度按时间yvgpqqbh1#
客户端确认对消息生产者没有影响,他们向代理发送消息并以相同的方式等待代理(只有事务会话模式对生产者有影响)。确认模式影响消息的消费者,在客户端确认的情况下,需要专门确认消息,以便从队列中消费它们。
如果使用客户端确认模式的消费者未能确认并被关闭,则传递给它的消息将被分派给另一个消费者(如果存在),或者将简单地保留在队列中,直到另一个消费者到达。