?? selectormanager.java
字號:
package org.mortbay.io.nio;import java.io.IOException;import java.nio.channels.CancelledKeyException;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.ArrayList;import java.util.Iterator;import java.util.List;import org.mortbay.component.AbstractLifeCycle;import org.mortbay.io.Connection;import org.mortbay.io.EndPoint;import org.mortbay.log.Log;import org.mortbay.thread.Timeout;/* ------------------------------------------------------------ *//** * The Selector Manager manages and number of SelectSets to allow * NIO scheduling to scale to large numbers of connections. * * @author gregw * */public abstract class SelectorManager extends AbstractLifeCycle{ private boolean _delaySelectKeyUpdate=true; private long _maxIdleTime; private long _lowResourcesConnections; private long _lowResourcesMaxIdleTime; private transient SelectSet[] _selectSet; private int _selectSets=1; private volatile int _set; /* ------------------------------------------------------------ */ /** * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed. * @see {@link #setLowResourcesMaxIdleTime(long)} */ public void setMaxIdleTime(long maxIdleTime) { _maxIdleTime=maxIdleTime; } /* ------------------------------------------------------------ */ /** * @param selectSets */ public void setSelectSets(int selectSets) { long lrc = _lowResourcesConnections * _selectSets; _selectSets=selectSets; _lowResourcesConnections=lrc/_selectSets; } /* ------------------------------------------------------------ */ /** * @return */ public long getMaxIdleTime() { return _maxIdleTime; } /* ------------------------------------------------------------ */ /** * @return */ public int getSelectSets() { return _selectSets; } /* ------------------------------------------------------------ */ /** * @return */ public boolean isDelaySelectKeyUpdate() { return _delaySelectKeyUpdate; } /* ------------------------------------------------------------ */ /** Register a channel * @param channel * @param att Attached Object * @throws IOException */ public void register(SocketChannel channel, Object att) throws IOException { int s=_set++; s=s%_selectSets; SelectSet set=_selectSet[s]; set.addChange(channel,att); set.wakeup(); } /* ------------------------------------------------------------ */ /** Register a serverchannel * @param acceptChannel * @return * @throws IOException */ public void register(ServerSocketChannel acceptChannel) throws IOException { int s=_set++; s=s%_selectSets; SelectSet set=_selectSet[s]; set.addChange(acceptChannel); set.wakeup(); } /* ------------------------------------------------------------ */ /** * @return the lowResourcesConnections */ public long getLowResourcesConnections() { return _lowResourcesConnections*_selectSets; } /* ------------------------------------------------------------ */ /** * Set the number of connections, which if exceeded places this manager in low resources state. * This is not an exact measure as the connection count is averaged over the select sets. * @param lowResourcesConnections the number of connections * @see {@link #setLowResourcesMaxIdleTime(long)} */ public void setLowResourcesConnections(long lowResourcesConnections) { _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets; } /* ------------------------------------------------------------ */ /** * @return the lowResourcesMaxIdleTime */ public long getLowResourcesMaxIdleTime() { return _lowResourcesMaxIdleTime; } /* ------------------------------------------------------------ */ /** * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when this SelectSet has more connections than {@link #getLowResourcesConnections()} * @see {@link #setMaxIdleTime(long)} */ public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime) { _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime; } /* ------------------------------------------------------------ */ /** * @param acceptorID * @throws IOException */ public void doSelect(int acceptorID) throws IOException { SelectSet[] sets= _selectSet; if (sets!=null && sets.length>acceptorID && sets[acceptorID]!=null) sets[acceptorID].doSelect(); } /* ------------------------------------------------------------ */ /** * @param delaySelectKeyUpdate */ public void setDelaySelectKeyUpdate(boolean delaySelectKeyUpdate) { _delaySelectKeyUpdate=delaySelectKeyUpdate; } /* ------------------------------------------------------------ */ /** * @param key * @return * @throws IOException */ protected abstract SocketChannel acceptChannel(SelectionKey key) throws IOException; /* ------------------------------------------------------------------------------- */ public abstract boolean dispatch(Runnable task) throws IOException; /* ------------------------------------------------------------ */ /* (non-Javadoc) * @see org.mortbay.component.AbstractLifeCycle#doStart() */ protected void doStart() throws Exception { _selectSet = new SelectSet[_selectSets]; for (int i=0;i<_selectSet.length;i++) _selectSet[i]= new SelectSet(i); super.doStart(); } /* ------------------------------------------------------------------------------- */ protected void doStop() throws Exception { SelectSet[] sets= _selectSet; _selectSet=null; if (sets!=null) for (int i=0;i<sets.length;i++) sets[i].stop(); super.doStop(); } /* ------------------------------------------------------------ */ /** * @param endpoint */ protected abstract void endPointClosed(SelectChannelEndPoint endpoint); /* ------------------------------------------------------------ */ /** * @param endpoint */ protected abstract void endPointOpened(SelectChannelEndPoint endpoint); /* ------------------------------------------------------------------------------- */ protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint); /* ------------------------------------------------------------ */ /** * @param channel * @param selectSet * @param sKey * @return * @throws IOException */ protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException; /* ------------------------------------------------------------------------------- */ protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment) { Log.warn(ex); } /* ------------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------------- */ public class SelectSet { private transient int _change; private transient List[] _changes; private transient Timeout _idleTimeout; private transient int _nextSet; private transient Timeout _retryTimeout; private transient Selector _selector; private transient int _setID; private transient boolean _selecting; private transient int _jvmBug; /* ------------------------------------------------------------ */ SelectSet(int acceptorID) throws Exception { _setID=acceptorID; _idleTimeout = new Timeout(); _idleTimeout.setDuration(getMaxIdleTime()); _retryTimeout = new Timeout(); _retryTimeout.setDuration(0L); // create a selector; _selector = Selector.open(); _changes = new ArrayList[] {new ArrayList(),new ArrayList()}; _change=0; } /* ------------------------------------------------------------ */ public void addChange(Object point) { synchronized (_changes) { _changes[_change].add(point); if (point instanceof SocketChannel) _changes[_change].add(null); } } /* ------------------------------------------------------------ */ public void addChange(SocketChannel channel, Object att) { synchronized (_changes) { _changes[_change].add(channel); _changes[_change].add(att); } } /* ------------------------------------------------------------ */ public void cancelIdle(Timeout.Task task) { synchronized (this) { task.cancel(); } } /* ------------------------------------------------------------ */ /** * Select and dispatch tasks found from changes and the selector. * * @throws IOException */ public void doSelect() throws IOException { SelectionKey key=null; try { List changes; synchronized (_changes) { changes=_changes[_change]; _change=_change==0?1:0; _selecting=true; } // Make any key changes required for (int i = 0; i < changes.size(); i++) { try { Object o = changes.get(i); if (o instanceof EndPoint) { // Update the operations for a key. SelectChannelEndPoint endpoint = (SelectChannelEndPoint)o; endpoint.doUpdateKey(); } else if (o instanceof Runnable) { dispatch((Runnable)o); } else if (o instanceof SocketChannel) { // finish accepting/connecting this connection SocketChannel channel=(SocketChannel)o; Object att = changes.get(++i); if (channel.isConnected()) { key = channel.register(_selector,SelectionKey.OP_READ,att);
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -