001: package org.enhydra.shark.toolagent;
002:
003: import java.util.HashMap;
004: import java.util.Iterator;
005: import java.util.List;
006: import java.util.Map;
007:
008: import javax.naming.InitialContext;
009: import javax.transaction.TransactionManager;
010:
011: import org.enhydra.shark.Shark;
012: import org.enhydra.shark.api.client.wfmc.wapi.WMConnectInfo;
013: import org.enhydra.shark.api.client.wfmc.wapi.WMSessionHandle;
014: import org.enhydra.shark.api.client.wfmodel.WfActivity;
015: import org.enhydra.shark.api.client.wfmodel.WfProcess;
016: import org.enhydra.shark.api.client.wfservice.ExecutionAdministration;
017: import org.enhydra.shark.api.client.wfservice.SharkConnection;
018: import org.enhydra.shark.api.client.wfservice.WMEntity;
019: import org.enhydra.shark.api.common.SharkConstants;
020: import org.enhydra.shark.api.internal.toolagent.AppParameter;
021: import org.enhydra.shark.api.internal.toolagent.ApplicationBusy;
022: import org.enhydra.shark.api.internal.toolagent.ApplicationNotDefined;
023: import org.enhydra.shark.api.internal.toolagent.ApplicationNotStarted;
024: import org.enhydra.shark.api.internal.toolagent.ToolAgent;
025: import org.enhydra.shark.api.internal.toolagent.ToolAgentGeneralException;
026: import org.enhydra.shark.api.internal.working.CallbackUtilities;
027: import org.enhydra.shark.xpdl.XPDLConstants;
028: import org.enhydra.shark.xpdl.elements.ExtendedAttribute;
029: import org.enhydra.shark.xpdl.elements.ExtendedAttributes;
030:
031: import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
032: import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
033:
034: /**
035: * <P>
036: * Tool agent class to schedule a ToolAgent call in separate pool of threads
037: * </P>
038: *
039: * @author Abe Achkinaz, BDNACorp.com
040: */
041: public class SchedulerToolAgent extends AbstractToolAgent {
042:
043: private final static String TOOL_AGENT_CLASS_EXT_ATTR_NAME = "ToolAgentClass";
044:
045: private final static String TOOL_AGENT_CLASS_PROXY_EXT_ATTR_NAME = "ToolAgentClassProxy";
046:
047: /*
048: * Helper methods for logging
049: */
050: private void info(String infoString) {
051: if (null != cus) {
052: cus.info(shandle, infoString);
053: }
054: }
055:
056: private void error(String infoString) {
057: if (null != cus) {
058: cus.error(shandle, infoString);
059: }
060: }
061:
062: public String getInfo(WMSessionHandle shandle)
063: throws ToolAgentGeneralException {
064: String i = "Wraps a ToolAgent standard call and executes them a separate"
065: + "\nin thread-pool."
066: + "\n"
067: + "\nTo use define an ToolAgentClass extended attribute to the scheduler"
068: + "\nanother extended attribute 'ToolAgentClassProxy' to the actual"
069: + "\napplication to be called in a separet thread. For example: "
070: + "\n<ExtendedAttributes>"
071: + "\n <ExtendedAttribute Name=\"ToolAgentClass\" Value=\"org.enhydra.shark.toolagent.SchedulerToolAgent\"/>"
072: + "\n <ExtendedAttribute Name=\"ToolAgentClassProxy\" Value=\"org.enhydra.shark.toolagent.BshToolAgent\"/>"
073: + "\n <ExtendedAttribute Name=\"Script\" Value=\"System.out.println(\"I was called...\");\"/>"
074: + "\n</ExtendedAttributes>"
075: + "\n"
076: + "\nTo be able to work with SchedulerToolAgent, you must define some "
077: + "\nproperties, and here is a section from shark's configuration file \"Shark.conf\" "
078: + "\nthat defines these properties:"
079: + "\n# Configure number of threads to execute commands : "
080: + "\nSchedulerToolAgent.threadPoolSize=3" + "\n";
081: return i;
082: }
083:
084: public void invokeApplication(WMSessionHandle shandle, long handle,
085: WMEntity appInfo, WMEntity toolInfo,
086: String applicationName, String procInstId, String assId,
087: AppParameter[] parameters, Integer appMode)
088: throws ApplicationNotStarted, ApplicationNotDefined,
089: ApplicationBusy, ToolAgentGeneralException {
090:
091: super
092: .invokeApplication(shandle, handle, appInfo, toolInfo,
093: applicationName, procInstId, assId, parameters,
094: appMode);
095:
096: try {
097: /*
098: * Get proxy class name and replace in parameter[0]:
099: */
100: String extAttribs = (String) parameters[0].the_value;
101: ExtendedAttributes eas = readParamsFromExtAttributes(extAttribs);
102: ExtendedAttribute eaScheduler = eas
103: .getFirstExtendedAttributeForName(TOOL_AGENT_CLASS_EXT_ATTR_NAME);
104: String schedulerClassName = eaScheduler.getVValue();
105: ExtendedAttribute eaProxy = eas
106: .getFirstExtendedAttributeForName(TOOL_AGENT_CLASS_PROXY_EXT_ATTR_NAME);
107: String proxyClassName = eaProxy.getVValue();
108: int idxSchedulerClassName = extAttribs
109: .indexOf(schedulerClassName);
110: String newExtAttribs = extAttribs.substring(0,
111: idxSchedulerClassName)
112: + proxyClassName
113: + extAttribs.substring(idxSchedulerClassName
114: + schedulerClassName.length());
115: parameters[0].the_value = newExtAttribs;
116: readParamsFromExtAttributes(newExtAttribs);
117:
118: Class cls = null;
119: try {
120: cls = Class.forName(proxyClassName);
121: } catch (ClassNotFoundException cnfe1) {
122: cls = ToolAgentLoader.load(cus, proxyClassName);
123: }
124:
125: ToolAgent ta = (ToolAgent) cls.newInstance();
126: ta.configure(cus);
127:
128: ToolAgentCmdProxy taCmdProxy = new ToolAgentCmdProxy(cus,
129: ta, proxyClassName, wmci, shandle, appInfo,
130: toolInfo, handle, applicationName, procInstId,
131: assId, parameters, appMode);
132:
133: SingletonPooledExecutor.getInstance(cus, shandle).execute(
134: taCmdProxy);
135:
136: /*
137: * Finished spawning the command, returned finished to engine otherwise it throws
138: * an exception!
139: */
140: status = AbstractToolAgent.APP_STATUS_FINISHED;
141: } catch (Throwable ex) {
142: ex.printStackTrace();
143: error("SchedulerToolAgent terminated incorrectly: " + ex);
144: status = AbstractToolAgent.APP_STATUS_INVALID;
145: throw new ToolAgentGeneralException(ex);
146: }
147: }
148: }
149:
150: /*
151: * <P> Wrap ToolAgent interface into runnable class used by SinglePooledExec </P> @author
152: * aachkinazi, BDNACorp.com
153: */
154: class ToolAgentCmdProxy implements Runnable {
155: CallbackUtilities m_cus;
156:
157: String m_taName;
158:
159: ToolAgent m_ta;
160:
161: WMConnectInfo m_wmci;
162:
163: WMSessionHandle m_shandle;
164:
165: WMEntity m_appInfo;
166:
167: WMEntity m_toolInfo;
168:
169: long m_handle;
170:
171: String m_applicationName;
172:
173: String m_procInstId;
174:
175: String m_assId;
176:
177: AppParameter[] m_parameters;
178:
179: AppParameter[] m_proxyParameters;
180:
181: Integer m_appMode;
182:
183: ToolAgentCmdProxy(CallbackUtilities cus, ToolAgent ta,
184: String taName, WMConnectInfo wmci, WMSessionHandle shandle,
185: WMEntity appInfo, WMEntity toolInfo, long handle,
186: String applicationName, String procInstId, String assId,
187: AppParameter[] parameters, Integer appMode) {
188: m_cus = cus;
189: m_ta = ta;
190: m_taName = taName;
191: m_wmci = wmci;
192: m_shandle = shandle;
193: m_appInfo = appInfo;
194: m_toolInfo = toolInfo;
195: m_handle = handle;
196: m_applicationName = applicationName;
197: m_procInstId = procInstId;
198: m_assId = assId;
199: m_parameters = parameters;
200: m_appMode = appMode;
201: }
202:
203: /*
204: * Helper logging methods
205: */
206: private void info(String infoString) {
207: if (null != m_cus) {
208: m_cus.info(m_shandle, infoString);
209: }
210: }
211:
212: private void error(String infoString) {
213: if (null != m_cus) {
214: m_cus.error(m_shandle, infoString);
215: }
216: }
217:
218: /**
219: * Return IN_OUT and OUT values to the calling activity
220: *
221: * @return
222: */
223: private Map getResults() throws Exception {
224: /*
225: * Build activity result map,
226: */
227: Map results = new HashMap();
228: for (int i = 0; i < m_parameters.length; ++i) {
229: AppParameter p = m_parameters[i];
230: if (p.the_mode
231: .equals(XPDLConstants.FORMAL_PARAMETER_MODE_INOUT)
232: || p.the_mode
233: .equals(XPDLConstants.FORMAL_PARAMETER_MODE_OUT)) {
234: results.put(p.the_actual_name, convertToProperType(
235: p.the_value, p.the_class));
236: }
237: }
238:
239: return results;
240: }
241:
242: private Object convertToProperType(Object toConvert,
243: Class desiredType) throws Exception {
244: if (null == toConvert || desiredType.isInstance(toConvert))
245: return toConvert;
246:
247: if (desiredType.equals(Integer.class)) {
248: return new Integer((new Integer(toConvert.toString()))
249: .intValue());
250: } else if (desiredType.equals(Long.class)) {
251: return new Long((new Double(toConvert.toString()))
252: .longValue());
253: } else if (desiredType.equals(Boolean.class)) {
254: return new Boolean(toConvert.toString());
255: } else if (desiredType.equals(Double.class)) {
256: return new Double(toConvert.toString());
257: } else if (desiredType.equals(java.util.Date.class)) {
258: return new java.util.Date(toConvert.toString());
259: }
260: return toConvert;
261: }
262:
263: /*
264: * (non-Javadoc)
265: *
266: * @see java.lang.Runnable#run()
267: */
268: public void run() {
269: Thread curThread = Thread.currentThread();
270: String oldThreadName = curThread.getName();
271: /*
272: * Invoke query application and set state of activity in proxy thread context:
273: */
274: long status = ToolAgent.APP_STATUS_INVALID;
275:
276: SingletonPooledExecutor spe = null;
277: try {
278: spe = SingletonPooledExecutor.getInstance(m_cus, m_shandle);
279: } catch (Exception _) {
280: }
281: if (null == spe) {
282: error("Unable to get thread-pool!");
283: return;
284: }
285:
286: Shark shark = Shark.getInstance();
287: if (null == shark) {
288: error("Unable to get Shark engine instance!");
289: spe.updateCompleteCount(this );
290: return;
291: }
292:
293: TransactionManager ut = null;
294: try {
295: curThread.setName(oldThreadName + "->" + m_taName);
296: WMSessionHandle taShandle;
297:
298: String XATransactionManagerLookupName = m_cus
299: .getProperty(
300: "SharkTxSynchronizationFactory.XATransactionManagerLookupName",
301: "javax.transaction.TransactionManager");
302: if (XATransactionManagerLookupName.startsWith("\"")) {
303: XATransactionManagerLookupName = XATransactionManagerLookupName
304: .substring(1);
305: }
306: if (XATransactionManagerLookupName.endsWith("\"")) {
307: XATransactionManagerLookupName = XATransactionManagerLookupName
308: .substring(0, XATransactionManagerLookupName
309: .length() - 1);
310: }
311:
312: ut = (TransactionManager) new InitialContext()
313: .lookup(XATransactionManagerLookupName);
314: ut.setTransactionTimeout(600);
315: ut.begin();
316:
317: /*
318: * Find activity (it could happen that we need to wait until previous transaction
319: * ends in order to see the activity in DB.
320: */
321: SharkConnection sconn = shark.getSharkConnection();
322: sconn.attachToHandle(spe.getSharkSessionHandle());
323: // System.err.println("Thread "+Thread.currentThread()+" STA -> gp");
324: WfProcess wfProc = sconn.getProcess(m_procInstId);
325: if (wfProc == null) {
326: ut.commit();
327: // retry 5 times
328: int i = 0;
329: while (i++ < 5 && wfProc == null) {
330: ut.setTransactionTimeout(600);
331: ut.begin();
332: // System.err.println("Sleeping for 500ms "+i+". time when searching for
333: // process "+m_procInstId);
334: Thread.sleep(500);
335: wfProc = sconn.getProcess(m_procInstId);
336: ut.commit();
337: }
338: if (wfProc == null) {
339: throw new Exception(
340: "SchedulerToolAgent -> Can't find process"
341: + m_procInstId);
342: }
343: ut.setTransactionTimeout(600);
344: ut.begin();
345: }
346: String activityId = Shark.getInstance().getAdminMisc()
347: .getAssignmentActivityId(sconn.getSessionHandle(),
348: m_procInstId, m_assId);
349:
350: // System.err.println("Thread "+Thread.currentThread()+" STA -> ga");
351: WfActivity wfActivity = sconn.getActivity(m_procInstId,
352: activityId);
353: if (wfActivity == null) {
354: ut.commit();
355: // retry 5 times
356: int i = 0;
357: while (i++ < 5 && wfActivity == null) {
358: ut.setTransactionTimeout(600);
359: ut.begin();
360: // System.err.println("Sleeping for 500ms "+i+". time when searching for
361: // activity "+activityId);
362: Thread.sleep(500);
363: wfActivity = sconn.getActivity(m_procInstId,
364: activityId);
365: ut.commit();
366: }
367: if (wfActivity == null) {
368: throw new Exception(
369: "SchedulerToolAgent -> Can't find activity "
370: + activityId + " for process "
371: + m_procInstId);
372: }
373: ut.setTransactionTimeout(600);
374: ut.begin();
375: }
376:
377: taShandle = m_ta.connect(m_wmci);
378:
379: /*
380: * Use null for appName because ToolAgent implementations check for appName being
381: * null to parse extended attributes and look for the name! This has the
382: * side-effect of initializing other ToolAgent specific extended attributes such
383: * as Script for BshToolAgent.
384: */
385: boolean excInjected = false;
386: try {
387: m_ta.invokeApplication(m_shandle, taShandle.getId(),
388: m_appInfo, m_toolInfo, null, m_procInstId,
389: m_assId, m_parameters, m_appMode);
390: } catch (ToolAgentGeneralException tage) {
391: ExecutionAdministration ea = Shark.getInstance()
392: .getExecutionAdministration();
393: ea.injectException(m_shandle, m_procInstId, wfActivity
394: .key(), tage);
395: excInjected = true;
396: status = ToolAgent.APP_STATUS_INVALID;
397: }
398: int finishCount = 0;
399: if (!excInjected) {
400: status = m_ta.requestAppStatus(m_shandle, taShandle
401: .getId(), m_toolInfo, m_procInstId, m_assId,
402: m_parameters);
403: m_ta.disconnect(taShandle);
404:
405: finishCount = spe.updateCompleteCount(this );
406:
407: // System.err.println("Thread "+Thread.currentThread()+" STA -> actsetres");
408: wfActivity.set_result(getResults());
409: } else {
410: finishCount = spe.resetCompleteCount(this );
411: }
412:
413: if (0 == finishCount) {
414: // System.err.println("Thread "+Thread.currentThread()+" STA -> actcomplete");
415: if (!excInjected) {
416: if (wfActivity
417: .state()
418: .equals(
419: SharkConstants.STATE_OPEN_NOT_RUNNING_NOT_STARTED)) {
420: wfActivity
421: .change_state(SharkConstants.STATE_OPEN_RUNNING);
422: }
423: wfActivity.complete();
424: } else {
425: wfActivity.terminate();
426: }
427: }
428:
429: sconn.disconnect();
430: ut.commit();
431: } catch (Throwable ex) {
432: ex.printStackTrace();
433: error("SchedulerToolAgent -> applicationProxy " + m_taName
434: + " terminated incorrectly: " + ex);
435: status = ToolAgent.APP_STATUS_INVALID;
436: try {
437: ut.rollback();
438: } catch (Exception _) { /* left blank */
439: }
440: } finally {
441: ut = null;
442: }
443:
444: /*
445: * Update activity with results and set complete state
446: */
447: curThread.setName(oldThreadName);
448: }
449:
450: /**
451: * Cancel an activity step that never got scheduled
452: */
453: public void cancel() {
454: /*
455: * There is no generic way to handle cancelling a pending application that never got
456: * a thread to run. Do nothing for now!
457: */
458: }
459:
460: /**
461: * @return
462: */
463: public Object getAssId() {
464: return m_assId;
465: }
466: }
467:
468: /*
469: * Define SingletonPoolExecutor to manager threads used by SchedulerToolAgent and
470: * controlling wfActivityComplete. @author aachkinazi, BDNACorp.com
471: */
472: class SingletonPooledExecutor extends PooledExecutor {
473: private static final Object classLock = SingletonPooledExecutor.class;
474:
475: /** Shutdown hook thread name. */
476: private static final String SHUTDOWN_HOOK_THREAD_NAME = "SingletonPooledExecShutdownHook";
477:
478: /**
479: * Global shutdown flag to expedite shutdown process. There is no need to synchronize
480: * access to this flag because the shutdown sequence does not rely or coordinate on the
481: * state of this flag. It simply indicates that the shutdown sequence has begun so the
482: * threads that happen to be in a good state to do so can shutdown early.
483: */
484: private static boolean s_shutdown = false;
485:
486: /** Thread to run as shutdown hook. */
487: private static Thread s_shutdownHook;
488:
489: /**
490: * Sigleton pool of threads
491: */
492: private static SingletonPooledExecutor m_spe = null;
493:
494: private static CallbackUtilities m_cus = null;
495:
496: /**
497: * Keep track of pending scheduled commands per assignment only call
498: * WfActivity.complete() when the count reaches 0. This handles the case where an
499: * activity has multiple SchedulerToolAgent steps.
500: *
501: * @see SingletonPooledExecutor#execute(ToolAgentCmdProxy)
502: * @see SingletonPooledExecutor#completeCmd(ToolAgentCmdProxy)
503: */
504: private Map m_assIdToCount = new HashMap();
505:
506: private WMSessionHandle m_shandle;
507:
508: /**
509: * @param queue
510: * @param threadPoolSize
511: */
512: private SingletonPooledExecutor(LinkedQueue queue,
513: int threadPoolSize) {
514: super (queue, threadPoolSize);
515: }
516:
517: /**
518: * @return
519: */
520: public WMSessionHandle getSharkSessionHandle() {
521: return m_shandle;
522: }
523:
524: /*
525: * Helper logging methods
526: */
527: private void info(String infoString) {
528: if (null != m_cus) {
529: m_cus.info(m_shandle, infoString);
530: } else {
531: System.out.println(infoString);
532: }
533: }
534:
535: private void error(String infoString) {
536: if (null != m_cus) {
537: m_cus.error(m_shandle, infoString);
538: } else {
539: System.out.println(infoString);
540: }
541: }
542:
543: private static void shutdown() {
544: s_shutdown = true;
545: if (null != m_spe) {
546: m_spe.shutdownNow();
547: try {
548: final long timeout = 2 * 1000L;
549: if (!m_spe.awaitTerminationAfterShutdown(timeout)) {
550: final int remainingThreads = m_spe.getPoolSize();
551: if (0 != remainingThreads) {
552: m_spe
553: .info("Threads remaining during shutdown: "
554: + remainingThreads);
555: }
556: }
557:
558: List pendingTasks = m_spe.drain();
559: for (Iterator itr = pendingTasks.iterator(); itr
560: .hasNext();) {
561: ToolAgentCmdProxy obj = (ToolAgentCmdProxy) itr
562: .next();
563: m_spe.info("Pending task: " + obj.toString());
564: obj.cancel();
565: }
566: } catch (Exception ex) {
567: m_spe.error("Exception during thread pool shutdown: "
568: + ex.toString());
569: }
570: m_spe = null;
571: }
572: }
573:
574: public static SingletonPooledExecutor getInstance(
575: CallbackUtilities cus, WMSessionHandle shandle)
576: throws Exception {
577: synchronized (classLock) {
578: if (null == m_spe) {
579: try {
580: /*
581: * Configure for the first time,
582: */
583: m_cus = cus;
584: int threadPoolSize = 3;
585: try {
586: String threadPoolSizeStr = cus.getProperty(
587: "SchedulerToolAgent.threadPoolSize",
588: Integer.toString(3));
589: threadPoolSize = Integer
590: .parseInt(threadPoolSizeStr);
591: } catch (Exception ex) {
592: // Left blank!
593: }
594:
595: /*
596: * Provide a shutdown hook to clean up thread pools
597: */
598: s_shutdownHook = new Thread() {
599: public void run() {
600: SingletonPooledExecutor.shutdown();
601: }
602:
603: };
604: s_shutdownHook
605: .setName(SingletonPooledExecutor.SHUTDOWN_HOOK_THREAD_NAME);
606: Runtime.getRuntime()
607: .addShutdownHook(s_shutdownHook);
608:
609: m_spe = new SingletonPooledExecutor(
610: new LinkedQueue(), threadPoolSize);
611: m_spe.waitWhenBlocked();
612: m_spe.createThreads(threadPoolSize);
613:
614: m_spe.m_shandle = shandle;
615:
616: } catch (Exception ex) {
617: String errorMsg = "Exception during thread pool init "
618: + ex.toString();
619: if (null != m_cus) {
620: m_cus.error(shandle, errorMsg);
621: } else {
622: System.out.println(errorMsg);
623: }
624: if (null != s_shutdownHook) {
625: Runtime.getRuntime().removeShutdownHook(
626: s_shutdownHook);
627: }
628: s_shutdownHook = null;
629: if (null != m_spe) {
630: SingletonPooledExecutor.shutdown();
631: }
632: throw ex;
633: }
634: }
635: return m_spe;
636: }
637: }
638:
639: /*
640: * (non-Javadoc)
641: *
642: * @see EDU.oswego.cs.dl.util.concurrent.Executor#execute(java.lang.Runnable)
643: */
644: public void execute(ToolAgentCmdProxy cmdProxy)
645: throws InterruptedException {
646: synchronized (m_assIdToCount) {
647: /*
648: * Keep track of outstanding SchedulerToolAgent calls per assId/Activity.
649: */
650: Object assId = cmdProxy.getAssId();
651: Integer curCnt = null == assId ? null
652: : (Integer) m_assIdToCount.get(assId);
653: int assIdCnt = null == curCnt ? 1 : curCnt.intValue() + 1;
654: m_assIdToCount.put(assId, new Integer(assIdCnt));
655: }
656: super .execute(cmdProxy);
657: }
658:
659: /**
660: * @param proxy
661: * @return
662: */
663: public int updateCompleteCount(ToolAgentCmdProxy cmdProxy) {
664: int returnCnt;
665: synchronized (m_assIdToCount) {
666: Object assId = cmdProxy.getAssId();
667: Integer curCnt = null == assId ? null
668: : (Integer) m_assIdToCount.get(assId);
669: if (null == curCnt) {
670: error("Unable to find cmd count for assId "
671: + assId.toString());
672: returnCnt = -1;
673: }
674:
675: /*
676: * Only complete the last scheduled activity
677: */
678: returnCnt = Math.max(curCnt.intValue() - 1, 0);
679: m_assIdToCount.put(assId, new Integer(returnCnt));
680: }
681:
682: return returnCnt;
683: }
684:
685: public int resetCompleteCount(ToolAgentCmdProxy cmdProxy) {
686: int returnCnt;
687: synchronized (m_assIdToCount) {
688: Object assId = cmdProxy.getAssId();
689: Integer curCnt = null == assId ? null
690: : (Integer) m_assIdToCount.get(assId);
691: if (null == curCnt) {
692: error("Unable to find cmd count for assId "
693: + assId.toString());
694: returnCnt = -1;
695: }
696:
697: /*
698: * Only complete the last scheduled activity
699: */
700: returnCnt = 0;
701: m_assIdToCount.put(assId, new Integer(returnCnt));
702: }
703:
704: return returnCnt;
705: }
706:
707: }
|