001: package com.ice.jcvsweb.schedule;
002:
003: import java.text.SimpleDateFormat;
004: import java.util.ArrayList;
005:
006: import javax.servlet.ServletContext;
007:
008: public class Scheduler extends Thread {
009: public static final String NAME = "JCVSWebScheduler";
010: public static final String ATTR_NAME = "jcvsScheduler";
011:
012: private Object semaphore = null;
013: private boolean shutDownF = false;
014: private ServletContext context = null;
015:
016: // These are tasks that are queued up to run "asap".
017: // We just hand them out as pooled threads become
018: // available for processing.
019: //
020: private ArrayList whenPossible = null;
021: private WhenPossible whenPossibleThread = null;
022:
023: public Scheduler(ServletContext ctx) {
024: super (Scheduler.NAME);
025: this .context = ctx;
026: this .semaphore = new Object();
027:
028: this .whenPossible = new ArrayList();
029: this .whenPossibleThread = this .new WhenPossible("WhenPossible",
030: this .whenPossible);
031: }
032:
033: public void run() {
034: this .context.log("[JCVS] Scheduler starts");
035:
036: this .whenPossibleThread.start();
037:
038: for (;;) {
039: if (false)
040: this .context.log("[JCVS] Scheduler runs"
041: + " when possible Q size = "
042: + this .whenPossible.size());
043:
044: synchronized (this .semaphore) {
045: if (this .shutDownF) {
046: this .whenPossibleThread.shutdown();
047: break;
048: }
049: }
050:
051: try {
052: this .sleep(7 * 1000);
053: } catch (InterruptedException ex) {
054: this .context.log("sleep( 20 secs ) interrupted", ex);
055: }
056: }
057:
058: this .context.log("[JCVS] Scheduler ends.");
059: }
060:
061: public void runWhenPossible(ScheduledEvent event) {
062: this .context.log("[JCVS] runWhenPossible: ADD " + event);
063:
064: synchronized (this .whenPossible) {
065: this .whenPossible.add(event);
066: }
067:
068: this .context.log("[JCVS] runWhenPossible: ADDED " + event);
069: }
070:
071: public void shutdown() {
072: this .context.log("[JCVS] Scheduler shutdown requested.");
073:
074: synchronized (this .semaphore) {
075: this .shutDownF = true;
076: }
077: }
078:
079: private class WhenPossible extends Thread {
080: private volatile boolean end = false;
081: private volatile ScheduledEvent event = null;
082: private volatile ArrayList whenPossible = null;
083:
084: public WhenPossible(String name, ArrayList whenPossible) {
085: super (name);
086: this .whenPossible = whenPossible;
087: }
088:
089: public synchronized void shutdown() {
090: this .end = true;
091: }
092:
093: public void run() {
094: for (;;) {
095: if (false)
096: context.log("WhenPossibleThread wakes: "
097: + this .event);
098:
099: synchronized (this ) {
100: if (this .end)
101: break;
102: }
103:
104: this .event = null;
105: synchronized (this .whenPossible) {
106: if (this .whenPossible.size() > 0) {
107: this .event = (ScheduledEvent) this .whenPossible
108: .remove(0);
109: }
110: }
111:
112: if (this .event != null) {
113: context
114: .log("WhenPossibleThread run: "
115: + this .event);
116:
117: this .event.run();
118:
119: context.log("WhenPossibleThread done: "
120: + this .event);
121: } else {
122: try {
123: this .sleep(3 * 1000);
124: } catch (InterruptedException ex) {
125: context.log("sleep( 3 secs ) interrupted", ex);
126: }
127: }
128: }
129: }
130:
131: }
132:
133: }
|