Source Code Cross Referenced for PooledExecutor.java in  » Ajax » Laszlo-4.0.10 » EDU » oswego » cs » dl » util » concurrent » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Ajax » Laszlo 4.0.10 » EDU.oswego.cs.dl.util.concurrent 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:          File: PooledExecutor.java
003:
004:          Originally written by Doug Lea and released into the public domain.
005:          This may be used for any purposes whatsoever without acknowledgment.
006:          Thanks for the assistance and support of Sun Microsystems Labs,
007:          and everyone contributing, testing, and using this code.
008:
009:          History:
010:          Date       Who                What
011:          19Jun1998  dl               Create public version
012:          29aug1998  dl               rely on ThreadFactoryUser, 
013:                                      remove ThreadGroup-based methods
014:                                      adjusted locking policies
015:           3mar1999  dl               Worker threads sense decreases in pool size
016:          31mar1999  dl               Allow supplied channel in constructor;
017:                                      add methods createThreads, drain
018:          15may1999  dl               Allow infinite keepalives
019:          21oct1999  dl               add minimumPoolSize methods
020:           7sep2000  dl               BlockedExecutionHandler now an interface,
021:                                      new DiscardOldestWhenBlocked policy
022:          12oct2000  dl               add shutdownAfterProcessingCurrentlyQueuedTasks
023:          13nov2000  dl               null out task ref after run 
024:          08apr2001  dl               declare inner class ctor protected 
025:          12nov2001  dl               Better shutdown support
026:                                      Blocked exec handlers can throw IE
027:                                      Simplify locking scheme
028:          25jan2001  dl               {get,set}BlockedExecutionHandler now public
029:          17may2002  dl               null out task var in worker run to enable GC.
030:         */
031:
032:        package EDU.oswego.cs.dl.util.concurrent;
033:
034:        import java.util.*;
035:
036:        /**
037:         * A tunable, extensible thread pool class. The main supported public
038:         * method is <code>execute(Runnable command)</code>, which can be
039:         * called instead of directly creating threads to execute commands.
040:         *
041:         * <p>
042:         * Thread pools can be useful for several, usually intertwined
043:         * reasons:
044:         *
045:         * <ul>
046:         *
047:         *    <li> To bound resource use. A limit can be placed on the maximum
048:         *    number of simultaneously executing threads.
049:         *
050:         *    <li> To manage concurrency levels. A targeted number of threads
051:         *    can be allowed to execute simultaneously.
052:         *
053:         *    <li> To manage a set of threads performing related tasks.
054:         *
055:         *    <li> To minimize overhead, by reusing previously constructed
056:         *    Thread objects rather than creating new ones.  (Note however
057:         *    that pools are hardly ever cure-alls for performance problems
058:         *    associated with thread construction, especially on JVMs that
059:         *    themselves internally pool or recycle threads.)  
060:         *
061:         * </ul>
062:         *
063:         * These goals introduce a number of policy parameters that are
064:         * encapsulated in this class. All of these parameters have defaults
065:         * and are tunable, either via get/set methods, or, in cases where
066:         * decisions should hold across lifetimes, via methods that can be
067:         * easily overridden in subclasses.  The main, most commonly set
068:         * parameters can be established in constructors.  Policy choices
069:         * across these dimensions can and do interact.  Be careful, and
070:         * please read this documentation completely before using!  See also
071:         * the usage examples below.
072:         *
073:         * <dl>
074:         *   <dt> Queueing 
075:         *
076:         *   <dd> By default, this pool uses queueless synchronous channels to
077:         *   to hand off work to threads. This is a safe, conservative policy
078:         *   that avoids lockups when handling sets of requests that might
079:         *   have internal dependencies. (In these cases, queuing one task
080:         *   could lock up another that would be able to continue if the
081:         *   queued task were to run.)  If you are sure that this cannot
082:         *   happen, then you can instead supply a queue of some sort (for
083:         *   example, a BoundedBuffer or LinkedQueue) in the constructor.
084:         *   This will cause new commands to be queued in cases where all
085:         *   MaximumPoolSize threads are busy. Queues are sometimes
086:         *   appropriate when each task is completely independent of others,
087:         *   so tasks cannot affect each others execution.  For example, in an
088:         *   http server.  <p>
089:         *
090:         *   When given a choice, this pool always prefers adding a new thread
091:         *   rather than queueing if there are currently fewer than the
092:         *   current getMinimumPoolSize threads running, but otherwise always
093:         *   prefers queuing a request rather than adding a new thread. Thus,
094:         *   if you use an unbounded buffer, you will never have more than
095:         *   getMinimumPoolSize threads running. (Since the default
096:         *   minimumPoolSize is one, you will probably want to explicitly
097:         *   setMinimumPoolSize.)  <p>
098:         *
099:         *   While queuing can be useful in smoothing out transient bursts of
100:         *   requests, especially in socket-based services, it is not very
101:         *   well behaved when commands continue to arrive on average faster
102:         *   than they can be processed.  Using bounds for both the queue and
103:         *   the pool size, along with run-when-blocked policy is often a
104:         *   reasonable response to such possibilities.  <p>
105:         *
106:         *   Queue sizes and maximum pool sizes can often be traded off for
107:         *   each other. Using large queues and small pools minimizes CPU
108:         *   usage, OS resources, and context-switching overhead, but can lead
109:         *   to artifically low throughput. Especially if tasks frequently
110:         *   block (for example if they are I/O bound), a JVM and underlying
111:         *   OS may be able to schedule time for more threads than you
112:         *   otherwise allow. Use of small queues or queueless handoffs
113:         *   generally requires larger pool sizes, which keeps CPUs busier but
114:         *   may encounter unacceptable scheduling overhead, which also
115:         *   decreases throughput.  <p>
116:         *
117:         *   <dt> Maximum Pool size
118:         *
119:         *   <dd> The maximum number of threads to use, when needed.  The pool
120:         *   does not by default preallocate threads.  Instead, a thread is
121:         *   created, if necessary and if there are fewer than the maximum,
122:         *   only when an <code>execute</code> request arrives.  The default
123:         *   value is (for all practical purposes) infinite --
124:         *   <code>Integer.MAX_VALUE</code>, so should be set in the
125:         *   constructor or the set method unless you are just using the pool
126:         *   to minimize construction overhead.  Because task handoffs to idle
127:         *   worker threads require synchronization that in turn relies on JVM
128:         *   scheduling policies to ensure progress, it is possible that a new
129:         *   thread will be created even though an existing worker thread has
130:         *   just become idle but has not progressed to the point at which it
131:         *   can accept a new task. This phenomenon tends to occur on some
132:         *   JVMs when bursts of short tasks are executed.  <p>
133:         *
134:         *   <dt> Minimum Pool size
135:         *
136:         *   <dd> The minimum number of threads to use, when needed (default
137:         *   1).  When a new request is received, and fewer than the minimum
138:         *   number of threads are running, a new thread is always created to
139:         *   handle the request even if other worker threads are idly waiting
140:         *   for work. Otherwise, a new thread is created only if there are
141:         *   fewer than the maximum and the request cannot immediately be
142:         *   queued.  <p>
143:         *
144:         *   <dt> Preallocation
145:         *
146:         *   <dd> You can override lazy thread construction policies via
147:         *   method createThreads, which establishes a given number of warm
148:         *   threads. Be aware that these preallocated threads will time out
149:         *   and die (and later be replaced with others if needed) if not used
150:         *   within the keep-alive time window. If you use preallocation, you
151:         *   probably want to increase the keepalive time.  The difference
152:         *   between setMinimumPoolSize and createThreads is that
153:         *   createThreads immediately establishes threads, while setting the
154:         *   minimum pool size waits until requests arrive.  <p>
155:         *
156:         *   <dt> Keep-alive time
157:         *
158:         *   <dd> If the pool maintained references to a fixed set of threads
159:         *   in the pool, then it would impede garbage collection of otherwise
160:         *   idle threads. This would defeat the resource-management aspects
161:         *   of pools. One solution would be to use weak references.  However,
162:         *   this would impose costly and difficult synchronization issues.
163:         *   Instead, threads are simply allowed to terminate and thus be
164:         *   GCable if they have been idle for the given keep-alive time.  The
165:         *   value of this parameter represents a trade-off between GCability
166:         *   and construction time. In most current Java VMs, thread
167:         *   construction and cleanup overhead is on the order of
168:         *   milliseconds. The default keep-alive value is one minute, which
169:         *   means that the time needed to construct and then GC a thread is
170:         *   expended at most once per minute.  
171:         *   <p> 
172:         *
173:         *   To establish worker threads permanently, use a <em>negative</em>
174:         *   argument to setKeepAliveTime.  <p>
175:         *
176:         *   <dt> Blocked execution policy
177:         *
178:         *   <dd> If the maximum pool size or queue size is bounded, then it
179:         *   is possible for incoming <code>execute</code> requests to
180:         *   block. There are four supported policies for handling this
181:         *   problem, and mechanics (based on the Strategy Object pattern) to
182:         *   allow others in subclasses: <p>
183:         *
184:         *   <dl>
185:         *     <dt> Run (the default)
186:         *     <dd> The thread making the <code>execute</code> request
187:         *          runs the task itself. This policy helps guard against lockup. 
188:         *     <dt> Wait
189:         *     <dd> Wait until a thread becomes available.
190:         *     <dt> Abort
191:         *     <dd> Throw a RuntimeException
192:         *     <dt> Discard 
193:         *     <dd> Throw away the current request and return.
194:         *     <dt> DiscardOldest
195:         *     <dd> Throw away the oldest request and return.
196:         *   </dl>
197:         *
198:         *   Other plausible policies include raising the maximum pool size
199:         *   after checking with some other objects that this is OK.  <p>
200:         *
201:         *   These cases can never occur if the maximum pool size is unbounded
202:         *   or the queue is unbounded.  In these cases you instead face
203:         *   potential resource exhaustion.)  The execute method does not
204:         *   throw any checked exceptions in any of these cases since any
205:         *   errors associated with them must normally be dealt with via
206:         *   handlers or callbacks. (Although in some cases, these might be
207:         *   associated with throwing unchecked exceptions.)  You may wish to
208:         *   add special implementations even if you choose one of the listed
209:         *   policies. For example, the supplied Discard policy does not
210:         *   inform the caller of the drop. You could add your own version
211:         *   that does so.  Since choice of policies is normally a system-wide
212:         *   decision, selecting a policy affects all calls to
213:         *   <code>execute</code>.  If for some reason you would instead like
214:         *   to make per-call decisions, you could add variant versions of the
215:         *   <code>execute</code> method (for example,
216:         *   <code>executeIfWouldNotBlock</code>) in subclasses.  <p>
217:         *
218:         *   <dt> Thread construction parameters
219:         *
220:         *   <dd> A settable ThreadFactory establishes each new thread.  By
221:         *   default, it merely generates a new instance of class Thread, but
222:         *   can be changed to use a Thread subclass, to set priorities,
223:         *   ThreadLocals, etc.  <p>
224:         *
225:         *   <dt> Interruption policy
226:         *
227:         *   <dd> Worker threads check for interruption after processing each
228:         *   command, and terminate upon interruption.  Fresh threads will
229:         *   replace them if needed. Thus, new tasks will not start out in an
230:         *   interrupted state due to an uncleared interruption in a previous
231:         *   task. Also, unprocessed commands are never dropped upon
232:         *   interruption. It would conceptually suffice simply to clear
233:         *   interruption between tasks, but implementation characteristics of
234:         *   interruption-based methods are uncertain enough to warrant this
235:         *   conservative strategy. It is a good idea to be equally
236:         *   conservative in your code for the tasks running within pools.
237:         *   <p>
238:         *
239:         *   <dt> Shutdown policy
240:         *
241:         *   <dd> The interruptAll method interrupts, but does not disable the
242:         *   pool. Two different shutdown methods are supported for use when
243:         *   you do want to (permanently) stop processing tasks. Method
244:         *   shutdownAfterProcessingCurrentlyQueuedTasks waits until all
245:         *   current tasks are finished. The shutDownNow method interrupts
246:         *   current threads and leaves other queued requests unprocessed.
247:         *   <p>
248:         *
249:         *   <dt> Handling requests after shutdown
250:         *
251:         *   <dd> When the pool is shutdown, new incoming requests are handled
252:         *   by the blockedExecutionHandler. By default, the handler is set to
253:         *   discard new requests, but this can be set with an optional
254:         *   argument to method
255:         *   shutdownAfterProcessingCurrentlyQueuedTasks. <p> Also, if you are
256:         *   using some form of queuing, you may wish to call method drain()
257:         *   to remove (and return) unprocessed commands from the queue after
258:         *   shutting down the pool and its clients. If you need to be sure
259:         *   these commands are processed, you can then run() each of the
260:         *   commands in the list returned by drain().
261:         *
262:         * </dl>
263:         * <p>
264:         *
265:         * <b>Usage examples.</b>
266:         * <p>
267:         *
268:         * Probably the most common use of pools is in statics or singletons
269:         * accessible from a number of classes in a package; for example:
270:         *
271:         * <pre>
272:         * class MyPool {
273:         *   // initialize to use a maximum of 8 threads.
274:         *   static PooledExecutor pool = new PooledExecutor(8);
275:         * }
276:         * </pre>
277:         * Here are some sample variants in initialization:
278:         * <ol>
279:         *  <li> Using a bounded buffer of 10 tasks, at least 4 threads (started only
280:         *       when needed due to incoming requests), but allowing
281:         *       up to 100 threads if the buffer gets full.
282:         *     <pre>
283:         *        pool = new PooledExecutor(new BoundedBuffer(10), 100);
284:         *        pool.setMinimumPoolSize(4);
285:         *     </pre>
286:         *  <li> Same as (1), except pre-start 9 threads, allowing them to
287:         *        die if they are not used for five minutes.
288:         *     <pre>
289:         *        pool = new PooledExecutor(new BoundedBuffer(10), 100);
290:         *        pool.setMinimumPoolSize(4);
291:         *        pool.setKeepAliveTime(1000 * 60 * 5);
292:         *        pool.createThreads(9);
293:         *     </pre>
294:         *  <li> Same as (2) except clients block if both the buffer is full and
295:         *       all 100 threads are busy:
296:         *     <pre>
297:         *        pool = new PooledExecutor(new BoundedBuffer(10), 100);
298:         *        pool.setMinimumPoolSize(4);
299:         *        pool.setKeepAliveTime(1000 * 60 * 5);
300:         *        pool.waitWhenBlocked();
301:         *        pool.createThreads(9);
302:         *     </pre>
303:         *  <li> An unbounded queue serviced by exactly 5 threads:
304:         *     <pre>
305:         *        pool = new PooledExecutor(new LinkedQueue());
306:         *        pool.setKeepAliveTime(-1); // live forever
307:         *        pool.createThreads(5);
308:         *     </pre>
309:         *  </ol>
310:         *
311:         * <p>
312:         * <b>Usage notes.</b>
313:         * <p>
314:         *
315:         * Pools do not mesh well with using thread-specific storage via
316:         * java.lang.ThreadLocal.  ThreadLocal relies on the identity of a
317:         * thread executing a particular task. Pools use the same thread to
318:         * perform different tasks.  <p>
319:         *
320:         * If you need a policy not handled by the parameters in this class
321:         * consider writing a subclass.  <p>
322:         *
323:         * Version note: Previous versions of this class relied on
324:         * ThreadGroups for aggregate control. This has been removed, and the
325:         * method interruptAll added, to avoid differences in behavior across
326:         * JVMs.
327:         *
328:         * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
329:         **/
330:
331:        public class PooledExecutor extends ThreadFactoryUser implements 
332:                Executor {
333:
334:            /** 
335:             * The maximum pool size; used if not otherwise specified.  Default
336:             * value is essentially infinite (Integer.MAX_VALUE)
337:             **/
338:            public static final int DEFAULT_MAXIMUMPOOLSIZE = Integer.MAX_VALUE;
339:
340:            /** 
341:             * The minimum pool size; used if not otherwise specified.  Default
342:             * value is 1.
343:             **/
344:            public static final int DEFAULT_MINIMUMPOOLSIZE = 1;
345:
346:            /**
347:             * The maximum time to keep worker threads alive waiting for new
348:             * tasks; used if not otherwise specified. Default value is one
349:             * minute (60000 milliseconds).
350:             **/
351:            public static final long DEFAULT_KEEPALIVETIME = 60 * 1000;
352:
353:            /** The maximum number of threads allowed in pool. **/
354:            protected int maximumPoolSize_ = DEFAULT_MAXIMUMPOOLSIZE;
355:
356:            /** The minumum number of threads to maintain in pool. **/
357:            protected int minimumPoolSize_ = DEFAULT_MINIMUMPOOLSIZE;
358:
359:            /**  Current pool size.  **/
360:            protected int poolSize_ = 0;
361:
362:            /** The maximum time for an idle thread to wait for new task. **/
363:            protected long keepAliveTime_ = DEFAULT_KEEPALIVETIME;
364:
365:            /** 
366:             * Shutdown flag - latches true when a shutdown method is called 
367:             * in order to disable queuing/handoffs of new tasks.
368:             **/
369:            protected boolean shutdown_ = false;
370:
371:            /**
372:             * The channel used to hand off the command to a thread in the pool.
373:             **/
374:            protected final Channel handOff_;
375:
376:            /**
377:             * The set of active threads, declared as a map from workers to
378:             * their threads.  This is needed by the interruptAll method.  It
379:             * may also be useful in subclasses that need to perform other
380:             * thread management chores.
381:             **/
382:            protected final Map threads_;
383:
384:            /** The current handler for unserviceable requests. **/
385:            protected BlockedExecutionHandler blockedExecutionHandler_;
386:
387:            /** 
388:             * Create a new pool with all default settings
389:             **/
390:
391:            public PooledExecutor() {
392:                this (new SynchronousChannel(), DEFAULT_MAXIMUMPOOLSIZE);
393:            }
394:
395:            /** 
396:             * Create a new pool with all default settings except
397:             * for maximum pool size.
398:             **/
399:
400:            public PooledExecutor(int maxPoolSize) {
401:                this (new SynchronousChannel(), maxPoolSize);
402:            }
403:
404:            /** 
405:             * Create a new pool that uses the supplied Channel for queuing, and
406:             * with all default parameter settings.
407:             **/
408:
409:            public PooledExecutor(Channel channel) {
410:                this (channel, DEFAULT_MAXIMUMPOOLSIZE);
411:            }
412:
413:            /** 
414:             * Create a new pool that uses the supplied Channel for queuing, and
415:             * with all default parameter settings except for maximum pool size.
416:             **/
417:
418:            public PooledExecutor(Channel channel, int maxPoolSize) {
419:                maximumPoolSize_ = maxPoolSize;
420:                handOff_ = channel;
421:                runWhenBlocked();
422:                threads_ = new HashMap();
423:            }
424:
425:            /** 
426:             * Return the maximum number of threads to simultaneously execute
427:             * New unqueued requests will be handled according to the current
428:             * blocking policy once this limit is exceeded.
429:             **/
430:            public synchronized int getMaximumPoolSize() {
431:                return maximumPoolSize_;
432:            }
433:
434:            /** 
435:             * Set the maximum number of threads to use. Decreasing the pool
436:             * size will not immediately kill existing threads, but they may
437:             * later die when idle.
438:             * @exception IllegalArgumentException if less or equal to zero.
439:             * (It is
440:             * not considered an error to set the maximum to be less than than
441:             * the minimum. However, in this case there are no guarantees
442:             * about behavior.)
443:             **/
444:            public synchronized void setMaximumPoolSize(int newMaximum) {
445:                if (newMaximum <= 0)
446:                    throw new IllegalArgumentException();
447:                maximumPoolSize_ = newMaximum;
448:            }
449:
450:            /** 
451:             * Return the minimum number of threads to simultaneously execute.
452:             * (Default value is 1).  If fewer than the mininum number are
453:             * running upon reception of a new request, a new thread is started
454:             * to handle this request.
455:             **/
456:            public synchronized int getMinimumPoolSize() {
457:                return minimumPoolSize_;
458:            }
459:
460:            /** 
461:             * Set the minimum number of threads to use. 
462:             * @exception IllegalArgumentException if less than zero. (It is not
463:             * considered an error to set the minimum to be greater than the
464:             * maximum. However, in this case there are no guarantees about
465:             * behavior.)
466:             **/
467:            public synchronized void setMinimumPoolSize(int newMinimum) {
468:                if (newMinimum < 0)
469:                    throw new IllegalArgumentException();
470:                minimumPoolSize_ = newMinimum;
471:            }
472:
473:            /** 
474:             * Return the current number of active threads in the pool.  This
475:             * number is just a snaphot, and may change immediately upon
476:             * returning
477:             **/
478:            public synchronized int getPoolSize() {
479:                return poolSize_;
480:            }
481:
482:            /** 
483:             * Return the number of milliseconds to keep threads alive waiting
484:             * for new commands. A negative value means to wait forever. A zero
485:             * value means not to wait at all.
486:             **/
487:            public synchronized long getKeepAliveTime() {
488:                return keepAliveTime_;
489:            }
490:
491:            /** 
492:             * Set the number of milliseconds to keep threads alive waiting for
493:             * new commands. A negative value means to wait forever. A zero
494:             * value means not to wait at all.
495:             **/
496:            public synchronized void setKeepAliveTime(long msecs) {
497:                keepAliveTime_ = msecs;
498:            }
499:
500:            /** Get the handler for blocked execution **/
501:            public synchronized BlockedExecutionHandler getBlockedExecutionHandler() {
502:                return blockedExecutionHandler_;
503:            }
504:
505:            /** Set the handler for blocked execution **/
506:            public synchronized void setBlockedExecutionHandler(
507:                    BlockedExecutionHandler h) {
508:                blockedExecutionHandler_ = h;
509:            }
510:
511:            /**
512:             * Create and start a thread to handle a new command.  Call only
513:             * when holding lock.
514:             **/
515:            protected void addThread(Runnable command) {
516:                Worker worker = new Worker(command);
517:                Thread thread = getThreadFactory().newThread(worker);
518:                threads_.put(worker, thread);
519:                ++poolSize_;
520:                thread.start();
521:            }
522:
523:            /**
524:             * Create and start up to numberOfThreads threads in the pool.
525:             * Return the number created. This may be less than the number
526:             * requested if creating more would exceed maximum pool size bound.
527:             **/
528:            public int createThreads(int numberOfThreads) {
529:                int ncreated = 0;
530:                for (int i = 0; i < numberOfThreads; ++i) {
531:                    synchronized (this ) {
532:                        if (poolSize_ < maximumPoolSize_) {
533:                            addThread(null);
534:                            ++ncreated;
535:                        } else
536:                            break;
537:                    }
538:                }
539:                return ncreated;
540:            }
541:
542:            /**
543:             * Interrupt all threads in the pool, causing them all to
544:             * terminate. Assuming that executed tasks do not disable (clear)
545:             * interruptions, each thread will terminate after processing its
546:             * current task. Threads will terminate sooner if the executed tasks
547:             * themselves respond to interrupts.
548:             **/
549:            public synchronized void interruptAll() {
550:                for (Iterator it = threads_.values().iterator(); it.hasNext();) {
551:                    Thread t = (Thread) (it.next());
552:                    t.interrupt();
553:                }
554:            }
555:
556:            /**
557:             * Interrupt all threads and disable construction of new
558:             * threads. Any tasks entered after this point will be discarded. A
559:             * shut down pool cannot be restarted.
560:             */
561:            public void shutdownNow() {
562:                shutdownNow(new DiscardWhenBlocked());
563:            }
564:
565:            /**
566:             * Interrupt all threads and disable construction of new
567:             * threads. Any tasks entered after this point will be handled by
568:             * the given BlockedExecutionHandler.  A shut down pool cannot be
569:             * restarted.
570:             */
571:            public synchronized void shutdownNow(BlockedExecutionHandler handler) {
572:                setBlockedExecutionHandler(handler);
573:                shutdown_ = true; // don't allow new tasks
574:                minimumPoolSize_ = maximumPoolSize_ = 0; // don't make new threads
575:                interruptAll(); // interrupt all existing threads
576:            }
577:
578:            /**
579:             * Terminate threads after processing all elements currently in
580:             * queue. Any tasks entered after this point will be discarded. A
581:             * shut down pool cannot be restarted.
582:             **/
583:            public void shutdownAfterProcessingCurrentlyQueuedTasks() {
584:                shutdownAfterProcessingCurrentlyQueuedTasks(new DiscardWhenBlocked());
585:            }
586:
587:            /**
588:             * Terminate threads after processing all elements currently in
589:             * queue. Any tasks entered after this point will be handled by the
590:             * given BlockedExecutionHandler.  A shut down pool cannot be
591:             * restarted.
592:             **/
593:            public synchronized void shutdownAfterProcessingCurrentlyQueuedTasks(
594:                    BlockedExecutionHandler handler) {
595:                setBlockedExecutionHandler(handler);
596:                shutdown_ = true;
597:                if (poolSize_ == 0) // disable new thread construction when idle
598:                    minimumPoolSize_ = maximumPoolSize_ = 0;
599:            }
600:
601:            /** 
602:             * Return true if a shutDown method has succeeded in terminating all
603:             * threads.
604:             */
605:            public synchronized boolean isTerminatedAfterShutdown() {
606:                return shutdown_ && poolSize_ == 0;
607:            }
608:
609:            /**
610:             * Wait for a shutdown pool to fully terminate, or until the timeout
611:             * has expired. This method may only be called <em>after</em>
612:             * invoking shutdownNow or
613:             * shutdownAfterProcessingCurrentlyQueuedTasks.
614:             *
615:             * @param maxWaitTime  the maximum time in milliseconds to wait
616:             * @return true if the pool has terminated within the max wait period
617:             * @exception IllegalStateException if shutdown has not been requested
618:             * @exception InterruptedException if the current thread has been interrupted in the course of waiting
619:             */
620:            public synchronized boolean awaitTerminationAfterShutdown(
621:                    long maxWaitTime) throws InterruptedException {
622:                if (!shutdown_)
623:                    throw new IllegalStateException();
624:                if (poolSize_ == 0)
625:                    return true;
626:                long waitTime = maxWaitTime;
627:                if (waitTime <= 0)
628:                    return false;
629:                long start = System.currentTimeMillis();
630:                for (;;) {
631:                    wait(waitTime);
632:                    if (poolSize_ == 0)
633:                        return true;
634:                    waitTime = maxWaitTime
635:                            - (System.currentTimeMillis() - start);
636:                    if (waitTime <= 0)
637:                        return false;
638:                }
639:            }
640:
641:            /**
642:             * Wait for a shutdown pool to fully terminate.  This method may
643:             * only be called <em>after</em> invoking shutdownNow or
644:             * shutdownAfterProcessingCurrentlyQueuedTasks.
645:             *
646:             * @exception IllegalStateException if shutdown has not been requested
647:             * @exception InterruptedException if the current thread has been interrupted in the course of waiting
648:             */
649:            public synchronized void awaitTerminationAfterShutdown()
650:                    throws InterruptedException {
651:                if (!shutdown_)
652:                    throw new IllegalStateException();
653:                while (poolSize_ > 0)
654:                    wait();
655:            }
656:
657:            /**
658:             * Remove all unprocessed tasks from pool queue, and return them in
659:             * a java.util.List. Thsi method should be used only when there are
660:             * not any active clients of the pool. Otherwise you face the
661:             * possibility that the method will loop pulling out tasks as
662:             * clients are putting them in.  This method can be useful after
663:             * shutting down a pool (via shutdownNow) to determine whether there
664:             * are any pending tasks that were not processed.  You can then, for
665:             * example execute all unprocessed commands via code along the lines
666:             * of:
667:             *
668:             * <pre>
669:             *   List tasks = pool.drain();
670:             *   for (Iterator it = tasks.iterator(); it.hasNext();) 
671:             *     ( (Runnable)(it.next()) ).run();
672:             * </pre>
673:             **/
674:            public List drain() {
675:                boolean wasInterrupted = false;
676:                Vector tasks = new Vector();
677:                for (;;) {
678:                    try {
679:                        Object x = handOff_.poll(0);
680:                        if (x == null)
681:                            break;
682:                        else
683:                            tasks.addElement(x);
684:                    } catch (InterruptedException ex) {
685:                        wasInterrupted = true; // postpone re-interrupt until drained
686:                    }
687:                }
688:                if (wasInterrupted)
689:                    Thread.currentThread().interrupt();
690:                return tasks;
691:            }
692:
693:            /** 
694:             * Cleanup method called upon termination of worker thread.
695:             **/
696:            protected synchronized void workerDone(Worker w) {
697:                threads_.remove(w);
698:                if (--poolSize_ == 0 && shutdown_) {
699:                    maximumPoolSize_ = minimumPoolSize_ = 0; // disable new threads
700:                    notifyAll(); // notify awaitTerminationAfterShutdown
701:                }
702:            }
703:
704:            /** 
705:             * Get a task from the handoff queue, or null if shutting down.
706:             **/
707:            protected Runnable getTask() throws InterruptedException {
708:                long waitTime;
709:                synchronized (this ) {
710:                    if (poolSize_ > maximumPoolSize_) // Cause to die if too many threads
711:                        return null;
712:                    waitTime = (shutdown_) ? 0 : keepAliveTime_;
713:                }
714:                if (waitTime >= 0)
715:                    return (Runnable) (handOff_.poll(waitTime));
716:                else
717:                    return (Runnable) (handOff_.take());
718:            }
719:
720:            /**
721:             * Class defining the basic run loop for pooled threads.
722:             **/
723:            protected class Worker implements  Runnable {
724:                protected Runnable firstTask_;
725:
726:                protected Worker(Runnable firstTask) {
727:                    firstTask_ = firstTask;
728:                }
729:
730:                public void run() {
731:                    try {
732:                        Runnable task = firstTask_;
733:                        firstTask_ = null; // enable GC
734:
735:                        if (task != null) {
736:                            task.run();
737:                            task = null;
738:                        }
739:
740:                        while ((task = getTask()) != null) {
741:                            task.run();
742:                            task = null;
743:                        }
744:                    } catch (InterruptedException ex) {
745:                    } // fall through
746:                    finally {
747:                        workerDone(this );
748:                    }
749:                }
750:            }
751:
752:            /**
753:             * Class for actions to take when execute() blocks. Uses Strategy
754:             * pattern to represent different actions. You can add more in
755:             * subclasses, and/or create subclasses of these. If so, you will
756:             * also want to add or modify the corresponding methods that set the
757:             * current blockedExectionHandler_.
758:             **/
759:            public interface BlockedExecutionHandler {
760:                /** 
761:                 * Return true if successfully handled so, execute should
762:                 * terminate; else return false if execute loop should be retried.
763:                 **/
764:                boolean blockedAction(Runnable command)
765:                        throws InterruptedException;
766:            }
767:
768:            /** Class defining Run action. **/
769:            protected class RunWhenBlocked implements  BlockedExecutionHandler {
770:                public boolean blockedAction(Runnable command) {
771:                    command.run();
772:                    return true;
773:                }
774:            }
775:
776:            /** 
777:             * Set the policy for blocked execution to be that the current
778:             * thread executes the command if there are no available threads in
779:             * the pool.
780:             **/
781:            public void runWhenBlocked() {
782:                setBlockedExecutionHandler(new RunWhenBlocked());
783:            }
784:
785:            /** Class defining Wait action. **/
786:            protected class WaitWhenBlocked implements  BlockedExecutionHandler {
787:                public boolean blockedAction(Runnable command)
788:                        throws InterruptedException {
789:                    handOff_.put(command);
790:                    return true;
791:                }
792:            }
793:
794:            /** 
795:             * Set the policy for blocked execution to be to wait until a thread
796:             * is available.
797:             **/
798:            public void waitWhenBlocked() {
799:                setBlockedExecutionHandler(new WaitWhenBlocked());
800:            }
801:
802:            /** Class defining Discard action. **/
803:            protected class DiscardWhenBlocked implements 
804:                    BlockedExecutionHandler {
805:                public boolean blockedAction(Runnable command) {
806:                    return true;
807:                }
808:            }
809:
810:            /** 
811:             * Set the policy for blocked execution to be to return without
812:             * executing the request.
813:             **/
814:            public void discardWhenBlocked() {
815:                setBlockedExecutionHandler(new DiscardWhenBlocked());
816:            }
817:
818:            /** Class defining Abort action. **/
819:            protected class AbortWhenBlocked implements  BlockedExecutionHandler {
820:                public boolean blockedAction(Runnable command) {
821:                    throw new RuntimeException("Pool is blocked");
822:                }
823:            }
824:
825:            /** 
826:             * Set the policy for blocked execution to be to
827:             * throw a RuntimeException.
828:             **/
829:            public void abortWhenBlocked() {
830:                setBlockedExecutionHandler(new AbortWhenBlocked());
831:            }
832:
833:            /**
834:             * Class defining DiscardOldest action.  Under this policy, at most
835:             * one old unhandled task is discarded.  If the new task can then be
836:             * handed off, it is.  Otherwise, the new task is run in the current
837:             * thread (i.e., RunWhenBlocked is used as a backup policy.)
838:             **/
839:            protected class DiscardOldestWhenBlocked implements 
840:                    BlockedExecutionHandler {
841:                public boolean blockedAction(Runnable command)
842:                        throws InterruptedException {
843:                    handOff_.poll(0);
844:                    if (!handOff_.offer(command, 0))
845:                        command.run();
846:                    return true;
847:                }
848:            }
849:
850:            /** 
851:             * Set the policy for blocked execution to be to discard the oldest
852:             * unhandled request
853:             **/
854:            public void discardOldestWhenBlocked() {
855:                setBlockedExecutionHandler(new DiscardOldestWhenBlocked());
856:            }
857:
858:            /**
859:             * Arrange for the given command to be executed by a thread in this
860:             * pool.  The method normally returns when the command has been
861:             * handed off for (possibly later) execution.
862:             **/
863:            public void execute(Runnable command) throws InterruptedException {
864:                for (;;) {
865:                    synchronized (this ) {
866:                        if (!shutdown_) {
867:                            int size = poolSize_;
868:
869:                            // Ensure minimum number of threads
870:                            if (size < minimumPoolSize_) {
871:                                addThread(command);
872:                                return;
873:                            }
874:
875:                            // Try to give to existing thread
876:                            if (handOff_.offer(command, 0)) {
877:                                return;
878:                            }
879:
880:                            // If cannot handoff and still under maximum, create new thread
881:                            if (size < maximumPoolSize_) {
882:                                addThread(command);
883:                                return;
884:                            }
885:                        }
886:                    }
887:
888:                    // Cannot hand off and cannot create -- ask for help
889:                    if (getBlockedExecutionHandler().blockedAction(command)) {
890:                        return;
891:                    }
892:                }
893:            }
894:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.