org.apache.rocketmq.common.message.Message.getKeys()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(5.6k)|赞(0)|评价(0)|浏览(226)

本文整理了Java中org.apache.rocketmq.common.message.Message.getKeys()方法的一些代码示例,展示了Message.getKeys()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.getKeys()方法的具体详情如下:
包路径:org.apache.rocketmq.common.message.Message
类名称:Message
方法名:getKeys

Message.getKeys介绍

暂无

代码示例

代码示例来源:origin: apache/storm

  1. /**
  2. * Generate Storm tuple values by Message and Scheme.
  3. * @param msg RocketMQ Message
  4. * @param scheme Scheme for deserializing
  5. * @return tuple values
  6. */
  7. public static List<Object> generateTuples(Message msg, Scheme scheme) {
  8. List<Object> tup;
  9. String rawKey = msg.getKeys();
  10. ByteBuffer body = ByteBuffer.wrap(msg.getBody());
  11. if (rawKey != null && scheme instanceof KeyValueScheme) {
  12. ByteBuffer key = ByteBuffer.wrap(rawKey.getBytes(StandardCharsets.UTF_8));
  13. tup = ((KeyValueScheme)scheme).deserializeKeyAndValue(key, body);
  14. } else {
  15. tup = scheme.deserialize(body);
  16. }
  17. return tup;
  18. }
  19. }

代码示例来源:origin: apache/rocketmq

  1. @Override
  2. public void sendMessageBefore(SendMessageContext context) {
  3. //if it is message trace data,then it doesn't recorded
  4. if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {
  5. return;
  6. }
  7. //build the context content of TuxeTraceContext
  8. TraceContext tuxeContext = new TraceContext();
  9. tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
  10. context.setMqTraceContext(tuxeContext);
  11. tuxeContext.setTraceType(TraceType.Pub);
  12. tuxeContext.setGroupName(context.getProducerGroup());
  13. //build the data bean object of message trace
  14. TraceBean traceBean = new TraceBean();
  15. traceBean.setTopic(context.getMessage().getTopic());
  16. traceBean.setTags(context.getMessage().getTags());
  17. traceBean.setKeys(context.getMessage().getKeys());
  18. traceBean.setStoreHost(context.getBrokerAddr());
  19. traceBean.setBodyLength(context.getMessage().getBody().length);
  20. traceBean.setMsgType(context.getMsgType());
  21. tuxeContext.getTraceBeans().add(traceBean);
  22. }

代码示例来源:origin: apache/rocketmq

  1. Assert.assertEquals(message.getTopic(), messageByOffset.getTopic());
  2. Assert.assertEquals(message.getKeys(), messageByOffset.getKeys());
  3. Assert.assertEquals(message.getKeys(), messageByMsgId.getKeys());

代码示例来源:origin: javahongxi/whatsmars

  1. private SendResult sendOrderly(Message message) throws Exception {
  2. SendResult sendResult = this.defaultMQProducer.send(message,
  3. new MessageQueueSelector() {
  4. @Override
  5. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
  6. long id = NumberUtils.toLong(String.valueOf(arg));
  7. int index = (int) (id % mqs.size());
  8. return mqs.get(index);
  9. }
  10. }, message.getKeys());
  11. log.debug("send result: {}", sendResult);
  12. return sendResult;
  13. }

代码示例来源:origin: javahongxi/whatsmars

  1. private static SendResult sendOrderly(String producerGroup, Message message, int sendMsgTimeout) throws Exception {
  2. return getProducer(producerGroup, sendMsgTimeout).send(message,
  3. new MessageQueueSelector() {
  4. @Override
  5. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
  6. long id = NumberUtils.toLong(String.valueOf(arg));
  7. int index = (int) (id % mqs.size());
  8. return mqs.get(index);
  9. }
  10. }, message.getKeys());
  11. }

代码示例来源:origin: maihaoche/rocketmq-spring-boot-starter

  1. @Override
  2. public void onException(Throwable e) {
  3. //todo 对于发送失败的数据,如何保存,保证所有轨迹数据都记录下来
  4. clientlog.info("send trace data failed ,the msgidSet is"+message.getKeys());
  5. }
  6. }, 5000);

代码示例来源:origin: didi/DDMQ

  1. Assert.assertEquals(message.getTopic(), messageByOffset.getTopic());
  2. Assert.assertEquals(message.getKeys(), messageByOffset.getKeys());
  3. Assert.assertEquals(message.getKeys(), messageByMsgId.getKeys());

代码示例来源:origin: maihaoche/rocketmq-spring-boot-starter

  1. /**
  2. * 发送数据的接口
  3. *
  4. * @param keySet
  5. * 本批次包含的keyset
  6. * @param data
  7. * 本批次的轨迹数据
  8. */
  9. public void sendTraceDataByMQ(Set<String> keySet, String data) {
  10. String topic = OnsTraceConstants.traceTopic;
  11. final Message message = new Message(topic, data.getBytes());
  12. message.setKeys(keySet);
  13. try {
  14. traceProducer.send(message, new SendCallback() {
  15. @Override
  16. public void onSuccess(SendResult sendResult) {
  17. }
  18. @Override
  19. public void onException(Throwable e) {
  20. //todo 对于发送失败的数据,如何保存,保证所有轨迹数据都记录下来
  21. clientlog.info("send trace data failed ,the msgidSet is"+message.getKeys());
  22. }
  23. }, 5000);
  24. }
  25. catch (Exception e) {
  26. clientlog.info("send trace data failed ,the msgidSet is"+message.getKeys());
  27. }
  28. }

代码示例来源:origin: org.apache.rocketmq/rocketmq-client

  1. @Override
  2. public void sendMessageBefore(SendMessageContext context) {
  3. //if it is message trace data,then it doesn't recorded
  4. if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {
  5. return;
  6. }
  7. //build the context content of TuxeTraceContext
  8. TraceContext tuxeContext = new TraceContext();
  9. tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
  10. context.setMqTraceContext(tuxeContext);
  11. tuxeContext.setTraceType(TraceType.Pub);
  12. tuxeContext.setGroupName(context.getProducerGroup());
  13. //build the data bean object of message trace
  14. TraceBean traceBean = new TraceBean();
  15. traceBean.setTopic(context.getMessage().getTopic());
  16. traceBean.setTags(context.getMessage().getTags());
  17. traceBean.setKeys(context.getMessage().getKeys());
  18. traceBean.setStoreHost(context.getBrokerAddr());
  19. traceBean.setBodyLength(context.getMessage().getBody().length);
  20. traceBean.setMsgType(context.getMsgType());
  21. tuxeContext.getTraceBeans().add(traceBean);
  22. }

代码示例来源:origin: org.apache.rocketmq/rocketmq-spring-boot

  1. public static org.springframework.messaging.Message convertToSpringMessage(
  2. org.apache.rocketmq.common.message.Message message) {
  3. org.springframework.messaging.Message retMessage =
  4. MessageBuilder.withPayload(message.getBody()).
  5. setHeader(RocketMQHeaders.KEYS, message.getKeys()).
  6. setHeader(RocketMQHeaders.TAGS, message.getTags()).
  7. setHeader(RocketMQHeaders.TOPIC, message.getTopic()).
  8. setHeader(RocketMQHeaders.FLAG, message.getFlag()).
  9. setHeader(RocketMQHeaders.TRANSACTION_ID, message.getTransactionId()).
  10. setHeader(RocketMQHeaders.PROPERTIES, message.getProperties()).
  11. build();
  12. return retMessage;
  13. }

相关文章