com.alibaba.rocketmq.common.message.Message.getKeys()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(4.7k)|赞(0)|评价(0)|浏览(602)

本文整理了Java中com.alibaba.rocketmq.common.message.Message.getKeys()方法的一些代码示例,展示了Message.getKeys()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.getKeys()方法的具体详情如下:
包路径:com.alibaba.rocketmq.common.message.Message
类名称:Message
方法名:getKeys

Message.getKeys介绍

暂无

代码示例

代码示例来源:origin: coffeewar/enode-master

  1. @Override
  2. public void onException(Throwable e) {
  3. //todo 对于发送失败的数据,如何保存,保证所有轨迹数据都记录下来
  4. clientlog.info("send trace data failed ,the msgidSet is" + message.getKeys());
  5. }
  6. }, 5000);

代码示例来源:origin: coffeewar/enode-master

  1. @Override
  2. public void onSuccess(SendResult sendResult) {
  3. logger.info("ENode message async send success, keys:{}, sendResult: {}, routingKey: {}, messageId: {}, version: {}", message.getKeys(), sendResult, routingKey, messageId, version);
  4. promise.complete(AsyncTaskResult.Success);
  5. }

代码示例来源:origin: coffeewar/enode-master

  1. @Override
  2. public void onException(Throwable ex) {
  3. logger.error("ENode message async send failed, keys:{}, routingKey: {}, messageId: {}, version: {}", message.getKeys(), routingKey, messageId, version);
  4. promise.complete(new AsyncTaskResult(AsyncTaskStatus.IOException, ex.getMessage()));
  5. }
  6. });

代码示例来源:origin: coffeewar/enode-master

  1. public CompletableFuture<AsyncTaskResult> sendMessageAsync(Producer producer, Message message, String routingKey, String messageId, String version) {
  2. CompletableFuture<AsyncTaskResult> promise = new CompletableFuture<>();
  3. logger.info("============= send rocketmq message,keys:{},messageid: {},routingKey: {}", message.getKeys(), messageId, routingKey);
  4. try {
  5. producer.send(message, this::messageQueueSelect, routingKey, new SendCallback() {
  6. @Override
  7. public void onSuccess(SendResult sendResult) {
  8. logger.info("ENode message async send success, keys:{}, sendResult: {}, routingKey: {}, messageId: {}, version: {}", message.getKeys(), sendResult, routingKey, messageId, version);
  9. promise.complete(AsyncTaskResult.Success);
  10. }
  11. @Override
  12. public void onException(Throwable ex) {
  13. logger.error("ENode message async send failed, keys:{}, routingKey: {}, messageId: {}, version: {}", message.getKeys(), routingKey, messageId, version);
  14. promise.complete(new AsyncTaskResult(AsyncTaskStatus.IOException, ex.getMessage()));
  15. }
  16. });
  17. } catch (Exception ex) {
  18. logger.error(String.format("ENode message async send has exception,keys:%s,message: %s, routingKey: %s, messageId: %s, version: %s", message.getKeys(), message, routingKey, messageId, version), ex);
  19. promise.complete(new AsyncTaskResult(AsyncTaskStatus.IOException, ex.getMessage()));
  20. }
  21. return promise;
  22. }

代码示例来源:origin: coffeewar/enode-master

  1. /**
  2. * 发送数据的接口
  3. *
  4. * @param keySet 本批次包含的keyset
  5. * @param data 本批次的轨迹数据
  6. */
  7. public void sendTraceDataByMQ(Set<String> keySet, String data) {
  8. String topic = OnsTraceConstants.traceTopic + currentRegionId;
  9. final Message message = new Message(topic, data.getBytes());
  10. message.setKeys(keySet);
  11. try {
  12. traceProducer.send(message, new SendCallback() {
  13. @Override
  14. public void onSuccess(SendResult sendResult) {
  15. }
  16. @Override
  17. public void onException(Throwable e) {
  18. //todo 对于发送失败的数据,如何保存,保证所有轨迹数据都记录下来
  19. clientlog.info("send trace data failed ,the msgidSet is" + message.getKeys());
  20. }
  21. }, 5000);
  22. } catch (Exception e) {
  23. clientlog.info("send trace data failed ,the msgidSet is" + message.getKeys());
  24. }
  25. }

代码示例来源:origin: beston123/Tarzan

  1. LOGGER.error("准备消息 '{}' 失败, {}", message.getKeys(), result.getErrorMsg());
  2. return;
  3. LOGGER.info("准备消息 '" + message.getKeys() + "' 成功, 事务Id=" + tid);
  4. LOGGER.info("本地事务处理成功,提交消息'" + message.getKeys() + "'.");
  5. testMessageNotifier.commitMessage(tid, message);
  6. }else {
  7. LOGGER.info("本地事务处理失败,回滚消息'" + message.getKeys() + "'.");
  8. testMessageNotifier.rollbackMessage(tid);

代码示例来源:origin: beston123/Tarzan

  1. private RocketMQBody buildMQBody(Message message){
  2. RocketMQBody mqBody = new RocketMQBody();
  3. mqBody.setProducerGroup(getGroupId());
  4. mqBody.setTopic(getTopic());
  5. mqBody.setTags(message.getTags());
  6. mqBody.setMessageKey(message.getKeys());
  7. mqBody.setMessageBody(message.getBody());
  8. return mqBody;
  9. }

代码示例来源:origin: coffeewar/enode-master

  1. @Override
  2. public void sendMessageBefore(SendMessageContext context) {
  3. // 如果是消息轨迹本身的发送链路,则不需要再记录
  4. if (context == null || context.getMessage().getTopic().startsWith(MixAll.SYSTEM_TOPIC_PREFIX)) {
  5. return;
  6. }
  7. OnsTraceContext onsContext = new OnsTraceContext();
  8. onsContext.setTraceBeans(new ArrayList<OnsTraceBean>(1));
  9. context.setMqTraceContext(onsContext);
  10. onsContext.setTraceType(OnsTraceType.Pub);
  11. onsContext.setGroupName(context.getProducerGroup());
  12. OnsTraceBean traceBean = new OnsTraceBean();
  13. traceBean.setTopic(context.getMessage().getTopic());
  14. traceBean.setTags(context.getMessage().getTags());
  15. traceBean.setKeys(context.getMessage().getKeys());
  16. traceBean.setStoreHost(context.getBrokerAddr());
  17. traceBean.setBodyLength(context.getMessage().getBody().length);
  18. traceBean.setMsgType(context.getMsgType());
  19. onsContext.getTraceBeans().add(traceBean);
  20. }

相关文章