org.apache.qpid.proton.message.Message.setReplyTo()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(13.6k)|赞(0)|评价(0)|浏览(214)

本文整理了Java中org.apache.qpid.proton.message.Message.setReplyTo()方法的一些代码示例,展示了Message.setReplyTo()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.setReplyTo()方法的具体详情如下:
包路径:org.apache.qpid.proton.message.Message
类名称:Message
方法名:setReplyTo

Message.setReplyTo介绍

暂无

代码示例

代码示例来源:origin: org.apache.qpid/proton

  1. private void adjustReplyTo(Message m)
  2. {
  3. String original = m.getReplyTo();
  4. if (original == null || original.length() == 0)
  5. {
  6. m.setReplyTo("amqp://" + _name);
  7. }
  8. else if (original.startsWith("~/"))
  9. {
  10. m.setReplyTo("amqp://" + _name + "/" + original.substring(2));
  11. }
  12. }

代码示例来源:origin: Azure/azure-service-bus-java

  1. amqpMessage.setSubject(brokeredMessage.getLabel());
  2. amqpMessage.getProperties().setTo(brokeredMessage.getTo());
  3. amqpMessage.setReplyTo(brokeredMessage.getReplyTo());
  4. amqpMessage.setReplyToGroupId(brokeredMessage.getReplyToSessionId());
  5. amqpMessage.setGroupId(brokeredMessage.getSessionId());

代码示例来源:origin: org.apache.qpid/proton-j-impl

  1. private void adjustReplyTo(Message m)
  2. {
  3. String original = m.getReplyTo();
  4. if (original != null) {
  5. if (original.startsWith("~/"))
  6. {
  7. m.setReplyTo("amqp://" + _name + "/" + original.substring(2));
  8. }
  9. else if (original.equals("~"))
  10. {
  11. m.setReplyTo("amqp://" + _name);
  12. }
  13. }
  14. }

代码示例来源:origin: com.microsoft.azure.iot/proton-j-azure-iot

  1. private void adjustReplyTo(Message m)
  2. {
  3. String original = m.getReplyTo();
  4. if (original != null) {
  5. if (original.startsWith("~/"))
  6. {
  7. m.setReplyTo("amqp://" + _name + "/" + original.substring(2));
  8. }
  9. else if (original.equals("~"))
  10. {
  11. m.setReplyTo("amqp://" + _name);
  12. }
  13. }
  14. }

代码示例来源:origin: EnMasseProject/enmasse

  1. /**
  2. * Return a raw AMQP message
  3. *
  4. * @return
  5. */
  6. public Message toAmqp() {
  7. Message message = ProtonHelper.message();
  8. message.setSubject(AMQP_SUBJECT);
  9. message.setCorrelationId(String.format(AmqpHelper.AMQP_CLIENT_PUBLISH_ADDRESS_TEMPLATE, this.clientId));
  10. message.setReplyTo(String.format(AmqpHelper.AMQP_CLIENT_CONTROL_ADDRESS_TEMPLATE, this.clientId));
  11. return message;
  12. }

代码示例来源:origin: apache/activemq-artemis

  1. /**
  2. * Sets the replyTo address which is applied to the AMQP message reply-to field in the message properties
  3. *
  4. * @param address The replyTo address that should be applied in the Message To field.
  5. */
  6. public void setReplyToAddress(String address) {
  7. checkReadOnly();
  8. lazyCreateProperties();
  9. getWrappedMessage().setReplyTo(address);
  10. }

代码示例来源:origin: EnMasseProject/enmasse

  1. @Override
  2. public int getSubscriberCount(AmqpClient queueClient, Destination replyQueue, String queue) throws Exception {
  3. Message requestMessage = Message.Factory.create();
  4. Map<String, Object> appProperties = new HashMap<>();
  5. appProperties.put(resourceProperty, "queue." + queue);
  6. appProperties.put(operationProperty, "getConsumerCount");
  7. requestMessage.setAddress(managementAddress);
  8. requestMessage.setApplicationProperties(new ApplicationProperties(appProperties));
  9. requestMessage.setReplyTo(replyQueue.getAddress());
  10. requestMessage.setBody(new AmqpValue("[]"));
  11. Future<Integer> sent = queueClient.sendMessages(managementAddress, requestMessage);
  12. assertThat(String.format("Sender failed, expected %d messages", 1),
  13. sent.get(30, TimeUnit.SECONDS), is(1));
  14. log.info("request sent");
  15. Future<List<Message>> received = queueClient.recvMessages(replyQueue.getAddress(), 1);
  16. assertThat(String.format("Receiver failed, expected %d messages", 1),
  17. received.get(30, TimeUnit.SECONDS).size(), is(1));
  18. AmqpValue val = (AmqpValue) received.get().get(0).getBody();
  19. log.info("answer received: " + val.toString());
  20. String count = val.getValue().toString().replaceAll("\\[|]|\"", "");
  21. return Integer.valueOf(count);
  22. }
  23. }

代码示例来源:origin: EnMasseProject/enmasse

  1. @Override
  2. public List<String> getQueueNames(AmqpClient queueClient, Destination replyQueue, String topic) throws Exception {
  3. Message requestMessage = Message.Factory.create();
  4. Map<String, Object> appProperties = new HashMap<>();
  5. appProperties.put(resourceProperty, "address." + topic);
  6. appProperties.put(operationProperty, "getQueueNames");
  7. requestMessage.setAddress(managementAddress);
  8. requestMessage.setApplicationProperties(new ApplicationProperties(appProperties));
  9. requestMessage.setReplyTo(replyQueue.getAddress());
  10. requestMessage.setBody(new AmqpValue("[]"));
  11. Future<Integer> sent = queueClient.sendMessages(managementAddress, requestMessage);
  12. assertThat(String.format("Sender failed, expected %d messages", 1), sent.get(30, TimeUnit.SECONDS), is(1));
  13. log.info("request sent");
  14. Future<List<Message>> received = queueClient.recvMessages(replyQueue.getAddress(), 1);
  15. assertThat(String.format("Receiver failed, expected %d messages", 1),
  16. received.get(30, TimeUnit.SECONDS).size(), is(1));
  17. AmqpValue val = (AmqpValue) received.get().get(0).getBody();
  18. log.info("answer received: " + val.toString());
  19. String queues = val.getValue().toString();
  20. queues = queues.replaceAll("\\[|]|\"", "");
  21. return Arrays.asList(queues.split(","));
  22. }

代码示例来源:origin: EnMasseProject/enmasse

  1. public Message request(Message message, long timeout, TimeUnit timeUnit) {
  2. Map<String, Object> properties = new HashMap<>();
  3. if (message.getApplicationProperties() != null) {
  4. properties.putAll(message.getApplicationProperties().getValue());
  5. }
  6. message.setApplicationProperties(new ApplicationProperties(properties));
  7. if (message.getReplyTo() == null) {
  8. message.setReplyTo(replyTo);
  9. }
  10. context.runOnContext(h -> sender.send(message));
  11. try {
  12. return replies.poll(timeout, timeUnit);
  13. } catch (InterruptedException e) {
  14. throw new RuntimeException(e);
  15. }
  16. }

代码示例来源:origin: Azure/azure-event-hubs-java

  1. public void request(
  2. final Message message,
  3. final OperationResult<Message, Exception> onResponse) {
  4. if (message == null)
  5. throw new IllegalArgumentException("message cannot be null");
  6. if (message.getMessageId() != null)
  7. throw new IllegalArgumentException("message.getMessageId() should be null");
  8. if (message.getReplyTo() != null)
  9. throw new IllegalArgumentException("message.getReplyTo() should be null");
  10. message.setMessageId("request" + UnsignedLong.valueOf(this.requestId.incrementAndGet()).toString());
  11. message.setReplyTo(this.replyTo);
  12. this.inflightRequests.put(message.getMessageId(), onResponse);
  13. sendLink.delivery(UUID.randomUUID().toString().replace("-", StringUtil.EMPTY).getBytes());
  14. final int payloadSize = AmqpUtil.getDataSerializedSize(message) + 512; // need buffer for headers
  15. final byte[] bytes = new byte[payloadSize];
  16. final int encodedSize = message.encode(bytes, 0, payloadSize);
  17. receiveLink.flow(1);
  18. sendLink.send(bytes, 0, encodedSize);
  19. sendLink.advance();
  20. }

代码示例来源:origin: org.eclipse.hono/hono-client

  1. /**
  2. * Build a Proton message with a provided subject (serving as the operation that shall be invoked).
  3. * The message can be extended by arbitrary application properties passed in.
  4. * <p>
  5. * To enable specific message properties that are not considered here, the method can be overridden by subclasses.
  6. *
  7. * @param subject The subject system property of the message.
  8. * @param appProperties The map containing arbitrary application properties.
  9. * Maybe null if no application properties are needed.
  10. * @return The Proton message constructed from the provided parameters.
  11. * @throws NullPointerException if the subject is {@code null}.
  12. * @throws IllegalArgumentException if the application properties contain not AMQP 1.0 compatible values
  13. * (see {@link AbstractHonoClient#setApplicationProperties(Message, Map)}
  14. */
  15. private Message createMessage(final String subject, final Map<String, Object> appProperties) {
  16. Objects.requireNonNull(subject);
  17. final Message msg = ProtonHelper.message();
  18. final String messageId = createMessageId();
  19. AbstractHonoClient.setApplicationProperties(msg, appProperties);
  20. msg.setReplyTo(replyToAddress);
  21. msg.setMessageId(messageId);
  22. msg.setSubject(subject);
  23. return msg;
  24. }

代码示例来源:origin: Azure/azure-service-bus-java

  1. public CompletableFuture<Message> requestAysnc(Message requestMessage, TransactionContext transaction, Duration timeout)
  2. {
  3. this.throwIfClosed(null);
  4. CompletableFuture<Message> responseFuture = new CompletableFuture<Message>();
  5. RequestResponseWorkItem workItem = new RequestResponseWorkItem(requestMessage, transaction, responseFuture, timeout);
  6. String requestId = "request:" + this.requestCounter.incrementAndGet();
  7. requestMessage.setMessageId(requestId);
  8. requestMessage.setReplyTo(this.replyTo);
  9. this.pendingRequests.put(requestId, workItem);
  10. workItem.setTimeoutTask(this.scheduleRequestTimeout(requestId, timeout));
  11. TRACE_LOGGER.debug("Sending request with id:{}", requestId);
  12. this.amqpSender.sendRequest(requestId, false);
  13. // Check and recreate links if necessary
  14. if(!((this.amqpSender.sendLink.getLocalState() == EndpointState.ACTIVE && this.amqpSender.sendLink.getRemoteState() == EndpointState.ACTIVE)
  15. && (this.amqpReceiver.receiveLink.getLocalState() == EndpointState.ACTIVE && this.amqpReceiver.receiveLink.getRemoteState() == EndpointState.ACTIVE)))
  16. {
  17. this.ensureUniqueLinkRecreation();
  18. }
  19. return responseFuture;
  20. }

代码示例来源:origin: eclipse/hono

  1. private static Message givenAValidMessageWithoutBody(final CredentialsConstants.CredentialsAction action) {
  2. final Message msg = ProtonHelper.message();
  3. msg.setMessageId("msg");
  4. msg.setReplyTo("reply");
  5. msg.setSubject(action.toString());
  6. return msg;
  7. }
  8. }

代码示例来源:origin: eclipse/hono

  1. private Message givenAMessageHavingProperties(final TenantConstants.TenantAction action) {
  2. final Message msg = ProtonHelper.message();
  3. msg.setMessageId("msg");
  4. msg.setReplyTo("reply");
  5. msg.setSubject(action.toString());
  6. return msg;
  7. }
  8. }

代码示例来源:origin: eclipse/hono

  1. /**
  2. * Verifies that the endpoint rejects request messages for operations the client
  3. * is not authorized to invoke.
  4. */
  5. @Test
  6. public void testHandleMessageRejectsUnauthorizedRequests() {
  7. final Message msg = ProtonHelper.message();
  8. msg.setSubject("unauthorized");
  9. msg.setReplyTo(REPLY_RESOURCE.toString());
  10. final ProtonConnection con = mock(ProtonConnection.class);
  11. final ProtonDelivery delivery = mock(ProtonDelivery.class);
  12. final AuthorizationService authService = mock(AuthorizationService.class);
  13. when(authService.isAuthorized(any(HonoUser.class), any(ResourceIdentifier.class), anyString())).thenReturn(Future.succeededFuture(Boolean.FALSE));
  14. final Future<Void> processingTracker = Future.future();
  15. final RequestResponseEndpoint<ServiceConfigProperties> endpoint = getEndpoint(true, processingTracker);
  16. endpoint.setAuthorizationService(authService);
  17. endpoint.onLinkAttach(con, sender, REPLY_RESOURCE);
  18. // WHEN a request for an operation is received that the client is not authorized to invoke
  19. endpoint.handleMessage(con, receiver, resource, delivery, msg);
  20. // THEN the the message is rejected
  21. final ArgumentCaptor<DeliveryState> deliveryState = ArgumentCaptor.forClass(DeliveryState.class);
  22. verify(delivery).disposition(deliveryState.capture(), booleanThat(is(Boolean.TRUE)));
  23. assertThat(deliveryState.getValue(), instanceOf(Rejected.class));
  24. verify(receiver, never()).close();
  25. verify(authService).isAuthorized(Constants.PRINCIPAL_ANONYMOUS, resource, "unauthorized");
  26. assertFalse(processingTracker.isComplete());
  27. }

代码示例来源:origin: eclipse/hono

  1. private static Message givenAMessageHavingProperties(final String deviceId, final String action) {
  2. final Message msg = ProtonHelper.message();
  3. msg.setMessageId("msg-id");
  4. msg.setReplyTo("reply");
  5. msg.setSubject(action);
  6. if (deviceId != null) {
  7. MessageHelper.addDeviceId(msg, deviceId);
  8. }
  9. return msg;
  10. }
  11. }

代码示例来源:origin: eclipse/hono

  1. /**
  2. * Verifies that the endpoint processes request messages for operations the client
  3. * is authorized to invoke.
  4. */
  5. @Test
  6. public void testHandleMessageProcessesAuthorizedRequests() {
  7. final Message msg = ProtonHelper.message();
  8. msg.setSubject("get");
  9. msg.setReplyTo(REPLY_RESOURCE.toString());
  10. final ProtonConnection con = mock(ProtonConnection.class);
  11. final ProtonDelivery delivery = mock(ProtonDelivery.class);
  12. final AuthorizationService authService = mock(AuthorizationService.class);
  13. when(authService.isAuthorized(any(HonoUser.class), any(ResourceIdentifier.class), anyString())).thenReturn(Future.succeededFuture(Boolean.TRUE));
  14. final Future<Void> processingTracker = Future.future();
  15. final RequestResponseEndpoint<ServiceConfigProperties> endpoint = getEndpoint(true, processingTracker);
  16. endpoint.setAuthorizationService(authService);
  17. endpoint.onLinkAttach(con, sender, REPLY_RESOURCE);
  18. // WHEN a request for an operation is received that the client is authorized to invoke
  19. endpoint.handleMessage(con, receiver, resource, delivery, msg);
  20. // THEN then the message gets processed
  21. final ArgumentCaptor<DeliveryState> deliveryState = ArgumentCaptor.forClass(DeliveryState.class);
  22. verify(delivery).disposition(deliveryState.capture(), booleanThat(is(Boolean.TRUE)));
  23. assertThat(deliveryState.getValue(), instanceOf(Accepted.class));
  24. verify(receiver, never()).close();
  25. verify(authService).isAuthorized(Constants.PRINCIPAL_ANONYMOUS, resource, "get");
  26. assertTrue(processingTracker.isComplete());
  27. }

代码示例来源:origin: eclipse/hono

  1. /**
  2. * Verifies that the filter detects a missing subject.
  3. */
  4. @Test
  5. public void testVerifyDetectsMissingSubject() {
  6. // GIVEN a request message without a subject
  7. final Message msg = ProtonHelper.message();
  8. msg.setMessageId("msg");
  9. msg.setReplyTo("reply");
  10. // WHEN receiving the message via a link with any tenant
  11. final ResourceIdentifier linkTarget = getResourceIdentifier(DEFAULT_TENANT);
  12. // THEN message validation fails
  13. assertFalse(TenantMessageFilter.verify(linkTarget, msg));
  14. }

代码示例来源:origin: eclipse/hono

  1. /**
  2. * Verifies that a message that does not contain a message-id nor correlation-id
  3. * does not pass the filter.
  4. */
  5. @Test
  6. public void testVerifyFailsForMissingCorrelationId() {
  7. // GIVEN a message with an unsupported subject
  8. final Message msg = ProtonHelper.message();
  9. msg.setReplyTo("reply");
  10. msg.setBody(new AmqpValue(BILLIE_HASHED_PASSWORD));
  11. msg.setContentType("application/json");
  12. // WHEN receiving the message via a link with any tenant
  13. final boolean filterResult = CredentialsMessageFilter.verify(target, msg);
  14. // THEN message validation fails
  15. assertFalse(filterResult);
  16. }

代码示例来源:origin: io.vertx/vertx-amqp-bridge

  1. protonMsg.setReplyTo(destinationName);

相关文章