001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.servicemix.common.endpoints;
018:
019: import java.util.Date;
020:
021: import javax.jbi.JBIException;
022: import javax.jbi.servicedesc.ServiceEndpoint;
023: import javax.xml.namespace.QName;
024:
025: import org.apache.servicemix.common.DefaultComponent;
026: import org.apache.servicemix.common.ServiceUnit;
027: import org.apache.servicemix.components.varscheduler.ScheduleIterator;
028: import org.apache.servicemix.components.varscheduler.Scheduler;
029: import org.apache.servicemix.components.varscheduler.SchedulerTask;
030: import org.apache.servicemix.executors.Executor;
031:
032: /**
033: * An implementation inheritence class for an endpoint which polls some resource at periodic intervals to decide if
034: * there is an event to process.
035: *
036: * @version $Revision: 464478 $
037: */
038: public abstract class PollingEndpoint extends ConsumerEndpoint {
039:
040: private Executor executor;
041: private Scheduler scheduler;
042: private Date firstTime;
043: private long period = 5000;
044: private long delay;
045: private SchedulerTask schedulerTask;
046: private ScheduleIterator scheduleIterator;
047: private boolean started;
048: private boolean scheduleExecutedFlag;
049:
050: public PollingEndpoint() {
051: }
052:
053: public PollingEndpoint(ServiceUnit serviceUnit, QName service,
054: String endpoint) {
055: super (serviceUnit, service, endpoint);
056: }
057:
058: public PollingEndpoint(DefaultComponent component,
059: ServiceEndpoint endpoint) {
060: super (component.getServiceUnit(), endpoint.getServiceName(),
061: endpoint.getEndpointName());
062: }
063:
064: /**
065: * Polls the underlying resource to see if some event is required
066: *
067: * @throws JBIException
068: */
069: public abstract void poll() throws Exception;
070:
071: // Properties
072: // -------------------------------------------------------------------------
073: public Executor getExecutor() {
074: return executor;
075: }
076:
077: public long getDelay() {
078: return delay;
079: }
080:
081: public void setDelay(long delay) {
082: this .delay = delay;
083: }
084:
085: public Date getFirstTime() {
086: return firstTime;
087: }
088:
089: public void setFirstTime(Date firstTime) {
090: this .firstTime = firstTime;
091: }
092:
093: public long getPeriod() {
094: return period;
095: }
096:
097: public void setPeriod(long period) {
098: this .period = period;
099: }
100:
101: public Scheduler getScheduler() {
102: return scheduler;
103: }
104:
105: public void setScheduler(Scheduler scheduler) {
106: this .scheduler = scheduler;
107: }
108:
109: public synchronized void start() throws Exception {
110: if (!started) {
111: started = true;
112:
113: if (scheduler == null) {
114: scheduler = new Scheduler(true);
115: }
116: if (scheduleIterator == null) {
117: scheduleIterator = new PollingEndpoint.PollScheduleIterator();
118: }
119:
120: if (executor == null) {
121: executor = getServiceUnit().getComponent()
122: .getExecutor();
123: }
124: if (schedulerTask != null) {
125: schedulerTask.cancel();
126: }
127: schedulerTask = new PollingEndpoint.PollSchedulerTask();
128: this .scheduler.schedule(schedulerTask, scheduleIterator);
129: }
130: super .start();
131: }
132:
133: public synchronized void stop() throws Exception {
134: if (schedulerTask != null) {
135: schedulerTask.cancel();
136: schedulerTask = null;
137: }
138: scheduleExecutedFlag = false;
139: started = false;
140: scheduler.cancel();
141: scheduler = null;
142: scheduleIterator = null;
143: executor = null;
144: super .stop();
145: }
146:
147: // Implementation methods
148: // -------------------------------------------------------------------------
149:
150: private class PollSchedulerTask extends SchedulerTask {
151: public void run() {
152: try {
153: // lets run the work inside the JCA worker pools to ensure
154: // the threads are setup correctly when we actually do stuff
155: getExecutor().execute(new Runnable() {
156: public void run() {
157: try {
158: poll();
159: } catch (Exception e) {
160: handlePollException(e);
161: }
162: }
163: });
164: } catch (Throwable e) {
165: logger.error("Failed to schedule work: " + e, e);
166: }
167: }
168: }
169:
170: protected void handlePollException(Exception e) {
171: logger.error("Caught exception while polling: " + e, e);
172: }
173:
174: private class PollScheduleIterator implements ScheduleIterator {
175: public Date nextExecution() {
176: long nextTime = System.currentTimeMillis();
177: if (scheduleExecutedFlag) {
178: nextTime += period;
179: } else {
180: if (firstTime != null) {
181: nextTime = firstTime.getTime();
182: }
183: nextTime += delay;
184: scheduleExecutedFlag = true;
185: }
186: return (started) ? new Date(nextTime) : null;
187: }
188: }
189: }
|