001: /*
002: * $Id: AbstractResourceManager.java 8077 2007-08-27 20:15:25Z aperepel $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.util.xa;
012:
013: import org.mule.config.i18n.CoreMessages;
014:
015: import java.util.ArrayList;
016: import java.util.Collection;
017: import java.util.Collections;
018: import java.util.Iterator;
019:
020: import javax.transaction.Status;
021:
022: import org.apache.commons.logging.Log;
023:
024: /**
025: * This code is based on code coming from the <a
026: * href="http://jakarta.apache.org/commons/transaction/">commons-transaction</a>
027: * project.
028: */
029: public abstract class AbstractResourceManager {
030:
031: /**
032: * Shutdown mode: Wait for all transactions to complete
033: */
034: public static final int SHUTDOWN_MODE_NORMAL = 0;
035:
036: /**
037: * Shutdown mode: Try to roll back all active transactions
038: */
039: public static final int SHUTDOWN_MODE_ROLLBACK = 1;
040:
041: /**
042: * Shutdown mode: Try to stop active transaction <em>NOW</em>, do no rollbacks
043: */
044: public static final int SHUTDOWN_MODE_KILL = 2;
045:
046: protected static final int OPERATION_MODE_STOPPED = 0;
047: protected static final int OPERATION_MODE_STOPPING = 1;
048: protected static final int OPERATION_MODE_STARTED = 2;
049: protected static final int OPERATION_MODE_STARTING = 3;
050: protected static final int OPERATION_MODE_RECOVERING = 4;
051:
052: protected static final int DEFAULT_TIMEOUT_MSECS = 5000;
053: protected static final int DEFAULT_COMMIT_TIMEOUT_FACTOR = 2;
054:
055: protected Collection globalTransactions = Collections
056: .synchronizedCollection(new ArrayList());
057: protected int operationMode = OPERATION_MODE_STOPPED;
058: protected long defaultTimeout = DEFAULT_TIMEOUT_MSECS;
059: protected Log logger = getLogger();
060: protected boolean dirty = false;
061:
062: protected abstract Log getLogger();
063:
064: public synchronized void start()
065: throws ResourceManagerSystemException {
066: logger.info("Starting ResourceManager");
067: operationMode = OPERATION_MODE_STARTING;
068: // TODO: recover and sync
069: doStart();
070: recover();
071: // sync();
072: operationMode = OPERATION_MODE_STARTED;
073: if (dirty) {
074: logger
075: .warn("Started ResourceManager, but in dirty mode only (Recovery of pending transactions failed)");
076: } else {
077: logger.info("Started ResourceManager");
078: }
079: }
080:
081: protected void doStart() throws ResourceManagerSystemException {
082: // template method
083: }
084:
085: protected void recover() throws ResourceManagerSystemException {
086: // nothing to do (yet?)
087: }
088:
089: public synchronized void stop()
090: throws ResourceManagerSystemException {
091: stop(SHUTDOWN_MODE_NORMAL);
092: }
093:
094: public synchronized boolean stop(int mode)
095: throws ResourceManagerSystemException {
096: return stop(mode, getDefaultTransactionTimeout()
097: * DEFAULT_COMMIT_TIMEOUT_FACTOR);
098: }
099:
100: public synchronized boolean stop(int mode, long timeOut)
101: throws ResourceManagerSystemException {
102: logger.info("Stopping ResourceManager");
103: operationMode = OPERATION_MODE_STOPPING;
104: // TODO: sync
105: // sync();
106: boolean success = shutdown(mode, timeOut);
107: // TODO: release
108: // releaseGlobalOpenResources();
109: if (success) {
110: operationMode = OPERATION_MODE_STOPPED;
111: logger.info("Stopped ResourceManager");
112: } else {
113: logger.warn("Failed to stop ResourceManager");
114: }
115:
116: return success;
117: }
118:
119: protected boolean shutdown(int mode, long timeoutMSecs) {
120: switch (mode) {
121: case SHUTDOWN_MODE_NORMAL:
122: return waitForAllTxToStop(timeoutMSecs);
123: case SHUTDOWN_MODE_ROLLBACK:
124: throw new UnsupportedOperationException();
125: // return rollBackOrForward();
126: case SHUTDOWN_MODE_KILL:
127: return true;
128: default:
129: return false;
130: }
131: }
132:
133: /**
134: * Gets the default transaction timeout in <em>milliseconds</em>.
135: */
136: public long getDefaultTransactionTimeout() {
137: return defaultTimeout;
138: }
139:
140: /**
141: * Sets the default transaction timeout.
142: *
143: * @param timeout timeout in <em>milliseconds</em>
144: */
145: public void setDefaultTransactionTimeout(long timeout) {
146: defaultTimeout = timeout;
147: }
148:
149: /**
150: * Starts a new transaction and associates it with the current thread. All
151: * subsequent changes in the same thread made to the map are invisible from other
152: * threads until {@link #commitTransaction()} is called. Use
153: * {@link #rollbackTransaction()} to discard your changes. After calling either
154: * method there will be no transaction associated to the current thread any
155: * longer. <br>
156: * <br>
157: * <em>Caution:</em> Be careful to finally call one of those methods, as
158: * otherwise the transaction will lurk around for ever.
159: *
160: * @see #prepareTransaction()
161: * @see #commitTransaction()
162: * @see #rollbackTransaction()
163: */
164: public AbstractTransactionContext startTransaction(Object session)
165: throws ResourceManagerException {
166: return createTransactionContext(session);
167: }
168:
169: public void beginTransaction(AbstractTransactionContext context)
170: throws ResourceManagerException {
171: // can only start a new transaction when not already stopping
172: assureStarted();
173:
174: synchronized (context) {
175: if (logger.isDebugEnabled()) {
176: logger.debug("Beginning transaction " + context);
177: }
178: doBegin(context);
179: context.status = Status.STATUS_ACTIVE;
180: if (logger.isDebugEnabled()) {
181: logger.debug("Began transaction " + context);
182: }
183: }
184: globalTransactions.add(context);
185: }
186:
187: public int prepareTransaction(AbstractTransactionContext context)
188: throws ResourceManagerException {
189: assureReady();
190: synchronized (context) {
191: if (logger.isDebugEnabled()) {
192: logger.debug("Preparing transaction " + context);
193: }
194: context.status = Status.STATUS_PREPARING;
195: int status = doPrepare(context);
196: context.status = Status.STATUS_PREPARED;
197: if (logger.isDebugEnabled()) {
198: logger.debug("Prepared transaction " + context);
199: }
200: return status;
201: }
202: }
203:
204: public void rollbackTransaction(AbstractTransactionContext context)
205: throws ResourceManagerException {
206: assureReady();
207: synchronized (context) {
208: if (logger.isDebugEnabled()) {
209: logger.debug("Rolling back transaction " + context);
210: }
211: try {
212: context.status = Status.STATUS_ROLLING_BACK;
213: doRollback(context);
214: context.status = Status.STATUS_ROLLEDBACK;
215: } catch (Error e) {
216: setDirty(context, e);
217: throw e;
218: } catch (RuntimeException e) {
219: setDirty(context, e);
220: throw e;
221: } catch (ResourceManagerSystemException e) {
222: setDirty(context, e);
223: throw e;
224: } finally {
225: globalTransactions.remove(context);
226: context.finalCleanUp();
227: // tell shutdown thread this tx is finished
228: context.notifyFinish();
229: }
230: if (logger.isDebugEnabled()) {
231: logger.debug("Rolled back transaction " + context);
232: }
233: }
234: }
235:
236: public void setTransactionRollbackOnly(
237: AbstractTransactionContext context)
238: throws ResourceManagerException {
239: context.status = Status.STATUS_MARKED_ROLLBACK;
240: }
241:
242: public void commitTransaction(AbstractTransactionContext context)
243: throws ResourceManagerException {
244: assureReady();
245: if (context.status == Status.STATUS_MARKED_ROLLBACK) {
246: throw new ResourceManagerException(CoreMessages
247: .transactionMarkedForRollback());
248: }
249: synchronized (context) {
250: if (logger.isDebugEnabled()) {
251: logger.debug("Committing transaction " + context);
252: }
253: try {
254: context.status = Status.STATUS_COMMITTING;
255: doCommit(context);
256: context.status = Status.STATUS_COMMITTED;
257: } catch (Error e) {
258: setDirty(context, e);
259: throw e;
260: } catch (RuntimeException e) {
261: setDirty(context, e);
262: throw e;
263: } catch (ResourceManagerSystemException e) {
264: setDirty(context, e);
265: throw e;
266: } catch (ResourceManagerException e) {
267: logger.warn("Could not commit tx " + context
268: + ", rolling back instead", e);
269: doRollback(context);
270: } finally {
271: globalTransactions.remove(context);
272: context.finalCleanUp();
273: // tell shutdown thread this tx is finished
274: context.notifyFinish();
275: }
276: if (logger.isDebugEnabled()) {
277: logger.debug("Committed transaction " + context);
278: }
279: }
280: }
281:
282: protected abstract AbstractTransactionContext createTransactionContext(
283: Object session);
284:
285: protected abstract void doBegin(AbstractTransactionContext context);
286:
287: protected abstract int doPrepare(AbstractTransactionContext context);
288:
289: protected abstract void doCommit(AbstractTransactionContext context)
290: throws ResourceManagerException;
291:
292: protected abstract void doRollback(
293: AbstractTransactionContext context)
294: throws ResourceManagerException;
295:
296: // TODO
297: // protected boolean rollBackOrForward() {
298: // }
299:
300: protected boolean waitForAllTxToStop(long timeoutMSecs) {
301: long startTime = System.currentTimeMillis();
302:
303: // be sure not to lock globalTransactions for too long, as we need to
304: // give
305: // txs the chance to complete (otherwise deadlocks are very likely to
306: // occur)
307: // instead iterate over a copy as we can be sure no new txs will be
308: // registered
309: // after operation level has been set to stopping
310:
311: Collection transactionsToStop;
312: synchronized (globalTransactions) {
313: transactionsToStop = new ArrayList(globalTransactions);
314: }
315: for (Iterator it = transactionsToStop.iterator(); it.hasNext();) {
316: long remainingTimeout = startTime
317: - System.currentTimeMillis() + timeoutMSecs;
318:
319: if (remainingTimeout <= 0) {
320: return false;
321: }
322:
323: AbstractTransactionContext context = (AbstractTransactionContext) it
324: .next();
325: synchronized (context) {
326: if (!context.finished) {
327: logger.info("Waiting for tx " + context
328: + " to finish for " + remainingTimeout
329: + " milli seconds");
330: }
331: while (!context.finished && remainingTimeout > 0) {
332: try {
333: context.wait(remainingTimeout);
334: } catch (InterruptedException e) {
335: return false;
336: }
337: remainingTimeout = startTime
338: - System.currentTimeMillis() + timeoutMSecs;
339: }
340: if (context.finished) {
341: logger.info("Tx " + context + " finished");
342: } else {
343: logger.warn("Tx " + context
344: + " failed to finish in given time");
345: }
346: }
347: }
348:
349: return (globalTransactions.size() == 0);
350: }
351:
352: /**
353: * Flag this resource manager as dirty. No more operations will be allowed until
354: * a recovery has been successfully performed.
355: *
356: * @param context
357: * @param t
358: */
359: protected void setDirty(AbstractTransactionContext context,
360: Throwable t) {
361: logger.error(
362: "Fatal error during critical commit/rollback of transaction "
363: + context
364: + ", setting resource manager to dirty.", t);
365: dirty = true;
366: }
367:
368: /**
369: * Check that the FileManager is started.
370: *
371: * @throws FileManagerSystemException if the FileManager is not started.
372: */
373: protected void assureStarted()
374: throws ResourceManagerSystemException {
375: if (operationMode != OPERATION_MODE_STARTED) {
376: throw new ResourceManagerSystemException(CoreMessages
377: .resourceManagerNotStarted());
378: }
379: // do not allow any further writing or commit or rollback when db is
380: // corrupt
381: if (dirty) {
382: throw new ResourceManagerSystemException(CoreMessages
383: .resourceManagerDirty());
384: }
385: }
386:
387: /**
388: * Check that the FileManager is ready.
389: *
390: * @throws FileManagerSystemException if the FileManager is neither started not
391: * stopping.
392: */
393: protected void assureReady() throws ResourceManagerSystemException {
394: if (operationMode != OPERATION_MODE_STARTED
395: && operationMode != OPERATION_MODE_STOPPING) {
396: throw new ResourceManagerSystemException(CoreMessages
397: .resourceManagerNotReady());
398: }
399: // do not allow any further writing or commit or rollback when db is
400: // corrupt
401: if (dirty) {
402: throw new ResourceManagerSystemException(CoreMessages
403: .resourceManagerDirty());
404: }
405: }
406:
407: }
|