?? jobstoresupport.java
字號:
// Protect connection attributes we might change.
conn = getAttributeRestoringConnection(conn);
// Set any connection connection attributes we are to override.
try {
if (!isDontSetAutoCommitFalse()) {
conn.setAutoCommit(false);
}
if(isTxIsolationLevelSerializable()) {
conn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
}
} catch (SQLException sqle) {
getLog().warn("Failed to override connection auto commit/transaction isolation.", sqle);
} catch (Throwable e) {
try { conn.close(); } catch(Throwable tt) {}
throw new JobPersistenceException(
"Failure setting up connection.", e);
}
return conn;
}
protected void releaseLock(Connection conn, String lockName, boolean doIt) {
if (doIt && conn != null) {
try {
getLockHandler().releaseLock(conn, lockName);
} catch (LockException le) {
getLog().error("Error returning lock: " + le.getMessage(), le);
}
}
}
/**
* Removes all volatile data.
*
* @throws JobPersistenceException If jobs could not be recovered.
*/
protected void cleanVolatileTriggerAndJobs()
throws JobPersistenceException {
executeInNonManagedTXLock(
LOCK_TRIGGER_ACCESS,
new VoidTransactionCallback() {
public void execute(Connection conn) throws JobPersistenceException {
cleanVolatileTriggerAndJobs(conn);
}
});
}
/**
* <p>
* Removes all volatile data.
* </p>
*
* @throws JobPersistenceException
* if jobs could not be recovered
*/
protected void cleanVolatileTriggerAndJobs(Connection conn)
throws JobPersistenceException {
try {
// find volatile jobs & triggers...
Key[] volatileTriggers = getDelegate().selectVolatileTriggers(conn);
Key[] volatileJobs = getDelegate().selectVolatileJobs(conn);
for (int i = 0; i < volatileTriggers.length; i++) {
removeTrigger(conn, null, volatileTriggers[i].getName(),
volatileTriggers[i].getGroup());
}
getLog().info(
"Removed " + volatileTriggers.length
+ " Volatile Trigger(s).");
for (int i = 0; i < volatileJobs.length; i++) {
removeJob(conn, null, volatileJobs[i].getName(),
volatileJobs[i].getGroup(), true);
}
getLog().info(
"Removed " + volatileJobs.length + " Volatile Job(s).");
// clean up any fired trigger entries
getDelegate().deleteVolatileFiredTriggers(conn);
} catch (Exception e) {
throw new JobPersistenceException("Couldn't clean volatile data: "
+ e.getMessage(), e);
}
}
/**
* Recover any failed or misfired jobs and clean up the data store as
* appropriate.
*
* @throws JobPersistenceException if jobs could not be recovered
*/
protected void recoverJobs() throws JobPersistenceException {
executeInNonManagedTXLock(
LOCK_TRIGGER_ACCESS,
new VoidTransactionCallback() {
public void execute(Connection conn) throws JobPersistenceException {
recoverJobs(conn);
}
});
}
/**
* <p>
* Will recover any failed or misfired jobs and clean up the data store as
* appropriate.
* </p>
*
* @throws JobPersistenceException
* if jobs could not be recovered
*/
protected void recoverJobs(Connection conn) throws JobPersistenceException {
try {
// update inconsistent job states
int rows = getDelegate().updateTriggerStatesFromOtherStates(conn,
STATE_WAITING, STATE_ACQUIRED, STATE_BLOCKED);
rows += getDelegate().updateTriggerStatesFromOtherStates(conn,
STATE_PAUSED, STATE_PAUSED_BLOCKED, STATE_PAUSED_BLOCKED);
getLog().info(
"Freed " + rows
+ " triggers from 'acquired' / 'blocked' state.");
// clean up misfired jobs
recoverMisfiredJobs(conn, true);
// recover jobs marked for recovery that were not fully executed
Trigger[] recoveringJobTriggers = getDelegate()
.selectTriggersForRecoveringJobs(conn);
getLog()
.info(
"Recovering "
+ recoveringJobTriggers.length
+ " jobs that were in-progress at the time of the last shut-down.");
for (int i = 0; i < recoveringJobTriggers.length; ++i) {
if (jobExists(conn, recoveringJobTriggers[i].getJobName(),
recoveringJobTriggers[i].getJobGroup())) {
recoveringJobTriggers[i].computeFirstFireTime(null);
storeTrigger(conn, null, recoveringJobTriggers[i], null, false,
STATE_WAITING, false, true);
}
}
getLog().info("Recovery complete.");
// remove lingering 'complete' triggers...
Key[] ct = getDelegate().selectTriggersInState(conn, STATE_COMPLETE);
for(int i=0; ct != null && i < ct.length; i++) {
removeTrigger(conn, null, ct[i].getName(), ct[i].getGroup());
}
getLog().info(
"Removed " + (ct != null ? ct.length : 0)
+ " 'complete' triggers.");
// clean up any fired trigger entries
int n = getDelegate().deleteFiredTriggers(conn);
getLog().info("Removed " + n + " stale fired job entries.");
} catch (JobPersistenceException e) {
throw e;
} catch (Exception e) {
throw new JobPersistenceException("Couldn't recover jobs: "
+ e.getMessage(), e);
}
}
protected long getMisfireTime() {
long misfireTime = System.currentTimeMillis();
if (getMisfireThreshold() > 0) {
misfireTime -= getMisfireThreshold();
}
return (misfireTime > 0) ? misfireTime : 0;
}
/**
* Helper class for returning the composite result of trying
* to recover misfired jobs.
*/
protected static class RecoverMisfiredJobsResult {
public static final RecoverMisfiredJobsResult NO_OP =
new RecoverMisfiredJobsResult(false, 0, Long.MAX_VALUE);
private boolean _hasMoreMisfiredTriggers;
private int _processedMisfiredTriggerCount;
private long _earliestNewTime;
public RecoverMisfiredJobsResult(
boolean hasMoreMisfiredTriggers, int processedMisfiredTriggerCount, long earliestNewTime) {
_hasMoreMisfiredTriggers = hasMoreMisfiredTriggers;
_processedMisfiredTriggerCount = processedMisfiredTriggerCount;
_earliestNewTime = earliestNewTime;
}
public boolean hasMoreMisfiredTriggers() {
return _hasMoreMisfiredTriggers;
}
public int getProcessedMisfiredTriggerCount() {
return _processedMisfiredTriggerCount;
}
public long getEarliestNewTime() {
return _earliestNewTime;
}
}
protected RecoverMisfiredJobsResult recoverMisfiredJobs(
Connection conn, boolean recovering)
throws JobPersistenceException, SQLException {
// If recovering, we want to handle all of the misfired
// triggers right away.
int maxMisfiresToHandleAtATime =
(recovering) ? -1 : getMaxMisfiresToHandleAtATime();
List misfiredTriggers = new ArrayList();
long earliestNewTime = Long.MAX_VALUE;
// We must still look for the MISFIRED state in case triggers were left
// in this state when upgrading to this version that does not support it.
boolean hasMoreMisfiredTriggers =
getDelegate().selectMisfiredTriggersInStates(
conn, STATE_MISFIRED, STATE_WAITING, getMisfireTime(),
maxMisfiresToHandleAtATime, misfiredTriggers);
if (hasMoreMisfiredTriggers) {
getLog().info(
"Handling the first " + misfiredTriggers.size() +
" triggers that missed their scheduled fire-time. " +
"More misfired triggers remain to be processed.");
} else if (misfiredTriggers.size() > 0) {
getLog().info(
"Handling " + misfiredTriggers.size() +
" trigger(s) that missed their scheduled fire-time.");
} else {
getLog().debug(
"Found 0 triggers that missed their scheduled fire-time.");
return RecoverMisfiredJobsResult.NO_OP;
}
for (Iterator misfiredTriggerIter = misfiredTriggers.iterator(); misfiredTriggerIter.hasNext();) {
Key triggerKey = (Key) misfiredTriggerIter.next();
Trigger trig =
retrieveTrigger(conn, triggerKey.getName(), triggerKey.getGroup());
if (trig == null) {
continue;
}
doUpdateOfMisfiredTrigger(conn, null, trig, false, STATE_WAITING, recovering);
if(trig.getNextFireTime() != null && trig.getNextFireTime().getTime() < earliestNewTime)
earliestNewTime = trig.getNextFireTime().getTime();
signaler.notifyTriggerListenersMisfired(trig);
}
return new RecoverMisfiredJobsResult(
hasMoreMisfiredTriggers, misfiredTriggers.size(), earliestNewTime);
}
protected boolean updateMisfiredTrigger(Connection conn,
SchedulingContext ctxt, String triggerName, String groupName,
String newStateIfNotComplete, boolean forceState) // TODO: probably
// get rid of
// this
throws JobPersistenceException {
try {
Trigger trig = getDelegate().selectTrigger(conn, triggerName,
groupName);
long misfireTime = System.currentTimeMillis();
if (getMisfireThreshold() > 0) {
misfireTime -= getMisfireThreshold();
}
if (trig.getNextFireTime().getTime() > misfireTime) {
return false;
}
doUpdateOfMisfiredTrigger(conn, ctxt, trig, forceState, newStateIfNotComplete, false);
signaler.notifySchedulerListenersFinalized(trig);
return true;
} catch (Exception e) {
throw new JobPersistenceException(
"Couldn't update misfired trigger '" + groupName + "."
+ triggerName + "': " + e.getMessage(), e);
}
}
private void doUpdateOfMisfiredTrigger(Connection conn, SchedulingContext ctxt, Trigger trig, boolean forceState, String newStateIfNotComplete, boolean recovering) throws JobPersistenceException {
Calendar cal = null;
if (trig.getCalendarName() != null) {
cal = retrieveCalendar(conn, ctxt, trig.getCalendarName());
}
signaler.notifyTriggerListenersMisfired(trig);
trig.updateAfterMisfire(cal);
if (trig.getNextFireTime() == null) {
storeTrigger(conn, ctxt, trig,
null, true, STATE_COMPLETE, forceState, recovering);
} else {
storeTrigger(conn, ctxt, trig, null, true, newStateIfNotComplete,
forceState, false);
}
}
/**
* <p>
* Store the given <code>{@link org.quartz.JobDetail}</code> and <code>{@link org.quartz.Trigger}</code>.
* </p>
*
* @param newJob
* The <code>JobDetail</code> to be stored.
* @param newTrigger
* The <code>Trigger</code> to be stored.
* @throws ObjectAlreadyExistsException
* if a <code>Job</code> with the same name/group already
* exists.
*/
public void storeJobAndTrigger(final SchedulingContext ctxt, final JobDetail newJob,
final Trigger newTrigger)
throws ObjectAlreadyExistsException, JobPersistenceException {
executeInLock(
(isLockOnInsert()) ? LOCK_TRIGGER_ACCESS : null,
new VoidTransactionCallback() {
public void execute(Connection conn) throws JobPersistenceException {
if (newJob.isVolatile() && !newTrigger.isVolatile()) {
JobPersistenceException jpe =
new JobPersistenceException(
"Cannot associate non-volatile trigger with a volatile job!");
jpe.setErrorCode(SchedulerException.ERR_CLIENT_ERROR);
throw jpe;
}
storeJob(conn, ctxt, newJob, false);
storeTrigger(conn, ctxt, newTrigger, newJob, false,
Constants.STATE_WAITING, false, false);
}
});
}
/**
* <p>
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -