001: /*******************************************************************************
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: *******************************************************************************/package org.ofbiz.service.job;
019:
020: import java.io.IOException;
021: import java.sql.Timestamp;
022: import java.util.Calendar;
023: import java.util.Date;
024: import java.util.HashMap;
025: import java.util.Map;
026:
027: import javax.xml.parsers.ParserConfigurationException;
028:
029: import org.ofbiz.base.util.Debug;
030: import org.ofbiz.base.util.UtilDateTime;
031: import org.ofbiz.base.util.UtilMisc;
032: import org.ofbiz.base.util.UtilProperties;
033: import org.ofbiz.base.util.UtilValidate;
034: import org.ofbiz.entity.GenericDelegator;
035: import org.ofbiz.entity.GenericEntityException;
036: import org.ofbiz.entity.GenericValue;
037: import org.ofbiz.entity.serialize.SerializeException;
038: import org.ofbiz.entity.serialize.XmlSerializer;
039: import org.ofbiz.service.DispatchContext;
040: import org.ofbiz.service.GenericRequester;
041: import org.ofbiz.service.ServiceUtil;
042: import org.ofbiz.service.calendar.RecurrenceInfo;
043: import org.ofbiz.service.config.ServiceConfigUtil;
044: import org.xml.sax.SAXException;
045:
046: /**
047: * Entity Service Job - Store => Schedule => Run
048: */
049: public class PersistedServiceJob extends GenericServiceJob {
050:
051: public static final String module = PersistedServiceJob.class
052: .getName();
053:
054: private transient GenericDelegator delegator = null;
055: private Timestamp storedDate = null;
056: private long nextRecurrence = -1;
057: private long maxRetry = -1;
058:
059: /**
060: * Creates a new PersistedServiceJob
061: * @param dctx
062: * @param jobValue
063: * @param req
064: */
065: public PersistedServiceJob(DispatchContext dctx,
066: GenericValue jobValue, GenericRequester req) {
067: super (jobValue.getString("jobId"), jobValue
068: .getString("jobName"));
069: this .delegator = dctx.getDelegator();
070: this .requester = req;
071: this .dctx = dctx;
072: this .storedDate = jobValue.getTimestamp("runTime");
073: this .runtime = storedDate.getTime();
074: this .maxRetry = jobValue.get("maxRetry") != null ? jobValue
075: .getLong("maxRetry").longValue() : -1;
076: }
077:
078: public void queue() throws InvalidJobException {
079: super .queue();
080:
081: // refresh the job object
082: GenericValue jobValue = null;
083: try {
084: jobValue = this .getJob();
085: jobValue.refresh();
086: } catch (GenericEntityException e) {
087: runtime = -1;
088: throw new InvalidJobException(
089: "Unable to refresh Job object", e);
090: }
091:
092: // make sure it isn't already set/cancelled
093: if (runtime != -1) {
094: Timestamp cancelTime = jobValue
095: .getTimestamp("cancelDateTime");
096: Timestamp startTime = jobValue
097: .getTimestamp("startDateTime");
098: if (cancelTime != null || startTime != null) {
099: // job not available
100: runtime = -1;
101: throw new InvalidJobException("Job [" + getJobId()
102: + "] is not available");
103:
104: } else {
105: // set the start time to now
106: jobValue.set("startDateTime", UtilDateTime
107: .nowTimestamp());
108: jobValue.set("statusId", "SERVICE_RUNNING");
109: try {
110: jobValue.store();
111: } catch (GenericEntityException e) {
112: runtime = -1;
113: throw new InvalidJobException(
114: "Unable to set the startDateTime on the current job ["
115: + getJobId() + "]; not running!", e);
116:
117: }
118: }
119: }
120: }
121:
122: /**
123: * @see org.ofbiz.service.job.GenericServiceJob#init()
124: */
125: protected void init() throws InvalidJobException {
126: super .init();
127:
128: // configure any addition recurrences
129: GenericValue job = this .getJob();
130: RecurrenceInfo recurrence = JobManager.getRecurrenceInfo(job);
131:
132: String instanceId = UtilProperties.getPropertyValue(
133: "general.properties", "unique.instanceId", "ofbiz0");
134: if (!instanceId.equals(job.getString("runByInstanceId"))) {
135: throw new InvalidJobException(
136: "Job has been accpeted by a different instance!");
137: }
138:
139: try {
140: if (recurrence != null) {
141: recurrence.incrementCurrentCount();
142: long next = recurrence.next();
143: createRecurrence(job, next);
144: }
145: } catch (GenericEntityException e) {
146: throw new RuntimeException(e.getMessage());
147: }
148: if (Debug.infoOn())
149: Debug.logInfo(this .toString() + "[" + getJobId()
150: + "] -- Next runtime: " + nextRecurrence, module);
151: }
152:
153: private void createRecurrence(GenericValue job, long next)
154: throws GenericEntityException {
155: if (Debug.verboseOn())
156: Debug.logVerbose("Next runtime returned: " + next, module);
157:
158: if (next > runtime) {
159: String newJobId = job.getDelegator().getNextSeqId(
160: "JobSandbox");
161: String pJobId = job.getString("parentJobId");
162: if (pJobId == null) {
163: pJobId = job.getString("jobId");
164: }
165: GenericValue newJob = GenericValue.create(job);
166: newJob.set("jobId", newJobId);
167: newJob.set("previousJobId", job.getString("jobId"));
168: newJob.set("parentJobId", pJobId);
169: newJob.set("statusId", "SERVICE_PENDING");
170: newJob.set("startDateTime", null);
171: newJob.set("runByInstanceId", null);
172: newJob.set("runTime", new java.sql.Timestamp(next));
173: nextRecurrence = next;
174: delegator.create(newJob);
175: if (Debug.verboseOn())
176: Debug.logVerbose("Created next job entry: " + newJob,
177: module);
178: }
179: }
180:
181: /**
182: * @see org.ofbiz.service.job.GenericServiceJob#finish()
183: */
184: protected void finish() throws InvalidJobException {
185: super .finish();
186:
187: // set the finish date
188: GenericValue job = getJob();
189: String status = job.getString("statusId");
190: if (status == null || "SERVICE_RUNNING".equals(status)) {
191: job.set("statusId", "SERVICE_FINISHED");
192: }
193: job.set("finishDateTime", UtilDateTime.nowTimestamp());
194: try {
195: job.store();
196: } catch (GenericEntityException e) {
197: Debug.logError(e, "Cannot update the job [" + getJobId()
198: + "] sandbox", module);
199: }
200: }
201:
202: /**
203: * @see org.ofbiz.service.job.GenericServiceJob#failed(Throwable)
204: */
205: protected void failed(Throwable t) throws InvalidJobException {
206: super .failed(t);
207:
208: GenericValue job = getJob();
209: // if the job has not been re-scheduled; we need to re-schedule and run again
210: if (nextRecurrence == -1) {
211: if (this .canRetry()) {
212: // create a recurrence
213: Calendar cal = Calendar.getInstance();
214: cal.setTime(new Date());
215: cal.add(Calendar.MINUTE, ServiceConfigUtil
216: .getFailedRetryMin());
217: long next = cal.getTimeInMillis();
218: try {
219: createRecurrence(job, next);
220: } catch (GenericEntityException gee) {
221: Debug.logError(gee,
222: "ERROR: Unable to re-schedule job ["
223: + getJobId() + "] to re-run : "
224: + job, module);
225: }
226: Debug.log("Persisted Job [" + getJobId()
227: + "] Failed Re-Scheduling : " + next, module);
228: } else {
229: Debug
230: .logWarning(
231: "Persisted Job ["
232: + getJobId()
233: + "] Failed - Max Retry Hit; not re-scheduling",
234: module);
235: }
236: }
237: // set the failed status
238: job.set("statusId", "SERVICE_FAILED");
239: job.set("finishDateTime", UtilDateTime.nowTimestamp());
240: try {
241: job.store();
242: } catch (GenericEntityException e) {
243: Debug.logError(e, "Cannot update the job sandbox", module);
244: }
245: }
246:
247: /**
248: * @see org.ofbiz.service.job.GenericServiceJob#getServiceName()
249: */
250: protected String getServiceName() throws InvalidJobException {
251: GenericValue jobObj = getJob();
252: if (jobObj == null || jobObj.get("serviceName") == null) {
253: return null;
254: }
255: return jobObj.getString("serviceName");
256: }
257:
258: /**
259: * @see org.ofbiz.service.job.GenericServiceJob#getContext()
260: */
261: protected Map getContext() throws InvalidJobException {
262: Map context = null;
263: try {
264: GenericValue jobObj = getJob();
265: if (!UtilValidate
266: .isEmpty(jobObj.getString("runtimeDataId"))) {
267: GenericValue contextObj = jobObj
268: .getRelatedOne("RuntimeData");
269: if (contextObj != null) {
270: context = (Map) XmlSerializer.deserialize(
271: contextObj.getString("runtimeInfo"),
272: delegator);
273: }
274: }
275:
276: if (context == null) {
277: context = new HashMap();
278: }
279:
280: // check the runAsUser
281: if (!UtilValidate.isEmpty(jobObj.get("runAsUser"))) {
282: context.put("userLogin", ServiceUtil.getUserLogin(dctx,
283: context, jobObj.getString("runAsUser")));
284: }
285: } catch (GenericEntityException e) {
286: Debug
287: .logError(
288: e,
289: "PersistedServiceJob.getContext(): Entity Exception",
290: module);
291: } catch (SerializeException e) {
292: Debug
293: .logError(
294: e,
295: "PersistedServiceJob.getContext(): Serialize Exception",
296: module);
297: } catch (ParserConfigurationException e) {
298: Debug
299: .logError(
300: e,
301: "PersistedServiceJob.getContext(): Parse Exception",
302: module);
303: } catch (SAXException e) {
304: Debug.logError(e,
305: "PersistedServiceJob.getContext(): SAXException",
306: module);
307: } catch (IOException e) {
308: Debug.logError(e,
309: "PersistedServiceJob.getContext(): IOException",
310: module);
311: }
312: if (context == null) {
313: Debug.logError("Job context is null", module);
314: }
315:
316: return context;
317: }
318:
319: // gets the job value object
320: private GenericValue getJob() throws InvalidJobException {
321: try {
322: Map fields = UtilMisc.toMap("jobId", getJobId());
323: GenericValue jobObj = delegator.findByPrimaryKey(
324: "JobSandbox", fields);
325:
326: if (jobObj == null) {
327: throw new InvalidJobException("Job [" + getJobId()
328: + "] came back null from datasource");
329: }
330: return jobObj;
331: } catch (GenericEntityException e) {
332: throw new InvalidJobException("Cannot get job definition ["
333: + getJobId() + "] from entity", e);
334: }
335: }
336:
337: // returns the number of current retries
338: private long getRetries() throws InvalidJobException {
339: GenericValue job = this .getJob();
340: String pJobId = job.getString("parentJobId");
341: if (pJobId == null) {
342: return 0;
343: }
344:
345: Map fields = UtilMisc.toMap("parentJobId", pJobId, "statusId",
346: "SERVICE_FAILED");
347: long count = 0;
348: try {
349: count = delegator.findCountByAnd("JobSandbox", fields);
350: } catch (GenericEntityException e) {
351: Debug.logError(e, module);
352: }
353:
354: return count + 1; // add one for the parent
355: }
356:
357: private boolean canRetry() throws InvalidJobException {
358: if (maxRetry == -1) {
359: return true;
360: }
361: if (this .getRetries() < maxRetry) {
362: return true;
363: }
364: return false;
365: }
366: }
|