001: /*******************************************************************************
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: *******************************************************************************/package org.ofbiz.service;
019:
020: import java.util.Map;
021: import javax.transaction.Status;
022: import javax.transaction.Transaction;
023: import javax.transaction.xa.XAException;
024: import javax.transaction.xa.Xid;
025:
026: import org.ofbiz.base.util.Debug;
027: import org.ofbiz.base.util.UtilValidate;
028: import org.ofbiz.entity.transaction.GenericTransactionException;
029: import org.ofbiz.entity.transaction.GenericXaResource;
030: import org.ofbiz.entity.transaction.TransactionUtil;
031:
032: /**
033: * ServiceXaWrapper - XA Resource wrapper for running services on commit() or rollback()
034: */
035: public class ServiceXaWrapper extends GenericXaResource {
036:
037: public static final String module = ServiceXaWrapper.class
038: .getName();
039: public static final int TYPE_ROLLBACK = 600;
040: public static final int TYPE_COMMIT = 500;
041: public static final int MODE_ASYNC = 100;
042: public static final int MODE_SYNC = 200;
043:
044: protected DispatchContext dctx = null;
045: protected String rollbackService = null;
046: protected String commitService = null;
047: protected String runAsUser = null;
048: protected Map rollbackContext = null;
049: protected Map commitContext = null;
050: protected boolean rollbackAsync = true;
051: protected boolean rollbackAsyncPersist = true;
052: protected boolean commitAsync = false;
053: protected boolean commitAsyncPersist = false;
054:
055: protected ServiceXaWrapper() {
056: }
057:
058: public ServiceXaWrapper(DispatchContext dctx) {
059: this .dctx = dctx;
060: }
061:
062: /**
063: * Sets the service to run on rollback()
064: * @param serviceName Name of service to run
065: * @param context Context to use when running
066: */
067: public void setCommitService(String serviceName, Map context) {
068: this .setCommitService(serviceName, null, context, commitAsync,
069: commitAsyncPersist);
070: }
071:
072: /**
073: * Sets the service to run on rollback()
074: * @param serviceName Name of service to run
075: * @param context Context to use when running
076: * @param async override default async behavior
077: */
078: public void setCommitService(String serviceName, Map context,
079: boolean async, boolean persist) {
080: this .setCommitService(serviceName, null, context, async,
081: persist);
082: }
083:
084: /**
085: * Sets the service to run on rollback()
086: * @param serviceName Name of service to run
087: * @param runAsUser UserLoginID to run as
088: * @param context Context to use when running
089: * @param async override default async behavior
090: */
091: public void setCommitService(String serviceName, String runAsUser,
092: Map context, boolean async, boolean persist) {
093: this .commitService = serviceName;
094: this .runAsUser = runAsUser;
095: this .commitContext = context;
096: this .commitAsync = async;
097: this .commitAsyncPersist = persist;
098: }
099:
100: /**
101: * @return The name of the service to run on rollback()
102: */
103: public String getCommitService() {
104: return this .commitService;
105: }
106:
107: /**
108: * @return The context used when running the rollback() service
109: */
110: public Map getCommitContext() {
111: return this .commitContext;
112: }
113:
114: /**
115: * Sets the service to run on rollback()
116: * @param serviceName Name of service to run
117: * @param context Context to use when running
118: */
119: public void setRollbackService(String serviceName, Map context) {
120: this .setRollbackService(serviceName, context, rollbackAsync,
121: rollbackAsyncPersist);
122: }
123:
124: /**
125: * Sets the service to run on rollback()
126: * @param serviceName Name of service to run
127: * @param context Context to use when running
128: * @param async override default async behavior
129: */
130: public void setRollbackService(String serviceName, Map context,
131: boolean async, boolean persist) {
132: this .rollbackService = serviceName;
133: this .rollbackContext = context;
134: this .rollbackAsync = async;
135: this .rollbackAsyncPersist = persist;
136: }
137:
138: /**
139: * @return The name of the service to run on rollback()
140: */
141: public String getRollbackService() {
142: return this .rollbackService;
143: }
144:
145: /**
146: * @return The context used when running the rollback() service
147: */
148: public Map getRollbackContext() {
149: return this .rollbackContext;
150: }
151:
152: public void enlist() throws XAException {
153: super .enlist();
154: Debug.log("Enlisted in transaction : " + this .toString(),
155: module);
156: }
157:
158: // -- XAResource Methods
159: /**
160: * @see javax.transaction.xa.XAResource#commit(javax.transaction.xa.Xid xid, boolean onePhase)
161: */
162: public void commit(Xid xid, boolean onePhase) throws XAException {
163: Debug.log("ServiceXaWrapper#commit() : " + onePhase + " / "
164: + xid.toString(), module);
165: // the commit listener
166: if (this .active) {
167: Debug.logWarning("commit() called without end()", module);
168: }
169: if (this .xid == null || !this .xid.equals(xid)) {
170: throw new XAException(XAException.XAER_NOTA);
171: }
172:
173: final String service = commitService;
174: final Map context = commitContext;
175: final boolean persist = commitAsyncPersist;
176: final boolean async = commitAsync;
177:
178: Thread thread = new Thread() {
179: public void run() {
180: try {
181: runService(service, context, persist,
182: (async ? MODE_ASYNC : MODE_SYNC),
183: TYPE_COMMIT);
184: } catch (XAException e) {
185: Debug.logError(e, module);
186: }
187: }
188: };
189: thread.start();
190:
191: this .xid = null;
192: this .active = false;
193: }
194:
195: /**
196: * @see javax.transaction.xa.XAResource#rollback(javax.transaction.xa.Xid xid)
197: */
198: public void rollback(Xid xid) throws XAException {
199: Debug.log("ServiceXaWrapper#rollback() : " + xid.toString(),
200: module);
201: // the rollback listener
202: if (this .active) {
203: Debug.logWarning("rollback() called without end()", module);
204: }
205: if (this .xid == null || !this .xid.equals(xid)) {
206: throw new XAException(XAException.XAER_NOTA);
207: }
208:
209: final String service = rollbackService;
210: final Map context = rollbackContext;
211: final boolean persist = rollbackAsyncPersist;
212: final boolean async = rollbackAsync;
213:
214: Thread thread = new Thread() {
215: public void run() {
216: try {
217: runService(service, context, persist,
218: (async ? MODE_ASYNC : MODE_SYNC),
219: TYPE_ROLLBACK);
220: } catch (XAException e) {
221: Debug.logError(e, module);
222: }
223: }
224: };
225: thread.start();
226:
227: this .xid = null;
228: this .active = false;
229: }
230:
231: public int prepare(Xid xid) throws XAException {
232: // overriding to log two phase commits
233: Debug.log("ServiceXaWrapper#prepare() : " + xid.toString(),
234: module);
235: int rtn;
236: try {
237: rtn = super .prepare(xid);
238: } catch (XAException e) {
239: Debug.logError(e, module);
240: throw e;
241: }
242: Debug.log("ServiceXaWrapper#prepare() : " + rtn + " / "
243: + (rtn == XA_OK), module);
244: return rtn;
245: }
246:
247: protected final void runService(String service, Map context,
248: boolean persist, int mode, int type) throws XAException {
249: // set the logging prefix
250: String msgPrefix = "[XaWrapper] ";
251: switch (type) {
252: case TYPE_ROLLBACK:
253: msgPrefix = "[Rollback] ";
254: break;
255: case TYPE_COMMIT:
256: msgPrefix = "[Commit] ";
257: break;
258: }
259:
260: // if a service exists; run it
261: if (UtilValidate.isNotEmpty(service)) {
262:
263: // suspend this transaction
264: Transaction parentTx = null;
265: boolean beganTx;
266:
267: try {
268: int currentTxStatus = Status.STATUS_UNKNOWN;
269: try {
270: currentTxStatus = TransactionUtil.getStatus();
271: } catch (GenericTransactionException e) {
272: Debug.logWarning(e, module);
273: }
274:
275: // suspend the parent tx
276: if (currentTxStatus != Status.STATUS_NO_TRANSACTION) {
277: parentTx = TransactionUtil.suspend();
278: }
279:
280: // begin the new tx
281: beganTx = TransactionUtil.begin();
282:
283: // configure and run the service
284: try {
285: // obtain the model and get the valid context
286: ModelService model = dctx.getModelService(service);
287: Map this Context = context;
288: if (model.validate) {
289: this Context = model.makeValid(context,
290: ModelService.IN_PARAM);
291: }
292:
293: // set the userLogin object
294: this Context
295: .put("userLogin", ServiceUtil.getUserLogin(
296: dctx, this Context, runAsUser));
297:
298: // invoke based on mode
299: switch (mode) {
300: case MODE_ASYNC:
301: Debug.log(msgPrefix + "Invoking [" + service
302: + "] via runAsync", module);
303: dctx.getDispatcher().runAsync(service,
304: this Context, persist);
305: break;
306:
307: case MODE_SYNC:
308: Debug.log(msgPrefix + "Invoking [" + service
309: + "] via runSyncIgnore", module);
310: dctx.getDispatcher().runSyncIgnore(service,
311: this Context);
312: break;
313: }
314: } catch (Throwable t) {
315: Debug.logError(t, "Problem calling " + msgPrefix
316: + "service : " + service + " / " + context,
317: module);
318: try {
319: TransactionUtil.rollback(beganTx, t
320: .getMessage(), t);
321: } catch (GenericTransactionException e) {
322: Debug.logError(e, module);
323: }
324:
325: // async calls are assumed to not effect this TX
326: if (mode != MODE_ASYNC) {
327: throw new XAException(XAException.XA_RBOTHER);
328: }
329: } finally {
330: // commit the transaction
331: try {
332: TransactionUtil.commit(beganTx);
333: } catch (GenericTransactionException e) {
334: Debug.logError(e, module);
335: }
336: }
337: } catch (GenericTransactionException e) {
338: Debug.logError(e, module);
339: } finally {
340: // resume the transaction
341: if (parentTx != null) {
342: try {
343: TransactionUtil.resume(parentTx);
344: } catch (Exception e) {
345: Debug.logError(e, module);
346: }
347: }
348: }
349: } else {
350: Debug.log("No " + msgPrefix
351: + "service defined; nothing to do", module);
352: }
353:
354: this .xid = null;
355: this .active = false;
356: }
357: }
|