org.apache.rocketmq.common.message.Message.getBody()方法的使用及代码示例

x33g5p2x  于2022-01-07 转载在 其他  
字(8.9k)|赞(0)|评价(0)|浏览(271)

本文整理了Java中org.apache.rocketmq.common.message.Message.getBody()方法的一些代码示例,展示了Message.getBody()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.getBody()方法的具体详情如下:
包路径:org.apache.rocketmq.common.message.Message
类名称:Message
方法名:getBody

Message.getBody介绍

暂无

代码示例

代码示例来源:origin: spring-cloud-incubator/spring-cloud-alibaba

  1. @Override
  2. public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
  3. if ("1".equals(msg.getUserProperty("test"))) {
  4. System.out.println(new String(msg.getBody()) + " rollback");
  5. return LocalTransactionState.ROLLBACK_MESSAGE;
  6. }
  7. System.out.println(new String(msg.getBody()) + " commit");
  8. return LocalTransactionState.COMMIT_MESSAGE;
  9. }
  10. }

代码示例来源:origin: apache/rocketmq-externals

  1. @Override
  2. public void onException(Throwable e) {
  3. latch.countDown();
  4. errorNum.incrementAndGet();
  5. try {
  6. log.error("Message publish failed,body=" + new String(message.getBody(), "UTF-8"), e);
  7. } catch (UnsupportedEncodingException e1) {
  8. log.error("Encoding error", e);
  9. }
  10. }
  11. }

代码示例来源:origin: apache/rocketmq-externals

  1. @Override
  2. public void onSuccess(SendResult sendResult) {
  3. latch.countDown();
  4. if (log.isDebugEnabled()) {
  5. try {
  6. log.debug("Sent event,body={},sendResult={}", new String(message.getBody(), "UTF-8"), sendResult);
  7. } catch (UnsupportedEncodingException e) {
  8. log.error("Encoding error", e);
  9. }
  10. }
  11. }

代码示例来源:origin: apache/rocketmq

  1. public void asyncSend(Object msg) {
  2. Message metaqMsg = (Message) msg;
  3. try {
  4. producer.send(metaqMsg, sendCallback);
  5. msgBodys.addData(new String(metaqMsg.getBody()));
  6. originMsgs.addData(msg);
  7. } catch (Exception e) {
  8. e.printStackTrace();
  9. }
  10. }

代码示例来源:origin: apache/rocketmq

  1. public void asyncSend(Object msg, MessageQueueSelector selector, Object arg) {
  2. Message metaqMsg = (Message) msg;
  3. try {
  4. producer.send(metaqMsg, selector, arg, sendCallback);
  5. msgBodys.addData(new String(metaqMsg.getBody()));
  6. originMsgs.addData(msg);
  7. } catch (Exception e) {
  8. e.printStackTrace();
  9. }
  10. }

代码示例来源:origin: apache/rocketmq

  1. public void asyncSend(Object msg, MessageQueue mq) {
  2. Message metaqMsg = (Message) msg;
  3. try {
  4. producer.send(metaqMsg, mq, sendCallback);
  5. msgBodys.addData(new String(metaqMsg.getBody()));
  6. originMsgs.addData(msg);
  7. } catch (Exception e) {
  8. e.printStackTrace();
  9. }
  10. }

代码示例来源:origin: apache/rocketmq

  1. public void sendOneWay(Object msg, MessageQueueSelector selector, Object arg) {
  2. Message metaqMsg = (Message) msg;
  3. try {
  4. producer.sendOneway(metaqMsg, selector, arg);
  5. msgBodys.addData(new String(metaqMsg.getBody()));
  6. originMsgs.addData(msg);
  7. } catch (Exception e) {
  8. e.printStackTrace();
  9. }
  10. }

代码示例来源:origin: apache/rocketmq

  1. public void sendOneWay(Object msg) {
  2. Message metaqMsg = (Message) msg;
  3. try {
  4. producer.sendOneway(metaqMsg);
  5. msgBodys.addData(new String(metaqMsg.getBody()));
  6. originMsgs.addData(msg);
  7. } catch (Exception e) {
  8. e.printStackTrace();
  9. }
  10. }

代码示例来源:origin: apache/rocketmq

  1. public void sendOneWay(Object msg, MessageQueue mq) {
  2. Message metaqMsg = (Message) msg;
  3. try {
  4. producer.sendOneway(metaqMsg, mq);
  5. msgBodys.addData(new String(metaqMsg.getBody()));
  6. originMsgs.addData(msg);
  7. } catch (Exception e) {
  8. e.printStackTrace();
  9. }
  10. }

代码示例来源:origin: apache/rocketmq

  1. for (; nextIndex < messages.size(); nextIndex++) {
  2. Message message = messages.get(nextIndex);
  3. int tmpSize = message.getTopic().length() + message.getBody().length;
  4. Map<String, String> properties = message.getProperties();
  5. for (Map.Entry<String, String> entry : properties.entrySet()) {

代码示例来源:origin: apache/rocketmq

  1. public static byte[] encodeMessage(Message message) {
  2. byte[] body = message.getBody();
  3. int bodyLen = body.length;
  4. String properties = messageProperties2String(message.getProperties());

代码示例来源:origin: apache/storm

  1. /**
  2. * Generate Storm tuple values by Message and Scheme.
  3. * @param msg RocketMQ Message
  4. * @param scheme Scheme for deserializing
  5. * @return tuple values
  6. */
  7. public static List<Object> generateTuples(Message msg, Scheme scheme) {
  8. List<Object> tup;
  9. String rawKey = msg.getKeys();
  10. ByteBuffer body = ByteBuffer.wrap(msg.getBody());
  11. if (rawKey != null && scheme instanceof KeyValueScheme) {
  12. ByteBuffer key = ByteBuffer.wrap(rawKey.getBytes(StandardCharsets.UTF_8));
  13. tup = ((KeyValueScheme)scheme).deserializeKeyAndValue(key, body);
  14. } else {
  15. tup = scheme.deserialize(body);
  16. }
  17. return tup;
  18. }
  19. }

代码示例来源:origin: apache/rocketmq

  1. /**
  2. * Validate message
  3. */
  4. public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
  5. throws MQClientException {
  6. if (null == msg) {
  7. throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
  8. }
  9. // topic
  10. Validators.checkTopic(msg.getTopic());
  11. // body
  12. if (null == msg.getBody()) {
  13. throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
  14. }
  15. if (0 == msg.getBody().length) {
  16. throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
  17. }
  18. if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
  19. throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
  20. "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
  21. }
  22. }

代码示例来源:origin: apache/rocketmq

  1. public ResultWrapper send(Object msg, Object orderKey) {
  2. org.apache.rocketmq.client.producer.SendResult metaqResult = null;
  3. Message message = (Message) msg;
  4. try {
  5. long start = System.currentTimeMillis();
  6. metaqResult = producer.send(message);
  7. this.msgRTs.addData(System.currentTimeMillis() - start);
  8. if (isDebug) {
  9. logger.info(metaqResult);
  10. }
  11. sendResult.setMsgId(metaqResult.getMsgId());
  12. sendResult.setSendResult(metaqResult.getSendStatus().equals(SendStatus.SEND_OK));
  13. sendResult.setBrokerIp(metaqResult.getMessageQueue().getBrokerName());
  14. msgBodys.addData(new String(message.getBody()));
  15. originMsgs.addData(msg);
  16. originMsgIndex.put(new String(message.getBody()), metaqResult);
  17. } catch (Exception e) {
  18. if (isDebug) {
  19. e.printStackTrace();
  20. }
  21. sendResult.setSendResult(false);
  22. sendResult.setSendException(e);
  23. errorMsgs.addData(msg);
  24. }
  25. return sendResult;
  26. }

代码示例来源:origin: apache/rocketmq

  1. public ResultWrapper sendMQ(Message msg, MessageQueue mq) {
  2. org.apache.rocketmq.client.producer.SendResult metaqResult = null;
  3. try {
  4. long start = System.currentTimeMillis();
  5. metaqResult = producer.send(msg, mq);
  6. this.msgRTs.addData(System.currentTimeMillis() - start);
  7. if (isDebug) {
  8. logger.info(metaqResult);
  9. }
  10. sendResult.setMsgId(metaqResult.getMsgId());
  11. sendResult.setSendResult(metaqResult.getSendStatus().equals(SendStatus.SEND_OK));
  12. sendResult.setBrokerIp(metaqResult.getMessageQueue().getBrokerName());
  13. msgBodys.addData(new String(msg.getBody()));
  14. originMsgs.addData(msg);
  15. originMsgIndex.put(new String(msg.getBody()), metaqResult);
  16. } catch (Exception e) {
  17. if (isDebug) {
  18. e.printStackTrace();
  19. }
  20. sendResult.setSendResult(false);
  21. sendResult.setSendException(e);
  22. errorMsgs.addData(msg);
  23. }
  24. return sendResult;
  25. }

代码示例来源:origin: apache/rocketmq

  1. public static Message cloneMessage(final Message msg) {
  2. Message newMsg = new Message(msg.getTopic(), msg.getBody());
  3. newMsg.setFlag(msg.getFlag());
  4. newMsg.setProperties(msg.getProperties());
  5. return newMsg;
  6. }

代码示例来源:origin: apache/rocketmq

  1. request.setBody(msg.getBody());

代码示例来源:origin: apache/rocketmq

  1. private boolean tryToCompressMessage(final Message msg) {
  2. if (msg instanceof MessageBatch) {
  3. //batch dose not support compressing right now
  4. return false;
  5. }
  6. byte[] body = msg.getBody();
  7. if (body != null) {
  8. if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
  9. try {
  10. byte[] data = UtilAll.compress(body, zipCompressLevel);
  11. if (data != null) {
  12. msg.setBody(data);
  13. return true;
  14. }
  15. } catch (IOException e) {
  16. log.error("tryToCompressMessage exception", e);
  17. log.warn(msg.toString());
  18. }
  19. }
  20. }
  21. return false;
  22. }

代码示例来源:origin: apache/rocketmq

  1. @Override
  2. public void sendMessageBefore(SendMessageContext context) {
  3. //if it is message trace data,then it doesn't recorded
  4. if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {
  5. return;
  6. }
  7. //build the context content of TuxeTraceContext
  8. TraceContext tuxeContext = new TraceContext();
  9. tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
  10. context.setMqTraceContext(tuxeContext);
  11. tuxeContext.setTraceType(TraceType.Pub);
  12. tuxeContext.setGroupName(context.getProducerGroup());
  13. //build the data bean object of message trace
  14. TraceBean traceBean = new TraceBean();
  15. traceBean.setTopic(context.getMessage().getTopic());
  16. traceBean.setTags(context.getMessage().getTags());
  17. traceBean.setKeys(context.getMessage().getKeys());
  18. traceBean.setStoreHost(context.getBrokerAddr());
  19. traceBean.setBodyLength(context.getMessage().getBody().length);
  20. traceBean.setMsgType(context.getMsgType());
  21. tuxeContext.getTraceBeans().add(traceBean);
  22. }

代码示例来源:origin: apache/rocketmq

  1. private MessageExtBrokerInner makeOpMessageInner(Message message, MessageQueue messageQueue) {
  2. MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
  3. msgInner.setTopic(message.getTopic());
  4. msgInner.setBody(message.getBody());
  5. msgInner.setQueueId(messageQueue.getQueueId());
  6. msgInner.setTags(message.getTags());
  7. msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(msgInner.getTags()));
  8. msgInner.setSysFlag(0);
  9. MessageAccessor.setProperties(msgInner, message.getProperties());
  10. msgInner.setPropertiesString(MessageDecoder.messageProperties2String(message.getProperties()));
  11. msgInner.setBornTimestamp(System.currentTimeMillis());
  12. msgInner.setBornHost(this.storeHost);
  13. msgInner.setStoreHost(this.storeHost);
  14. msgInner.setWaitStoreMsgOK(false);
  15. MessageClientIDSetter.setUniqID(msgInner);
  16. return msgInner;
  17. }

相关文章