本文整理了Java中org.apache.qpid.proton.message.Message.setReplyTo()
方法的一些代码示例,展示了Message.setReplyTo()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.setReplyTo()
方法的具体详情如下:
包路径:org.apache.qpid.proton.message.Message
类名称:Message
方法名:setReplyTo
暂无
代码示例来源:origin: org.apache.qpid/proton
private void adjustReplyTo(Message m)
{
String original = m.getReplyTo();
if (original == null || original.length() == 0)
{
m.setReplyTo("amqp://" + _name);
}
else if (original.startsWith("~/"))
{
m.setReplyTo("amqp://" + _name + "/" + original.substring(2));
}
}
代码示例来源:origin: Azure/azure-service-bus-java
amqpMessage.setSubject(brokeredMessage.getLabel());
amqpMessage.getProperties().setTo(brokeredMessage.getTo());
amqpMessage.setReplyTo(brokeredMessage.getReplyTo());
amqpMessage.setReplyToGroupId(brokeredMessage.getReplyToSessionId());
amqpMessage.setGroupId(brokeredMessage.getSessionId());
代码示例来源:origin: org.apache.qpid/proton-j-impl
private void adjustReplyTo(Message m)
{
String original = m.getReplyTo();
if (original != null) {
if (original.startsWith("~/"))
{
m.setReplyTo("amqp://" + _name + "/" + original.substring(2));
}
else if (original.equals("~"))
{
m.setReplyTo("amqp://" + _name);
}
}
}
代码示例来源:origin: com.microsoft.azure.iot/proton-j-azure-iot
private void adjustReplyTo(Message m)
{
String original = m.getReplyTo();
if (original != null) {
if (original.startsWith("~/"))
{
m.setReplyTo("amqp://" + _name + "/" + original.substring(2));
}
else if (original.equals("~"))
{
m.setReplyTo("amqp://" + _name);
}
}
}
代码示例来源:origin: EnMasseProject/enmasse
/**
* Return a raw AMQP message
*
* @return
*/
public Message toAmqp() {
Message message = ProtonHelper.message();
message.setSubject(AMQP_SUBJECT);
message.setCorrelationId(String.format(AmqpHelper.AMQP_CLIENT_PUBLISH_ADDRESS_TEMPLATE, this.clientId));
message.setReplyTo(String.format(AmqpHelper.AMQP_CLIENT_CONTROL_ADDRESS_TEMPLATE, this.clientId));
return message;
}
代码示例来源:origin: apache/activemq-artemis
/**
* Sets the replyTo address which is applied to the AMQP message reply-to field in the message properties
*
* @param address The replyTo address that should be applied in the Message To field.
*/
public void setReplyToAddress(String address) {
checkReadOnly();
lazyCreateProperties();
getWrappedMessage().setReplyTo(address);
}
代码示例来源:origin: EnMasseProject/enmasse
@Override
public int getSubscriberCount(AmqpClient queueClient, Destination replyQueue, String queue) throws Exception {
Message requestMessage = Message.Factory.create();
Map<String, Object> appProperties = new HashMap<>();
appProperties.put(resourceProperty, "queue." + queue);
appProperties.put(operationProperty, "getConsumerCount");
requestMessage.setAddress(managementAddress);
requestMessage.setApplicationProperties(new ApplicationProperties(appProperties));
requestMessage.setReplyTo(replyQueue.getAddress());
requestMessage.setBody(new AmqpValue("[]"));
Future<Integer> sent = queueClient.sendMessages(managementAddress, requestMessage);
assertThat(String.format("Sender failed, expected %d messages", 1),
sent.get(30, TimeUnit.SECONDS), is(1));
log.info("request sent");
Future<List<Message>> received = queueClient.recvMessages(replyQueue.getAddress(), 1);
assertThat(String.format("Receiver failed, expected %d messages", 1),
received.get(30, TimeUnit.SECONDS).size(), is(1));
AmqpValue val = (AmqpValue) received.get().get(0).getBody();
log.info("answer received: " + val.toString());
String count = val.getValue().toString().replaceAll("\\[|]|\"", "");
return Integer.valueOf(count);
}
}
代码示例来源:origin: EnMasseProject/enmasse
@Override
public List<String> getQueueNames(AmqpClient queueClient, Destination replyQueue, String topic) throws Exception {
Message requestMessage = Message.Factory.create();
Map<String, Object> appProperties = new HashMap<>();
appProperties.put(resourceProperty, "address." + topic);
appProperties.put(operationProperty, "getQueueNames");
requestMessage.setAddress(managementAddress);
requestMessage.setApplicationProperties(new ApplicationProperties(appProperties));
requestMessage.setReplyTo(replyQueue.getAddress());
requestMessage.setBody(new AmqpValue("[]"));
Future<Integer> sent = queueClient.sendMessages(managementAddress, requestMessage);
assertThat(String.format("Sender failed, expected %d messages", 1), sent.get(30, TimeUnit.SECONDS), is(1));
log.info("request sent");
Future<List<Message>> received = queueClient.recvMessages(replyQueue.getAddress(), 1);
assertThat(String.format("Receiver failed, expected %d messages", 1),
received.get(30, TimeUnit.SECONDS).size(), is(1));
AmqpValue val = (AmqpValue) received.get().get(0).getBody();
log.info("answer received: " + val.toString());
String queues = val.getValue().toString();
queues = queues.replaceAll("\\[|]|\"", "");
return Arrays.asList(queues.split(","));
}
代码示例来源:origin: EnMasseProject/enmasse
public Message request(Message message, long timeout, TimeUnit timeUnit) {
Map<String, Object> properties = new HashMap<>();
if (message.getApplicationProperties() != null) {
properties.putAll(message.getApplicationProperties().getValue());
}
message.setApplicationProperties(new ApplicationProperties(properties));
if (message.getReplyTo() == null) {
message.setReplyTo(replyTo);
}
context.runOnContext(h -> sender.send(message));
try {
return replies.poll(timeout, timeUnit);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
代码示例来源:origin: Azure/azure-event-hubs-java
public void request(
final Message message,
final OperationResult<Message, Exception> onResponse) {
if (message == null)
throw new IllegalArgumentException("message cannot be null");
if (message.getMessageId() != null)
throw new IllegalArgumentException("message.getMessageId() should be null");
if (message.getReplyTo() != null)
throw new IllegalArgumentException("message.getReplyTo() should be null");
message.setMessageId("request" + UnsignedLong.valueOf(this.requestId.incrementAndGet()).toString());
message.setReplyTo(this.replyTo);
this.inflightRequests.put(message.getMessageId(), onResponse);
sendLink.delivery(UUID.randomUUID().toString().replace("-", StringUtil.EMPTY).getBytes());
final int payloadSize = AmqpUtil.getDataSerializedSize(message) + 512; // need buffer for headers
final byte[] bytes = new byte[payloadSize];
final int encodedSize = message.encode(bytes, 0, payloadSize);
receiveLink.flow(1);
sendLink.send(bytes, 0, encodedSize);
sendLink.advance();
}
代码示例来源:origin: org.eclipse.hono/hono-client
/**
* Build a Proton message with a provided subject (serving as the operation that shall be invoked).
* The message can be extended by arbitrary application properties passed in.
* <p>
* To enable specific message properties that are not considered here, the method can be overridden by subclasses.
*
* @param subject The subject system property of the message.
* @param appProperties The map containing arbitrary application properties.
* Maybe null if no application properties are needed.
* @return The Proton message constructed from the provided parameters.
* @throws NullPointerException if the subject is {@code null}.
* @throws IllegalArgumentException if the application properties contain not AMQP 1.0 compatible values
* (see {@link AbstractHonoClient#setApplicationProperties(Message, Map)}
*/
private Message createMessage(final String subject, final Map<String, Object> appProperties) {
Objects.requireNonNull(subject);
final Message msg = ProtonHelper.message();
final String messageId = createMessageId();
AbstractHonoClient.setApplicationProperties(msg, appProperties);
msg.setReplyTo(replyToAddress);
msg.setMessageId(messageId);
msg.setSubject(subject);
return msg;
}
代码示例来源:origin: Azure/azure-service-bus-java
public CompletableFuture<Message> requestAysnc(Message requestMessage, TransactionContext transaction, Duration timeout)
{
this.throwIfClosed(null);
CompletableFuture<Message> responseFuture = new CompletableFuture<Message>();
RequestResponseWorkItem workItem = new RequestResponseWorkItem(requestMessage, transaction, responseFuture, timeout);
String requestId = "request:" + this.requestCounter.incrementAndGet();
requestMessage.setMessageId(requestId);
requestMessage.setReplyTo(this.replyTo);
this.pendingRequests.put(requestId, workItem);
workItem.setTimeoutTask(this.scheduleRequestTimeout(requestId, timeout));
TRACE_LOGGER.debug("Sending request with id:{}", requestId);
this.amqpSender.sendRequest(requestId, false);
// Check and recreate links if necessary
if(!((this.amqpSender.sendLink.getLocalState() == EndpointState.ACTIVE && this.amqpSender.sendLink.getRemoteState() == EndpointState.ACTIVE)
&& (this.amqpReceiver.receiveLink.getLocalState() == EndpointState.ACTIVE && this.amqpReceiver.receiveLink.getRemoteState() == EndpointState.ACTIVE)))
{
this.ensureUniqueLinkRecreation();
}
return responseFuture;
}
代码示例来源:origin: eclipse/hono
private static Message givenAValidMessageWithoutBody(final CredentialsConstants.CredentialsAction action) {
final Message msg = ProtonHelper.message();
msg.setMessageId("msg");
msg.setReplyTo("reply");
msg.setSubject(action.toString());
return msg;
}
}
代码示例来源:origin: eclipse/hono
private Message givenAMessageHavingProperties(final TenantConstants.TenantAction action) {
final Message msg = ProtonHelper.message();
msg.setMessageId("msg");
msg.setReplyTo("reply");
msg.setSubject(action.toString());
return msg;
}
}
代码示例来源:origin: eclipse/hono
/**
* Verifies that the endpoint rejects request messages for operations the client
* is not authorized to invoke.
*/
@Test
public void testHandleMessageRejectsUnauthorizedRequests() {
final Message msg = ProtonHelper.message();
msg.setSubject("unauthorized");
msg.setReplyTo(REPLY_RESOURCE.toString());
final ProtonConnection con = mock(ProtonConnection.class);
final ProtonDelivery delivery = mock(ProtonDelivery.class);
final AuthorizationService authService = mock(AuthorizationService.class);
when(authService.isAuthorized(any(HonoUser.class), any(ResourceIdentifier.class), anyString())).thenReturn(Future.succeededFuture(Boolean.FALSE));
final Future<Void> processingTracker = Future.future();
final RequestResponseEndpoint<ServiceConfigProperties> endpoint = getEndpoint(true, processingTracker);
endpoint.setAuthorizationService(authService);
endpoint.onLinkAttach(con, sender, REPLY_RESOURCE);
// WHEN a request for an operation is received that the client is not authorized to invoke
endpoint.handleMessage(con, receiver, resource, delivery, msg);
// THEN the the message is rejected
final ArgumentCaptor<DeliveryState> deliveryState = ArgumentCaptor.forClass(DeliveryState.class);
verify(delivery).disposition(deliveryState.capture(), booleanThat(is(Boolean.TRUE)));
assertThat(deliveryState.getValue(), instanceOf(Rejected.class));
verify(receiver, never()).close();
verify(authService).isAuthorized(Constants.PRINCIPAL_ANONYMOUS, resource, "unauthorized");
assertFalse(processingTracker.isComplete());
}
代码示例来源:origin: eclipse/hono
private static Message givenAMessageHavingProperties(final String deviceId, final String action) {
final Message msg = ProtonHelper.message();
msg.setMessageId("msg-id");
msg.setReplyTo("reply");
msg.setSubject(action);
if (deviceId != null) {
MessageHelper.addDeviceId(msg, deviceId);
}
return msg;
}
}
代码示例来源:origin: eclipse/hono
/**
* Verifies that the endpoint processes request messages for operations the client
* is authorized to invoke.
*/
@Test
public void testHandleMessageProcessesAuthorizedRequests() {
final Message msg = ProtonHelper.message();
msg.setSubject("get");
msg.setReplyTo(REPLY_RESOURCE.toString());
final ProtonConnection con = mock(ProtonConnection.class);
final ProtonDelivery delivery = mock(ProtonDelivery.class);
final AuthorizationService authService = mock(AuthorizationService.class);
when(authService.isAuthorized(any(HonoUser.class), any(ResourceIdentifier.class), anyString())).thenReturn(Future.succeededFuture(Boolean.TRUE));
final Future<Void> processingTracker = Future.future();
final RequestResponseEndpoint<ServiceConfigProperties> endpoint = getEndpoint(true, processingTracker);
endpoint.setAuthorizationService(authService);
endpoint.onLinkAttach(con, sender, REPLY_RESOURCE);
// WHEN a request for an operation is received that the client is authorized to invoke
endpoint.handleMessage(con, receiver, resource, delivery, msg);
// THEN then the message gets processed
final ArgumentCaptor<DeliveryState> deliveryState = ArgumentCaptor.forClass(DeliveryState.class);
verify(delivery).disposition(deliveryState.capture(), booleanThat(is(Boolean.TRUE)));
assertThat(deliveryState.getValue(), instanceOf(Accepted.class));
verify(receiver, never()).close();
verify(authService).isAuthorized(Constants.PRINCIPAL_ANONYMOUS, resource, "get");
assertTrue(processingTracker.isComplete());
}
代码示例来源:origin: eclipse/hono
/**
* Verifies that the filter detects a missing subject.
*/
@Test
public void testVerifyDetectsMissingSubject() {
// GIVEN a request message without a subject
final Message msg = ProtonHelper.message();
msg.setMessageId("msg");
msg.setReplyTo("reply");
// WHEN receiving the message via a link with any tenant
final ResourceIdentifier linkTarget = getResourceIdentifier(DEFAULT_TENANT);
// THEN message validation fails
assertFalse(TenantMessageFilter.verify(linkTarget, msg));
}
代码示例来源:origin: eclipse/hono
/**
* Verifies that a message that does not contain a message-id nor correlation-id
* does not pass the filter.
*/
@Test
public void testVerifyFailsForMissingCorrelationId() {
// GIVEN a message with an unsupported subject
final Message msg = ProtonHelper.message();
msg.setReplyTo("reply");
msg.setBody(new AmqpValue(BILLIE_HASHED_PASSWORD));
msg.setContentType("application/json");
// WHEN receiving the message via a link with any tenant
final boolean filterResult = CredentialsMessageFilter.verify(target, msg);
// THEN message validation fails
assertFalse(filterResult);
}
代码示例来源:origin: io.vertx/vertx-amqp-bridge
protonMsg.setReplyTo(destinationName);
内容来源于网络,如有侵权,请联系作者删除!