我正在订阅rabbitMQ服务器上的一个队列,并使用其上的消息来检索主体和一个特定的头部。这是在Spring云数据流源应用程序上完成的,问题是当它启动时,它会一直执行并返回最后一条消息。我需要添加什么来使应用程序只在新消息到达正在侦听的队列时执行?
以下是我的供应商代码:
@Log4j2
@EnableConfigurationProperties({ RabbitMQProperties.class })
@Configuration
public class ReceiveMessageConfiguration {
private final static String QUEUE_NAME = "UniversalId";
String payload;
Connection connection;
Channel channel;
@Bean
public Supplier<String> receiverMessage(RabbitMQProperties rabbitMQProperties) throws Exception {
return () -> {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setRequestedHeartbeat(0);
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
// Get headers from properties
AMQP.BasicProperties properties = delivery.getProperties();
Map<String, Object> headers = properties.getHeaders();
// Extract and print payload and header
for (Map.Entry<String, Object> header : headers.entrySet()) {
if (header.getKey().toString().equals("UniversalId")) {
// log.info("ID nedeed: " + header.getValue());
}
}
payload = message;
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
log.info(payload);
if (payload != null) {
return payload;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
channel.close();
connection.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
return new String("No message");
};
}
}
我的属性文件是这样的:
spring.cloud.stream.function.bindings.receiverMessage-in-0=input
spring.cloud.stream.function.bindings.receiverMessage-out-0=output
spring.cloud.stream.bindings.input.destination=receiverMessage-input
spring.cloud.stream.bindings.output.destination=receiverMessage-output
更新:以下是控制台消息的示例:
2022-12-21T16:25:19.725-06:00[0;39m [32m INFO[0;39m [35m21600[0;39m [2m---[0;39m [2m[ main][0;39m [36mm.c.n.r.RabbitMqCustomSourceApplication [0;39m [2m:[0;39m Started RabbitMqCustomSourceApplication in 7.888 seconds (process running for 9.482)
[2m2022-12-21T16:25:20.432-06:00[0;39m [32m INFO[0;39m [35m21600[0;39m [2m---[0;39m [2m[ scheduling-1][0;39m [36mm.c.n.r.s.ReceiveMessageConfiguration [0;39m [2m:[0;39m null
[2m2022-12-21T16:25:21.478-06:00[0;39m [32m INFO[0;39m [35m21600[0;39m [2m---[0;39m [2m[ scheduling-1][0;39m [36mm.c.n.r.s.ReceiveMessageConfiguration [0;39m [2m:[0;39m null
[2m2022-12-21T16:25:22.523-06:00[0;39m [32m INFO[0;39m [35m21600[0;39m [2m---[0;39m [2m[ scheduling-1][0;39m [36mm.c.n.r.s.ReceiveMessageConfiguration [0;39m [2m:[0;39m null
[2m2022-12-21T16:25:23.568-06:00[0;39m [32m INFO[0;39m [35m21600[0;39m [2m---[0;39m [2m[ scheduling-1][0;39m [36mm.c.n.r.s.ReceiveMessageConfiguration [0;39m [2m:[0;39m null
[2m2022-12-21T16:25:24.604-06:00[0;39m [32m INFO[0;39m [35m21600[0;39m [2m---[0;39m [2m[ scheduling-1][0;39m [36mm.c.n.r.s.ReceiveMessageConfiguration [0;39m [2m:[0;39m null
[2m2022-12-21T16:25:25.913-06:00[0;39m [32m INFO[0;39m [35m21600[0;39m [2m---[0;39m [2m[ scheduling-1][0;39m [36mm.c.n.r.s.ReceiveMessageConfiguration [0;39m [2m:[0;39m null
[2m2022-12-21T16:25:27.056-06:00[0;39m [32m INFO[0;39m [35m21600[0;39m [2m---[0;39m [2m[ scheduling-1][0;39m [36mm.c.n.r.s.ReceiveMessageConfiguration [0;39m [2m:[0;39m null
[2m2022-12-21T16:25:28.124-06:00[0;39m [32m INFO[0;39m [35m21600[0;39m [2m---[0;39m [2m[ scheduling-1][0;39m [36mm.c.n.r.s.ReceiveMessageConfiguration [0;39m [2m:[0;39m null
[2m2022-12-21T16:25:29.167-06:00[0;39m [32m INFO[0;39m [35m21600[0;39m [2m---[0;39m [2m[ scheduling-1][0;39m [36mm.c.n.r.s.ReceiveMessageConfiguration [0;39m [2m:[0;39m [
{
"MailRequest": {
"action": "fourth",
"mails": []
}
}
]
[2m2022-12-21T16:25:30.233-06:00[0;39m [32m INFO[0;39m [35m21600[0;39m [2m---[0;39m [2m[ scheduling-1][0;39m [36mm.c.n.r.s.ReceiveMessageConfiguration [0;39m [2m:[0;39m [
{
"MailRequest": {
"action": "fourth",
"mails": []
}
}
]
我已经尝试从RabbitMQ API更改为AMQP API,但行为是相同的。
1条答案
按热度按时间vfh0ocws1#
阿雷奥拉,
因为这是一个直接使用Rabbit/AMQP的自定义函数,SCDF没有影响。因此这更多的是一个Rabbit/AMQP问题。
此外,您可能想要看一看RabbitSource提供的应用程序,它可以作为替代品。