?? mspastryprotocol.java
字號:
if ( (!this.nodeId.equals(nexthop)) && (nexthop != null)) { //send m to nexthop
transport = (UnreliableTransport) (Network.prototype).getProtocol(tid);
transport.send(nodeIdtoNode(this.nodeId), nodeIdtoNode(nexthop), m, mspastryid);
}
else receiveRoute(m);
}
//______________________________________________________________________________________________
/**
* Sort the nodes of the network by its nodeIds
*/
private void sortNet() {
Network.sort(new Comparator() {
//______________________________________________________________________________________
public int compare(Object o1, Object o2) {
Node n1 = (Node) o1;
Node n2 = (Node) o2;
MSPastryProtocol p1 = (MSPastryProtocol) (n1.getProtocol(mspastryid));
MSPastryProtocol p2 = (MSPastryProtocol) (n2.getProtocol(mspastryid));
return Util.put0(p1.nodeId).compareTo(Util.put0(p2.nodeId));
// return p1.nodeId.compareTo(p2.nodeId);
}
//______________________________________________________________________________________
public boolean equals(Object obj) {
return compare(this, obj) == 0;
}
//______________________________________________________________________________________
});
}
//______________________________________________________________________________________________
/**
* search the node that is nerares than the specified node
* @param current Node
* @return Node
*/
private Node selectNeighbor(Node current) {
//scelgo il seed come fatto nello StateBuilder per i rappresentanti
//il seed sar? quel Node che da m? ha la minor latenza
int candidates = 10;
long minLatency = Long.MAX_VALUE;
int seed = 0;
for (int i = 0; i < candidates; i++) {
int randomIndex;
do {
randomIndex = CommonState.r.nextInt(Network.size());
} while (!Network.get(randomIndex).isUp() );
long lat = getTr(randomIndex).getLatency(current,Network.get(randomIndex));
if (lat < minLatency) {
minLatency = lat;
seed = randomIndex;
}
}
return Network.get(seed);
}
//______________________________________________________________________________________________
/**
* Given that this node was correctly initialized (e.g. routing table and leafset created, and
* empty) it perform a join requesta to the mspastry according to the protocol specification
*/
public void join() {
if (this.nodeId == null) {
UniformRandomGenerator urg = new UniformRandomGenerator(
MSPastryCommonConfig.BITS, CommonState.r);
this.setNodeId(urg.generate());
sortNet();
}
Message joinrequest = Message.makeJoinRequest(null);
joinrequest.body = new Message.BodyJoinRequestReply();
Message.BodyJoinRequestReply body = (Message.BodyJoinRequestReply )(joinrequest.body);
body.joiner = this.nodeId;
body.rt = this.routingTable;
joinrequest.dest = this.nodeId;
Node seed = selectNeighbor(nodeIdtoNode(this.nodeId));
peersim.edsim.EDSimulator.add(0, joinrequest, seed, mspastryid);
}
//______________________________________________________________________________________________
/**
* shortcut for getting the MSPastry level of the node with index "i" in the network
* @param i int
* @return MSPastryProtocol
*/
public final MSPastryProtocol get(int i) {
return ((MSPastryProtocol) (Network.get(i)).getProtocol(mspastryid));
}
//______________________________________________________________________________________________
/**
* shortcut for getting the Transport level of the node with index "i" in the network
* @param i int
* @return MSPastryProtocol
*/
public final Transport getTr(int i) {
return ((Transport) (Network.get(i)).getProtocol(tid));
}
//______________________________________________________________________________________________
/**
* This primitive provide the sending of the data to dest, by encapsulating it into a LOOKUP
* Message
*
* @param recipient BigInteger
* @param data Object
*/
public void send(BigInteger recipient, Object data) {
Message m = new Message(data);
m.dest = recipient;
m.src = this.nodeId;
m.timestamp = CommonState.getTime();
/*
* starting by the current pastry node (this.NodeId), until destination
*/
Node me = nodeIdtoNode(this.nodeId);
EDSimulator.add(0, m, me, mspastryid);
}
//______________________________________________________________________________________________
private static final boolean cond1(BigInteger k, BigInteger i, BigInteger j) {
return k.subtract(j).abs().compareTo(k.subtract(i).abs()) < 0;
}
private static final boolean cond2(BigInteger k, BigInteger j, int r) {
return Util.prefixLen(k, j) >= r;
}
//______________________________________________________________________________________________
/**
* @param myNode Node
* @param myPid int
* @param m Message
*/
void performJoinRequest(Node myNode, int myPid, Message m) {
// aggiungi alla m.rt la riga N di myNode.R,
// dove commonprefixlen vale n-1
// (calcolata tra il nodo destinatatio (j) e (il nodeId di myNode)
MSPastryProtocol myP = ((MSPastryProtocol) myNode.getProtocol(myPid));
Message.BodyJoinRequestReply body = (Message.BodyJoinRequestReply)m.body;
if (nodeId.equals(body.joiner)) return;
int n = Util.prefixLen(nodeId, body.joiner) + 1;
body.rt.copyRowFrom(myP.routingTable, n);
}
//______________________________________________________________________________________________
/**
* see MSPastry protocol "performJoinReply" primitive
*/
private void probeLS() {
e("probeLS\n");
BigInteger[] leafs = this.leafSet.listAllNodes();
for (int i = 0; i < leafs.length; i++) {
transport = (UnreliableTransport) (Network.prototype).getProtocol(tid);
Message m = new Message(Message.MSG_LSPROBEREQUEST, null);
m.dest = this.nodeId; //using m.dest to contain the source of the probe request
transport.send(nodeIdtoNode(this.nodeId), nodeIdtoNode(leafs[i]), m, mspastryid);
}
}
//______________________________________________________________________________________________
/**
* see MSPastry protocol "performJoinReply" primitive
* @param myNode Node
* @param myPid int
* @param m Message
*/
void performJoinReply(Node myNode, int myPid, Message m) {
// Ri.add(R u L) (i = myself)
// Li.add(L)
Message.BodyJoinRequestReply reply = (Message.BodyJoinRequestReply) m.body;
//this.routingTable = (RoutingTable) reply.rt.clone();
this.routingTable = reply.rt;
BigInteger[] l = reply.ls.listAllNodes();
for (int j = 0; j < l.length; j++) {
int row, col;
row = Util.prefixLen(this.nodeId, l[j]);
col = Util.charToIndex(Util.put0(l[j]).charAt(row)); /// prima era:col = Util.charToIndex(Util.put0(l[j]).charAt(row + 1));
this.routingTable.set(row, col, l[j]);
}
// poch? this.leafSet e' vuoto, la add() viene fatta tramite assegnazione diretta.
this.leafSet = (LeafSet) reply.ls.clone();
this.leafSet.nodeId = this.nodeId;
probeLS();
}
//______________________________________________________________________________________________
/**
* see MSPastry protocol "performLSProbeRequest" primitive
* @param m Message
*/
private void performLSProbeRequest(Message m) {
this.leafSet.push(m.dest);
int row = Util.prefixLen(this.nodeId, m.dest);
int col = Util.charToIndex(Util.put0(m.dest).charAt(row)); /// prima era:col = Util.charToIndex(Util.put0(l[j]).charAt(row + 1));
BigInteger cell = this.routingTable.get(row, col);
if (cell!=null) {
transport = (UnreliableTransport) (Network.prototype).getProtocol(tid);
long oldLat = transport.getLatency(nodeIdtoNode(this.nodeId), nodeIdtoNode(cell));
long newLat = transport.getLatency(nodeIdtoNode(this.nodeId), nodeIdtoNode(m.dest));
if ( newLat > oldLat )
return;
}
this.routingTable.set(row, col, m.dest);
}
//______________________________________________________________________________________________
/**
* the cleaning service is called occasionally in order to remove from the tables of this node
* failed entrie.
* @param myNode Node
* @param myPid int
* @param m Message
*/
private void cleaningService(Node myNode, int myPid, Message m) {
// cleaning tables...
BigInteger bCheck;
Node nCheck;
for (int irow = 0; irow < routingTable.rows; irow++)
for (int icol = 0; icol < routingTable.cols; icol++) {
bCheck = routingTable.get(irow, icol);
nCheck = nodeIdtoNode(bCheck);
if ((nCheck == null) || (!nCheck.isUp()))
routingTable.set(irow, icol, null);
}
BigInteger[] bCheck2 = leafSet.listAllNodes();
for (int i = 0; i < bCheck2.length; i++) {
nCheck = nodeIdtoNode(bCheck2[i]);
if ((nCheck == null) || (!nCheck.isUp()))
leafSet.removeNodeId(bCheck2[i]);
}
long delay = 1000 + CommonState.r.nextLong(1000);
EDSimulator.add(delay, m, myNode, myPid);
}
//______________________________________________________________________________________________
/**
* manage the peersim receiving of the events
* @param myNode Node
* @param myPid int
* @param event Object
*/
public void processEvent(Node myNode, int myPid, Object event) {
if (!cleaningScheduled) {
long delay = 1000 + CommonState.r.nextLong(1000);
Message service = new Message(Message.MSG_SERVICEPOLL, "");
service.dest = nodeId;
EDSimulator.add(delay, service, myNode, myPid);
cleaningScheduled =true;
}
/**
* Parse message content
* Activate the correct event manager fot the partiular event
*/
this.mspastryid = myPid;
Message m = (Message) event;
switch (m.messageType) {
case Message.MSG_LOOKUP:
route(m, myNode);
break;
case Message.MSG_JOINREQUEST:
performJoinRequest(myNode, myPid, m);
route(m, myNode);
break;
case Message.MSG_JOINREPLY:
performJoinReply(myNode, myPid, m);
break;
case Message.MSG_SERVICEPOLL:
cleaningService( myNode, myPid, m);
break;
case Message.MSG_LSPROBEREQUEST:
performLSProbeRequest(m);
break;
}
}
//______________________________________________________________________________________________
/**
* set the current NodeId
*
* @param tmp BigInteger
*/
public void setNodeId(BigInteger tmp) {
this.nodeId = tmp;
leafSet.nodeId = tmp;
}
//______________________________________________________________________________________________
/**
* debug only
* @param o Object
*/
private static void e(Object o) { if (MSPastryCommonConfig.DEBUG) System.err.println(o);}
/**
* debug only
* @param o Object
*/
private static void o(Object o) { if (MSPastryCommonConfig.DEBUG) System.out.println(o);}
//______________________________________________________________________________________________
} // End of class
?? 快捷鍵說明
復(fù)制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -