001: /*
002: * The contents of this file are subject to the Sapient Public License
003: * Version 1.0 (the "License"); you may not use this file except in compliance
004: * with the License. You may obtain a copy of the License at
005: * http://carbon.sf.net/License.html.
006: *
007: * Software distributed under the License is distributed on an "AS IS" basis,
008: * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for
009: * the specific language governing rights and limitations under the License.
010: *
011: * The Original Code is The Carbon Component Framework.
012: *
013: * The Initial Developer of the Original Code is Sapient Corporation
014: *
015: * Copyright (C) 2003 Sapient Corporation. All Rights Reserved.
016: */
017:
018: package org.sape.carbon.services.threadpool;
019:
020: import java.util.ArrayList;
021: import java.util.Collections;
022: import java.util.List;
023:
024: import org.apache.commons.logging.Log;
025: import org.apache.commons.logging.LogFactory;
026:
027: import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
028: import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
029: import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
030:
031: import org.sape.carbon.core.component.Component;
032: import org.sape.carbon.core.component.ComponentConfiguration;
033: import org.sape.carbon.core.component.lifecycle.Configurable;
034: import org.sape.carbon.core.component.lifecycle.Destroyable;
035: import org.sape.carbon.core.component.lifecycle.Initializable;
036: import org.sape.carbon.core.component.lifecycle.Startable;
037: import org.sape.carbon.core.component.lifecycle.Suspendable;
038: import org.sape.carbon.core.config.InvalidConfigurationException;
039: import org.sape.carbon.core.exception.ExceptionUtility;
040: import org.sape.carbon.core.exception.InvalidParameterException;
041:
042: /**
043: * This implementation of ThreadPool relys on Doug Lea's PooledExecutor thread
044: * pool thread pool implementation. The PooledExecutor is a robust, highly
045: * configurable thread pool. Some configurations do not guarantee task
046: * execution, however. See
047: * <a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html">
048: * Overview of package util.concurrent</a> for more information about the
049: * PooledExecutor and other related classes.
050: *
051: * <br>Copyright 2002 Sapient
052: * @since carbon 2.1
053: * @author Douglas Voet, Nov 5, 2002
054: * @version $Revision: 1.8 $($Author: dvoet $ / $Date: 2003/11/20 18:49:58 $)
055: */
056: public class DefaultThreadPoolImpl implements ThreadPool, Configurable,
057: Initializable, Startable, Suspendable, Destroyable {
058:
059: private ThreadPoolConfiguration config;
060: private PooledExecutor threadPool = null;
061: private List failedTasks;
062: private Component this Component;
063: // explicitly a BoundedBuffer because it supports the size() method
064: private BoundedBuffer taskQueue = null;
065:
066: public TaskInfo execute(Runnable task, String taskName) {
067: return execute(task, taskName, null);
068: }
069:
070: public TaskInfo execute(Runnable task, String taskName,
071: TaskCallback callback) {
072:
073: if (task == null) {
074: throw new InvalidParameterException(this .getClass(),
075: "task cannot be null");
076: }
077:
078: try {
079: TaskInfoImpl info = new TaskInfoImpl(task, taskName,
080: callback, this .this Component.getComponentName());
081:
082: TaskRunner taskRunner = new TaskRunner(info);
083: this .threadPool.execute(taskRunner);
084: return info;
085:
086: } catch (InterruptedException ie) {
087: Thread.currentThread().interrupt();
088: throw new ThreadPoolRuntimeException(
089: this .getClass(),
090: "Interrupted while queuing task [" + taskName + "]",
091: ie);
092: }
093: }
094:
095: public List getFailedTasks() {
096: return Collections.unmodifiableList(this .failedTasks);
097: }
098:
099: public Integer getQueueSize() {
100: return new Integer(this .taskQueue.size());
101: }
102:
103: public Integer getPoolSize() {
104: return new Integer(this .threadPool.getPoolSize());
105: }
106:
107: /**
108: * initializes this.failedTasks and saves thisComponent for later
109: */
110: public void initialize(Component this Component) throws Exception {
111: this .failedTasks = Collections
112: .synchronizedList(new ArrayList());
113: this .this Component = this Component;
114: }
115:
116: /**
117: * validates the configuration object and stores it
118: */
119: public void configure(ComponentConfiguration configuration)
120: throws Exception {
121:
122: this .config = (ThreadPoolConfiguration) configuration;
123:
124: // validate configuration
125: if (this .config.getFailureListCapacity() < 0) {
126: throw new InvalidConfigurationException(this .getClass(),
127: configuration.getConfigurationName(),
128: "FailureListCapacity", "Must be >= 0");
129: }
130: if (this .config.getThreadPoolSize() <= 0) {
131: throw new InvalidConfigurationException(this .getClass(),
132: configuration.getConfigurationName(),
133: "ThreadPoolSize", "Must be > 0");
134: }
135: if (this .config.getKeepAliveTime() < 0) {
136: throw new InvalidConfigurationException(this .getClass(),
137: configuration.getConfigurationName(),
138: "KeepAliveTime", "Must be >= 0");
139: }
140: if (this .config.getTaskQueueSize() <= 0) {
141: throw new InvalidConfigurationException(this .getClass(),
142: configuration.getConfigurationName(),
143: "TaskQueueSize", "Must be > 0");
144: }
145: if (this .config.getInitialThreadCount() < 0) {
146: throw new InvalidConfigurationException(this .getClass(),
147: configuration.getConfigurationName(),
148: "InitialThreadCount", "Must be >= 0");
149: }
150: if (this .config.getShutdownWaitTime() < 0) {
151: throw new InvalidConfigurationException(this .getClass(),
152: configuration.getConfigurationName(),
153: "ShutdownWaitTime", "Must be >= 0");
154: }
155:
156: if (this .threadPool != null) {
157: // we must be in a suspended state because stop sets threadPool = null
158: configureThreadPool();
159: }
160: }
161:
162: /**
163: * Creates the thread pool object and configures it then creates the
164: * configured number of threads
165: */
166: public void start() throws Exception {
167: this .taskQueue = new BoundedBuffer(this .config
168: .getTaskQueueSize());
169: this .threadPool = new PooledExecutor(this .taskQueue);
170:
171: configureThreadPool();
172:
173: this .threadPool.createThreads(this .config
174: .getInitialThreadCount());
175: }
176:
177: /**
178: * Stops the thread pool and if configured, will wait for queued tasks
179: * to execute.
180: */
181: public void stop() throws Exception {
182: if (this .config.getDiscardQueuedTasksOnShutdown()) {
183: this .threadPool.shutdownNow();
184:
185: } else {
186: this .threadPool
187: .shutdownAfterProcessingCurrentlyQueuedTasks();
188: this .threadPool.awaitTerminationAfterShutdown(this .config
189: .getShutdownWaitTime());
190: }
191:
192: this .threadPool = null;
193: this .taskQueue = null;
194: this .failedTasks.clear();
195: }
196:
197: /**
198: * Resumes executing queued tasks by starting a thread for each queued task
199: * up to the max number of threads for the pool.
200: */
201: public void resume() throws Exception {
202: this .threadPool.createThreads(this .taskQueue.size());
203: }
204:
205: /** interrupts all the threads allowing them to terminate */
206: public void suspend() throws Exception {
207: this .threadPool.interruptAll();
208: }
209:
210: /** shutsdown the pool, not waiting to complete any queued tasks */
211: public void destroy() throws Exception {
212: if (this .threadPool != null) {
213: this .threadPool.shutdownNow();
214: }
215: }
216:
217: /** adds a task to the failed list */
218: public void taskFailed(TaskInfo task) {
219: if (this .config.getFailureListCapacity() == 0) {
220: return;
221: }
222:
223: if (this .failedTasks.size() == this .config
224: .getFailureListCapacity()) {
225: this .failedTasks.remove(0);
226: }
227:
228: this .failedTasks.add(task);
229: }
230:
231: /** configures this.threadPool using values from this.config */
232: private void configureThreadPool() {
233: this .threadPool.setThreadFactory(new ThreadFactory() {
234: public Thread newThread(Runnable command) {
235: String threadName = DefaultThreadPoolImpl.this .this Component
236: .getComponentName();
237: Thread newThread = new Thread(command, threadName);
238: newThread.setDaemon(DefaultThreadPoolImpl.this .config
239: .isUseDaemonThreads());
240: return newThread;
241: }
242: });
243:
244: this .threadPool.setMinimumPoolSize(this .config
245: .getThreadPoolSize());
246: this .threadPool.setMaximumPoolSize(this .config
247: .getThreadPoolSize());
248: this .threadPool
249: .setKeepAliveTime(this .config.getKeepAliveTime());
250:
251: switch (this .config.getQueueFullPolicy().getOrdinal()) {
252: case QueueFullPolicyEnum.RUN_ORDINAL:
253: this .threadPool.runWhenBlocked();
254: break;
255:
256: case QueueFullPolicyEnum.WAIT_ORDINAL:
257: this .threadPool.waitWhenBlocked();
258: break;
259:
260: case QueueFullPolicyEnum.ABORT_ORDINAL:
261: this .threadPool.abortWhenBlocked();
262: break;
263:
264: case QueueFullPolicyEnum.DISCARD_ORDINAL:
265: this .threadPool.discardWhenBlocked();
266: break;
267:
268: case QueueFullPolicyEnum.DISCARD_OLDEST_ORDINAL:
269: this .threadPool.discardOldestWhenBlocked();
270: break;
271: }
272: }
273:
274: /**
275: * Implementation of Runnable that wraps the tasks passed into the
276: * execute method and is responsible for tracking the task as it is
277: * queued and executed.
278: */
279: private class TaskRunner implements Runnable {
280: private TaskInfoImpl info;
281:
282: public TaskRunner(TaskInfoImpl info) {
283: this .info = info;
284: }
285:
286: public void run() {
287: try {
288: info.setExecuting();
289: this .info.getTask().run();
290: info.setSuccess();
291:
292: } catch (Throwable t) {
293: taskFailed(info);
294: info.setFailure(t);
295: }
296: }
297: }
298:
299: }
|