001: /*
002: *
003: * Jsmtpd, Java SMTP daemon
004: * Copyright (C) 2006 Jean-Francois POUX, jf.poux@laposte.net
005: *
006: * This program is free software; you can redistribute it and/or
007: * modify it under the terms of the GNU General Public License
008: * as published by the Free Software Foundation; either version 2
009: * of the License, or (at your option) any later version.
010: *
011: * This program is distributed in the hope that it will be useful,
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
014: * GNU General Public License for more details.
015: *
016: * You should have received a copy of the GNU General Public License
017: * along with this program; if not, write to the Free Software
018: * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
019: *
020: */
021: package org.jsmtpd.generic.threadpool;
022:
023: import java.util.HashSet;
024: import java.util.Iterator;
025: import java.util.LinkedList;
026: import java.util.List;
027: import java.util.Set;
028:
029: import org.apache.commons.logging.Log;
030: import org.apache.commons.logging.LogFactory;
031:
032: /**
033: * This needs testing.
034: *
035: *
036: * A growable thread pool.
037: * It's like the generic thread pool, but :
038: * Starts at 10% of initial threads provided (or whatever you set with growthRatio)
039: * When the pool is exhausted and a new thread is required, it will start
040: * numThreads*growRatio new threads, and place them in the pool, if the pool
041: * has still lower than numThreads
042: *
043: * Each 5000 ms, (or whatever you set), a "control thread" will look for
044: * unused threads and decrease the size of the pool as needed. There must have been no
045: * creation since 4* cycleTime.
046: *
047: * @author Jean-Francois POUX
048: * @see org.jsmtpd.generic.threadpool.IThreadedClass
049: */
050: public class GrowingThreadPool extends Thread implements ThreadPool {
051: private Log log = LogFactory.getLog(GrowingThreadPool.class);
052: private List<ThreadWorker> threads = new LinkedList<ThreadWorker>();
053: private int maxSize;
054: private int minimumThreads;
055: private int growth;
056: private String displayThreadName;
057: private String threadClassName;
058: private boolean running = true;
059: private long lastUp;
060: private float growthRatio = 0.2f;
061: private float minRatio = 0.1f;
062: private int cycleTime = 5000;
063:
064: /**
065: *
066: * @param numThreads number of threads to be spawned
067: * @param threadClassName name of the class to be threaded, must impletement IThreadedClass
068: * @throws InstantiationException
069: * @throws IllegalAccessException
070: * @throws ClassNotFoundException
071: */
072: public GrowingThreadPool(int numThreads, String threadClassName,
073: String displayThreadName) throws InstantiationException,
074: IllegalAccessException, ClassNotFoundException {
075: ThreadWorker tmp;
076: IThreadedClass cls;
077:
078: this .displayThreadName = displayThreadName;
079: this .threadClassName = threadClassName;
080: maxSize = numThreads;
081: // Start at ratio of max size
082: minimumThreads = (int) Math.round(numThreads * minRatio);
083: if (minimumThreads == 0)
084: minimumThreads = 1;
085:
086: // Grow or lower by growthRatio
087: growth = (int) Math.round(numThreads * growthRatio);
088: if (growth == 0)
089: growth = 1;
090:
091: log.info("Starting initial pool of " + minimumThreads
092: + " threads");
093: for (int i = 0; i < minimumThreads; i++) {
094: tmp = new ThreadWorker();
095: cls = (IThreadedClass) Class.forName(threadClassName)
096: .newInstance();
097: tmp.setWorker(cls);
098: tmp.setName(displayThreadName + "-" + tmp.getId());
099: tmp.start();
100: while (!tmp.isFree()) {
101: //wait for thread to be up if not
102: Thread.yield();
103: }
104: log.debug("Thread " + tmp.getName() + " started");
105: threads.add(tmp);
106: }
107: lastUp = System.currentTimeMillis();
108: start();
109:
110: }
111:
112: /**
113: * Will gracefully shutdown each running thread
114: *
115: */
116: public synchronized void gracefullShutdown() {
117: running = false;
118: this .interrupt();
119: log.debug("Gracefull shutdown ...");
120: ThreadWorker tmp;
121: for (int i = 0; i < threads.size(); i++) {
122: tmp = (ThreadWorker) threads.get(i);
123: tmp.gracefullShutdown();
124: }
125: }
126:
127: /**
128: * Will force each thread to shutdown
129: *
130: */
131: public synchronized void forceShutdown() {
132: running = false;
133: this .interrupt();
134: log.debug("Forcing shutdown ...");
135: ThreadWorker tmp;
136: for (int i = 0; i < threads.size(); i++) {
137: tmp = (ThreadWorker) threads.get(i);
138: tmp.forceShutdown();
139: }
140: }
141:
142: /**
143: *
144: * @return true if any free thread
145: */
146: public synchronized boolean hasFreeThread() {
147: for (Iterator iter = threads.iterator(); iter.hasNext();) {
148: ThreadWorker element = (ThreadWorker) iter.next();
149: if (element.isFree())
150: return true;
151: }
152: if ((threads.size() + growth) <= maxSize)
153: return true;
154: return false;
155: }
156:
157: /**
158: *
159: */
160: public synchronized int countFreeThread() {
161: int count = 0;
162: for (Iterator iter = threads.iterator(); iter.hasNext();) {
163: ThreadWorker element = (ThreadWorker) iter.next();
164: if (element.isFree())
165: count++;
166: }
167: return count;
168: }
169:
170: /**
171: * passes the obj parameter to the thread instance, and runs its doJob mehtod
172: * @param obj the object to pass
173: * @throws BusyThreadPoolException when the pool is exhausted
174: */
175: public synchronized void assignFreeThread(Object obj)
176: throws BusyThreadPoolException {
177: if (countFreeThread() == 0) {
178: growPool();
179: }
180: int i = 0;
181: for (ThreadWorker worker : threads) {
182: if (worker.isFree()) {
183: log.debug("Worker " + worker.getName()
184: + " is free, assigning job");
185: worker.setParam(obj);
186: worker.wake();
187: return;
188: }
189: i++;
190: }
191: log.warn("Thread pool exhausted !");
192: throw new BusyThreadPoolException();
193: }
194:
195: public synchronized void growPool() {
196: log.debug("Trying to grow thread pool");
197: ThreadWorker tmp;
198: IThreadedClass cls;
199:
200: if ((threads.size() + growth) <= maxSize) {
201: try {
202: lastUp = System.currentTimeMillis();
203: log.info("Increasing number of threads by " + growth
204: + ", starting from " + threads.size());
205: for (int i = 0; i < growth; i++) {
206: tmp = new ThreadWorker();
207: cls = (IThreadedClass) Class.forName(
208: threadClassName).newInstance();
209: tmp.setWorker(cls);
210: tmp.setName(displayThreadName + "#" + tmp.getId());
211: tmp.start();
212: while (!tmp.isFree()) {
213: //wait for thread to be up
214: Thread.yield();
215: }
216: threads.add(tmp);
217: log.debug("Thread " + tmp.getName() + " started");
218: }
219: } catch (Exception e) {
220: log.error("Could not increase pool", e);
221: }
222: } else {
223: log.error("Thread pool too low to assign a thread");
224: }
225: }
226:
227: public synchronized void lowerPool() {
228: long diff = System.currentTimeMillis() - lastUp;
229: if ((diff > (cycleTime * 4))
230: && (threads.size() > minimumThreads)
231: && (countFreeThread() > growth)) {
232: // remove growth size
233: Set<ThreadWorker> toRemove = new HashSet<ThreadWorker>();
234: for (int i = threads.size() - 1; i > 0; i--) { // loop backward to avoid destroy first threads
235: ThreadWorker worker = threads.get(i);
236: if (worker.isFree())
237: toRemove.add(worker);
238: if (toRemove.size() == growth)
239: break;
240: }
241: log.info("Reducing number of threads by " + growth
242: + ", starting from " + threads.size());
243: for (ThreadWorker worker : toRemove) {
244: log.debug("Shutting down thread " + worker.getName());
245: worker.gracefullShutdown();
246: threads.remove(worker);
247: }
248: }
249: }
250:
251: public void run() {
252: setName("C-" + getId());
253: log.info("Control thread started");
254: while (running) {
255: try {
256: Thread.sleep(cycleTime);
257: } catch (InterruptedException e) {
258: }
259: log.debug("Current size " + threads.size() + "/" + maxSize
260: + ", " + countFreeThread() + " free threads");
261: lowerPool();
262: }
263: log.info("Control thread ended");
264: }
265:
266: /**
267: * Time in ms of wait time of control thread
268: */
269: public void setCycleTime(int cycleTime) {
270: this .cycleTime = cycleTime;
271: }
272:
273: /**
274: * Ratio between 0.1 and 0.9 of grow rate of the pool
275: */
276: public void setGrowthRatio(float growthRatio) {
277: this .growthRatio = growthRatio;
278: }
279:
280: /**
281: * Minimal ratio of pool (between 0.1 and 1)
282: */
283: public void setMinRatio(float minRatio) {
284: this.minRatio = minRatio;
285: }
286:
287: }
|