001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
003: */
004: package com.tc.async.impl;
005:
006: import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
007: import EDU.oswego.cs.dl.util.concurrent.Channel;
008: import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
009:
010: import com.tc.async.api.ConfigurationContext;
011: import com.tc.async.api.EventHandler;
012: import com.tc.async.api.Stage;
013: import com.tc.async.api.StageManager;
014: import com.tc.async.api.StageMonitor;
015: import com.tc.logging.DefaultLoggerProvider;
016: import com.tc.logging.TCLogger;
017: import com.tc.logging.TCLoggerProvider;
018: import com.tc.properties.TCPropertiesImpl;
019: import com.tc.stats.Stats;
020: import com.tc.text.StringFormatter;
021: import com.tc.util.Assert;
022: import com.tc.util.concurrent.ThreadUtil;
023:
024: import java.util.Arrays;
025: import java.util.Collections;
026: import java.util.HashMap;
027: import java.util.Iterator;
028: import java.util.LinkedList;
029: import java.util.List;
030: import java.util.Map;
031:
032: /**
033: * @author steve
034: */
035: public class StageManagerImpl implements StageManager {
036:
037: private static final String STAGE_MONITOR = "tc.stage.monitor.enabled";
038: private static final String STAGE_MONITOR_DELAY = "tc.stage.monitor.delay";
039:
040: private static final boolean MONITOR = TCPropertiesImpl
041: .getProperties().getBoolean(STAGE_MONITOR);
042: private static final long MONITOR_DELAY = TCPropertiesImpl
043: .getProperties().getLong(STAGE_MONITOR_DELAY);
044:
045: private Map stages = new HashMap();
046: private TCLoggerProvider loggerProvider;
047: private final ThreadGroup group;
048: private String[] stageNames = new String[] {};
049:
050: public StageManagerImpl(ThreadGroup threadGroup) {
051: this .loggerProvider = new DefaultLoggerProvider();
052: this .group = threadGroup;
053:
054: if (MONITOR) {
055: startMonitor();
056: }
057: }
058:
059: private void startMonitor() {
060: final TCLogger logger = loggerProvider.getLogger(getClass());
061: Thread t = new Thread("SEDA Stage Monitor") {
062: public void run() {
063: while (true) {
064: printStats();
065: ThreadUtil.reallySleep(MONITOR_DELAY);
066: }
067: }
068:
069: private void printStats() {
070: try {
071: Stats stats[] = StageManagerImpl.this .getStats();
072: logger.info("Stage Depths");
073: logger.info("=================================");
074: for (int i = 0; i < stats.length; i++) {
075: stats[i].logDetails(logger);
076: }
077: } catch (Throwable th) {
078: logger.error(th);
079: }
080: }
081: };
082: t.setDaemon(true);
083: t.start();
084: }
085:
086: public void setLoggerProvider(TCLoggerProvider loggerProvider) {
087: this .loggerProvider = loggerProvider;
088: }
089:
090: public synchronized Stage createStage(String name,
091: EventHandler handler, int threads, int maxSize) {
092: Channel q = maxSize > 0 ? (Channel) new BoundedLinkedQueue(
093: maxSize) : new LinkedQueue();
094: Stage s = new StageImpl(loggerProvider, name, handler,
095: new StageQueueImpl(loggerProvider, name, q), threads,
096: group);
097: addStage(name, s);
098: return s;
099: }
100:
101: private synchronized void addStage(String name, Stage s) {
102: Object prev = stages.put(name, s);
103: Assert.assertNull(prev);
104: s.getSink().enableStatsCollection(MONITOR);
105: stageNames = (String[]) stages.keySet().toArray(
106: new String[stages.size()]);
107: Arrays.sort(stageNames);
108: }
109:
110: public void startStage(Stage stage, ConfigurationContext context) {
111: stage.start(context);
112: }
113:
114: public synchronized void startAll(ConfigurationContext context) {
115: for (Iterator i = stages.values().iterator(); i.hasNext();) {
116: Stage s = (Stage) i.next();
117: s.start(context);
118: }
119: }
120:
121: public void stopStage(Stage stage) {
122: stage.destroy();
123: }
124:
125: public synchronized void stopAll() {
126: for (Iterator i = stages.values().iterator(); i.hasNext();) {
127: Stage s = (Stage) i.next();
128: s.destroy();
129: }
130: }
131:
132: public synchronized Stage getStage(String name) {
133: return (Stage) stages.get(name);
134: }
135:
136: public synchronized Stats[] getStats() {
137: final String[] names = stageNames;
138: final Stats[] stats = new Stats[names.length];
139:
140: for (int i = 0; i < names.length; i++) {
141: stats[i] = getStage(names[i]).getSink().getStats(
142: MONITOR_DELAY);
143: }
144: return stats;
145: }
146:
147: static class StageMonitors {
148:
149: private final List monitors = Collections
150: .synchronizedList(new LinkedList());
151: private final StringFormatter formatter = new StringFormatter();
152:
153: StageMonitors(final TCLogger logger) {
154: return;
155: }
156:
157: public StageMonitor newStageMonitor(String name) {
158: return new NullStageMonitor();
159: }
160:
161: public String toString() {
162: StringBuffer buf = new StringBuffer();
163: buf.append("StageMonitors").append(formatter.newline());
164: for (Iterator i = Collections.unmodifiableList(monitors)
165: .iterator(); i.hasNext();) {
166: buf
167: .append(
168: ((StageMonitorImpl) i.next())
169: .dumpAndFlush()).append(
170: formatter.newline());
171: }
172: return buf.toString();
173: }
174: }
175:
176: }
|