?? router.java
字號:
/*
* Copyright (C) 2000-2001 Ken McCrary
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* Email: jkmccrary@yahoo.com
*/
package com.kenmccrary.jtella;
import java.util.Collections;
import java.util.List;
import java.util.LinkedList;
import java.util.ListIterator;
import java.util.Stack;
import java.util.Vector;
import java.util.Enumeration;
import java.util.Iterator;
import java.io.IOException;
import com.kenmccrary.jtella.util.Log;
import com.kenmccrary.jtella.util.BoundedQueue;
/**
* Routes messages read from the network to appropriate
* Connections
*
*/
class Router extends Thread
{
// TODO flush dead connections from routing tables
private static int MAX_ROUTER_TABLE = 5000;
private static byte MAX_HOPS = (byte)7;
private static byte MAX_TTL = (byte)50;
private ConnectionList connectionList;
private HostCache hostCache;
private RouteTable pingRouteTable;
private RouteTable queryRouteTable;
private RouteTable queryHitRouteTable;
private OriginateTable originateTable;
private Vector searchReceivers;
private Vector pushReceivers;
private BoundedQueue messageQueue;
private boolean shutDownFlag;
/**
* Collection of active connections to the network
*
* @param the list of connections in the system
* @param cache of available hosts in the system
*/
Router(ConnectionList connectionList, HostCache hostCache)
{
super("RouterThread");
this.connectionList = connectionList;
this.hostCache = hostCache;
pingRouteTable = new RouteTable(MAX_ROUTER_TABLE);
queryRouteTable = new RouteTable(MAX_ROUTER_TABLE);
queryHitRouteTable = new RouteTable(MAX_ROUTER_TABLE);
originateTable = new OriginateTable();
messageQueue = new BoundedQueue(1000);
searchReceivers = new Vector();
pushReceivers = new Vector();
}
/**
* Stops the operation of the router
*
*/
void shutdown()
{
shutDownFlag = true;
}
/**
* Routes a message, used by Connections
*
* @return false if routing failed because of overload
*/
boolean route(Message m, NodeConnection connection)
{
if ( m.getTTL() < 1 )
{
// expired message, no failure signal required
return true;
}
RouteMessage message = new RouteMessage(m, connection);
boolean result = true;
synchronized (this)
{
result = messageQueue.enqueue(message);
// notify in either case, either a new message on the queue or
// the queue is full
notify();
}
return result;
}
/**
* Record a message we originate, so we can route it back
*
*
*/
void routeBack(Message m, MessageReceiver receiver)
{
originateTable.put(m.getGUID(), receiver);
}
/**
* Removes a message sender's origination data
*
* @param messasgeGUIDs the originated message guids
*/
void removeMessageSender(List messageGUIDs)
{
Iterator iterator = messageGUIDs.iterator();
while ( iterator.hasNext() )
{
GUID guid = (GUID)iterator.next();
originateTable.remove(guid);
}
}
/**
* Adds a search listener
*
* @param receiver search receiver
*/
void addSearchMessageReceiver(MessageReceiver receiver)
{
searchReceivers.addElement(receiver);
}
/**
* Removes a search receiver
*
* @param receiver message receiver
*/
void removeSearchMessageReceiver(MessageReceiver receiver)
{
searchReceivers.removeElement(receiver);
}
/**
* Adds a push listener
*
* @param receiver push message receiver
*/
void addPushMessageReceiver(MessageReceiver receiver)
{
pushReceivers.addElement(receiver);
}
/**
* Removes a push receiver
*
* @param receiver message receiver
*/
void removePushMessageReceiver(MessageReceiver receiver)
{
pushReceivers.removeElement(receiver);
}
/**
* Query the next message to route, blocks if no message are available
*
* @return message to route
*/
RouteMessage getNextMessage()
{
synchronized (this)
{
while ( messageQueue.empty() )
{
try
{
wait();
}
catch (InterruptedException ie)
{
ie.printStackTrace();
}
}
return (RouteMessage)messageQueue.dequeue();
}
}
/**
* Runs along routing messages
*
*/
public void run()
{
while ( !shutDownFlag )
{
try
{
RouteMessage routeMessage = getNextMessage();
if ( null == routeMessage )
{
Log.getLog().logError("Null message in router");
continue;
}
//-----------------------------------------------------------
// Check if this is a response to a message we generated
//-----------------------------------------------------------
if ( originateTable.containsGUID( routeMessage.getMessage().getGUID()) )
{
Log.getLog().logInformation("Routing response to originated message");
// Retrieve the message receiver
Message m = routeMessage.getMessage();
MessageReceiver receiver = originateTable.get(m.getGUID());
if ( m instanceof SearchReplyMessage)
{
receiver.receiveSearchReply((SearchReplyMessage)m);
}
else
{
// TODO PONG REPLY
Log.getLog().logError("Routeback unknown message");
}
continue;
}
//-----------------------------------------------------------
// Don't forward invalid messages
//-----------------------------------------------------------
if ( !validateMessage(routeMessage.getMessage()))
{
continue;
}
//-----------------------------------------------------------
// Route the network traffic to our connections
//-----------------------------------------------------------
switch ( routeMessage.getMessage().getType() )
{
case Message.PING:
{
Log.getLog().logInformation("Routing ping message");
routePingMessage(routeMessage);
break;
}
case Message.PONG:
{
Log.getLog().logInformation("Routing pong message");
routePongMessage(routeMessage);
break;
}
case Message.PUSH:
{
Log.getLog().logInformation("Routing push message");
routePushMessage(routeMessage);
break;
}
case Message.QUERY:
{
Log.getLog().logInformation("Routing query message");
routeQueryMessage(routeMessage);
break;
}
case Message.QUERYREPLY:
{
Log.getLog().logInformation("Routing query reply message");
routeQueryReplyMessage(routeMessage);
break;
}
}
}
catch (Exception e)
{
// keep running
Log.getLog().log(e);
}
}
}
/**
* Get the source of a previously received message query message
*
* @param message a search message
*/
NodeConnection getQuerySource(SearchMessage message)
{
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -