org.jgroups.Message.setSrc()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(11.3k)|赞(0)|评价(0)|浏览(178)

本文整理了Java中org.jgroups.Message.setSrc()方法的一些代码示例,展示了Message.setSrc()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.setSrc()方法的具体详情如下:
包路径:org.jgroups.Message
类名称:Message
方法名:setSrc

Message.setSrc介绍

暂无

代码示例

代码示例来源:origin: wildfly/wildfly

  1. public Message makeReply() {
  2. Message retval=new Message(src_addr);
  3. if(dest_addr != null)
  4. retval.setSrc(dest_addr);
  5. return retval;
  6. }

代码示例来源:origin: wildfly/wildfly

  1. /**
  2. * If the sender is null, set our own address. We cannot just go ahead and set the address
  3. * anyway, as we might be sending a message on behalf of someone else ! E.g. in case of
  4. * retransmission, when the original sender has crashed, or in a FLUSH protocol when we
  5. * have to return all unstable messages with the FLUSH_OK response.
  6. */
  7. protected void setSourceAddress(Message msg) {
  8. if(msg.getSrc() == null && local_addr != null) // should already be set by TP.ProtocolAdapter in shared transport case !
  9. msg.setSrc(local_addr);
  10. }

代码示例来源:origin: wildfly/wildfly

  1. MessageInfo(MessageID messageID, Message message, long sequenceNumber) {
  2. if (messageID == null) {
  3. throw new NullPointerException("Message ID can't be null");
  4. }
  5. this.messageID = messageID;
  6. this.message = message.copy(true, true);
  7. this.sequenceNumber = sequenceNumber;
  8. this.readyToDeliver = false;
  9. this.message.setSrc(messageID.getAddress());
  10. }

代码示例来源:origin: wildfly/wildfly

  1. public Object down(Message msg) {
  2. if(msg.getDest() != null || msg.isFlagSet(Message.Flag.NO_TOTAL_ORDER) || msg.isFlagSet(Message.Flag.OOB))
  3. return down_prot.down(msg);
  4. if(msg.getSrc() == null)
  5. msg.setSrc(local_addr);
  6. try {
  7. fwd_queue.put(msg);
  8. if(seqno_reqs.getAndIncrement() == 0) {
  9. int num_reqs=seqno_reqs.get();
  10. sendSeqnoRequest(num_reqs);
  11. }
  12. }
  13. catch(InterruptedException e) {
  14. if(!running)
  15. return null;
  16. throw new RuntimeException(e);
  17. }
  18. return null; // don't pass down
  19. }

代码示例来源:origin: wildfly/wildfly

  1. public static List<Message> readMessageList(DataInput in, short transport_id) throws Exception {
  2. List<Message> list=new LinkedList<>();
  3. Address dest=Util.readAddress(in);
  4. Address src=Util.readAddress(in);
  5. // AsciiString cluster_name=Bits.readAsciiString(in); // not used here
  6. short length=in.readShort();
  7. byte[] cluster_name=length >= 0? new byte[length] : null;
  8. if(cluster_name != null)
  9. in.readFully(cluster_name, 0, cluster_name.length);
  10. int len=in.readInt();
  11. for(int i=0; i < len; i++) {
  12. Message msg=new Message(false);
  13. msg.readFrom(in);
  14. msg.setDest(dest);
  15. if(msg.getSrc() == null)
  16. msg.setSrc(src);
  17. // Now add a TpHeader back on, was not marshalled. Every message references the *same* TpHeader, saving memory !
  18. msg.putHeader(transport_id, new TpHeader(cluster_name));
  19. list.add(msg);
  20. }
  21. return list;
  22. }

代码示例来源:origin: wildfly/wildfly

  1. public Object down(Message msg) {
  2. if(msg.getSrc() == null)
  3. msg.setSrc(local_addr);
  4. Buffer serialized_msg=Util.streamableToBuffer(msg);
  5. // exclude existing headers, they will be seen again when we unmarshal the message at the receiver
  6. Message tmp=msg.copy(false, false).setBuffer(serialized_msg);
  7. return down_prot.down(tmp);
  8. }

代码示例来源:origin: wildfly/wildfly

  1. @Override
  2. protected void sendMcastDiscoveryRequest(Message msg) {
  3. try {
  4. if(msg.getSrc() == null)
  5. msg.setSrc(local_addr);
  6. ByteArrayDataOutputStream out=new ByteArrayDataOutputStream((int)msg.size());
  7. msg.writeTo(out);
  8. for(int i=bind_port; i <= bind_port+port_range; i++) {
  9. DatagramPacket packet=new DatagramPacket(out.buffer(), 0, out.position(), dest_addr, i);
  10. sock.send(packet);
  11. }
  12. }
  13. catch(Exception ex) {
  14. log.error(Util.getMessage("FailedSendingDiscoveryRequest"), ex);
  15. }
  16. }

代码示例来源:origin: wildfly/wildfly

  1. protected static Message deserialize(Message msg) throws Exception {
  2. try {
  3. Message ret=Util.streamableFromBuffer(Message::new, msg.getRawBuffer(), msg.getOffset(), msg.getLength());
  4. if(ret.getDest() == null)
  5. ret.setDest(msg.getDest());
  6. if(ret.getSrc() == null)
  7. ret.setSrc(msg.getSrc());
  8. return ret;
  9. }
  10. catch(Exception e) {
  11. throw new Exception(String.format("failed deserialize message from %s", msg.src()), e);
  12. }
  13. }
  14. }

代码示例来源:origin: wildfly/wildfly

  1. public Object up(Message msg) {
  2. Frag3Header hdr=msg.getHeader(this.id);
  3. if(hdr != null) { // needs to be defragmented
  4. Message assembled_msg=unfragment(msg, hdr);
  5. if(assembled_msg != null) {
  6. assembled_msg.setSrc(msg.getSrc()); // needed ? YES, because fragments have a null src !!
  7. up_prot.up(assembled_msg);
  8. avg_size_up.add(assembled_msg.length());
  9. }
  10. return null;
  11. }
  12. return up_prot.up(msg);
  13. }

代码示例来源:origin: apache/geode

  1. private Message createJGMessage(byte[] msgBytes, Address src, Address dest, short version) {
  2. Message msg = new Message();
  3. msg.setDest(dest);
  4. msg.setSrc(src);
  5. msg.setObject(msgBytes);
  6. msg.setFlag(Message.Flag.NO_RELIABILITY);
  7. msg.setFlag(Message.Flag.NO_FC);
  8. msg.setFlag(Message.Flag.DONT_BUNDLE);
  9. msg.setFlag(Message.Flag.OOB);
  10. return msg;
  11. }

代码示例来源:origin: wildfly/wildfly

  1. public Object up(Message msg) {
  2. FragHeader hdr=msg.getHeader(this.id);
  3. if(hdr != null) { // needs to be defragmented
  4. Message assembled_msg=unfragment(msg, hdr);
  5. if(assembled_msg != null) {
  6. assembled_msg.setSrc(msg.getSrc()); // needed ? YES, because fragments have a null src !!
  7. up_prot.up(assembled_msg);
  8. avg_size_up.add(assembled_msg.length());
  9. }
  10. return null;
  11. }
  12. return up_prot.up(msg);
  13. }

代码示例来源:origin: wildfly/wildfly

  1. public Object down(Message msg) {
  2. if(msg.getDest() != null)
  3. return down_prot.down(msg); // only process multicast messages
  4. if(next == null) // view hasn'<></> been received yet, use the normal transport
  5. return down_prot.down(msg);
  6. // we need to copy the message, as we cannot do a msg.setSrc(next): the next retransmission
  7. // would use 'next' as destination !
  8. Message copy=msg.copy(true);
  9. short hdr_ttl=(short)(loopback? view_size -1 : view_size);
  10. DaisyHeader hdr=new DaisyHeader(hdr_ttl);
  11. copy.setDest(next);
  12. copy.putHeader(getId(), hdr);
  13. msgs_sent++;
  14. if(loopback) {
  15. if(log.isTraceEnabled()) log.trace(new StringBuilder("looping back message ").append(msg));
  16. if(msg.getSrc() == null)
  17. msg.setSrc(local_addr);
  18. default_pool.execute(() -> up_prot.up(msg));
  19. }
  20. return down_prot.down(copy);
  21. }

代码示例来源:origin: wildfly/wildfly

  1. /**
  2. * Sends a message msg to the requester. We have to wrap the original message into a retransmit message, as we need
  3. * to preserve the original message's properties, such as src, headers etc.
  4. * @param dest
  5. * @param msg
  6. */
  7. protected void sendXmitRsp(Address dest, Message msg) {
  8. if(msg == null)
  9. return;
  10. if(stats)
  11. xmit_rsps_sent.increment();
  12. if(msg.getSrc() == null)
  13. msg.setSrc(local_addr);
  14. if(use_mcast_xmit) { // we simply send the original multicast message
  15. down_prot.down(msg);
  16. return;
  17. }
  18. Message xmit_msg=msg.copy(true, true).dest(dest); // copy payload and headers
  19. NakAckHeader2 hdr=xmit_msg.getHeader(id);
  20. NakAckHeader2 newhdr=hdr.copy();
  21. newhdr.type=NakAckHeader2.XMIT_RSP; // change the type in the copy from MSG --> XMIT_RSP
  22. xmit_msg.putHeader(id, newhdr);
  23. down_prot.down(xmit_msg);
  24. }

代码示例来源:origin: wildfly/wildfly

  1. public Object down(Message msg) {
  2. if(msg.getDest() != null || msg.isFlagSet(Message.Flag.NO_TOTAL_ORDER) || msg.isFlagSet(Message.Flag.OOB))
  3. return down_prot.down(msg);
  4. if(msg.getSrc() == null)
  5. msg.setSrc(local_addr);
  6. if(flushing)
  7. block();
  8. // A seqno is not used to establish ordering, but only to weed out duplicates; next_seqno doesn't need
  9. // to increase monotonically, but only to be unique (https://issues.jboss.org/browse/JGRP-1461) !
  10. long next_seqno=seqno.incrementAndGet();
  11. in_flight_sends.incrementAndGet();
  12. try {
  13. SequencerHeader hdr=new SequencerHeader(is_coord? SequencerHeader.BCAST : SequencerHeader.WRAPPED_BCAST, next_seqno);
  14. msg.putHeader(this.id, hdr);
  15. if(log.isTraceEnabled())
  16. log.trace("[" + local_addr + "]: forwarding " + local_addr + "::" + seqno + " to coord " + coord);
  17. // We always forward messages to the coordinator, even if we're the coordinator. Having the coord
  18. // send its messages directly led to starvation of messages from other members. MPerf perf went up
  19. // from 20MB/sec/node to 50MB/sec/node with this change !
  20. forwardToCoord(next_seqno, msg);
  21. }
  22. catch(Exception ex) {
  23. log.error(Util.getMessage("FailedSendingMessage"), ex);
  24. }
  25. finally {
  26. in_flight_sends.decrementAndGet();
  27. }
  28. return null; // don't pass down
  29. }

代码示例来源:origin: wildfly/wildfly

  1. protected Object handleUpEvent(Message msg, RelayHeader hdr) {
  2. switch(hdr.type) {
  3. case DISSEMINATE:
  4. Message copy=msg.copy();
  5. if(hdr.original_sender != null)
  6. copy.setSrc(hdr.original_sender);
  7. return up_prot.up(copy);
  8. case FORWARD:
  9. if(is_coord)
  10. forward(msg.getRawBuffer(), msg.getOffset(), msg.getLength());
  11. break;
  12. case VIEW:
  13. return installView(msg.getRawBuffer(), msg.getOffset(), msg.getLength());
  14. case BROADCAST_VIEW:
  15. break;
  16. default:
  17. throw new IllegalArgumentException(hdr.type + " is not a valid type");
  18. }
  19. return null;
  20. }

代码示例来源:origin: wildfly/wildfly

  1. /** Wraps the message annd sends it to the current coordinator */
  2. protected void forwardToCoord(Message msg) {
  3. Message tmp=msg.copy(true, Global.BLOCKS_START_ID); // // we only copy headers from building blocks
  4. if(tmp.getSrc() == null)
  5. tmp.setSrc(local_addr);
  6. try {
  7. byte[] buf=Util.streamableToByteBuffer(tmp);
  8. if(coord != null) {
  9. // optimization: if I'm the coord, simply relay to the remote cluster via the bridge
  10. if(coord.equals(local_addr)) {
  11. forward(buf, 0, buf.length);
  12. return;
  13. }
  14. tmp=new Message(coord, buf, 0, buf.length) // reusing tmp is OK here ...
  15. .putHeader(id, new RelayHeader(RelayHeader.Type.FORWARD));
  16. down_prot.down(tmp);
  17. }
  18. }
  19. catch(Exception e) {
  20. log.error(Util.getMessage("FailedForwardingUnicastMessageToCoord"), e);
  21. }
  22. }

代码示例来源:origin: wildfly/wildfly

  1. public void up(MessageBatch batch) {
  2. for(Message msg: batch) {
  3. Frag3Header hdr=msg.getHeader(this.id);
  4. if(hdr != null) { // needs to be defragmented
  5. Message assembled_msg=unfragment(msg,hdr);
  6. if(assembled_msg != null) {
  7. // the reassembled msg has to be add in the right place (https://issues.jboss.org/browse/JGRP-1648),
  8. // and canot be added to the tail of the batch !
  9. assembled_msg.setSrc(batch.sender());
  10. batch.replace(msg, assembled_msg);
  11. avg_size_up.add(assembled_msg.length());
  12. }
  13. else
  14. batch.remove(msg);
  15. }
  16. }
  17. if(!batch.isEmpty())
  18. up_prot.up(batch);
  19. }

代码示例来源:origin: wildfly/wildfly

  1. public void up(MessageBatch batch) {
  2. for(Message msg: batch) {
  3. FragHeader hdr=msg.getHeader(this.id);
  4. if(hdr != null) { // needs to be defragmented
  5. Message assembled_msg=unfragment(msg,hdr);
  6. if(assembled_msg != null) {
  7. // the reassembled msg has to be add in the right place (https://issues.jboss.org/browse/JGRP-1648),
  8. // and canot be added to the tail of the batch !
  9. assembled_msg.setSrc(batch.sender());
  10. batch.replace(msg, assembled_msg);
  11. avg_size_up.add(assembled_msg.length());
  12. }
  13. else
  14. batch.remove(msg);
  15. }
  16. }
  17. if(!batch.isEmpty())
  18. up_prot.up(batch);
  19. }

代码示例来源:origin: wildfly/wildfly

  1. @Override
  2. protected void sendMcastDiscoveryRequest(Message msg) {
  3. try {
  4. if(msg.getSrc() == null)
  5. msg.setSrc(local_addr);
  6. ByteArrayDataOutputStream out=new ByteArrayDataOutputStream((int)(msg.size()+1));
  7. msg.writeTo(out);
  8. Buffer buf=out.getBuffer();
  9. DatagramPacket packet=new DatagramPacket(buf.getBuf(), buf.getOffset(), buf.getLength(), mcast_addr, mcast_port);
  10. if(mcast_send_sockets != null) {
  11. MulticastSocket s;
  12. for(int i=0; i < mcast_send_sockets.length; i++) {
  13. s=mcast_send_sockets[i];
  14. try {
  15. s.send(packet);
  16. }
  17. catch(Exception e) {
  18. log.error(Util.getMessage("FailedSendingPacketOnSocket"), s);
  19. }
  20. }
  21. }
  22. else { // DEFAULT path
  23. if(mcast_sock != null)
  24. mcast_sock.send(packet);
  25. }
  26. }
  27. catch(Exception ex) {
  28. log.error(Util.getMessage("FailedSendingDiscoveryRequest"), ex);
  29. }
  30. }

代码示例来源:origin: wildfly/wildfly

  1. protected void sendOnLocalCluster(byte[] buf, int offset, int length) {
  2. try {
  3. Message msg=Util.streamableFromByteBuffer(Message::new, buf, offset, length);
  4. Address sender=msg.getSrc();
  5. Address dest=msg.getDest();
  6. if(!isLocal(dest)) {
  7. if(log.isWarnEnabled())
  8. log.warn("[" + local_addr + "] dest=" + dest + " is not local (site=" + this.site + "); discarding it");
  9. return;
  10. }
  11. // set myself to be the sender
  12. msg.setSrc(local_addr);
  13. // later, in RELAY, we'll take the original sender from the header and make it the sender
  14. msg.putHeader(id, RelayHeader.createDisseminateHeader(sender));
  15. if(log.isTraceEnabled())
  16. log.trace("received msg from " + sender + ", passing down the stack with dest=" +
  17. msg.getDest() + " and src=" + msg.getSrc());
  18. down_prot.down(msg);
  19. }
  20. catch(Exception e) {
  21. log.error(Util.getMessage("FailedSendingOnLocalCluster"), e);
  22. }
  23. }

相关文章