001: /*******************************************************************************
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: *******************************************************************************/package org.ofbiz.service.job;
019:
020: import java.io.IOException;
021: import java.sql.Timestamp;
022: import java.util.ArrayList;
023: import java.util.Collection;
024: import java.util.Date;
025: import java.util.Iterator;
026: import java.util.List;
027: import java.util.Map;
028:
029: import javolution.util.FastMap;
030:
031: import org.ofbiz.base.util.Debug;
032: import org.ofbiz.base.util.GeneralRuntimeException;
033: import org.ofbiz.base.util.UtilDateTime;
034: import org.ofbiz.base.util.UtilMisc;
035: import org.ofbiz.base.util.UtilProperties;
036: import org.ofbiz.base.util.UtilValidate;
037: import org.ofbiz.entity.GenericDelegator;
038: import org.ofbiz.entity.GenericEntityException;
039: import org.ofbiz.entity.GenericValue;
040: import org.ofbiz.entity.condition.EntityCondition;
041: import org.ofbiz.entity.condition.EntityConditionList;
042: import org.ofbiz.entity.condition.EntityExpr;
043: import org.ofbiz.entity.condition.EntityOperator;
044: import org.ofbiz.entity.serialize.SerializeException;
045: import org.ofbiz.entity.serialize.XmlSerializer;
046: import org.ofbiz.entity.transaction.GenericTransactionException;
047: import org.ofbiz.entity.transaction.TransactionUtil;
048: import org.ofbiz.service.DispatchContext;
049: import org.ofbiz.service.GenericDispatcher;
050: import org.ofbiz.service.LocalDispatcher;
051: import org.ofbiz.service.calendar.RecurrenceInfo;
052: import org.ofbiz.service.calendar.RecurrenceInfoException;
053: import org.ofbiz.service.config.ServiceConfigUtil;
054:
055: /**
056: * JobManager
057: */
058: public class JobManager {
059:
060: public static final String instanceId = UtilProperties
061: .getPropertyValue("general.properties",
062: "unique.instanceId", "ofbiz0");
063: public static final Map updateFields = UtilMisc
064: .toMap("runByInstanceId", instanceId, "statusId",
065: "SERVICE_QUEUED");
066: public static final String module = JobManager.class.getName();
067: public static final String dispatcherName = "JobDispatcher";
068: public static Map registeredManagers = FastMap.newInstance();
069:
070: protected GenericDelegator delegator;
071: protected JobPoller jp;
072:
073: /** Creates a new JobManager object. */
074: public JobManager(GenericDelegator delegator) {
075: if (delegator == null) {
076: throw new GeneralRuntimeException(
077: "ERROR: null delegator passed, cannot create JobManager");
078: }
079: if (JobManager.registeredManagers.get(delegator
080: .getDelegatorName()) != null) {
081: throw new GeneralRuntimeException("JobManager for ["
082: + delegator.getDelegatorName()
083: + "] already running");
084: }
085:
086: this .delegator = delegator;
087: jp = new JobPoller(this );
088: JobManager.registeredManagers.put(delegator.getDelegatorName(),
089: this );
090: }
091:
092: /** Queues a Job to run now. */
093: public void runJob(Job job) throws JobManagerException {
094: if (job.isValid())
095: jp.queueNow(job);
096: }
097:
098: /** Returns the ServiceDispatcher. */
099: public LocalDispatcher getDispatcher() {
100: LocalDispatcher this Dispatcher = GenericDispatcher
101: .getLocalDispatcher(dispatcherName, delegator);
102: return this Dispatcher;
103: }
104:
105: /** Returns the GenericDelegator. */
106: public GenericDelegator getDelegator() {
107: return this .delegator;
108: }
109:
110: public synchronized Iterator poll() {
111: List poll = new ArrayList();
112: Collection jobEnt = null;
113:
114: // sort the results by time
115: List order = UtilMisc.toList("runTime");
116:
117: // basic query
118: List expressions = UtilMisc.toList(new EntityExpr("runTime",
119: EntityOperator.LESS_THAN_EQUAL_TO, UtilDateTime
120: .nowTimestamp()), new EntityExpr(
121: "startDateTime", EntityOperator.EQUALS, null),
122: new EntityExpr("cancelDateTime", EntityOperator.EQUALS,
123: null), new EntityExpr("runByInstanceId",
124: EntityOperator.EQUALS, null));
125:
126: // limit to just defined pools
127: List pools = ServiceConfigUtil.getRunPools();
128: List poolsExpr = UtilMisc.toList(new EntityExpr("poolId",
129: EntityOperator.EQUALS, null));
130: if (pools != null) {
131: Iterator poolsIter = pools.iterator();
132: while (poolsIter.hasNext()) {
133: String poolName = (String) poolsIter.next();
134: poolsExpr.add(new EntityExpr("poolId",
135: EntityOperator.EQUALS, poolName));
136: }
137: }
138:
139: // make the conditions
140: EntityCondition baseCondition = new EntityConditionList(
141: expressions, EntityOperator.AND);
142: EntityCondition poolCondition = new EntityConditionList(
143: poolsExpr, EntityOperator.OR);
144: EntityCondition mainCondition = new EntityConditionList(
145: UtilMisc.toList(baseCondition, poolCondition),
146: EntityOperator.AND);
147:
148: // we will loop until we have no more to do
149: boolean pollDone = false;
150:
151: while (!pollDone) {
152: boolean beganTransaction;
153: try {
154: beganTransaction = TransactionUtil.begin();
155: } catch (GenericTransactionException e) {
156: Debug
157: .logError(
158: e,
159: "Unable to start transaction; not polling for jobs",
160: module);
161: return null;
162: }
163: if (!beganTransaction) {
164: Debug
165: .logError(
166: "Unable to poll for jobs; transaction was not started by this process",
167: module);
168: return null;
169: }
170:
171: try {
172: // first update the jobs w/ this instance running information
173: delegator.storeByCondition("JobSandbox", updateFields,
174: mainCondition);
175:
176: // now query all the 'queued' jobs for this instance
177: jobEnt = delegator.findByAnd("JobSandbox",
178: updateFields, order);
179: //jobEnt = delegator.findByCondition("JobSandbox", mainCondition, null, order);
180: } catch (GenericEntityException ee) {
181: Debug.logError(ee, "Cannot load jobs from datasource.",
182: module);
183: } catch (Exception e) {
184: Debug.logError(e, "Unknown error.", module);
185: }
186:
187: if (jobEnt != null && jobEnt.size() > 0) {
188: Iterator i = jobEnt.iterator();
189:
190: while (i.hasNext()) {
191: GenericValue v = (GenericValue) i.next();
192: DispatchContext dctx = getDispatcher()
193: .getDispatchContext();
194:
195: if (dctx == null) {
196: Debug
197: .logError(
198: "Unable to locate DispatchContext object; not running job!",
199: module);
200: continue;
201: }
202: Job job = new PersistedServiceJob(dctx, v, null); // todo fix the requester
203: try {
204: job.queue();
205: poll.add(job);
206: } catch (InvalidJobException e) {
207: Debug.logError(e, module);
208: }
209: }
210: } else {
211: pollDone = true;
212: }
213:
214: // finished this run; commit the transaction
215: try {
216: TransactionUtil.commit(beganTransaction);
217: } catch (GenericTransactionException e) {
218: Debug.logError(e, module);
219: }
220:
221: }
222: return poll.iterator();
223: }
224:
225: public synchronized void reloadCrashedJobs() {
226: String instanceId = UtilProperties.getPropertyValue(
227: "general.properties", "unique.instanceId", "ofbiz0");
228: List toStore = new ArrayList();
229: List crashed = null;
230:
231: List exprs = UtilMisc.toList(new EntityExpr("finishDateTime",
232: EntityOperator.EQUALS, null));
233: exprs.add(new EntityExpr("cancelDateTime",
234: EntityOperator.EQUALS, null));
235: exprs.add(new EntityExpr("runByInstanceId",
236: EntityOperator.EQUALS, instanceId));
237: try {
238: crashed = delegator.findByAnd("JobSandbox", exprs, UtilMisc
239: .toList("startDateTime"));
240: } catch (GenericEntityException e) {
241: Debug.logError(e, "Unable to load crashed jobs", module);
242: }
243:
244: if (crashed != null && crashed.size() > 0) {
245: Iterator i = crashed.iterator();
246: while (i.hasNext()) {
247: GenericValue job = (GenericValue) i.next();
248: long runtime = job.getTimestamp("runTime").getTime();
249: RecurrenceInfo ri = JobManager.getRecurrenceInfo(job);
250: if (ri != null) {
251: long next = ri.next();
252: if (next <= runtime) {
253: Timestamp now = UtilDateTime.nowTimestamp();
254: // only re-schedule if there is no new recurrences since last run
255: Debug.log("Scheduling Job : " + job, module);
256:
257: String newJobId = job.getDelegator()
258: .getNextSeqId("JobSandbox");
259: String pJobId = job.getString("parentJobId");
260: if (pJobId == null) {
261: pJobId = job.getString("jobId");
262: }
263: GenericValue newJob = GenericValue.create(job);
264: newJob.set("statusId", "SERVICE_PENDING");
265: newJob.set("runTime", now);
266: newJob.set("jobId", newJobId);
267: newJob.set("previousJobId", job
268: .getString("jobId"));
269: newJob.set("parentJobId", pJobId);
270: newJob.set("startDateTime", null);
271: newJob.set("runByInstanceId", null);
272: toStore.add(newJob);
273:
274: // set the cancel time on the old job to the same as the re-schedule time
275: job.set("statusId", "SERVICE_CRASHED");
276: job.set("cancelDateTime", now);
277: toStore.add(job);
278: }
279: }
280: }
281:
282: if (toStore.size() > 0) {
283: try {
284: delegator.storeAll(toStore);
285: } catch (GenericEntityException e) {
286: Debug.logError(e, module);
287: }
288: if (Debug.infoOn())
289: Debug.logInfo("-- " + toStore.size()
290: + " jobs re-scheduled", module);
291: }
292:
293: } else {
294: if (Debug.infoOn())
295: Debug.logInfo("No crashed jobs to re-schedule", module);
296: }
297: }
298:
299: /**
300: * Schedule a job to start at a specific time with specific recurrence info
301: *@param serviceName The name of the service to invoke
302: *@param context The context for the service
303: *@param startTime The time in milliseconds the service should run
304: *@param frequency The frequency of the recurrence (HOURLY,DAILY,MONTHLY,etc)
305: *@param interval The interval of the frequency recurrence
306: *@param count The number of times to repeat
307: */
308: public void schedule(String serviceName, Map context,
309: long startTime, int frequency, int interval, int count)
310: throws JobManagerException {
311: schedule(serviceName, context, startTime, frequency, interval,
312: count, 0);
313: }
314:
315: /**
316: * Schedule a job to start at a specific time with specific recurrence info
317: *@param serviceName The name of the service to invoke
318: *@param context The context for the service
319: *@param startTime The time in milliseconds the service should run
320: *@param frequency The frequency of the recurrence (HOURLY,DAILY,MONTHLY,etc)
321: *@param interval The interval of the frequency recurrence
322: *@param endTime The time in milliseconds the service should expire
323: */
324: public void schedule(String serviceName, Map context,
325: long startTime, int frequency, int interval, long endTime)
326: throws JobManagerException {
327: schedule(serviceName, context, startTime, frequency, interval,
328: -1, endTime);
329: }
330:
331: /**
332: * Schedule a job to start at a specific time with specific recurrence info
333: *@param serviceName The name of the service to invoke
334: *@param context The context for the service
335: *@param startTime The time in milliseconds the service should run
336: *@param frequency The frequency of the recurrence (HOURLY,DAILY,MONTHLY,etc)
337: *@param interval The interval of the frequency recurrence
338: *@param count The number of times to repeat
339: *@param endTime The time in milliseconds the service should expire
340: */
341: public void schedule(String serviceName, Map context,
342: long startTime, int frequency, int interval, int count,
343: long endTime) throws JobManagerException {
344: schedule(null, serviceName, context, startTime, frequency,
345: interval, count, endTime);
346: }
347:
348: /**
349: * Schedule a job to start at a specific time with specific recurrence info
350: *@param poolName The name of the pool to run the service from
351: *@param serviceName The name of the service to invoke
352: *@param context The context for the service
353: *@param startTime The time in milliseconds the service should run
354: *@param frequency The frequency of the recurrence (HOURLY,DAILY,MONTHLY,etc)
355: *@param interval The interval of the frequency recurrence
356: *@param count The number of times to repeat
357: *@param endTime The time in milliseconds the service should expire
358: */
359: public void schedule(String poolName, String serviceName,
360: Map context, long startTime, int frequency, int interval,
361: int count, long endTime) throws JobManagerException {
362: schedule(null, null, serviceName, context, startTime,
363: frequency, interval, count, endTime, -1);
364: }
365:
366: /**
367: * Schedule a job to start at a specific time with specific recurrence info
368: *@param jobName The name of the job
369: *@param poolName The name of the pool to run the service from
370: *@param serviceName The name of the service to invoke
371: *@param context The context for the service
372: *@param startTime The time in milliseconds the service should run
373: *@param frequency The frequency of the recurrence (HOURLY,DAILY,MONTHLY,etc)
374: *@param interval The interval of the frequency recurrence
375: *@param count The number of times to repeat
376: *@param endTime The time in milliseconds the service should expire
377: *@param maxRetry The max number of retries on failure (-1 for no max)
378: */
379: public void schedule(String jobName, String poolName,
380: String serviceName, Map context, long startTime,
381: int frequency, int interval, int count, long endTime,
382: int maxRetry) throws JobManagerException {
383: if (delegator == null) {
384: Debug.logWarning(
385: "No delegator referenced; cannot schedule job.",
386: module);
387: return;
388: }
389:
390: // persist the context
391: String dataId = null;
392: try {
393: dataId = delegator.getNextSeqId("RuntimeData");
394: GenericValue runtimeData = delegator.makeValue(
395: "RuntimeData", UtilMisc.toMap("runtimeDataId",
396: dataId));
397:
398: runtimeData.set("runtimeInfo", XmlSerializer
399: .serialize(context));
400: delegator.create(runtimeData);
401: } catch (GenericEntityException ee) {
402: throw new JobManagerException(ee.getMessage(), ee);
403: } catch (SerializeException se) {
404: throw new JobManagerException(se.getMessage(), se);
405: } catch (IOException ioe) {
406: throw new JobManagerException(ioe.getMessage(), ioe);
407: }
408:
409: // schedule the job
410: schedule(jobName, poolName, serviceName, dataId, startTime,
411: frequency, interval, count, endTime, maxRetry);
412: }
413:
414: /**
415: * Schedule a job to start at a specific time with specific recurrence info
416: *@param poolName The name of the pool to run the service from
417: *@param serviceName The name of the service to invoke
418: *@param dataId The persisted context (RuntimeData.runtimeDataId)
419: *@param startTime The time in milliseconds the service should run
420: */
421: public void schedule(String poolName, String serviceName,
422: String dataId, long startTime) throws JobManagerException {
423: schedule(null, poolName, serviceName, dataId, startTime, -1, 0,
424: 1, 0, -1);
425: }
426:
427: /**
428: * Schedule a job to start at a specific time with specific recurrence info
429: *@param jobName The name of the job
430: *@param poolName The name of the pool to run the service from
431: *@param serviceName The name of the service to invoke
432: *@param dataId The persisted context (RuntimeData.runtimeDataId)
433: *@param startTime The time in milliseconds the service should run
434: *@param frequency The frequency of the recurrence (HOURLY,DAILY,MONTHLY,etc)
435: *@param interval The interval of the frequency recurrence
436: *@param count The number of times to repeat
437: *@param endTime The time in milliseconds the service should expire
438: *@param maxRetry The max number of retries on failure (-1 for no max)
439: */
440: public void schedule(String jobName, String poolName,
441: String serviceName, String dataId, long startTime,
442: int frequency, int interval, int count, long endTime,
443: int maxRetry) throws JobManagerException {
444: if (delegator == null) {
445: Debug.logWarning(
446: "No delegator referenced; cannot schedule job.",
447: module);
448: return;
449: }
450:
451: // create the recurrence
452: String infoId = null;
453: if (frequency > -1 && count != 0) {
454: try {
455: RecurrenceInfo info = RecurrenceInfo.makeInfo(
456: delegator, startTime, frequency, interval,
457: count);
458: infoId = info.primaryKey();
459: } catch (RecurrenceInfoException e) {
460: throw new JobManagerException(e.getMessage(), e);
461: }
462: }
463:
464: // set the persisted fields
465: if (UtilValidate.isEmpty(jobName)) {
466: jobName = Long.toString((new Date().getTime()));
467: }
468: String jobId = delegator.getNextSeqId("JobSandbox");
469: Map jFields = UtilMisc.toMap("jobId", jobId, "jobName",
470: jobName, "runTime", new java.sql.Timestamp(startTime),
471: "serviceName", serviceName, "recurrenceInfoId", infoId,
472: "runtimeDataId", dataId);
473:
474: // set the pool ID
475: if (poolName != null && poolName.length() > 0) {
476: jFields.put("poolId", poolName);
477: } else {
478: jFields.put("poolId", ServiceConfigUtil.getSendPool());
479: }
480:
481: // set the loader name
482: jFields.put("loaderName", dispatcherName);
483:
484: // set the max retry
485: jFields.put("maxRetry", new Long(maxRetry));
486:
487: // create the value and store
488: GenericValue jobV = null;
489: try {
490: jobV = delegator.makeValue("JobSandbox", jFields);
491: delegator.create(jobV);
492: } catch (GenericEntityException e) {
493: throw new JobManagerException(e.getMessage(), e);
494: }
495: }
496:
497: /**
498: * Kill a JobInvoker Thread.
499: * @param threadName Name of the JobInvoker Thread to kill.
500: */
501: public void killThread(String threadName) {
502: jp.killThread(threadName);
503: }
504:
505: /**
506: * Get a List of each threads current state.
507: * @return List containing a Map of each thread's state.
508: */
509: public List processList() {
510: return jp.getPoolState();
511: }
512:
513: /** Close out the scheduler thread. */
514: public void shutdown() {
515: if (jp != null) {
516: jp.stop();
517: jp = null;
518: Debug.logInfo("JobManager: Stopped Scheduler Thread.",
519: module);
520: }
521: }
522:
523: public void finalize() throws Throwable {
524: this .shutdown();
525: super .finalize();
526: }
527:
528: /** gets the recurrence info object for a job. */
529: public static RecurrenceInfo getRecurrenceInfo(GenericValue job) {
530: try {
531: if (job != null
532: && !UtilValidate.isEmpty(job
533: .getString("recurrenceInfoId"))) {
534: if (job.get("cancelDateTime") != null) {
535: // cancel has been flagged, no more recurrence
536: return null;
537: }
538: GenericValue ri = job.getRelatedOne("RecurrenceInfo");
539:
540: if (ri != null) {
541: return new RecurrenceInfo(ri);
542: } else {
543: return null;
544: }
545: } else {
546: return null;
547: }
548: } catch (GenericEntityException e) {
549: e.printStackTrace();
550: Debug
551: .logError(
552: e,
553: "Problem getting RecurrenceInfo entity from JobSandbox",
554: module);
555: } catch (RecurrenceInfoException re) {
556: re.printStackTrace();
557: Debug.logError(re,
558: "Problem creating RecurrenceInfo instance: "
559: + re.getMessage(), module);
560: }
561: return null;
562: }
563:
564: }
|