001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.async.impl;
006:
007: import com.tc.async.api.ConfigurationContext;
008: import com.tc.async.api.EventContext;
009: import com.tc.async.api.EventHandler;
010: import com.tc.async.api.EventHandlerException;
011: import com.tc.async.api.Sink;
012: import com.tc.async.api.Source;
013: import com.tc.async.api.SpecializedEventContext;
014: import com.tc.async.api.Stage;
015: import com.tc.exception.TCRuntimeException;
016: import com.tc.logging.TCLogger;
017: import com.tc.logging.TCLoggerProvider;
018:
019: import java.util.ArrayList;
020: import java.util.List;
021:
022: /**
023: * @author steve
024: */
025: public class StageImpl implements Stage {
026: private static final long pollTime = 3000; // This is the poor man's solution for stage
027: // shutdown
028: private static final EventContext PAUSE_TOKEN = new EventContext() {
029: //
030: };
031:
032: private final String name;
033: private final EventHandler handler;
034: private final Sink sink;
035: private final WorkerThread[] threads;
036: private final ThreadGroup group;
037: private final TCLogger logger;
038: private boolean isPaused = false;
039:
040: public StageImpl(TCLoggerProvider loggerProvider, String name,
041: EventHandler handler, Sink sink, int threadCount,
042: ThreadGroup group) {
043: this .logger = loggerProvider.getLogger(Stage.class.getName()
044: + ": " + name);
045: this .name = name;
046: this .handler = handler;
047: this .threads = new WorkerThread[threadCount];
048: this .sink = sink;
049: this .group = group;
050: }
051:
052: public void destroy() {
053: stopThreads();
054: }
055:
056: public void start(ConfigurationContext context) {
057: handler.initializeContext(context);
058: startThreads();
059: }
060:
061: public Sink getSink() {
062: return sink;
063: }
064:
065: public String getName() {
066: return name;
067: }
068:
069: private synchronized void startThreads() {
070: for (int i = 0; i < threads.length; i++) {
071: threads[i] = new WorkerThread("WorkerThread(" + name + ","
072: + i + ")", (Source) sink, handler, group);
073: threads[i].start();
074: }
075: }
076:
077: private void stopThreads() {
078: for (int i = 0; i < threads.length; i++) {
079: threads[i].shutdown();
080: }
081: }
082:
083: public String toString() {
084: return "StageImpl(" + name + ")";
085: }
086:
087: private static class WorkerThread extends Thread {
088: private static final com.tc.util.State RUNNING = new com.tc.util.State(
089: "RUNNING");
090: private static final com.tc.util.State PAUSED = new com.tc.util.State(
091: "PAUSED");
092:
093: private com.tc.util.State state;
094: private final Source source;
095: private final EventHandler handler;
096: private boolean shutdownRequested = false;
097:
098: public WorkerThread(String name, Source source,
099: EventHandler handler, ThreadGroup group) {
100: super (group, name);
101: setDaemon(true);
102: this .source = source;
103: this .handler = handler;
104: }
105:
106: public synchronized void shutdown() {
107: this .shutdownRequested = true;
108: }
109:
110: private synchronized boolean shutdownRequested() {
111: return this .shutdownRequested;
112: }
113:
114: synchronized void pause() {
115: while (state != PAUSED) {
116: try {
117: wait();
118: } catch (InterruptedException e) {
119: throw new TCRuntimeException(e);
120: }
121: }
122: }
123:
124: synchronized void unpause() {
125: if (state != PAUSED)
126: throw new AssertionError(
127: "Attempt to unpause when not paused: " + state);
128: state = RUNNING;
129: notifyAll();
130: }
131:
132: public void run() {
133: state = RUNNING;
134: while (!shutdownRequested()) {
135: EventContext ctxt;
136: try {
137: ctxt = source.poll(pollTime);
138: if (ctxt == PAUSE_TOKEN) {
139: synchronized (this ) {
140: state = PAUSED;
141: notifyAll();
142: while (state == PAUSED) {
143: wait();
144: }
145: }
146: } else if (ctxt != null) {
147: if (ctxt instanceof SpecializedEventContext) {
148: ((SpecializedEventContext) ctxt).execute();
149: } else {
150: handler.handleEvent(ctxt);
151: }
152: }
153: } catch (InterruptedException ie) {
154: if (shutdownRequested()) {
155: return;
156: }
157: throw new TCRuntimeException(ie);
158: } catch (EventHandlerException ie) {
159: if (shutdownRequested())
160: return;
161: throw new TCRuntimeException(ie);
162: } finally {
163: // Agressively null out the reference before going around the loop again. If you don't do this, the reference
164: // to the context will exist until another context comes in. This can potentially keep many objects in memory
165: // longer than necessary
166: ctxt = null;
167: }
168: }
169: }
170: }
171:
172: public void pause() {
173: if (isPaused)
174: throw new AssertionError(
175: "Attempt to pause while already paused.");
176:
177: log("Pausing...");
178:
179: List pauseTokens = new ArrayList(threads.length);
180: for (int i = 0; i < threads.length; i++) {
181: pauseTokens.add(PAUSE_TOKEN);
182: }
183: sink.pause(pauseTokens);
184: for (int i = 0; i < threads.length; i++) {
185: threads[i].pause();
186: }
187: isPaused = true;
188: log("Paused.");
189: }
190:
191: public void unpause() {
192: if (!isPaused)
193: throw new AssertionError(
194: "Attempt to unpause while not paused.");
195: log("Unpausing...");
196:
197: sink.unpause();
198: for (int i = 0; i < threads.length; i++) {
199: threads[i].unpause();
200: }
201:
202: isPaused = false;
203: log("Unpaused.");
204: }
205:
206: private void log(Object msg) {
207: logger.info("Stage " + name + ": " + msg);
208: }
209: }
|