本文整理了Java中org.jgroups.Message.setSrc()
方法的一些代码示例,展示了Message.setSrc()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.setSrc()
方法的具体详情如下:
包路径:org.jgroups.Message
类名称:Message
方法名:setSrc
暂无
代码示例来源:origin: wildfly/wildfly
public Message makeReply() {
Message retval=new Message(src_addr);
if(dest_addr != null)
retval.setSrc(dest_addr);
return retval;
}
代码示例来源:origin: wildfly/wildfly
/**
* If the sender is null, set our own address. We cannot just go ahead and set the address
* anyway, as we might be sending a message on behalf of someone else ! E.g. in case of
* retransmission, when the original sender has crashed, or in a FLUSH protocol when we
* have to return all unstable messages with the FLUSH_OK response.
*/
protected void setSourceAddress(Message msg) {
if(msg.getSrc() == null && local_addr != null) // should already be set by TP.ProtocolAdapter in shared transport case !
msg.setSrc(local_addr);
}
代码示例来源:origin: wildfly/wildfly
MessageInfo(MessageID messageID, Message message, long sequenceNumber) {
if (messageID == null) {
throw new NullPointerException("Message ID can't be null");
}
this.messageID = messageID;
this.message = message.copy(true, true);
this.sequenceNumber = sequenceNumber;
this.readyToDeliver = false;
this.message.setSrc(messageID.getAddress());
}
代码示例来源:origin: wildfly/wildfly
public Object down(Message msg) {
if(msg.getDest() != null || msg.isFlagSet(Message.Flag.NO_TOTAL_ORDER) || msg.isFlagSet(Message.Flag.OOB))
return down_prot.down(msg);
if(msg.getSrc() == null)
msg.setSrc(local_addr);
try {
fwd_queue.put(msg);
if(seqno_reqs.getAndIncrement() == 0) {
int num_reqs=seqno_reqs.get();
sendSeqnoRequest(num_reqs);
}
}
catch(InterruptedException e) {
if(!running)
return null;
throw new RuntimeException(e);
}
return null; // don't pass down
}
代码示例来源:origin: wildfly/wildfly
public static List<Message> readMessageList(DataInput in, short transport_id) throws Exception {
List<Message> list=new LinkedList<>();
Address dest=Util.readAddress(in);
Address src=Util.readAddress(in);
// AsciiString cluster_name=Bits.readAsciiString(in); // not used here
short length=in.readShort();
byte[] cluster_name=length >= 0? new byte[length] : null;
if(cluster_name != null)
in.readFully(cluster_name, 0, cluster_name.length);
int len=in.readInt();
for(int i=0; i < len; i++) {
Message msg=new Message(false);
msg.readFrom(in);
msg.setDest(dest);
if(msg.getSrc() == null)
msg.setSrc(src);
// Now add a TpHeader back on, was not marshalled. Every message references the *same* TpHeader, saving memory !
msg.putHeader(transport_id, new TpHeader(cluster_name));
list.add(msg);
}
return list;
}
代码示例来源:origin: wildfly/wildfly
public Object down(Message msg) {
if(msg.getSrc() == null)
msg.setSrc(local_addr);
Buffer serialized_msg=Util.streamableToBuffer(msg);
// exclude existing headers, they will be seen again when we unmarshal the message at the receiver
Message tmp=msg.copy(false, false).setBuffer(serialized_msg);
return down_prot.down(tmp);
}
代码示例来源:origin: wildfly/wildfly
@Override
protected void sendMcastDiscoveryRequest(Message msg) {
try {
if(msg.getSrc() == null)
msg.setSrc(local_addr);
ByteArrayDataOutputStream out=new ByteArrayDataOutputStream((int)msg.size());
msg.writeTo(out);
for(int i=bind_port; i <= bind_port+port_range; i++) {
DatagramPacket packet=new DatagramPacket(out.buffer(), 0, out.position(), dest_addr, i);
sock.send(packet);
}
}
catch(Exception ex) {
log.error(Util.getMessage("FailedSendingDiscoveryRequest"), ex);
}
}
代码示例来源:origin: wildfly/wildfly
protected static Message deserialize(Message msg) throws Exception {
try {
Message ret=Util.streamableFromBuffer(Message::new, msg.getRawBuffer(), msg.getOffset(), msg.getLength());
if(ret.getDest() == null)
ret.setDest(msg.getDest());
if(ret.getSrc() == null)
ret.setSrc(msg.getSrc());
return ret;
}
catch(Exception e) {
throw new Exception(String.format("failed deserialize message from %s", msg.src()), e);
}
}
}
代码示例来源:origin: wildfly/wildfly
public Object up(Message msg) {
Frag3Header hdr=msg.getHeader(this.id);
if(hdr != null) { // needs to be defragmented
Message assembled_msg=unfragment(msg, hdr);
if(assembled_msg != null) {
assembled_msg.setSrc(msg.getSrc()); // needed ? YES, because fragments have a null src !!
up_prot.up(assembled_msg);
avg_size_up.add(assembled_msg.length());
}
return null;
}
return up_prot.up(msg);
}
代码示例来源:origin: apache/geode
private Message createJGMessage(byte[] msgBytes, Address src, Address dest, short version) {
Message msg = new Message();
msg.setDest(dest);
msg.setSrc(src);
msg.setObject(msgBytes);
msg.setFlag(Message.Flag.NO_RELIABILITY);
msg.setFlag(Message.Flag.NO_FC);
msg.setFlag(Message.Flag.DONT_BUNDLE);
msg.setFlag(Message.Flag.OOB);
return msg;
}
代码示例来源:origin: wildfly/wildfly
public Object up(Message msg) {
FragHeader hdr=msg.getHeader(this.id);
if(hdr != null) { // needs to be defragmented
Message assembled_msg=unfragment(msg, hdr);
if(assembled_msg != null) {
assembled_msg.setSrc(msg.getSrc()); // needed ? YES, because fragments have a null src !!
up_prot.up(assembled_msg);
avg_size_up.add(assembled_msg.length());
}
return null;
}
return up_prot.up(msg);
}
代码示例来源:origin: wildfly/wildfly
public Object down(Message msg) {
if(msg.getDest() != null)
return down_prot.down(msg); // only process multicast messages
if(next == null) // view hasn'<></> been received yet, use the normal transport
return down_prot.down(msg);
// we need to copy the message, as we cannot do a msg.setSrc(next): the next retransmission
// would use 'next' as destination !
Message copy=msg.copy(true);
short hdr_ttl=(short)(loopback? view_size -1 : view_size);
DaisyHeader hdr=new DaisyHeader(hdr_ttl);
copy.setDest(next);
copy.putHeader(getId(), hdr);
msgs_sent++;
if(loopback) {
if(log.isTraceEnabled()) log.trace(new StringBuilder("looping back message ").append(msg));
if(msg.getSrc() == null)
msg.setSrc(local_addr);
default_pool.execute(() -> up_prot.up(msg));
}
return down_prot.down(copy);
}
代码示例来源:origin: wildfly/wildfly
/**
* Sends a message msg to the requester. We have to wrap the original message into a retransmit message, as we need
* to preserve the original message's properties, such as src, headers etc.
* @param dest
* @param msg
*/
protected void sendXmitRsp(Address dest, Message msg) {
if(msg == null)
return;
if(stats)
xmit_rsps_sent.increment();
if(msg.getSrc() == null)
msg.setSrc(local_addr);
if(use_mcast_xmit) { // we simply send the original multicast message
down_prot.down(msg);
return;
}
Message xmit_msg=msg.copy(true, true).dest(dest); // copy payload and headers
NakAckHeader2 hdr=xmit_msg.getHeader(id);
NakAckHeader2 newhdr=hdr.copy();
newhdr.type=NakAckHeader2.XMIT_RSP; // change the type in the copy from MSG --> XMIT_RSP
xmit_msg.putHeader(id, newhdr);
down_prot.down(xmit_msg);
}
代码示例来源:origin: wildfly/wildfly
public Object down(Message msg) {
if(msg.getDest() != null || msg.isFlagSet(Message.Flag.NO_TOTAL_ORDER) || msg.isFlagSet(Message.Flag.OOB))
return down_prot.down(msg);
if(msg.getSrc() == null)
msg.setSrc(local_addr);
if(flushing)
block();
// A seqno is not used to establish ordering, but only to weed out duplicates; next_seqno doesn't need
// to increase monotonically, but only to be unique (https://issues.jboss.org/browse/JGRP-1461) !
long next_seqno=seqno.incrementAndGet();
in_flight_sends.incrementAndGet();
try {
SequencerHeader hdr=new SequencerHeader(is_coord? SequencerHeader.BCAST : SequencerHeader.WRAPPED_BCAST, next_seqno);
msg.putHeader(this.id, hdr);
if(log.isTraceEnabled())
log.trace("[" + local_addr + "]: forwarding " + local_addr + "::" + seqno + " to coord " + coord);
// We always forward messages to the coordinator, even if we're the coordinator. Having the coord
// send its messages directly led to starvation of messages from other members. MPerf perf went up
// from 20MB/sec/node to 50MB/sec/node with this change !
forwardToCoord(next_seqno, msg);
}
catch(Exception ex) {
log.error(Util.getMessage("FailedSendingMessage"), ex);
}
finally {
in_flight_sends.decrementAndGet();
}
return null; // don't pass down
}
代码示例来源:origin: wildfly/wildfly
protected Object handleUpEvent(Message msg, RelayHeader hdr) {
switch(hdr.type) {
case DISSEMINATE:
Message copy=msg.copy();
if(hdr.original_sender != null)
copy.setSrc(hdr.original_sender);
return up_prot.up(copy);
case FORWARD:
if(is_coord)
forward(msg.getRawBuffer(), msg.getOffset(), msg.getLength());
break;
case VIEW:
return installView(msg.getRawBuffer(), msg.getOffset(), msg.getLength());
case BROADCAST_VIEW:
break;
default:
throw new IllegalArgumentException(hdr.type + " is not a valid type");
}
return null;
}
代码示例来源:origin: wildfly/wildfly
/** Wraps the message annd sends it to the current coordinator */
protected void forwardToCoord(Message msg) {
Message tmp=msg.copy(true, Global.BLOCKS_START_ID); // // we only copy headers from building blocks
if(tmp.getSrc() == null)
tmp.setSrc(local_addr);
try {
byte[] buf=Util.streamableToByteBuffer(tmp);
if(coord != null) {
// optimization: if I'm the coord, simply relay to the remote cluster via the bridge
if(coord.equals(local_addr)) {
forward(buf, 0, buf.length);
return;
}
tmp=new Message(coord, buf, 0, buf.length) // reusing tmp is OK here ...
.putHeader(id, new RelayHeader(RelayHeader.Type.FORWARD));
down_prot.down(tmp);
}
}
catch(Exception e) {
log.error(Util.getMessage("FailedForwardingUnicastMessageToCoord"), e);
}
}
代码示例来源:origin: wildfly/wildfly
public void up(MessageBatch batch) {
for(Message msg: batch) {
Frag3Header hdr=msg.getHeader(this.id);
if(hdr != null) { // needs to be defragmented
Message assembled_msg=unfragment(msg,hdr);
if(assembled_msg != null) {
// the reassembled msg has to be add in the right place (https://issues.jboss.org/browse/JGRP-1648),
// and canot be added to the tail of the batch !
assembled_msg.setSrc(batch.sender());
batch.replace(msg, assembled_msg);
avg_size_up.add(assembled_msg.length());
}
else
batch.remove(msg);
}
}
if(!batch.isEmpty())
up_prot.up(batch);
}
代码示例来源:origin: wildfly/wildfly
public void up(MessageBatch batch) {
for(Message msg: batch) {
FragHeader hdr=msg.getHeader(this.id);
if(hdr != null) { // needs to be defragmented
Message assembled_msg=unfragment(msg,hdr);
if(assembled_msg != null) {
// the reassembled msg has to be add in the right place (https://issues.jboss.org/browse/JGRP-1648),
// and canot be added to the tail of the batch !
assembled_msg.setSrc(batch.sender());
batch.replace(msg, assembled_msg);
avg_size_up.add(assembled_msg.length());
}
else
batch.remove(msg);
}
}
if(!batch.isEmpty())
up_prot.up(batch);
}
代码示例来源:origin: wildfly/wildfly
@Override
protected void sendMcastDiscoveryRequest(Message msg) {
try {
if(msg.getSrc() == null)
msg.setSrc(local_addr);
ByteArrayDataOutputStream out=new ByteArrayDataOutputStream((int)(msg.size()+1));
msg.writeTo(out);
Buffer buf=out.getBuffer();
DatagramPacket packet=new DatagramPacket(buf.getBuf(), buf.getOffset(), buf.getLength(), mcast_addr, mcast_port);
if(mcast_send_sockets != null) {
MulticastSocket s;
for(int i=0; i < mcast_send_sockets.length; i++) {
s=mcast_send_sockets[i];
try {
s.send(packet);
}
catch(Exception e) {
log.error(Util.getMessage("FailedSendingPacketOnSocket"), s);
}
}
}
else { // DEFAULT path
if(mcast_sock != null)
mcast_sock.send(packet);
}
}
catch(Exception ex) {
log.error(Util.getMessage("FailedSendingDiscoveryRequest"), ex);
}
}
代码示例来源:origin: wildfly/wildfly
protected void sendOnLocalCluster(byte[] buf, int offset, int length) {
try {
Message msg=Util.streamableFromByteBuffer(Message::new, buf, offset, length);
Address sender=msg.getSrc();
Address dest=msg.getDest();
if(!isLocal(dest)) {
if(log.isWarnEnabled())
log.warn("[" + local_addr + "] dest=" + dest + " is not local (site=" + this.site + "); discarding it");
return;
}
// set myself to be the sender
msg.setSrc(local_addr);
// later, in RELAY, we'll take the original sender from the header and make it the sender
msg.putHeader(id, RelayHeader.createDisseminateHeader(sender));
if(log.isTraceEnabled())
log.trace("received msg from " + sender + ", passing down the stack with dest=" +
msg.getDest() + " and src=" + msg.getSrc());
down_prot.down(msg);
}
catch(Exception e) {
log.error(Util.getMessage("FailedSendingOnLocalCluster"), e);
}
}
内容来源于网络,如有侵权,请联系作者删除!