001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */
019: package org.apache.synapse.transport.base;
020:
021: import org.apache.axis2.description.AxisService;
022: import org.apache.axis2.description.Parameter;
023:
024: import java.util.TimerTask;
025: import java.util.Timer;
026: import java.util.Map;
027: import java.util.HashMap;
028:
029: public abstract class AbstractPollingTransportListener extends
030: AbstractTransportListener {
031:
032: /** the parameter in the services.xml that specifies the poll interval for a service */
033: public static final String TRANSPORT_POLL_INTERVAL = "transport.PollInterval";
034: /** the default poll interval */
035: public static final int DEFAULT_POLL_INTERVAL = 5 * 60; // 5 mins by default
036:
037: /** default interval in ms before polls */
038: protected int pollInterval = DEFAULT_POLL_INTERVAL;
039: /** The main timer that runs as a daemon thread */
040: protected final Timer timer = new Timer("PollTimer", true);
041: /** is a poll already executing? */
042: protected boolean pollInProgress = false;
043: /** a lock to prevent concurrent execution of polling */
044: protected final Object pollLock = new Object();
045: /** a map that keeps track of services to the timer tasks created for them */
046: protected Map serviceToTimerTaskMap = new HashMap();
047:
048: /**
049: * Schedule a repeated poll at the specified interval for the given service
050: * @param service the service to be polled
051: * @param pollInterval the interval between successive polls in seconds
052: */
053: public void schedulePoll(AxisService service, long pollInterval) {
054: pollInterval *= 1000; // convert to millisecs
055:
056: TimerTask task = (TimerTask) serviceToTimerTaskMap.get(service);
057:
058: // if a timer task exists, cancel it first and create a new one
059: if (task != null) {
060: task.cancel();
061: }
062:
063: task = new TimerTask() {
064: public void run() {
065: if (pollInProgress) {
066: if (log.isDebugEnabled()) {
067: log
068: .debug("Transport "
069: + transportName
070: + " onPoll() trigger : already executing poll..");
071: }
072: return;
073: }
074:
075: workerPool.execute(new Runnable() {
076: public void run() {
077: synchronized (pollLock) {
078: pollInProgress = true;
079: try {
080: onPoll();
081: } finally {
082: pollInProgress = false;
083: }
084: }
085: }
086: });
087: }
088: };
089: serviceToTimerTaskMap.put(service, task);
090: timer.scheduleAtFixedRate(task, pollInterval, pollInterval);
091: }
092:
093: /**
094: * Cancel any pending timer tasks for the given service
095: * @param service the service for which the timer task should be cancelled
096: */
097: public void cancelPoll(AxisService service) {
098: TimerTask task = (TimerTask) serviceToTimerTaskMap.get(service);
099: if (task != null) {
100: task.cancel();
101: }
102: }
103:
104: public void onPoll() {
105: }
106:
107: protected void startListeningForService(AxisService service) {
108: Parameter param = service.getParameter(TRANSPORT_POLL_INTERVAL);
109: long pollInterval = DEFAULT_POLL_INTERVAL;
110: if (param != null && param.getValue() instanceof String) {
111: try {
112: pollInterval = Integer.parseInt(param.getValue()
113: .toString());
114: } catch (NumberFormatException e) {
115: log.error("Invalid poll interval : " + param.getValue()
116: + " for service : " + service.getName()
117: + " Using defaults", e);
118: }
119: }
120: schedulePoll(service, pollInterval);
121: }
122:
123: protected void stopListeningForService(AxisService service) {
124: cancelPoll(service);
125: }
126:
127: public int getPollInterval() {
128: return pollInterval;
129: }
130:
131: public void setPollInterval(int pollInterval) {
132: this.pollInterval = pollInterval;
133: }
134: }
|