?? mpjrun.java
字號:
reader = new BufferedReader(new FileReader( machinesFile ));
}
catch (FileNotFoundException fnfe) {
throw new MPJRuntimeException ( "<"+ machinesFile + "> file cannot "+
" be found." +
" The starter module assumes "+
"it to be in the current directory.");
}
boolean loop = true;
String line = null;
int machineCount = 0 ;
while (machineCount < nprocs) {
line = reader.readLine();
if(DEBUG && logger.isDebugEnabled()) {
logger.debug("line <" + line + ">");
}
if(line == null) {
break ;
}
if (line.startsWith("#") || line.equals("") ||
(machineVector.size() == nprocs)) {
//loop = false;
continue ;
}
machineCount ++ ;
line = line.trim();
InetAddress address = InetAddress.getByName(line);
String addressT = address.getHostAddress();
String nameT = address.getHostName();
if(DEBUG && logger.isDebugEnabled()) {
logger.debug("nameT " + nameT);
logger.debug("addressT " + addressT);
}
boolean alreadyPresent = false;
for(int i=0 ; i<machineVector.size() ; i++) {
String machine = (String) machineVector.get(i);
if(machine.equals(nameT) || machine.equals(addressT)) {
alreadyPresent = true;
break ;
}
}
if(!alreadyPresent) {
//if( addressT or nameT already present, then you are buggered ) {
//}
/* What is the solution for this? */
//machineVector.add(addressT);
machineVector.add(nameT);
if(DEBUG && logger.isDebugEnabled()) {
logger.debug("Line " + line.trim() +
" added to vector " + machineVector);
}
}
}//end while.
}
private void clientSocketInit() throws Exception {
SocketChannel[] clientChannels = new SocketChannel[machineVector.size()];
for (int i = 0; i < machineVector.size(); i++) {
boolean connected = false ;
String daemon = (String) machineVector.get(i);
try {
clientChannels[i] = SocketChannel.open();
clientChannels[i].configureBlocking(true);
logger.debug("Connecting to " + daemon + "@" + D_SER_PORT);
connected = clientChannels[i].connect(
new InetSocketAddress(daemon, D_SER_PORT));
if(!connected) {
System.out.println(" home-made ...");
if(System.getProperty("os.name").startsWith("Windows")) {
CONF_FILE.delete() ;
}
throw new MPJRuntimeException("Cannot connect to the daemon "+
"at machine <"+daemon+"> and port <"+
D_SER_PORT+">."+
"Please make sure that the machine is reachable "+
"and running the daemon in 'sane' state");
}
doConnect(clientChannels[i]);
}
catch(IOException ioe) {
if(System.getProperty("os.name").startsWith("Windows")) {
CONF_FILE.delete() ;
}
System.out.println(" IOException in doConnect");
throw new MPJRuntimeException("Cannot connect to the daemon "+
"at machine <"+daemon+"> and port <"+
D_SER_PORT+">."+
"Please make sure that the machine is reachable "+
"and running the daemon in 'sane' state");
}
catch (Exception ccn1) {
System.out.println(" rest of the exceptions ");
throw ccn1;
}
}
}
/**
* This method cleans up the device environments, closes the selectors, serverSocket, and all the other socketChannels
*/
public void finish() {
logger.debug("\n---finish---");
try {
cfos.close();
if(server != null) {
server.stop();
server.destroy();
}
if(mpjServer != null) {
mpjServer.stop();
mpjServer.destroy();
}
logger.debug("Waking up the selector");
selector.wakeup();
selectorFlag = false;
logger.debug("Closing the selector");
selector.close();
SocketChannel peerChannel = null;
for (int i = 0; i < peerChannels.size(); i++) {
peerChannel = peerChannels.get(i);
logger.debug("Closing the channel " + peerChannel);
if (peerChannel.isOpen()) {
peerChannel.close();
}
}
peerChannel = null;
}
catch (Exception e) {
//e.printStackTrace();
}
}
private void doConnect(SocketChannel peerChannel) {
logger.debug("---doConnect---");
try {
logger.debug("Configuring it to be non-blocking");
peerChannel.configureBlocking(false);
}
catch (IOException ioe) {
logger.debug("Closed Channel Exception in doConnect");
System.exit(0);
}
try {
logger.debug("Registering for OP_READ & OP_WRITE event");
peerChannel.register(selector,
SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
catch (ClosedChannelException cce) {
logger.debug("Closed Channel Exception in doConnect");
System.exit(0);
}
try {
peerChannel.socket().setTcpNoDelay(true);
}
catch (Exception e) {}
peerChannels.add(peerChannel);
logger.debug("Adding the channel " + peerChannel + " to " + peerChannels);
logger.debug("Size of Peer Channels vector " + peerChannels.size());
peerChannel = null;
if (peerChannels.size() == machineVector.size()) {
Notify();
}
}
/**
* Entry point to the class
*/
public static void main(String args[]) throws Exception {
try {
MPJRun client = new MPJRun(args);
}
catch (Exception e) {
throw e;
}
}
private void addShutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
try {
for (int j = 0; j < peerChannels.size(); j++) {
SocketChannel socketChannel = null;
socketChannel = peerChannels.get(j);
buffer.clear();
buffer.put( (new String("kill")).getBytes());
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
}
if(server != null) {
server.stop();
server.destroy();
}
if(mpjServer != null) {
mpjServer.stop();
mpjServer.destroy();
}
cfos.close();
}
catch(Exception e){
}
}
});
}
Runnable selectorThread = new Runnable() {
/* This is selector thread */
public void run() {
logger.debug("selector Thread started ");
Set readyKeys = null;
Iterator readyItor = null;
SelectionKey key = null;
SelectableChannel keyChannel = null;
SocketChannel socketChannel = null;
ByteBuffer lilBuffer = ByteBuffer.allocateDirect(4);
ByteBuffer bigBuffer = ByteBuffer.allocateDirect(10000);
try {
while (selector.select() > -1 && selectorFlag == true) {
readyKeys = selector.selectedKeys();
readyItor = readyKeys.iterator();
while (readyItor.hasNext()) {
key = (SelectionKey) readyItor.next();
readyItor.remove();
keyChannel = (SelectableChannel) key.channel();
logger.debug("\n---selector EVENT---");
if (key.isAcceptable()) {
//doAccept(keyChannel);
logger.debug("ACCEPT_EVENT");
}
else if (key.isConnectable()) {
logger.debug("CONNECT_EVENT");
try {
socketChannel = (SocketChannel) keyChannel;
}
catch (NoConnectionPendingException e) {
continue;
}
if (socketChannel.isConnectionPending()) {
try {
socketChannel.finishConnect();
}
catch (IOException e) {
continue;
}
}
doConnect(socketChannel);
}
else if (key.isReadable()) {
//logger.debug("READ_EVENT");
socketChannel = (SocketChannel) keyChannel;
int read = socketChannel.read(bigBuffer);
/*
* It would be ideal if this piece of code is called ...
* but it appears ..its never callled ..maybe the behaviour
* of closing down that we saw was Linux dependant ????
*/
if (read == -1) {
logger.debug("END_OF_STREAM signal at starter from "+
"channel "+socketChannel) ;
streamEndedCount ++ ;
if (streamEndedCount == machineVector.size()) {
logger.debug("The starter has received "+
machineVector.size() +"signals");
logger.debug("This means its time to exit");
Notify();
}
}
bigBuffer.flip();
if(read == -1) {
System.exit(0);
}
byte[] tempArray = new byte[read];
//logger.debug("bigBuffer " + bigBuffer);
bigBuffer.get(tempArray, 0, read);
String line = new String(tempArray);
bigBuffer.clear();
//RECEIVED
//logger.debug("line <" + line + ">");
System.out.print(line);
//logger.debug("Does it endup with EXIT ? ==>" +
// line.endsWith("EXIT"));
if (line.endsWith("EXIT")) {
endCount++;
logger.debug("endCount " + endCount);
logger.debug("machineVector.size() " + machineVector.size());
if (endCount == machineVector.size()) {
logger.debug("Notify and exit");
Notify();
}
}
} //end if key.isReadable()
else if (key.isWritable()) {
logger.debug(
"In, WRITABLE, so changing the interestOps to READ_ONLY");
key.interestOps(SelectionKey.OP_READ);
}
}
}
}
catch (Exception ioe1) {
logger.debug("Exception in selector thread ");
ioe1.printStackTrace();
System.exit(0);
}
logger.debug("Thread getting out");
}
};
}
?? 快捷鍵說明
復(fù)制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -