org.apache.qpid.proton.message.Message.getBody()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(11.3k)|赞(0)|评价(0)|浏览(204)

本文整理了Java中org.apache.qpid.proton.message.Message.getBody()方法的一些代码示例,展示了Message.getBody()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.getBody()方法的具体详情如下:
包路径:org.apache.qpid.proton.message.Message
类名称:Message
方法名:getBody

Message.getBody介绍

暂无

代码示例

代码示例来源:origin: vert-x3/vertx-examples

  1. @Override
  2. public void start() throws Exception {
  3. ProtonClient client = ProtonClient.create(vertx);
  4. client.connect("localhost", 5672, res -> {
  5. if(!res.succeeded()) {
  6. System.out.println("Connect failed: " + res.cause());
  7. return;
  8. }
  9. ProtonConnection connection = res.result();
  10. connection.open();
  11. connection.createReceiver(address).handler((delivery, msg) -> {
  12. String content = (String) ((AmqpValue) msg.getBody()).getValue();
  13. System.out.println("Received message with content: " + content);
  14. // By default, receivers automatically accept (and settle) the delivery
  15. // when the handler returns, if no other disposition has been applied.
  16. // To change this and always manage dispositions yourself, use the
  17. // setAutoAccept method on the receiver.
  18. }).open();
  19. });
  20. }
  21. }

代码示例来源:origin: vert-x3/vertx-examples

  1. Section body = msg.getBody();
  2. if (body instanceof AmqpValue) {
  3. String content = (String) ((AmqpValue) body).getValue();

代码示例来源:origin: eclipse/hono

  1. /**
  2. * Checks if a message's body consists of an AMQP <em>Data</em> section.
  3. *
  4. * @param message The message to check.
  5. * @return {@code true} if the body consists of a Data section, {@code false} otherwise.
  6. * @throws NullPointerException If message is {@code null}.
  7. */
  8. public static boolean hasDataBody(final Message message) {
  9. Objects.requireNonNull(message);
  10. return message.getBody() instanceof Data;
  11. }

代码示例来源:origin: Azure/azure-service-bus-java

  1. public static Map getResponseBody(Message responseMessage)
  2. {
  3. return (Map)((AmqpValue)responseMessage.getBody()).getValue();
  4. }

代码示例来源:origin: EnMasseProject/enmasse

  1. public synchronized List<String> recvMessages(String address, int numMessages) {
  2. Queue<Message> queue = queues.get(address);
  3. if (queue == null) {
  4. return null;
  5. }
  6. List<String> messages = new ArrayList<>();
  7. while (numMessages > 0) {
  8. Message message = queue.poll();
  9. if (message == null) {
  10. throw new RuntimeException("No more messages, " + numMessages + " remains");
  11. }
  12. messages.add((String)((AmqpValue)message.getBody()).getValue());
  13. numMessages--;
  14. }
  15. return messages;
  16. }
  17. }

代码示例来源:origin: eclipse/hono

  1. private void handleMessage(final String msgType, final Message msg) {
  2. final Data body = (Data) msg.getBody();
  3. LOG.debug("Type: [{}] and Message: [{}]", msgType, body != null ? body.getValue().toString() : "");
  4. }

代码示例来源:origin: EnMasseProject/enmasse

  1. private String doOperationWithStringResult(String resource, String operation, Object ... parameters) throws TimeoutException {
  2. Message response = doOperation(resource, operation, parameters);
  3. String payload = (String) ((AmqpValue)response.getBody()).getValue();
  4. JsonArray json = new JsonArray(payload);
  5. return json.getString(0);
  6. }

代码示例来源:origin: EnMasseProject/enmasse

  1. public long getQueueMessageCount(String queueName) throws TimeoutException {
  2. log.info("Checking message count for queue {} on broker {}", queueName, syncRequestClient.getRemoteContainer());
  3. Message response = doAttribute("queue." + queueName, "messageCount");
  4. String payload = (String) ((AmqpValue)response.getBody()).getValue();
  5. JsonArray json = new JsonArray(payload);
  6. return json.getLong(0);
  7. }

代码示例来源:origin: EnMasseProject/enmasse

  1. public String getQueueAddress(String queueName) throws TimeoutException {
  2. log.info("Checking queue address for queue {} on broker {}", queueName, syncRequestClient.getRemoteContainer());
  3. Message response = doOperation("queue." + queueName, "getAddress");
  4. String payload = (String) ((AmqpValue)response.getBody()).getValue();
  5. JsonArray json = new JsonArray(payload);
  6. return json.getString(0);
  7. }

代码示例来源:origin: EnMasseProject/enmasse

  1. public Set<String> getDivertNames() throws TimeoutException {
  2. log.info("Retrieving divert names");
  3. Message response = doOperation("broker", "getDivertNames");
  4. Set<String> diverts = new LinkedHashSet<>();
  5. JsonArray payload = new JsonArray((String)((AmqpValue)response.getBody()).getValue());
  6. for (int i = 0; i < payload.size(); i++) {
  7. JsonArray inner = payload.getJsonArray(i);
  8. for (int j = 0; j < inner.size(); j++) {
  9. diverts.add(inner.getString(j));
  10. }
  11. }
  12. return diverts;
  13. }

代码示例来源:origin: EnMasseProject/enmasse

  1. @Override
  2. public List<String> getQueueNames(AmqpClient queueClient, Destination replyQueue, String topic) throws Exception {
  3. Message requestMessage = Message.Factory.create();
  4. Map<String, Object> appProperties = new HashMap<>();
  5. appProperties.put(resourceProperty, "address." + topic);
  6. appProperties.put(operationProperty, "getQueueNames");
  7. requestMessage.setAddress(managementAddress);
  8. requestMessage.setApplicationProperties(new ApplicationProperties(appProperties));
  9. requestMessage.setReplyTo(replyQueue.getAddress());
  10. requestMessage.setBody(new AmqpValue("[]"));
  11. Future<Integer> sent = queueClient.sendMessages(managementAddress, requestMessage);
  12. assertThat(String.format("Sender failed, expected %d messages", 1), sent.get(30, TimeUnit.SECONDS), is(1));
  13. log.info("request sent");
  14. Future<List<Message>> received = queueClient.recvMessages(replyQueue.getAddress(), 1);
  15. assertThat(String.format("Receiver failed, expected %d messages", 1),
  16. received.get(30, TimeUnit.SECONDS).size(), is(1));
  17. AmqpValue val = (AmqpValue) received.get().get(0).getBody();
  18. log.info("answer received: " + val.toString());
  19. String queues = val.getValue().toString();
  20. queues = queues.replaceAll("\\[|]|\"", "");
  21. return Arrays.asList(queues.split(","));
  22. }

代码示例来源:origin: EnMasseProject/enmasse

  1. private static List<List<String>> collectRouter(SyncRequestClient client, String entityType, List<String> attributeNames) throws Exception {
  2. Map<String, Object> properties = new LinkedHashMap<>();
  3. properties.put("operation", "QUERY");
  4. properties.put("entityType", entityType);
  5. Map<String, Object> body = new LinkedHashMap<>();
  6. body.put("attributeNames", attributeNames);
  7. Message message = Proton.message();
  8. message.setApplicationProperties(new ApplicationProperties(properties));
  9. message.setBody(new AmqpValue(body));
  10. Message response = client.request(message, 10, TimeUnit.SECONDS);
  11. AmqpValue value = (AmqpValue) response.getBody();
  12. Map<?,?> values = (Map<?,?>) value.getValue();
  13. @SuppressWarnings("unchecked")
  14. List<List<String>> results = (List<List<String>>) values.get("results");
  15. return results;
  16. }
  17. }

代码示例来源:origin: EnMasseProject/enmasse

  1. /**
  2. * Return an AMQP_UNSUBSCRIBE message from the raw AMQP one
  3. *
  4. * @param message raw AMQP message
  5. * @return AMQP_UNSUBSCRIBE message
  6. */
  7. @SuppressWarnings("unchecked")
  8. public static AmqpUnsubscribeMessage from(Message message) {
  9. if (!message.getSubject().equals(AMQP_SUBJECT)) {
  10. throw new IllegalArgumentException(String.format("AMQP message subject is no %s", AMQP_SUBJECT));
  11. }
  12. Section section = message.getBody();
  13. if ((section != null) && (section instanceof AmqpValue)) {
  14. List<String> topics = (List<String>) ((AmqpValue) section).getValue();
  15. return new AmqpUnsubscribeMessage(AmqpHelper.getClientIdFromPublishAddress((String) message.getCorrelationId()),
  16. topics);
  17. } else {
  18. throw new IllegalArgumentException("AMQP message wrong body type");
  19. }
  20. }

代码示例来源:origin: EnMasseProject/enmasse

  1. public Set<String> getQueueNames() throws TimeoutException {
  2. log.info("Retrieving queue names for broker {}", syncRequestClient.getRemoteContainer());
  3. Message response = doOperation("broker", "getQueueNames");
  4. Set<String> queues = new LinkedHashSet<>();
  5. JsonArray payload = new JsonArray((String)((AmqpValue)response.getBody()).getValue());
  6. for (int i = 0; i < payload.size(); i++) {
  7. JsonArray inner = payload.getJsonArray(i);
  8. for (int j = 0; j < inner.size(); j++) {
  9. String queueName = inner.getString(j);
  10. if (!queueName.equals(syncRequestClient.getReplyTo())) {
  11. queues.add(queueName);
  12. }
  13. }
  14. }
  15. return queues;
  16. }

代码示例来源:origin: EnMasseProject/enmasse

  1. public Set<String> getConnectorNames() throws TimeoutException {
  2. log.info("Retrieving conector names for broker {}", syncRequestClient.getRemoteContainer());
  3. Message response = doOperation("broker", "getConnectorServices");
  4. Set<String> connectors = new LinkedHashSet<>();
  5. JsonArray payload = new JsonArray((String)((AmqpValue)response.getBody()).getValue());
  6. for (int i = 0; i < payload.size(); i++) {
  7. JsonArray inner = payload.getJsonArray(i);
  8. for (int j = 0; j < inner.size(); j++) {
  9. String connector = inner.getString(j);
  10. if (!connector.equals("amqp-connector")) {
  11. connectors.add(connector);
  12. }
  13. }
  14. }
  15. return connectors;
  16. }
  17. }

代码示例来源:origin: io.vertx/vertx-amqp-bridge

  1. private void doJSON_to_AMQP_VerifyStringBodyTestImpl(boolean setBodyType) {
  2. String testContent = "myTestContent";
  3. JsonObject jsonObject = new JsonObject();
  4. jsonObject.put(AmqpConstants.BODY, testContent);
  5. if(setBodyType){
  6. jsonObject.put(AmqpConstants.BODY_TYPE, AmqpConstants.BODY_TYPE_VALUE);
  7. }
  8. Message protonMsg = translator.convertToAmqpMessage(jsonObject);
  9. assertNotNull("Expected converted msg", protonMsg);
  10. Section body = protonMsg.getBody();
  11. assertTrue("Unexpected body type", body instanceof AmqpValue);
  12. assertEquals("Unexpected message body value", testContent, ((AmqpValue) body).getValue());
  13. }

代码示例来源:origin: io.vertx/vertx-amqp-bridge

  1. @Test
  2. public void testJSON_to_AMQP_VerifyDataBody() {
  3. String testContent = "myTestContent";
  4. JsonObject jsonObject = new JsonObject();
  5. jsonObject.put(AmqpConstants.BODY, testContent.getBytes(StandardCharsets.UTF_8));
  6. jsonObject.put(AmqpConstants.BODY_TYPE, AmqpConstants.BODY_TYPE_DATA);
  7. Message protonMsg = translator.convertToAmqpMessage(jsonObject);
  8. assertNotNull("Expected converted msg", protonMsg);
  9. Section body = protonMsg.getBody();
  10. assertTrue("Unexpected body type", body instanceof Data);
  11. assertNotNull("Unexpected body content", body);
  12. assertEquals("Unexpected message body value", new Binary(testContent.getBytes(StandardCharsets.UTF_8)),
  13. ((Data) body).getValue());
  14. }

代码示例来源:origin: org.apache.beam/beam-sdks-java-io-amqp

  1. @Test
  2. public void encodeDecodeLargeMessage() throws Exception {
  3. Message message = Message.Factory.create();
  4. message.setAddress("address");
  5. message.setSubject("subject");
  6. String body = Joiner.on("").join(Collections.nCopies(32 * 1024 * 1024, " "));
  7. message.setBody(new AmqpValue(body));
  8. AmqpMessageCoder coder = AmqpMessageCoder.of();
  9. Message clone = CoderUtils.clone(coder, message);
  10. clone.getBody().toString().equals(message.getBody().toString());
  11. }
  12. }

代码示例来源:origin: apache/activemq-artemis

  1. private void validateMessage(byte[] expectedPayload, int msgNum, AmqpMessage message) {
  2. assertNotNull("failed at " + msgNum, message);
  3. Section body = message.getWrappedMessage().getBody();
  4. assertNotNull("No message body for msg " + msgNum, body);
  5. assertTrue("Unexpected message body type for msg " + body.getClass(), body instanceof Data);
  6. assertEquals("Unexpected body content for msg", new Binary(expectedPayload, 0, expectedPayload.length), ((Data) body).getValue());
  7. }

代码示例来源:origin: org.apache.beam/beam-sdks-java-io-amqp

  1. @Test
  2. public void encodeDecode() throws Exception {
  3. Message message = Message.Factory.create();
  4. message.setBody(new AmqpValue("body"));
  5. message.setAddress("address");
  6. message.setSubject("test");
  7. AmqpMessageCoder coder = AmqpMessageCoder.of();
  8. Message clone = CoderUtils.clone(coder, message);
  9. assertEquals("AmqpValue{body}", clone.getBody().toString());
  10. assertEquals("address", clone.getAddress());
  11. assertEquals("test", clone.getSubject());
  12. }

相关文章