?? transactionmanager.java
字號:
/* * Copyright (c) 2004 jPOS.org * * See terms of license at http://jpos.org/license.html * */package org.jpos.q2.transaction;import java.io.Serializable;import java.util.Map;import java.util.List;import java.util.HashMap;import java.util.ArrayList;import java.util.Iterator;import java.util.StringTokenizer;import org.jpos.util.NameRegistrar;import org.jdom.Element;import org.jpos.space.Space;import org.jpos.space.JDBMSpace;import org.jpos.space.SpaceFactory;import org.jpos.space.SpaceUtil;import org.jpos.q2.QFactory;import org.jpos.q2.QBeanSupport;import org.jpos.core.ConfigurationException;import org.jpos.util.Logger;import org.jpos.util.LogEvent;import org.jpos.transaction.TransactionConstants;import org.jpos.transaction.TransactionParticipant;import org.jpos.transaction.GroupSelector;import org.jpos.transaction.AbortParticipant;import org.jpos.transaction.ContextRecovery;public class TransactionManager extends QBeanSupport implements Runnable, TransactionConstants, TransactionManagerMBean{ Space sp; Space psp; String queue; String tailLock; Map groups; long head, tail, lastGC; public static final String HEAD = "$HEAD"; public static final String TAIL = "$TAIL"; public static final String CONTEXT = "$CONTEXT."; public static final String STATE = "$STATE."; public static final String GROUPS = "$GROUPS."; public static final String TAILLOCK = "$TAILLOCK"; public static final Integer PREPARING = new Integer (0); public static final Integer COMMITTING = new Integer (1); public static final Integer DONE = new Integer (2); public static final String DEFAULT_GROUP = ""; public static final long MAX_PARTICIPANTS = 1000; // loop prevention public void initService () throws ConfigurationException { queue = cfg.get ("queue", null); if (queue == null) throw new ConfigurationException ("queue property not specified"); sp = SpaceFactory.getSpace (cfg.get ("space")); psp = SpaceFactory.getSpace (cfg.get ("persistent-space")); tail = initCounter (TAIL, cfg.getLong ("initial-tail", 1)); head = Math.max (initCounter (HEAD, tail), tail); tailLock = TAILLOCK + "." + Integer.toString (this.hashCode()); initTailLock (); groups = new HashMap(); initParticipants (getPersist()); } public void startService () { NameRegistrar.register (getName (), this); recover (); long sessions = cfg.getLong ("sessions", 1); for (int i=0; i<sessions; i++) { Thread t = new Thread (this); t.setName (getName() + "-" + i); t.start (); } } public void stopService () { NameRegistrar.unregister (getName ()); long sessions = cfg.getLong ("sessions", 1); for (int i=0; i<sessions; i++) sp.out (queue, this, 60*1000); } public void queue (Serializable context) { sp.out (queue, context); } public void run () { long id; String threadName = Thread.currentThread().getName(); getLog().info (threadName + " start"); while (running()) { try { Object obj = sp.in (queue); if (obj == this) continue; // stopService ``hack'' if (!(obj instanceof Serializable)) { getLog().error ( "non serializable '" + obj.getClass().getName() + "' on queue '" + queue + "'" ); continue; } id = nextId (); List members = new ArrayList (); Serializable context = (Serializable) obj; snapshot (id, context, PREPARING); int action = prepare (id, context, members); switch (action) { case PREPARED: setState (id, COMMITTING); commit (id, context, members, false); break; case ABORTED: abort (id, context, members, false); break; case NO_JOIN: break; } snapshot (id, null, DONE); if (id == tail) { checkTail (); } } catch (Throwable t) { getLog().fatal (t); // should never happen } } getLog().info (threadName + " stop"); } public long getTail () { return tail; } public long getHead () { return head; } protected void commit (long id, Serializable context, List members, boolean recover) { Iterator iter = members.iterator(); while (iter.hasNext ()) { TransactionParticipant p = (TransactionParticipant) iter.next(); if (recover && p instanceof ContextRecovery) context = ((ContextRecovery) p).recover (id, context, true); commit (p, id, context); } } protected void abort (long id, Serializable context, List members, boolean recover) { Iterator iter = members.iterator(); while (iter.hasNext ()) { TransactionParticipant p = (TransactionParticipant) iter.next(); if (recover && p instanceof ContextRecovery) context = ((ContextRecovery) p).recover (id, context, false); abort (p, id, context); } } protected int prepareForAbort (TransactionParticipant p, long id, Serializable context) { try { if (p instanceof AbortParticipant) return ((AbortParticipant)p).prepareForAbort (id, context); } catch (Throwable t) { getLog().warn ("PREPARE-FOR-ABORT: " + Long.toString (id), t); } return ABORTED | NO_JOIN; } protected int prepare (TransactionParticipant p, long id, Serializable context) { try { return p.prepare (id, context); } catch (Throwable t) { getLog().warn ("PREPARE: " + Long.toString (id), t); } return ABORTED; } protected void commit (TransactionParticipant p, long id, Serializable context) { try { p.commit (id, context); } catch (Throwable t) { getLog().warn ("COMMIT: " + Long.toString (id), t); } } protected void abort (TransactionParticipant p, long id, Serializable context) { try { p.abort (id, context); } catch (Throwable t) { getLog().warn ("ABORT: " + Long.toString (id), t); } } protected int prepare (long id, Serializable context, List members) { boolean abort = false; Iterator iter = getParticipants (DEFAULT_GROUP).iterator(); for (int i=0; iter.hasNext (); i++) { int action = 0; if (i > MAX_PARTICIPANTS) { getLog().warn ( "loop detected - transaction " +id + " aborted." ); return ABORTED; } TransactionParticipant p = (TransactionParticipant) iter.next(); if (abort) { action = prepareForAbort (p, id, context); } else { action = prepare (p, id, context); abort = (action & PREPARED) == ABORTED; } if ((action & READONLY) == 0) { snapshot (id, context); } if ((action & NO_JOIN) == 0) { members.add (p); } if (p instanceof GroupSelector) { String groupName = ((GroupSelector)p).select (id, context); if (groupName != null) { StringTokenizer st = new StringTokenizer (groupName, " ,");
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -