?? resourcemanager.java
字號:
* @return int
*/
public int getGCMode() {
return _gcMode;
}
/**
* Check whether garbage collection has been disabled
*
* @return boolean - true if gc is disabled
*/
public boolean gcDisabled() {
return (_gcMode == GC_DISABLED) ? true : false;
}
/**
* Log this published message so that it can be passed through the system
* when the associated global transaction commits.
*
* @param xid - the global transaction identity
* @param message - the message published
* @throws TransactionLogException - error adding the entry
* @throws ResourceManagerException - error getting the trnasaction log
* @throws JMSException - if there is an issue with prep'ing the message
*/
public synchronized void logPublishedMessage(Xid xid, MessageImpl message)
throws TransactionLogException, ResourceManagerException, JMSException {
MessageMgr.instance().prepare(message);
logTransactionData(new ExternalXid(xid), _rid,
createPublishedMessageWrapper(message));
}
/**
* Log that this message handle was sent to the consumer within the specified
* global transaction identity. The message will be acknowledged when the
* global transaction commits. Alternatively, if the global transaction is
* rolled back the message handle will be returned to the destination
*
* @param xid the global transaction identity
* @param id the consumer receiving this message
* @param handle - the handle of the message received
* @throws TransactionLogException - error adding the entry
* @throws ResourceManagerException - error getting the transaction log
*/
public synchronized void logReceivedMessage(Xid xid, long id, MessageHandle handle)
throws TransactionLogException, ResourceManagerException {
logTransactionData(new ExternalXid(xid), _rid,
createReceivedMessageWrapper(id, handle));
}
/**
* Add an {@link StateTransactionLogEntry} using the specified txid,
* rid and state
*
* @param xid - the transaction identifier
* @param state - the transaction log state
* @throws TransactionLogException - error adding the entry
* @throws ResourceManagerException - error getting the trnasaction log
*/
public synchronized void logTransactionState(Xid xid, TransactionState state)
throws TransactionLogException, ResourceManagerException {
ExternalXid txid = new ExternalXid(xid);
switch (state.getOrd()) {
case TransactionState.OPENED_ORD:
{
TransactionLog log = getCurrentTransactionLog();
addTridLogEntry(txid, log);
log.logTransactionState(txid, _txExpiryTime * 1000, _rid,
state);
// cache the transaction state
_activeTransactions.put(txid, new LinkedList());
}
break;
case TransactionState.PREPARED_ORD:
// cache the transaction state
LinkedList list = (LinkedList) _activeTransactions.get(txid);
if (list != null) {
list.add(state);
} else {
throw new ResourceManagerException("Trasaction " + txid +
" is not active.");
}
break;
case TransactionState.CLOSED_ORD:
{
TransactionLog log = getTransactionLog(txid);
log.logTransactionState(txid, _txExpiryTime * 1000, _rid,
state);
removeTridLogEntry(txid, log);
// check whether this log has anymore open transactions
synchronized (_cacheLock) {
if ((_logToTridCache.get(log) == null) &&
(!isCurrentTransactionLog(log))) {
log.close();
// now check if gc mode is GC_SYNCHRONOUS. If it is
// remove the log file
if (_gcMode == GC_SYNCHRONOUS) {
try {
log.destroy();
} catch (TransactionLogException exception) {
exception.printStackTrace();
}
}
}
}
// we also want to remove this entry from the list
// of active transactions
_activeTransactions.remove(txid);
}
break;
default:
throw new ResourceManagerException("Cannot process tx state " +
state);
}
}
/**
* Add an {@link DataTransactionLogEntry} using the specified txid,
* rid and data
*
* @param txid - the transaction identifier
* @param rid - the resource identifier
* @throws TransactionLogException - error adding the entry
* @throws ResourceManagerException - error getting the trnasaction log
*/
synchronized void logTransactionData(ExternalXid txid, String rid,
Object data)
throws ResourceManagerException, TransactionLogException {
getTransactionLog(txid).logTransactionData(txid, _txExpiryTime * 1000,
rid, data);
// we also want to add this to the transaction data for that
// txid
LinkedList list = (LinkedList) _activeTransactions.get(txid);
if (list != null) {
list.add(data);
} else {
throw new ResourceManagerException("Trasaction " + txid +
" is not active.");
}
}
/**
* This is the entry point for the garbage collection callback. It scans
* through the each transaction log file and determines whether it can
* be garbage collected. If it can then it simply destroys the corresponding
* TransactionLog.
*/
public void garbageCollect() {
try {
int gcfiles = 0;
// if there are no transaction log files then return
if (_logs.size() == 0) {
return;
}
TreeSet copy = null;
synchronized (_logs) {
copy = new TreeSet(_logs);
}
// remove the current log file, since this is likely to be the
// current log file
copy.remove(_logs.last());
// process each of the remaining log files
while (copy.size() > 0) {
TransactionLog log = (TransactionLog) copy.first();
copy.remove(log);
if (log.canGarbageCollect()) {
// destroy the log
log.destroy();
// remove it from the log cache
synchronized (_logs) {
_logs.remove(log);
}
// increment the number of garbafe collected files
++gcfiles;
}
}
// print an informative message
_log.info("[RMGC] Collected " + gcfiles + " files.");
} catch (Exception exception) {
exception.printStackTrace();
}
}
/**
* Ensure that a transaction with the specified xid is currently active.
* If this is the case then commit the transaction based onb the value
* of the onePhase flag.
* <p>
* This will have the effect of passing all messages through
*
* @param id - the xa transaction identity
* @param onePhase - treu if it is a one phase commit
* @throws XAException - if there is a problem completing the call
*/
public synchronized void commit(Xid id, boolean onePhase)
throws XAException {
// check that the xid is not null
if (id == null) {
throw new XAException(XAException.XAER_NOTA);
}
// covert to our internal representation of an xid
ExternalXid xid = new ExternalXid(id);
// check to see that the transaction is active and open. We should
// not be allowed to commit a committed transaction.
if (!isTransactionActive(xid)) {
throw new XAException(XAException.XAER_PROTO);
}
// process all the messages associated with this global transaction
// If a message has been published then sent it to the message mgr
// for processing. If a message has been consumed then remove it
// from the list of unconsumed messages.
Connection connection = null;
try {
// get a connection to the database
connection = DatabaseService.getConnection();
// retrieve a list of recrods for the specified global transaction
// and process them. Ignore the state records and only process the
// data records, which are of type TransacitonalObjectWrapper.
Object[] records = getTransactionRecords(xid, _rid);
for (int index = 0; index < records.length; index++) {
if (records[index] instanceof TransactionalObjectWrapper) {
TransactionalObjectWrapper wrapper =
(TransactionalObjectWrapper) records[index];
if (wrapper.isPublishedMessage()) {
// send the published message to the message manager
MessageMgr.instance().add(connection,
(MessageImpl) wrapper.getObject());
} else if (wrapper.isReceivedMessage()) {
// if it is a received message handle then simply
// delete it and mark it as acknowledged
MessageHandle handle = ((ReceivedMessageWrapper) (wrapper)).getMessageHandle();
if (handle.isPersistent()) {
handle.destroy(connection);
} else {
handle.destroy();
}
}
} else {
// ignore since it is a state records.
}
}
connection.commit();
} catch (PersistenceException exception) {
SQLHelper.rollback(connection);
throw new XAException("Failed in ResourceManager.commit : " +
exception.toString());
} catch (Exception exception) {
throw new XAException("Failed in ResourceManager.commit : " +
exception.toString());
} finally {
SQLHelper.close(connection);
// and now mark the transaction as closed
try {
logTransactionState(xid, TransactionState.CLOSED);
} catch (Exception exception) {
throw new XAException("Error processing commit : " + exception);
}
}
}
/**
* Ends the work performed on behalf of a transaction branch. The resource
* manager disassociates the XA resource from the transaction branch
* specified and let the transaction be completedCommits an XA transaction
* that is in progress.
*
* @param id - the xa transaction identity
* @param flags - one of TMSUCCESS, TMFAIL, or TMSUSPEND
* @throws XAException - if there is a problem completing the call
*/
public synchronized void end(Xid id, int flags)
throws XAException {
//check the xid is not null
if (id == null) {
throw new XAException(XAException.XAER_NOTA);
}
// covert to our internal representation of an xid
ExternalXid xid = new ExternalXid(id);
// check that the flags are valid for this method
if ((flags != XAResource.TMSUSPEND) ||
(flags != XAResource.TMSUCCESS) ||
(flags != XAResource.TMFAIL)) {
throw new XAException(XAException.XAER_PROTO);
}
switch (flags) {
case XAResource.TMFAIL:
// check that the transaction exists
if (!isTransactionActive(xid)) {
throw new XAException(XAException.XAER_PROTO);
}
// do not process that associated data, simply rollback
rollback(xid);
break;
case XAResource.TMSUSPEND:
// check that the transaction is opened
if (!isTransactionActive(xid)) {
throw new XAException(XAException.XAER_PROTO);
}
break;
case XAResource.TMSUCCESS:
// nothing to do here but check that the resource manager is
// in a consistent state wrt to this xid. The xid should not
// be active if it received the commit, forget etc.
if (isTransactionActive(xid)) {
throw new XAException(XAException.XAER_PROTO);
}
break;
}
}
/**
* Tell the resource manager to forget about a heuristically completed
* transaction branch.
*
* @param id - the xa transaction identity
* @throws XAException - if there is a problem completing the call
*/
public synchronized void forget(Xid id)
throws XAException {
//check the xid is not null
if (id == null) {
throw new XAException(XAException.XAER_NOTA);
}
// covert to our internal representation of an xid
ExternalXid xid = new ExternalXid(id);
// check to see that the xid actually exists
if (!isTransactionActive(xid)) {
throw new XAException(XAException.XAER_PROTO);
}
// call rollback to complete the work
rollback(id);
}
/**
* Return the transaction timeout for this instance of the resource
* manager.
*
* @return int - the timeout in seconds
* @throws XAException - if there is a problem completing the call
*/
public synchronized int getTransactionTimeout()
throws XAException {
return _txExpiryTime;
}
/**
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -