001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.qos.metrics;
028:
029: import java.util.HashMap;
030: import java.util.Iterator;
031:
032: import org.cougaar.core.component.ServiceBroker;
033: import org.cougaar.core.mts.AgentStatusService;
034: import org.cougaar.core.mts.MessageAddress;
035: import org.cougaar.core.node.NodeIdentificationService;
036: import org.cougaar.core.plugin.ComponentPlugin;
037: import org.cougaar.core.service.ThreadService;
038: import org.cougaar.core.thread.Schedulable;
039:
040: /**
041: * This Plugin converts the AgentStatusService records into
042: * Metrics. It snapshots the service periodically, and converts the
043: * deltas into rates. This Plugin is laaded as part of the standard
044: * metrics service rules and is responsible for publishing the message
045: * traffic metrics for Agents and Nodes into the MetricsUpdateService.
046: *
047: * In the <a
048: * href="../../../../../OnlineManual/MetricsService/sensors.html">Sensor
049: * Data Flow</a> pattern this class plays the role of <b>Sensor</b>
050: * for message counts and size among Agents and Nodes.
051: *
052: * @see AgentLoadServlet
053: * @see AgentStatusService
054: */
055: public class AgentStatusRatePlugin extends ComponentPlugin implements
056: Runnable, Constants {
057: private static final int LOCAL = 0;
058: private static final int REMOTE = 1;
059: private static final int BASE_PERIOD = 10; //10SecAVG
060:
061: private AgentStatusService agentStatusService;
062: private MetricsUpdateService metricsUpdate;
063: private NodeHistory nodeHistory;
064: private HashMap agentLocalHistories;
065: private HashMap agentRemoteHistories;
066: private MessageAddress nodeID;
067: private Schedulable schedulable;
068:
069: public AgentStatusRatePlugin() {
070: super ();
071: agentLocalHistories = new HashMap();
072: agentRemoteHistories = new HashMap();
073: }
074:
075: private static class AgentSnapShot extends DecayingHistory.SnapShot {
076: AgentStatusService.AgentState state;
077:
078: AgentSnapShot(AgentStatusService.AgentState state) {
079: super ();
080: this .state = state;
081: }
082: }
083:
084: private abstract class History extends DecayingHistory {
085: MessageAddress agent;
086:
087: History(MessageAddress address, HashMap store) {
088: super (10, 3, BASE_PERIOD);
089: this .agent = address;
090: if (store != null)
091: store.put(address, this );
092: }
093:
094: }
095:
096: private class LocalHistory extends History {
097: String msgInKey;
098: String msgOutKey;
099: String bytesInKey;
100: String bytesOutKey;
101:
102: LocalHistory(MessageAddress address, String type, HashMap store) {
103: super (address, store);
104: String agentKey = type + KEY_SEPR + address + KEY_SEPR;
105: msgInKey = (agentKey + MSG_IN).intern();
106: addKey(msgInKey);
107: msgOutKey = (agentKey + MSG_OUT).intern();
108: addKey(msgOutKey);
109: bytesInKey = (agentKey + BYTES_IN).intern();
110: addKey(bytesInKey);
111: bytesOutKey = (agentKey + BYTES_OUT).intern();
112: addKey(bytesOutKey);
113: }
114:
115: public void newAddition(KeyMap keys,
116: DecayingHistory.SnapShot rawNow,
117: DecayingHistory.SnapShot rawLast) {
118: AgentSnapShot now = (AgentSnapShot) rawNow;
119: AgentSnapShot last = (AgentSnapShot) rawLast;
120: updateMetric(keys.getKey(msgInKey), msgInRate(now, last),
121: "msg/sec");
122: updateMetric(keys.getKey(msgOutKey), msgOutRate(now, last),
123: "msg/sec");
124: updateMetric(keys.getKey(bytesInKey),
125: bytesInRate(now, last), "bytes/sec");
126: updateMetric(keys.getKey(bytesOutKey), bytesOutRate(now,
127: last), "bytes/sec");
128: }
129: }
130:
131: private class NodeHistory extends LocalHistory {
132: NodeHistory() {
133: super (nodeID, "Node", null);
134: }
135: }
136:
137: private class AgentLocalHistory extends LocalHistory {
138:
139: AgentLocalHistory(MessageAddress address) {
140: super (address, "Agent", agentLocalHistories);
141: }
142: }
143:
144: private class AgentRemoteHistory extends History {
145: String msgFromKey;
146: String msgToKey;
147: String bytesFromKey;
148: String bytesToKey;
149:
150: AgentRemoteHistory(MessageAddress address) {
151: super (address, agentRemoteHistories);
152: String agentKey = "Node" + KEY_SEPR + nodeID + KEY_SEPR
153: + "Destination" + KEY_SEPR + agent + KEY_SEPR;
154: msgFromKey = (agentKey + MSG_FROM).intern();
155: addKey(msgFromKey);
156: msgToKey = (agentKey + MSG_TO).intern();
157: addKey(msgToKey);
158: bytesFromKey = (agentKey + BYTES_FROM).intern();
159: addKey(bytesFromKey);
160: bytesToKey = (agentKey + BYTES_TO).intern();
161: addKey(bytesToKey);
162: }
163:
164: public void newAddition(KeyMap keys,
165: DecayingHistory.SnapShot rawNow,
166: DecayingHistory.SnapShot rawLast) {
167: AgentSnapShot now = (AgentSnapShot) rawNow;
168: AgentSnapShot last = (AgentSnapShot) rawLast;
169: updateMetric(keys.getKey(msgFromKey), msgInRate(now, last),
170: "msg/sec");
171: updateMetric(keys.getKey(msgToKey), msgOutRate(now, last),
172: "msg/sec");
173: updateMetric(keys.getKey(bytesFromKey), bytesInRate(now,
174: last), "bytes/sec");
175: updateMetric(keys.getKey(bytesToKey), bytesOutRate(now,
176: last), "bytes/sec");
177: // JAZ ADD QUEUE Metric
178: }
179:
180: }
181:
182: private synchronized History getAgentHistory(MessageAddress agent,
183: int kind) {
184: HashMap map = kind == LOCAL ? agentLocalHistories
185: : agentRemoteHistories;
186: History history = (History) map.get(agent);
187: if (history != null)
188: return history;
189: else if (kind == LOCAL)
190: return new AgentLocalHistory(agent);
191: else
192: return new AgentRemoteHistory(agent);
193: }
194:
195: private void updateMetric(String key, double value, String units) {
196: Metric metric = new MetricImpl(value, SECOND_MEAS_CREDIBILITY,
197: units, "AgentStatusRatePlugin");
198: metricsUpdate.updateValue(key, metric);
199: }
200:
201: private double deltaSec(AgentSnapShot now, AgentSnapShot last) {
202: return (now.timestamp - last.timestamp) / 1000.0;
203: }
204:
205: private double msgInRate(AgentSnapShot now, AgentSnapShot last) {
206: double deltaT = deltaSec(now, last);
207: if (deltaT > 0) {
208: return (now.state.receivedCount - last.state.receivedCount)
209: / deltaT;
210: } else
211: return 0.0;
212: }
213:
214: private double msgOutRate(AgentSnapShot now, AgentSnapShot last) {
215: double deltaT = deltaSec(now, last);
216: if (deltaT > 0) {
217: return (now.state.deliveredCount - last.state.deliveredCount)
218: / deltaT;
219: } else
220: return 0.0;
221: }
222:
223: private double bytesOutRate(AgentSnapShot now, AgentSnapShot last) {
224: double deltaT = deltaSec(now, last);
225: if (deltaT > 0) {
226: return (now.state.deliveredBytes - last.state.deliveredBytes)
227: / deltaT;
228: } else
229: return 0.0;
230: }
231:
232: private double bytesInRate(AgentSnapShot now, AgentSnapShot last) {
233: double deltaT = deltaSec(now, last);
234: if (deltaT > 0) {
235: return (now.state.receivedBytes - last.state.receivedBytes)
236: / deltaT;
237: } else
238: return 0.0;
239: }
240:
241: public void load() {
242: super .load();
243:
244: ServiceBroker sb = getServiceBroker();
245: agentStatusService = (AgentStatusService) sb.getService(this ,
246: AgentStatusService.class, null);
247:
248: metricsUpdate = (MetricsUpdateService) sb.getService(this ,
249: MetricsUpdateService.class, null);
250:
251: NodeIdentificationService nis = (NodeIdentificationService) sb
252: .getService(this , NodeIdentificationService.class, null);
253: nodeID = nis.getMessageAddress();
254:
255: nodeHistory = new NodeHistory();
256:
257: // Start a 1 second poller, if the required services exist.
258: if (agentStatusService != null && metricsUpdate != null) {
259: ThreadService tsvc = (ThreadService) sb.getService(this ,
260: ThreadService.class, null);
261: schedulable = tsvc.getThread(this , this , "AgentStatus");
262: schedulable.schedule(0, BASE_PERIOD * 1000);
263: sb.releaseService(this , ThreadService.class, tsvc);
264: }
265: }
266:
267: // The body of the Schedulable
268: public void run() {
269: Iterator itr = agentStatusService.getLocalAgents().iterator();
270: while (itr.hasNext()) {
271: MessageAddress addr = (MessageAddress) itr.next();
272: AgentStatusService.AgentState state = agentStatusService
273: .getLocalAgentState(addr);
274: if (state != null) {
275: AgentSnapShot record = new AgentSnapShot(state);
276: getAgentHistory(addr, LOCAL).add(record);
277: }
278: }
279:
280: itr = agentStatusService.getRemoteAgents().iterator();
281: while (itr.hasNext()) {
282: MessageAddress addr = (MessageAddress) itr.next();
283: AgentStatusService.AgentState state = agentStatusService
284: .getRemoteAgentState(addr);
285: if (state != null) {
286: AgentSnapShot record = new AgentSnapShot(state);
287: getAgentHistory(addr, REMOTE).add(record);
288: }
289: }
290:
291: AgentStatusService.AgentState nodeState = agentStatusService
292: .getNodeState();
293: if (nodeState != null) {
294: // snapshot
295: AgentSnapShot record = new AgentSnapShot(nodeState);
296: nodeHistory.add(record);
297: } else {
298: // Can't happen
299: }
300:
301: }
302:
303: protected void setupSubscriptions() {
304: }
305:
306: protected void execute() {
307: //System.out.println("Executed AgentStatusRatePlugin");
308: }
309:
310: }
|