?? outputstreamhandler.java
字號(hào):
package com.ll.smsbeans;
import java.io.IOException;
import java.io.OutputStream;
import java.util.LinkedList;
import java.util.NoSuchElementException;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.ll.smsbeans.log.LogCommon;
/**
* 建立一個(gè)輸出線程。為發(fā)送的數(shù)據(jù)建立一個(gè)支持線程的隊(duì)列,使數(shù)據(jù)包的發(fā)送可以支持多線程。
*
* @author listlike <a href="mailto:listlike@hotmail.com">
* <i><listlike@hotmail.com></i></a>
*/
public final class OutputStreamHandler extends Thread
{
/** 輸出流 */
private OutputStream out;
/** 父連接,用于在包發(fā)送成功后發(fā)出成功事件 */
private ConnectionBean.OutputStreamInterface osi;
/** 發(fā)送包隊(duì)列 */
private final LockingQueue output = new LockingQueue();
/** 是否立即終止線程的運(yùn)行 */
private boolean keepRunning = true;
static Logger _log;
/**
*建立一個(gè)新的 <code>OutputStreamHandler</code> 實(shí)例。
*
* @param out <code>OutputStream</code> 輸出流
*/
public OutputStreamHandler(ConnectionBean.OutputStreamInterface osi)
{
this.osi = osi;
}
/**
*設(shè)置輸出流。
*
* @param out <code>OutputStream</code> 輸出流
*/
public void setOutputStream(OutputStream out)
{
this.out = out;
}
/**
* 把需要發(fā)送包加入發(fā)送隊(duì)列。
*
* @param p Packet 發(fā)送包。
*/
public final void send(Packet p)
{
if (keepRunning)
output.put(p);
else
osi.sendFailed(p);
}
/**
* <code>shutdown</code> 通知線程結(jié)束運(yùn)行。
*/
public void shutdown()
{
_log.fine("OutputStream: shutdown");
keepRunning = false;
output.putLast(null);
_log.fine("OutputStream: closing queue");
}
/**
* 線程終止處理方法。
* @param ex <code>Exception</code> 造成中止的異常。
* @param p <code>Packet</code> 當(dāng)前發(fā)送的包。
*/
public void handleThreadDeath(Exception ex, Packet p)
{
_log.warning("OutputStream: thread death");
if (p != null)
osi.sendFailed(p);
Object pkt = output.getLast();
while (pkt != null)
{
osi.sendFailed((Packet) pkt);
pkt = output.getLast();
}
osi.unexpectedThreadDeath(ex);
}
/**
* 線程終止處理方法。
* @param ex <code>Exception</code> 造成中止的異常。
*/
public void handleThreadDeath(Exception ex)
{
handleThreadDeath(ex, null);
}
/**
* 線程循環(huán)發(fā)送發(fā)送隊(duì)列中的數(shù)據(jù)包。
*/
public final void run()
{
if (out == null)
throw new RuntimeException("starting output thread without any IO set up to use");
Packet p = null;
while (keepRunning)
{
try
{
p = (Packet) output.get();
} catch (final InterruptedException e)
{
_log.severe("OutputStream: interrupted");
}
if (p != null)
{
try
{
byte[] data = p.getContent();
int len = data.length + 4;
byte[] mybytes = new byte[4];
mybytes[3] = (byte) (0xff & len);
mybytes[2] = (byte) ((0xff00 & len) >> 8);
mybytes[1] = (byte) ((0xff0000 & len) >> 16);
mybytes[0] = (byte) ((0xff000000 & len) >> 24);
out.write(mybytes);
out.write(data);
out.flush();
if (_log.isLoggable(LogCommon.DEBUG_LEVEL))
_log.log(
LogCommon.DEBUG_LEVEL,
LogCommon.getLogBin("Sent Data", mybytes, data));
} catch (IOException e)
{
//錯(cuò)誤處理
handleThreadDeath(e, p);
return;
}
//發(fā)送成功消息
osi.sent(p);
}
}
_log.warning("OutputStream: stopped");
}
/**
*線程安全的發(fā)送隊(duì)列,這個(gè)隊(duì)列沒(méi)有大小沒(méi)有限制。
*
* @author listlike <a href="mailto:listlike@hotmail.com">
* <i><listlike@hotmail.com></i></a>
*
*/
private final class LockingQueue
{
private LinkedList m_queue = new LinkedList();
private boolean m_closed = false;
private boolean m_reject = false;
private int m_waiting = 0;
/**
* 加入對(duì)象
* @param item <code>Object</code> 加入隊(duì)列的對(duì)象。
*/
public synchronized final void put(Object item)
{
if (m_closed || m_reject)
return;
m_queue.addLast(item);
notify(); //#notify
}
/**
* 加入最后一個(gè)發(fā)送對(duì)象,加入后對(duì)列自動(dòng)關(guān)閉,不能再加入對(duì)象。
*
* @param item <code>Object</code> 加入隊(duì)列的對(duì)象。
*/
public synchronized final void putLast(Object item)
{
put(item);
m_reject = true;
}
/**
* 獲取對(duì)象
*
* @param timeout 超時(shí)設(shè)置。
* @return <code>Object</code> 獲取的對(duì)象
* @throws InterruptedException
*/
public synchronized final Object get(long timeout)
throws InterruptedException
{
long _expire = System.currentTimeMillis() + timeout;
if (m_closed)
return null;
try
{
if (m_queue.size() <= 0)
{
++m_waiting;
while (m_queue.size() <= 0)
{
wait(timeout);
if (timeout > 0
&& System.currentTimeMillis() > _expire)
{
--m_waiting;
throw new InterruptedException("LockingQueue : timeout to dequeue");
}
if (m_closed)
{
--m_waiting;
return null;
}
}
--m_waiting;
}
Object head = m_queue.removeFirst();
if (m_queue.size() == 0 && m_reject)
close(); // 移出最后一個(gè)對(duì)象,關(guān)閉隊(duì)列。
return head;
} catch (NoSuchElementException e) // 會(huì)發(fā)生嗎?
{
throw new Error("LockingQueue: internal error");
}
}
/**
* 獲得最后對(duì)象
* @return 最后對(duì)象
*/
public final Object getLast()
{
if (m_queue.size() <= 0)
return null;
else
return m_queue.removeFirst();
}
/**
* 獲取對(duì)象
*
* @return <code>Object</code> 獲取的對(duì)象
* @throws InterruptedException
*/
public synchronized final Object get() throws InterruptedException
{
return get(0);
}
/**
* 隊(duì)列是否為空
* @return 隊(duì)列是否為空
*/
public final boolean isEmpty()
{
return m_queue.size() <= 0;
}
/**
* 等待的線程數(shù)。
* @return 隊(duì)列的大小
*/
public final int size()
{
return m_waiting;
}
/**
* 關(guān)閉隊(duì)列
*/
public synchronized void close()
{
m_closed = true;
m_queue = null;
notifyAll();
}
}
/* (非 Javadoc)
* @see java.lang.Thread#start()
*/
public synchronized void start()
{
// TODO 自動(dòng)生成方法存根
keepRunning = true;
super.start();
}
static {
_log = Logger.getLogger("com.ll.smsbeans.OutputStreamHandler");
}
}
?? 快捷鍵說(shuō)明
復(fù)制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號(hào)
Ctrl + =
減小字號(hào)
Ctrl + -