本文整理了Java中org.springframework.amqp.core.Message.getBody()
方法的一些代码示例,展示了Message.getBody()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.getBody()
方法的具体详情如下:
包路径:org.springframework.amqp.core.Message
类名称:Message
方法名:getBody
暂无
代码示例来源:origin: qiurunze123/miaosha
@RabbitListener(queues=MQConfig.MIAOSHATEST)
public void receiveMiaoShaMessage(Message message, Channel channel) throws IOException {
log.info("接受到的消息为:{}",message);
String messRegister = new String(message.getBody(), "UTF-8");
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
MiaoShaMessageVo msm = RedisService.stringToBean(messRegister, MiaoShaMessageVo.class);
messageService.insertMs(msm);
}
}
代码示例来源:origin: prontera/spring-cloud-rest-tcc
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
final String failedMessage = new String(message.getBody(), Charsets.UTF_8);
try {
final String guid = Jacksons.getMapper().readTree(failedMessage).get("guid").asText();
final EventPublisher publisher = new EventPublisher();
publisher.setGuid(guid);
if (EventStatus.NO_ROUTE.name().equalsIgnoreCase(replyText)) {
publisher.setEventStatus(EventStatus.NO_ROUTE);
} else {
logReturnedFault(replyCode, replyText, exchange, routingKey, failedMessage);
publisher.setEventStatus(EventStatus.ERROR);
}
// 因为在basic.return之后会调用basic.ack,鄙人认为NO_ROUTE的状态有可能被错误地转换成为NOT_FOUND,所以不需要考虑竞争情况
publisherMapper.updateByGuidSelective(publisher);
} catch (IOException e) {
logReturnedFault(replyCode, replyText, exchange, routingKey, failedMessage);
}
}
代码示例来源:origin: yu199195/myth
/**
* Message container simple message listener container.
*
* @return the simple message listener container
*/
@Bean
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "host")
public SimpleMessageListenerContainer messageContainer() {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(2);
container.setConcurrentConsumers(1);
//设置确认模式手工确认
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
byte[] messageBody = message.getBody();
LOGGER.debug("motan 框架接收到的消息");
//确认消息成功消费
final Boolean success = mythMqReceiveService.processMessage(messageBody);
if (success) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
});
return container;
}
代码示例来源:origin: yu199195/myth
@Bean
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "host")
public SimpleMessageListenerContainer messageContainer() {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
//设置确认模式手工确认
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
byte[] messageBody = message.getBody();
//确认消息成功消费
final Boolean success = mythMqReceiveService.processMessage(messageBody);
if (success) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
});
return container;
}
代码示例来源:origin: yu199195/myth
/**
* Message container simple message listener container.
*
* @return the simple message listener container
*/
@Bean
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "host")
public SimpleMessageListenerContainer messageContainer() {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(3);
container.setConcurrentConsumers(1);
//设置确认模式手工确认
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
byte[] messageBody = message.getBody();
LogUtil.debug(LOGGER,()->"springcloud account服务 amqp接收消息");
//确认消息成功消费
final Boolean success = mythMqReceiveService.processMessage(messageBody);
if (success) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
});
return container;
}
代码示例来源:origin: FlowCI/flow-platform
public PriorityMessage(Message message) {
super(message.getBody(), message.getMessageProperties());
this.timestamp = System.nanoTime();
}
代码示例来源:origin: spring-projects/spring-integration
@SuppressWarnings("rawtypes")
@Test
public void amqpOutboundChannelAdapterWithinChain() {
Object eventDrivenConsumer = context.getBean("chainWithRabbitOutbound");
List chainHandlers = TestUtils.getPropertyValue(eventDrivenConsumer, "handler.handlers", List.class);
AmqpOutboundEndpoint endpoint = (AmqpOutboundEndpoint) chainHandlers.get(0);
assertNull(TestUtils.getPropertyValue(endpoint, "defaultDeliveryMode"));
Field amqpTemplateField = ReflectionUtils.findField(AmqpOutboundEndpoint.class, "amqpTemplate");
amqpTemplateField.setAccessible(true);
RabbitTemplate amqpTemplate = TestUtils.getPropertyValue(endpoint, "amqpTemplate", RabbitTemplate.class);
amqpTemplate = Mockito.spy(amqpTemplate);
Mockito.doAnswer(invocation -> {
Object[] args = invocation.getArguments();
org.springframework.amqp.core.Message amqpMessage = (org.springframework.amqp.core.Message) args[2];
MessageProperties properties = amqpMessage.getMessageProperties();
assertEquals("hello", new String(amqpMessage.getBody()));
assertEquals(MessageDeliveryMode.PERSISTENT, properties.getDeliveryMode());
return null;
})
.when(amqpTemplate).send(Mockito.any(String.class), Mockito.any(String.class),
Mockito.any(org.springframework.amqp.core.Message.class),
Mockito.any(CorrelationData.class));
ReflectionUtils.setField(amqpTemplateField, endpoint, amqpTemplate);
MessageChannel requestChannel = context.getBean("amqpOutboundChannelAdapterWithinChain", MessageChannel.class);
Message<?> message = MessageBuilder.withPayload("hello").build();
requestChannel.send(message);
Mockito.verify(amqpTemplate, Mockito.times(1)).send(Mockito.any(String.class),
isNull(), Mockito.any(org.springframework.amqp.core.Message.class), isNull());
}
代码示例来源:origin: souyunku/SpringBootExamples
/**
* 失败后返回消息回调
* <p>
* 当消息发送出去找不到对应路由队列时,将会把消息退回
* 如果有任何一个路由队列接收投递消息成功,则不会退回消息
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
logger.info("return--message:" + new String(message.getBody()) + ",replyCode:" + replyCode + ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey);
}
代码示例来源:origin: spring-projects/spring-integration
@Test
public void adapterWithContentType() throws Exception {
RabbitTemplate template = new RabbitTemplate(this.connectionFactory);
template.setDefaultReceiveQueue(this.queue.getName());
while (template.receive() != null) {
// drain
}
Message<?> message = MessageBuilder.withPayload("hello")
.setHeader(AmqpHeaders.CONTENT_TYPE, "application/json")
.build();
this.ctRequestChannel.send(message);
org.springframework.amqp.core.Message m = receive(template);
assertNotNull(m);
assertEquals("\"hello\"", new String(m.getBody(), "UTF-8"));
assertEquals("application/json", m.getMessageProperties().getContentType());
assertEquals("java.lang.String",
m.getMessageProperties().getHeaders().get(JsonHeaders.TYPE_ID.replaceFirst(JsonHeaders.PREFIX, "")));
message = MessageBuilder.withPayload("hello")
.build();
this.ctRequestChannel.send(message);
m = receive(template);
assertNotNull(m);
assertEquals("hello", new String(m.getBody(), "UTF-8"));
assertEquals("text/plain", m.getMessageProperties().getContentType());
while (template.receive() != null) {
// drain
}
}
代码示例来源:origin: spring-projects/spring-integration
assertEquals("\"hello\"", new String(received.getBody(), "UTF-8"));
assertEquals("application/json", received.getMessageProperties().getContentType());
assertEquals("java.lang.String", received.getMessageProperties().getHeaders()
代码示例来源:origin: spring-projects/spring-amqp
private Object asString(Message message, MessageProperties properties) {
String encoding = properties.getContentEncoding();
if (encoding == null) {
encoding = this.defaultCharset;
}
try {
return new String(message.getBody(), encoding);
}
catch (UnsupportedEncodingException e) {
throw new MessageConversionException("failed to convert text-based Message content", e);
}
}
代码示例来源:origin: spring-projects/spring-amqp
@Override
public Object fromMessage(org.springframework.amqp.core.Message message) throws MessageConversionException {
String payload = new String(message.getBody());
return Long.parseLong(payload);
}
});
代码示例来源:origin: spring-projects/spring-amqp
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String value = new String(message.getBody());
logger.debug("Receiving: " + value);
latch.countDown();
}
}
代码示例来源:origin: spring-projects/spring-amqp
protected void sendToRabbit(Channel channel, String exchange, String routingKey, boolean mandatory,
Message message) throws IOException {
BasicProperties convertedMessageProperties = this.messagePropertiesConverter
.fromMessageProperties(message.getMessageProperties(), this.encoding);
channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody());
}
代码示例来源:origin: spring-projects/spring-amqp
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String value = new String(message.getBody());
try {
logger.debug("Acking: " + value);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
finally {
latch.countDown();
}
}
}
代码示例来源:origin: spring-projects/spring-amqp
@Test
public void testManualContainer() throws Exception {
this.rabbitTemplate.convertAndSend("test.manual.container", "foo");
EnableRabbitConfig config = this.context.getBean(EnableRabbitConfig.class);
assertTrue(config.manualContainerLatch.await(10, TimeUnit.SECONDS));
assertThat(new String(config.message.getBody()), equalTo("foo"));
}
代码示例来源:origin: spring-projects/spring-amqp
@Test
public void testConvertString() throws Exception {
RabbitTemplate template = new RabbitTemplate();
String payload = "Hello, world!";
Message message = template.convertMessageIfNecessary(payload);
assertEquals(payload, new String(message.getBody(), SimpleMessageConverter.DEFAULT_CHARSET));
}
代码示例来源:origin: spring-projects/spring-amqp
@Test
public void messagingMessageReturned() {
Message message = org.springframework.amqp.core.MessageBuilder.withBody("\"messaging\"".getBytes())
.andProperties(MessagePropertiesBuilder.newInstance().setContentType("application/json").build()).build();
message = this.rabbitTemplate.sendAndReceive("test.messaging.message", message);
assertThat(message, is(notNullValue()));
assertThat(new String(message.getBody()), equalTo("{\"field\":\"MESSAGING\"}"));
assertThat(message.getMessageProperties().getHeaders().get("foo"), equalTo("bar"));
}
代码示例来源:origin: spring-projects/spring-amqp
@Test
public void toMessageWithTextMessage() {
org.springframework.amqp.core.Message message = converter
.toMessage(MessageBuilder.withPayload("Hello World").build(), new MessageProperties());
assertEquals(MessageProperties.CONTENT_TYPE_TEXT_PLAIN, message.getMessageProperties().getContentType());
assertEquals("Hello World", new String(message.getBody()));
}
代码示例来源:origin: spring-projects/spring-amqp
@Test
public void stringToMessage() throws Exception {
SimpleMessageConverter converter = new SimpleMessageConverter();
Message message = converter.toMessage("test", new MessageProperties());
String contentType = message.getMessageProperties().getContentType();
String content = new String(message.getBody(),
message.getMessageProperties().getContentEncoding());
assertEquals("text/plain", contentType);
assertEquals("test", content);
}
内容来源于网络,如有侵权,请联系作者删除!