001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.aspects.txlock;
023:
024: import org.jboss.logging.Logger;
025: import org.jboss.util.deadlock.DeadlockDetector;
026: import org.jboss.util.deadlock.Resource;
027:
028: import javax.transaction.Status;
029: import javax.transaction.Synchronization;
030: import javax.transaction.Transaction;
031: import java.util.HashMap;
032: import java.util.LinkedList;
033:
034: /**
035: * This class is holds threads awaiting the transactional lock to be free
036: * in a fair FIFO transactional queue. Non-transactional threads
037: * are also put in this wait queue as well. This class pops the next waiting transaction from the queue
038: * and notifies only those threads waiting associated with that transaction.
039: *
040: * As of 04/10/2002, you can now specify in jboss.xml method attributes that define
041: * methods as read-only. read-only methods(and read-only beans) will release transactional
042: * locks at the end of the invocation. This decreases likelyhood of deadlock and increases
043: * performance.
044: *
045: * FIXME marcf: we should get solid numbers on this locking, bench in multi-thread environments
046: * We need someone with serious SUN hardware to run this lock into the ground
047: *
048: * @author <a href="marc.fleury@jboss.org">Marc Fleury</a>
049: * @author <a href="bill@burkecentral.com">Bill Burke</a>
050: *
051: * @version $Revision: 57186 $
052: */
053: public class QueuedTxLock implements Resource {
054: public static final String TXLOCK = "TxLock";
055: public static final String TIMEOUT = "timeout";
056:
057: private HashMap txLocks = new HashMap();
058: private LinkedList txWaitQueue = new LinkedList();
059:
060: /** Transaction holding lock on bean */
061: private Transaction tx = null;
062: private boolean synched = false;
063: private boolean isSynchronized = false;
064: private Logger log = Logger.getLogger(this .getClass());
065:
066: public Object getResourceHolder() {
067: return tx;
068: }
069:
070: public void sync() throws InterruptedException {
071: synchronized (this ) {
072: while (synched) {
073: this .wait();
074: }
075: synched = true;
076: }
077: }
078:
079: public void releaseSync() {
080: synchronized (this ) {
081: synched = false;
082: this .notify();
083: }
084: }
085:
086: /**
087: * The setTransaction associates a transaction with the lock.
088: * The current transaction is associated by the schedule call.
089: */
090: public void setTransaction(Transaction tx) {
091: this .tx = tx;
092: }
093:
094: public Transaction getTransaction() {
095: return tx;
096: }
097:
098: private class TxLock {
099:
100: public Transaction waitingTx;
101: public String threadName;
102: public boolean isQueued;
103:
104: public TxLock(Transaction trans) {
105: this .threadName = Thread.currentThread().toString();
106: this .waitingTx = trans;
107: this .isQueued = true;
108: }
109:
110: public boolean equals(Object obj) {
111: if (obj == this )
112: return true;
113:
114: TxLock lock = (TxLock) obj;
115:
116: return lock.waitingTx.equals(this .waitingTx);
117: }
118:
119: public int hashCode() {
120: return waitingTx.hashCode();
121: }
122:
123: public String toString() {
124: StringBuffer buffer = new StringBuffer(100);
125: buffer.append("TXLOCK waitingTx=").append(waitingTx);
126: buffer.append(" thread=").append(threadName);
127: buffer.append(" queued=").append(isQueued);
128: return buffer.toString();
129: }
130: }
131:
132: protected TxLock getTxLock(Transaction miTx) {
133: TxLock lock = null;
134: TxLock key = new TxLock(miTx);
135: lock = (TxLock) txLocks.get(key);
136: if (lock == null) {
137: txLocks.put(key, key);
138: txWaitQueue.addLast(key);
139: lock = key;
140: }
141: return lock;
142: }
143:
144: protected boolean isTxExpired(Transaction miTx) throws Exception {
145: if (miTx != null
146: && miTx.getStatus() == Status.STATUS_MARKED_ROLLBACK) {
147: return true;
148: }
149: return false;
150: }
151:
152: public boolean lockNoWait(Transaction transaction) throws Exception {
153: this .sync();
154: if (log.isTraceEnabled())
155: log
156: .trace("lockNoWait tx=" + transaction + " "
157: + toString());
158: try {
159: // And are we trying to enter with another transaction?
160: if (getTransaction() != null
161: && !getTransaction().equals(transaction)) {
162: return false;
163: }
164: setTransaction(transaction);
165: return true;
166: } finally {
167: this .releaseSync();
168: }
169: }
170:
171: /**
172: * doSchedule(Invocation)
173: *
174: * doSchedule implements a particular policy for scheduling the threads coming in.
175: * There is always the spec required "serialization" but we can add custom scheduling in here
176: *
177: * Synchronizing on lock: a failure to get scheduled must result in a wait() call and a
178: * release of the lock. Schedulation must return with lock.
179: *
180: */
181: public void schedule(Transaction miTx,
182: org.jboss.aop.joinpoint.Invocation mi) throws Exception {
183: boolean trace = log.isTraceEnabled();
184: this .sync();
185: try {
186: if (trace)
187: log.trace("Begin schedule");
188:
189: if (isTxExpired(miTx)) {
190: log.error("Saw rolled back tx=" + miTx);
191: throw new RuntimeException(
192: "Transaction marked for rollback, possibly a timeout");
193: }
194:
195: //Next test is independent of whether the context is locked or not, it is purely transactional
196: // Is the instance involved with another transaction? if so we implement pessimistic locking
197: waitForTx(mi, miTx, trace);
198: if (!isSynchronized) {
199: isSynchronized = true;
200: miTx
201: .registerSynchronization(new TxLockSynchronization());
202: }
203: } finally {
204: this .releaseSync();
205: }
206:
207: }
208:
209: /**
210: * Wait until no other transaction is running with this lock.
211: *
212: * @return Returns true if this thread was scheduled in txWaitQueue
213: */
214: protected void waitForTx(org.jboss.aop.joinpoint.Invocation mi,
215: Transaction miTx, boolean trace) throws Exception {
216: boolean wasScheduled = false;
217: // Do we have a running transaction with the context?
218: // We loop here until either until success or until transaction timeout
219: // If we get out of the loop successfully, we can successfully
220: // set the transaction on this puppy.
221: TxLock txLock = null;
222: while (getTransaction() != null &&
223: // And are we trying to enter with another transaction?
224: !getTransaction().equals(miTx)) {
225: // Check for a deadlock on every cycle
226: try {
227: DeadlockDetector.singleton
228: .deadlockDetection(miTx, this );
229: } catch (Exception e) {
230: // We were queued, not any more
231: if (txLock != null && txLock.isQueued) {
232: txLocks.remove(txLock);
233: txWaitQueue.remove(txLock);
234: }
235: throw e;
236: }
237:
238: wasScheduled = true;
239: // That's no good, only one transaction per context
240: // Let's put the thread to sleep the transaction demarcation will wake them up
241: if (trace)
242: log.trace("Transactional contention on context miTx="
243: + miTx + " " + toString());
244:
245: if (txLock == null)
246: txLock = getTxLock(miTx);
247:
248: if (trace)
249: log.trace("Begin wait on " + txLock + " " + toString());
250:
251: // And lock the threads on the lock corresponding to the Tx in MI
252: synchronized (txLock) {
253: releaseSync();
254: try {
255: int txTimeout = 0;
256: Integer timeout = (Integer) mi.getMetaData(TXLOCK,
257: TIMEOUT);
258: if (timeout != null)
259: txTimeout = timeout.intValue();
260: txLock.wait(txTimeout);
261: } catch (InterruptedException ignored) {
262: }
263: } // end synchronized(txLock)
264:
265: this .sync();
266:
267: if (trace)
268: log.trace("End wait on " + txLock + " " + toString());
269: if (isTxExpired(miTx)) {
270: log.error(Thread.currentThread()
271: + "Saw rolled back tx=" + miTx
272: + " waiting for txLock");
273: if (txLock.isQueued) {
274: // Remove the TxLock from the queue because this thread is exiting.
275: // Don't worry about notifying other threads that share the same transaction.
276: // They will timeout and throw the below RuntimeException
277: txLocks.remove(txLock);
278: txWaitQueue.remove(txLock);
279: } else if (getTransaction() != null
280: && getTransaction().equals(miTx)) {
281: // We're not qu
282: nextTransaction(trace);
283: }
284: if (miTx != null) {
285: DeadlockDetector.singleton.removeWaiting(miTx);
286: }
287: throw new RuntimeException(
288: "Transaction marked for rollback, possibly a timeout");
289: }
290: } // end while(tx!=miTx)
291:
292: // If we get here, this means that we have the txlock
293: if (!wasScheduled)
294: setTransaction(miTx);
295: return;
296: }
297:
298: /*
299: * nextTransaction()
300: *
301: * nextTransaction will
302: * - set the current tx to null
303: * - schedule the next transaction by notifying all threads waiting on the transaction
304: * - setting the thread with the new transaction so there is no race with incoming calls
305: */
306: protected void nextTransaction(boolean trace) {
307: if (!synched) {
308: throw new IllegalStateException(
309: "do not call nextTransaction while not synched!");
310: }
311:
312: setTransaction(null);
313: // is there a waiting list?
314: TxLock thelock = null;
315: if (!txWaitQueue.isEmpty()) {
316: thelock = (TxLock) txWaitQueue.removeFirst();
317: txLocks.remove(thelock);
318: thelock.isQueued = false;
319: // The new transaction is the next one, important to set it up to avoid race with
320: // new incoming calls
321: if (thelock.waitingTx != null)
322: DeadlockDetector.singleton
323: .removeWaiting(thelock.waitingTx);
324: setTransaction(thelock.waitingTx);
325: synchronized (thelock) {
326: // notify All threads waiting on this transaction.
327: // They will enter the methodLock wait loop.
328: thelock.notifyAll();
329: }
330: }
331: if (trace)
332: log.trace("nextTransaction: " + thelock + " " + toString());
333: }
334:
335: public void endTransaction() {
336: boolean trace = log.isTraceEnabled();
337: if (trace)
338: log.trace("endTransaction: " + toString());
339: nextTransaction(trace);
340: }
341:
342: public void endInvocation(Transaction thetx) {
343: if (log.isTraceEnabled())
344: log
345: .trace("endInvocation: miTx=" + thetx + " "
346: + toString());
347: }
348:
349: public String toString() {
350: StringBuffer buffer = new StringBuffer(100);
351: buffer.append(" hash=").append(hashCode());
352: buffer.append(" tx=").append(getTransaction());
353: buffer.append(" synched=").append(synched);
354: buffer.append(" queue=").append(txWaitQueue);
355: return buffer.toString();
356: }
357:
358: private final class TxLockSynchronization implements
359: Synchronization {
360: public void beforeCompletion() {
361: }
362:
363: public void afterCompletion(int status) {
364: try {
365: sync();
366: } catch (InterruptedException ignored) {
367: }
368: isSynchronized = false;
369: endTransaction();
370: releaseSync();
371: }
372: }
373: }
|