?? resourcemanager.java
字號:
* Ask the resource manager to prepare for a transaction commit of the
* transaction specified in xid
*
* @param xares
* @return int - XA_RDONLY or XA_OK
* @throws XAException - if there is a problem completing the call
*/
public synchronized boolean isSameRM(XAResource xares)
throws XAException {
boolean result = false;
if ((xares == this) ||
((xares instanceof ResourceManager) &&
(((ResourceManager) xares)._rid.equals(_rid)))) {
result = true;
}
return result;
}
/**
* Obtain a list of prepared transaction branches from a resource manager.
* The transaction manager calls this method during recovery to obtain the
* list of transaction branches that are currently in prepared or
* heuristically completed states.
*
* @throws XAException - if there is a problem completing the call
*/
public synchronized int prepare(Xid id)
throws XAException {
//check the xid is not null
if (id == null) {
throw new XAException(XAException.XAER_NOTA);
}
// covert to our internal representation of an xid
ExternalXid xid = new ExternalXid(id);
// check to see that the xid actually exists
if (!isTransactionActive(xid)) {
throw new XAException(XAException.XAER_PROTO);
}
// can a prepare for the same resource occur multiple times
// ????
try {
logTransactionState(xid, TransactionState.PREPARED);
} catch (Exception exception) {
throw new XAException("Error processing prepare : " + exception);
}
return XAResource.XA_OK;
}
/**
* Inform the resource manager to roll back work done on behalf of a
* transaction branch
*
* @throws XAException - if there is a problem completing the call
*/
public synchronized Xid[] recover(int flag)
throws XAException {
Xid[] result = new Xid[0];
if ((flag == XAResource.TMNOFLAGS) ||
(flag == XAResource.TMSTARTRSCAN) ||
(flag == XAResource.TMENDRSCAN)) {
LinkedList xids = new LinkedList();
Iterator iter = _activeTransactions.keySet().iterator();
while (iter.hasNext()) {
Xid xid = (Xid) iter.next();
LinkedList list = (LinkedList) _activeTransactions.get(xid);
if (list.size() > 1) {
// need at least a start in the chain.
Object last = list.getLast();
if ((last instanceof StateTransactionLogEntry) &&
(((StateTransactionLogEntry) last).getState().isPrepared())) {
xids.add(xid);
}
}
}
result = (Xid[]) xids.toArray();
}
return result;
}
/**
* Set the current transaction timeout value for this XAResource instance.
*
* @throws XAException - if there is a problem completing the call
*/
public synchronized void rollback(Xid id)
throws XAException {
//check the xid is not null
if (id == null) {
throw new XAException(XAException.XAER_NOTA);
}
// covert to our internal representation of an xid
ExternalXid xid = new ExternalXid(id);
// check to see that the xid actually exists
if (!isTransactionActive(xid)) {
throw new XAException(XAException.XAER_PROTO);
}
// process the data in that transaction. If it was a published message
// then drop it. If it was a consumed message then return it back to
// the destination.
Connection connection = null;
try {
// get a connection to the database
connection = DatabaseService.getConnection();
// retrieve a list of recrods for the specified global transaction
// and process them. Ignore the state records and only process the
// data records, which are of type TransacitonalObjectWrapper.
Object[] records = getTransactionRecords(xid, _rid);
for (int index = 0; index < records.length; index++) {
if (records[index] instanceof TransactionalObjectWrapper) {
TransactionalObjectWrapper wrapper =
(TransactionalObjectWrapper) records[index];
if (wrapper.isPublishedMessage()) {
// we don't need to process these messages since the
// global transaction has been rolled back.
} else if (wrapper.isReceivedMessage()) {
ReceivedMessageWrapper rmsg_wrapper =
(ReceivedMessageWrapper) wrapper;
MessageHandle handle =
(MessageHandle) rmsg_wrapper.getObject();
DestinationManager mgr = DestinationManager.instance();
DestinationCache cache =
mgr.getDestinationCache(handle.getDestination());
cache.returnMessageHandle(handle);
}
} else {
// ignore since it is a state records.
}
}
connection.commit();
} catch (PersistenceException exception) {
if (connection != null) {
try {
connection.rollback();
} catch (Exception nested) {
// ignore
}
}
throw new XAException("Failed in ResourceManager.rollback : " +
exception.toString());
} catch (Exception exception) {
throw new XAException("Failed in ResourceManager.rollback : " +
exception.toString());
} finally {
if (connection != null) {
try {
connection.close();
} catch (Exception nested) {
// ignore
}
}
// and now mark the transaction as closed
try {
logTransactionState(xid, TransactionState.CLOSED);
} catch (Exception exception) {
throw new XAException("Error processing rollback : " + exception);
}
}
}
/**
* Start work on behalf of a transaction branch specified in xid If TMJOIN
* is specified, the start is for joining a transaction previously seen by
* the resource manager
*
* @throws XAException - if there is a problem completing the call
*/
public synchronized boolean setTransactionTimeout(int seconds)
throws XAException {
_txExpiryTime = seconds;
return true;
}
// implementation of XAResource.start
public synchronized void start(Xid id, int flags)
throws XAException {
//check the xid is not null
if (id == null) {
throw new XAException(XAException.XAER_NOTA);
}
// covert to our internal representation of an xid
ExternalXid xid = new ExternalXid(id);
// check that the flags are valid for this method
if ((flags != XAResource.TMNOFLAGS) ||
(flags != XAResource.TMJOIN) ||
(flags != XAResource.TMRESUME)) {
throw new XAException(XAException.XAER_PROTO);
}
switch (flags) {
case XAResource.TMNOFLAGS:
// check to see that the xid does not already exist
if (isTransactionActive(xid)) {
throw new XAException(XAException.XAER_DUPID);
}
// otherwise log the start of the transaction
try {
logTransactionState(xid, TransactionState.OPENED);
} catch (Exception exception) {
throw new XAException("Error processing start : " + exception);
}
break;
case XAResource.TMJOIN:
case XAResource.TMRESUME:
// joining a transaction previously seen by the resource
// manager
if (!isTransactionActive(xid)) {
throw new XAException(XAException.XAER_PROTO);
}
break;
}
}
// override ServiceManager.start
public void start()
throws ServiceException {
this.setState(ServiceState.RUNNING);
}
// override ServiceManager.stop
public void stop()
throws ServiceException {
this.setState(ServiceState.STOPPED);
}
// override ServiceManager.run
public void run() {
// do nothing
}
/**
* Return the resource manager identity
*
* @return the resource manager identity
*/
public String getResourceManagerId() {
return _rid;
}
/**
* Create the next {@link TransactionLog} and add it to the list of
* managed transaction logs.
* <p>
* The method will throw ResourceManagerException if there is a
* problem completing the request.
*
* @throws ResourceManagerException
*/
protected TransactionLog createNextTransactionLog()
throws ResourceManagerException {
TransactionLog newlog = null;
synchronized (_logs) {
try {
// get the last log number
long last = 1;
if (!_logs.isEmpty()) {
last = getSequenceNumber(((TransactionLog) _logs.last()).getName());
}
// now that we have the last log number, increment it and use
// it to build the name of the next log file.
String name = _logDirectory + System.getProperty("file.separator") +
RM_LOGFILE_PREFIX + Long.toString(++last) + RM_LOGFILE_EXTENSION;
// create a transaction log and add it to the collection
newlog = new TransactionLog(name, true);
_logs.add(newlog);
} catch (TransactionLogException exception) {
throw new ResourceManagerException(
"Error in createNextTransactionLog " + exception);
}
}
return newlog;
}
/**
* Build a list of all log files in the specified log directory
*
* @throws IllegalArgumentException - if the directory does not exist.
*/
protected void buildLogFileList() {
File dir = new File(_logDirectory);
if ((!dir.exists()) ||
(!dir.isDirectory())) {
throw new IllegalArgumentException(_logDirectory +
" is not a directory");
}
try {
File[] list = dir.listFiles(new FilenameFilter() {
// implementation of FilenameFilter.accept
public boolean accept(File dir, String name) {
boolean result = false;
if ((name.startsWith(RM_LOGFILE_PREFIX)) &&
(name.endsWith(RM_LOGFILE_EXTENSION))) {
result = true;
}
return result;
}
});
// add the files to the list
synchronized (_logs) {
for (int index = 0; index < list.length; index++) {
_logs.add(new TransactionLog(list[index].getPath(), false));
}
}
} catch (Exception exception) {
// replace this with the exception strategy
exception.printStackTrace();
}
}
/**
* This method will process all the transaction logs, in the log diretory
* and call recover on each of them.
*
* @throws ResourceManagerException - if there is a problem recovering
*/
private synchronized void recover()
throws ResourceManagerException {
try {
if (!_logs.isEmpty()) {
Iterator iter = _logs.iterator();
while (iter.hasNext()) {
TransactionLog log = (TransactionLog) iter.next();
HashMap records = log.recover();
}
}
} catch (Exception exception) {
throw new ResourceManagerException("Error in recover " +
exception.toString());
}
}
/**
* Retrieve the transaction log for the specified transaction id
*
* @param txid - the transaction identity
* @return TransactionLog
* @throws TransactionLogException - if there is tx log exception
* @throws ResourceManagerException - if there is a resource problem.
*/
private TransactionLog getTransactionLog(ExternalXid txid)
throws TransactionLogException, ResourceManagerException {
TransactionLog log = (TransactionLog) _tridToLogCache.get(txid);
if (log == null) {
log = getCurrentTransactionLog();
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -