001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.test.util.test;
023:
024: import java.util.Arrays;
025: import java.util.HashMap;
026: import java.util.HashSet;
027:
028: import org.jboss.util.threadpool.BasicThreadPool;
029: import org.jboss.util.threadpool.Task;
030: import org.apache.log4j.Logger;
031: import junit.framework.TestCase;
032:
033: /**
034: * Tests of thread pool with Tasks added to the pool
035: *
036: * @see org.jboss.util.threadpool.ThreadPool
037: * @author <a href="adrian@jboss.org">Adrian.Brock</a>
038: * @author Scott.Stark@jboss.org
039: * @version $Revision: 57211 $
040: */
041: public class ThreadPoolTaskUnitTestCase extends TestCase {
042: private static Logger log = Logger
043: .getLogger(ThreadPoolTaskUnitTestCase.class);
044:
045: /** Basic test */
046: static final int BASIC = 0;
047:
048: /** Hold the thread after start */
049: static final int HOLD_START = 1;
050:
051: /** The accepted stats */
052: Stats accepted = new Stats("Accepted");
053:
054: /** The rejected stats */
055: Stats rejected = new Stats("Rejected");
056:
057: /** The started stats */
058: Stats started = new Stats("Started");
059:
060: /** The completed stats */
061: Stats completed = new Stats("Completed");
062:
063: /** The stopped stats */
064: Stats stopped = new Stats("Stopped");
065:
066: /** The thread names */
067: HashMap threadNames = new HashMap();
068:
069: /**
070: * Create a new ThreadPoolTaskUnitTestCase
071: *
072: * @param name the test to run
073: */
074: public ThreadPoolTaskUnitTestCase(String name) {
075: super (name);
076: }
077:
078: protected void setUp() throws Exception {
079: log.debug("====> Starting test: " + getName());
080: }
081:
082: protected void tearDown() throws Exception {
083: log.debug("=====> Stopping test: " + getName());
084: }
085:
086: /**
087: * Basic test
088: */
089: public void testBasic() throws Exception {
090: BasicThreadPool pool = new BasicThreadPool();
091: try {
092: pool.runTask(new TestTask(BASIC, "test"));
093: completed.wait(1);
094: HashSet expected = makeExpected(new Object[] { "test" });
095: assertEquals(expected, accepted.tasks);
096: assertEquals(expected, started.tasks);
097: assertEquals(expected, completed.tasks);
098: } finally {
099: pool.stop(true);
100: }
101: }
102:
103: /**
104: * Multiple Basic test
105: */
106: public void testMultipleBasic() throws Exception {
107: BasicThreadPool pool = new BasicThreadPool();
108: try {
109: pool.runTask(new TestTask(BASIC, "test1"));
110: pool.runTask(new TestTask(BASIC, "test2"));
111: pool.runTask(new TestTask(BASIC, "test3"));
112: completed.wait(3);
113: HashSet expected = makeExpected(new Object[] { "test1",
114: "test2", "test3" });
115: assertEquals(expected, accepted.tasks);
116: assertEquals(expected, started.tasks);
117: assertEquals(expected, completed.tasks);
118: } finally {
119: pool.stop(true);
120: }
121: }
122:
123: /**
124: * Test pooling
125: */
126: public void testSimplePooling() throws Exception {
127: BasicThreadPool pool = new BasicThreadPool();
128: pool.setMaximumPoolSize(1);
129: try {
130: pool.runTask(new TestTask(BASIC, "test1"));
131: completed.wait(1);
132: pool.runTask(new TestTask(BASIC, "test2"));
133: completed.wait(2);
134: assertEquals(threadNames.get("test1"), threadNames
135: .get("test2"));
136: } finally {
137: pool.stop(true);
138: }
139: }
140:
141: /**
142: * Test multiple pooling
143: */
144: public void testMultiplePooling() throws Exception {
145: BasicThreadPool pool = new BasicThreadPool();
146: try {
147: pool.runTask(new TestTask(HOLD_START, "test1"));
148: started.wait(1);
149: pool.runTask(new TestTask(BASIC, "test2"));
150: completed.wait(1);
151: started.release("test1");
152: completed.wait(2);
153: assertTrue("Shouldn't run on the same thread",
154: threadNames.get("test1").equals(
155: threadNames.get("test2")) == false);
156: } finally {
157: pool.stop(true);
158: }
159: }
160:
161: /**
162: * Test maximum pool
163: */
164: public void testMaximumPool() throws Exception {
165: BasicThreadPool pool = new BasicThreadPool();
166: pool.setMaximumPoolSize(1);
167: try {
168: pool.runTask(new TestTask(HOLD_START, "test1"));
169: started.wait(1);
170: pool.runTask(new TestTask(BASIC, "test2"));
171: Thread.sleep(1000);
172: assertEquals(0, completed.tasks.size());
173: started.release("test1");
174: completed.wait(2);
175: assertEquals(
176: makeExpected(new Object[] { "test1", "test2" }),
177: completed.tasks);
178: } finally {
179: pool.stop(true);
180: }
181: }
182:
183: /**
184: * Test maximum cache
185: */
186: public void testMaximumQueue() throws Exception {
187: BasicThreadPool pool = new BasicThreadPool();
188: pool.setMaximumQueueSize(1);
189: pool.setMaximumPoolSize(1);
190: try {
191: pool.runTask(new TestTask(HOLD_START, "test1"));
192: started.wait(1);
193: pool.runTask(new TestTask(BASIC, "test2"));
194: assertEquals(0, rejected.tasks.size());
195: pool.runTask(new TestTask(BASIC, "test3"));
196: assertEquals(makeExpected(new Object[] { "test3" }),
197: rejected.tasks);
198:
199: started.release("test1");
200: completed.wait(2);
201: assertEquals(
202: makeExpected(new Object[] { "test1", "test2" }),
203: completed.tasks);
204: } finally {
205: pool.stop(true);
206: }
207: }
208:
209: /**
210: * Test maximum cache
211: */
212: public void testCompleteTimeout() throws Exception {
213: BasicThreadPool pool = new BasicThreadPool();
214: pool.setMaximumQueueSize(1);
215: pool.setMaximumPoolSize(1);
216: try {
217: /* Test that a task with a timeout that completes within its timeout
218: works as expected
219: */
220: TestTask task = new TestTask(HOLD_START, "test1", 0,
221: 10 * 1000, Task.WAIT_NONE);
222: pool.runTask(task);
223: started.wait(1);
224: started.release("test1");
225: completed.wait(1);
226:
227: /* Test a task with a timeout that does not complete within its timeout
228: is stopped
229: */
230: task = new TestTask(HOLD_START, "test2", 0, 10 * 1000,
231: Task.WAIT_NONE);
232: task.setRunSleepTime(12 * 1000);
233: pool.runTask(task);
234: started.wait(1);
235: started.release("test2");
236: stopped.wait(1);
237: completed.wait(1);
238:
239: // Test that another valid task completes as expected
240: task = new TestTask(HOLD_START, "test3", 0, 0,
241: Task.WAIT_NONE);
242: pool.runTask(task);
243: started.wait(1);
244: started.release("test3");
245: completed.wait(1);
246:
247: /* Test a task with a timeout that does not complete within its timeout
248: is stopped
249: */
250: task = new TestTask(HOLD_START, "test4", 0, 10 * 1000,
251: Task.WAIT_NONE);
252: task.setRunSleepTime(12 * 1000);
253: pool.runTask(task);
254: started.wait(1);
255: started.release("test4");
256: stopped.wait(1);
257: completed.wait(1);
258: } finally {
259: pool.stop(true);
260: }
261: }
262:
263: public void testCompleteTimeoutWithSpinLoop() throws Exception {
264: BasicThreadPool pool = new BasicThreadPool();
265: pool.setMaximumQueueSize(1);
266: pool.setMaximumPoolSize(1);
267: try {
268: /* Test that a task with a timeout that completes within its timeout
269: works as expected
270: */
271: TestTask task = new TestTask(HOLD_START, "test1", 0,
272: 10 * 1000, Task.WAIT_NONE);
273: task.setRunSleepTime(Long.MAX_VALUE);
274: pool.runTask(task);
275: started.wait(1);
276: started.release("test1");
277: stopped.wait(1);
278: completed.wait(1);
279: } finally {
280: pool.stop(true);
281: }
282: }
283:
284: /**
285: * Save the thread name
286: *
287: * @param data the test data
288: * @param name the thread name
289: */
290: public synchronized void saveRunnableThreadName(String data,
291: String name) {
292: threadNames.put(data, name);
293: }
294:
295: /**
296: * Make the expected result
297: *
298: * @param expected the results as an object array
299: * @return the expected result
300: */
301: public HashSet makeExpected(Object[] expected) {
302: return new HashSet(Arrays.asList(expected));
303: }
304:
305: /**
306: * Test task
307: */
308: public class TestTask implements Task {
309: /** The test to run */
310: private int test;
311: /** The data for the test */
312: private String data;
313: /** The start timeout */
314: private long startTimeout;
315: /** The completion timeout */
316: private long completionTimeout;
317: /** The time to sleep in execute */
318: private long runSleepTime;
319: /** The wait type */
320: private int waitType;
321:
322: /**
323: * Create a new TestTask
324: *
325: * @param test the test
326: * @param data the test data
327: */
328: public TestTask(int test, String data) {
329: this (test, data, 0, Task.WAIT_NONE);
330: }
331:
332: /**
333: * Create a new TestTask
334: *
335: * @param test the test
336: * @param data the test data
337: * @param startTimeout the start timeout
338: * @param waitType the wait type
339: */
340: public TestTask(int test, String data, long startTimeout,
341: int waitType) {
342: this (test, data, startTimeout, 0, waitType);
343: }
344:
345: public TestTask(int test, String data, long startTimeout,
346: long completionTimeout, int waitType) {
347: this .test = test;
348: this .data = data;
349: this .startTimeout = startTimeout;
350: this .completionTimeout = completionTimeout;
351: this .waitType = waitType;
352: }
353:
354: public void execute() {
355: saveThreadName();
356: log.info("Start execute");
357: if (runSleepTime > 0) {
358: log.info("Begin spin loop");
359: if (runSleepTime == Long.MAX_VALUE) {
360: while (true)
361: ;
362: } else {
363: log.info("Begin sleep");
364: try {
365: Thread.sleep(runSleepTime);
366: } catch (InterruptedException e) {
367: }
368: }
369: }
370: log.info("End execute");
371: }
372:
373: public void saveThreadName() {
374: saveRunnableThreadName(data, Thread.currentThread()
375: .getName());
376: }
377:
378: public void accepted(long time) {
379: accepted.notify(data, time);
380: }
381:
382: public void rejected(long time, Throwable throwable) {
383: rejected.notify(data, time, throwable);
384: }
385:
386: public void started(long time) {
387: started.notify(data, time);
388: if (test == HOLD_START)
389: started.waitForRelease(data);
390: }
391:
392: public void completed(long time, Throwable throwable) {
393: completed.notify(data, time, throwable);
394: }
395:
396: public long getCompletionTimeout() {
397: return completionTimeout;
398: }
399:
400: public int getPriority() {
401: return Thread.NORM_PRIORITY;
402: }
403:
404: public long getStartTimeout() {
405: return startTimeout;
406: }
407:
408: public int getWaitType() {
409: return waitType;
410: }
411:
412: public void stop() {
413: stopped.notify(data);
414: }
415:
416: public void setRunSleepTime(long runSleepTime) {
417: this .runSleepTime = runSleepTime;
418: }
419: }
420:
421: public class Stats {
422: /**
423: * The name
424: */
425: String name;
426:
427: /** The tasks */
428: HashSet tasks = new HashSet();
429:
430: /** The times */
431: HashMap times = new HashMap();
432:
433: /** The errors */
434: HashMap errors = new HashMap();
435:
436: /** The releases */
437: HashSet releases = new HashSet();
438:
439: public Stats(String name) {
440: this .name = name;
441: }
442:
443: /**
444: * Wait for expected
445: */
446: public void wait(int target) throws InterruptedException {
447: log.debug(Thread.currentThread().getName()
448: + ": Waiting for " + name + " target=" + target);
449: synchronized (ThreadPoolTaskUnitTestCase.this ) {
450: while (tasks.size() < target)
451: ThreadPoolTaskUnitTestCase.this .wait();
452: log.debug(Thread.currentThread().getName()
453: + ": Waited for " + name + " target=" + target);
454: }
455: }
456:
457: /**
458: * Release in waiting
459: *
460: * @param data the thread
461: */
462: public void release(String data) {
463: log.debug(Thread.currentThread().getName() + ": Releasing "
464: + name + " data=" + data);
465: synchronized (ThreadPoolTaskUnitTestCase.this ) {
466: releases.add(data);
467: ThreadPoolTaskUnitTestCase.this .notifyAll();
468: log.debug(Thread.currentThread().getName()
469: + ": Released " + name + " data=" + data);
470: }
471: }
472:
473: /**
474: * Wait for release
475: */
476: public void waitForRelease(String data) {
477: log
478: .debug(Thread.currentThread().getName()
479: + ": Waiting for release " + name
480: + " data=" + data);
481: synchronized (ThreadPoolTaskUnitTestCase.this ) {
482: try {
483: while (releases.contains(data) == false)
484: ThreadPoolTaskUnitTestCase.this .wait();
485: } catch (InterruptedException ignored) {
486: }
487: log.debug(Thread.currentThread().getName()
488: + ": Waited for release " + name + " data="
489: + data);
490: }
491: }
492:
493: /**
494: * Notify
495: */
496: public void notify(String data) {
497: log.debug(Thread.currentThread().getName() + ": Notifying "
498: + name + " data=" + data);
499: synchronized (ThreadPoolTaskUnitTestCase.this ) {
500: tasks.add(data);
501: ThreadPoolTaskUnitTestCase.this .notifyAll();
502: log.debug(Thread.currentThread().getName()
503: + ": Notified " + name + " data=" + data);
504: }
505: }
506:
507: /**
508: * Notify
509: */
510: public void notify(String data, long time) {
511: log.debug(Thread.currentThread().getName() + ": Notifying "
512: + name + " data=" + data + " time=" + time);
513: synchronized (ThreadPoolTaskUnitTestCase.this ) {
514: tasks.add(data);
515: times.put(data, new Long(time));
516: ThreadPoolTaskUnitTestCase.this .notifyAll();
517: }
518: log.debug(Thread.currentThread().getName() + ": Notified "
519: + name + " data=" + data + " time=" + time);
520: }
521:
522: /**
523: * Notify
524: */
525: public void notify(String data, long time, Throwable throwable) {
526: if (throwable != null)
527: log.debug(Thread.currentThread().getName()
528: + ": Notifying " + name + " data=" + data
529: + " time=" + time, throwable);
530: else
531: log.debug(Thread.currentThread().getName()
532: + ": Notifying " + name + " data=" + data
533: + " time=" + time + " throwable=null");
534: synchronized (ThreadPoolTaskUnitTestCase.this ) {
535: tasks.add(data);
536: times.put(data, new Long(time));
537: errors.put(data, throwable);
538: ThreadPoolTaskUnitTestCase.this .notifyAll();
539: }
540: if (throwable != null)
541: log.debug(Thread.currentThread().getName()
542: + ": Notified " + name + " data=" + data
543: + " time=" + time + " throwable="
544: + throwable.getMessage());
545: else
546: log.debug(Thread.currentThread().getName()
547: + ": Notified " + name + " data=" + data
548: + " time=" + time + " throwable=null");
549: }
550:
551: /**
552: * Clear
553: */
554: public void clear() {
555: log.debug(Thread.currentThread().getName() + ": Clearing "
556: + name);
557: synchronized (ThreadPoolTaskUnitTestCase.this ) {
558: tasks.clear();
559: log.debug(Thread.currentThread().getName()
560: + ": Cleared " + name);
561: }
562: }
563: }
564: }
|