001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.qos.frame.topology;
028:
029: import java.util.HashMap;
030: import java.util.HashSet;
031: import java.util.Iterator;
032: import java.util.Properties;
033: import java.util.Set;
034: import java.util.StringTokenizer;
035:
036: import org.cougaar.core.agent.service.alarm.Alarm;
037: import org.cougaar.core.blackboard.IncrementalSubscription;
038: import org.cougaar.core.component.ServiceBroker;
039: import org.cougaar.core.mts.MessageAddress;
040: import org.cougaar.core.qos.frame.DataFrame;
041: import org.cougaar.core.qos.frame.FrameSet;
042: import org.cougaar.core.qos.frame.FrameSetService;
043: import org.cougaar.core.qos.metrics.Metric;
044: import org.cougaar.core.plugin.ParameterizedPlugin;
045: import org.cougaar.core.qos.rss.AgentTopologyService;
046: import org.cougaar.core.service.AlarmService;
047: import org.cougaar.core.service.BlackboardService;
048: import org.cougaar.core.service.LoggingService;
049: import org.cougaar.core.service.wp.WhitePagesService;
050: import org.cougaar.core.thread.SchedulableStatus;
051: import org.cougaar.core.wp.ListAllAgents;
052: import org.cougaar.util.UnaryPredicate;
053:
054: /**
055: * This class populates a frameset with frames that represent the host-node-agent hierarchy.
056: * The plugin polls the WP service for all agents in the society.
057: * For each agents, it polls the Metrics Topology Service to get the nodes and hosts which contain the agent.
058: * It then adds DateFrames for the agent, node and host and changes the containment relationship
059: * as they move around.
060: */
061: public class TopologyFrameUpdaterPlugin extends ParameterizedPlugin {
062: private static final long POLL_PERIOD = 3000; // 3 seconds
063: private LoggingService log;
064: private FrameSet frameSet;
065: private AlarmService alarmService;
066: private WhitePagesService wp;
067: private AgentTopologyService topologyService;
068:
069: private Set getAgentsFromWP() {
070: Set justAgents = new HashSet();
071: Set rawAgents = null;
072: try {
073: SchedulableStatus.beginWait("WP call for all Agents");
074: rawAgents = ListAllAgents.listAllAgents(wp);
075: } catch (Exception e) {
076: e.printStackTrace();
077: } finally {
078: SchedulableStatus.endBlocking();
079: }
080: String agentName;
081: for (Iterator iter = rawAgents.iterator(); iter.hasNext();) {
082: agentName = (String) iter.next();
083: try {
084: SchedulableStatus.beginWait("WP call for version slot");
085: if (wp.get(agentName, "version") != null) {
086: justAgents.add(agentName);
087: }
088: } catch (Exception e) {
089: e.printStackTrace();
090: } finally {
091: SchedulableStatus.endBlocking();
092: }
093: }
094: return justAgents;
095: }
096:
097: // Couaar Topolopy Frameset helper methods
098:
099: private DataFrame addEntityToFrameset(String type, String name) {
100: Properties prop = new Properties();
101: prop.setProperty("name", name);
102: return frameSet.makeFrame(type, prop);
103: }
104:
105: private DataFrame addRelationshipToFrameset(String type,
106: String parentName, String childName) {
107: Properties prop = new Properties();
108: prop.setProperty("parent-value", parentName);
109: prop.setProperty("child-value", childName);
110: return frameSet.makeFrame(type, prop);
111: }
112:
113: private void addIndicatorsToAgent(String agent) {
114: addEntityToFrameset("cpuIndicator", agent + "Cpu");
115: addEntityToFrameset("msgInIndicator", agent + "MsgIn");
116: addEntityToFrameset("msgOutIndicator", agent + "MsgOut");
117: addRelationshipToFrameset("IndicatorOnAgent", agent, agent
118: + "Cpu");
119: addRelationshipToFrameset("IndicatorOnAgent", agent, agent
120: + "MsgIn");
121: addRelationshipToFrameset("IndicatorOnAgent", agent, agent
122: + "MsgOut");
123: }
124:
125: private Agent addAgent(String agentName) {
126: addIndicatorsToAgent(agentName);
127: return (Agent) addEntityToFrameset("agent", agentName);
128: }
129:
130: private Node addNode(String nodeName) {
131: return (Node) addEntityToFrameset("node", nodeName);
132: }
133:
134: private Host addHost(String hostName) {
135: return (Host) addEntityToFrameset("host", hostName);
136: }
137:
138: private AgentInNode addAgentInNode(String nodeName, String agentName) {
139: return (AgentInNode) addRelationshipToFrameset("AgentInNode",
140: nodeName, agentName);
141: }
142:
143: private NodeOnHost addNodeOnHost(String hostName, String nodeName) {
144: return (NodeOnHost) addRelationshipToFrameset("NodeOnHost",
145: hostName, nodeName);
146: }
147:
148: private AgentInNode findOrMakeOrMoveAgentInNode(String nodeName,
149: String agentName) {
150: //Check that agent is contained in node
151: AgentInNode agentRelationship = (AgentInNode) frameSet
152: .findFrame("AgentInNode", "child-value", agentName);
153: if (agentRelationship == null)
154: agentRelationship = addAgentInNode(nodeName, agentName);
155: //TODO agentRelationship.relationshipParent() use this instead (with different query for agentonframe)
156: else if (agentRelationship.getParentValue() != nodeName) {
157: agentRelationship.setParentValue(nodeName);
158: if (log.isInfoEnabled()) {
159: log.info("Agent=" + agentName + " moved to node="
160: + nodeName);
161: }
162: }
163: return agentRelationship;
164: }
165:
166: private NodeOnHost findOrMakeOrMoveNodeOnHost(String hostName,
167: String nodeName) {
168: NodeOnHost nodeRelationship = (NodeOnHost) frameSet.findFrame(
169: "NodeOnHost", "child-value", nodeName);
170: if (nodeRelationship == null)
171: nodeRelationship = addNodeOnHost(hostName, nodeName);
172: else if (nodeRelationship.getParentValue() != hostName) {
173: frameSet.removeFrame(nodeRelationship);
174: nodeRelationship = addNodeOnHost(hostName, nodeName);
175: // if (log.isInfoEnabled()){
176: // log.info("Node=" + nodeName + " moved to host=" +hostName);
177: // }
178: }
179: return nodeRelationship;
180: }
181:
182: private Host findOrMakeHost(String hostName) {
183: Host hostFrame = (Host) frameSet.findFrame("host", "name",
184: hostName);
185: if (hostFrame == null) {
186: hostFrame = addHost(hostName);
187: }
188: return hostFrame;
189: }
190:
191: private Node findOrMakeNode(String nodeName) {
192: Node nodeFrame = (Node) frameSet.findFrame("node", "name",
193: nodeName);
194: if (nodeFrame == null) {
195: nodeFrame = addNode(nodeName);
196: }
197: return nodeFrame;
198: }
199:
200: private Agent findOrMakeAgent(String agentName) {
201: Agent agentFrame = (Agent) frameSet.findFrame("agent", "name",
202: agentName);
203: if (agentFrame == null) {
204: agentFrame = addAgent(agentName);
205: }
206: return agentFrame;
207: }
208:
209: private boolean assureAgentInFrameSet(String agentName) {
210: //check that agent->node->hostin WP, before adding to frameset
211: MessageAddress agentAddress = MessageAddress
212: .getMessageAddress(agentName);
213: String nodeName = topologyService.getAgentNode(agentAddress);
214: if (nodeName == null)
215: return false;
216: String hostName = topologyService.getAgentHost(agentAddress);
217: if (hostName == null)
218: return false;
219: // Assure agent->node->host in frameset
220: findOrMakeAgent(agentName);
221: findOrMakeNode(nodeName);
222: findOrMakeHost(hostName);
223: findOrMakeOrMoveAgentInNode(nodeName, agentName);
224: findOrMakeOrMoveNodeOnHost(hostName, nodeName);
225: return true;
226: }
227:
228: // TODO Determine the dyamic status of an indicator
229: // We have to use dynamic slots, because of indicatory slots are not defined
230: // as part of indicator prototype, to get around a bug in the frame code)
231: private String indicatorStatus(Indicator indicator) {
232: //JAZ no need to use the dynamic accessors
233: String watchSlot = (String) indicator.getValue("watchSlot");
234: Metric watchMetric = (Metric) indicator.getValue(watchSlot);
235: // If credibility too low declair status unknown
236: if (watchMetric == null || watchMetric.getCredibility() <= 0.1)
237: return "unknown";
238: // get thresholds
239: double idleThreshold = ((Double) indicator
240: .getValue("idleThreshold")).doubleValue();
241: double calmThreshold = ((Double) indicator
242: .getValue("calmThreshold")).doubleValue();
243: double normalThreshold = ((Double) indicator
244: .getValue("normalThreshold")).doubleValue();
245: double busyThreshold = ((Double) indicator
246: .getValue("busyThreshold")).doubleValue();
247: double franticThreshold = ((Double) indicator
248: .getValue("franticThreshold")).doubleValue();
249: double watchValue = watchMetric.doubleValue();
250: // test status
251: if (watchValue <= idleThreshold)
252: return "idle";
253: if (watchValue <= calmThreshold)
254: return "calm";
255: if (watchValue <= normalThreshold)
256: return "normal";
257: if (watchValue <= busyThreshold)
258: return "busy";
259: if (watchValue <= franticThreshold)
260: return "frantic";
261: return "frantic";
262: }
263:
264: private String hostStatus(Host host) {
265: //JAZ no need to use the dynamic accessors
266: Metric loadAverage = (Metric) host.getValue("loadAverage");
267: Metric cpuCount = (Metric) host.getValue("count");
268: if (loadAverage == null || cpuCount == null
269: || (loadAverage.getCredibility() <= 0.1)
270: || (cpuCount.getCredibility() <= 0.1))
271: return "unknown";
272: double norm = loadAverage.doubleValue()
273: / cpuCount.doubleValue();
274: if (norm <= 0.0)
275: return "idle";
276: if (norm <= 0.2)
277: return "calm";
278: if (norm <= 0.5)
279: return "normal";
280: if (norm <= 1.0)
281: return "busy";
282: if (norm <= 2.0)
283: return "frantic";
284: return "frantic";
285: }
286:
287: private String nodeStatus(Node node) {
288: //JAZ no need to use the dynamic accessors
289: Metric cpu = (Metric) node.getValue("cpuLoadAverage");
290: Metric msgIn = (Metric) node.getValue("msgIn");
291: Metric msgOut = (Metric) node.getValue("msgOut");
292: if (cpu == null || msgIn == null || msgOut == null
293: || (cpu.getCredibility() <= 0.1)
294: || (msgIn.getCredibility() <= 0.1)
295: || (msgOut.getCredibility() <= 0.1))
296: return "unknown";
297: double norm = 0.34 * cpu.doubleValue() + 0.0033
298: * msgIn.doubleValue() + 0.0033 * msgOut.doubleValue();
299: if (norm <= 0.0)
300: return "idle";
301: if (norm <= 0.2)
302: return "calm";
303: if (norm <= 0.5)
304: return "normal";
305: if (norm <= 1.0)
306: return "busy";
307: if (norm <= 2.0)
308: return "frantic";
309: return "frantic";
310: }
311:
312: private String agentStatus(Agent agent) {
313: Metric cpu = agent.getCpuLoadAverage();
314: Metric msgIn = agent.getMsgIn();
315: Metric msgOut = agent.getMsgOut();
316: // JAZ no need to use the dynamic accessors
317: // Metric cpu= (Metric) agent.getValue("cpuLoadAverage");
318: // Metric msgIn= (Metric) agent.getValue("msgIn");
319: // Metric msgOut = (Metric) agent.getValue("msgOut");
320: if (cpu == null || msgIn == null || msgOut == null
321: || (cpu.getCredibility() <= 0.1)
322: || (msgIn.getCredibility() <= 0.1)
323: || (msgOut.getCredibility() <= 0.1))
324: return "unknown";
325: double norm = 0.34 * cpu.doubleValue() + 0.0033
326: * msgIn.doubleValue() + 0.0033 * msgOut.doubleValue();
327: if (norm <= 0.0)
328: return "idle";
329: if (norm <= 0.2)
330: return "calm";
331: if (norm <= 0.5)
332: return "normal";
333: if (norm <= 1.0)
334: return "busy";
335: if (norm <= 2.0)
336: return "frantic";
337: return "frantic";
338: }
339:
340: private void updateIndicator(Indicator indicatorFrame) {
341: String oldStatus = indicatorFrame.getValue("status").toString();
342: String currentStatus = indicatorStatus(indicatorFrame);
343: // if (log.isInfoEnabled()){
344: // log.info("indicator=" +indicatorFrame.getName() +
345: // " oldStatus=" + oldStatus +
346: // " currentStatus=" + currentStatus );
347: // }
348: if (!oldStatus.equals(currentStatus)) {
349: indicatorFrame.setStatus(currentStatus);
350: }
351: }
352:
353: private void updateAgentStatus(Agent agentFrame) {
354: String oldStatus = agentFrame.getValue("status").toString();
355: String currentStatus = agentStatus(agentFrame);
356: if (!oldStatus.equals(currentStatus)) {
357: agentFrame.setStatus(currentStatus);
358: }
359: }
360:
361: private void updateNodeStatus(Node nodeFrame) {
362: String oldStatus = nodeFrame.getValue("status").toString();
363: String currentStatus = nodeStatus(nodeFrame);
364: if (!oldStatus.equals(currentStatus)) {
365: nodeFrame.setStatus(currentStatus);
366: }
367: }
368:
369: private void updateHostStatus(Host hostFrame) {
370: String oldStatus = hostFrame.getValue("status").toString();
371: String currentStatus = hostStatus(hostFrame);
372: if (!oldStatus.equals(currentStatus)) {
373: hostFrame.setStatus(currentStatus);
374: }
375: }
376:
377: private void updateAgentIndicators(Agent agentFrame) {
378: HashMap indicators = (HashMap) agentFrame
379: .findRelationshipFrames("child", "IndicatorOnAgent");
380: Iterator itr = indicators.values().iterator();
381: while (itr.hasNext()) {
382: Indicator indicatorFrame = (Indicator) itr.next();
383: updateIndicator(indicatorFrame);
384: }
385: }
386:
387: private class Poller implements Alarm {
388: private long expirationTime;
389: private long period;
390: private boolean expired = false;
391:
392: public Poller(long period) {
393: super ();
394: this .period = period;
395: this .expirationTime = System.currentTimeMillis() + period;
396: }
397:
398: public long getExpirationTime() {
399: return expirationTime;
400: }
401:
402: public boolean hasExpired() {
403: return expired;
404: }
405:
406: public boolean cancel() {
407: boolean was = expired;
408: expired = true;
409: return was;
410: }
411:
412: public void restart() {
413: this .expirationTime = System.currentTimeMillis() + period;
414: expired = false;
415: alarmService.addRealTimeAlarm(this );
416: }
417:
418: public void expire() {
419: // Poll WP for all agents in society and assure they are in FrameSet.
420: String agentName;
421: Set agents = getAgentsFromWP();
422: for (Iterator iter = agents.iterator(); iter.hasNext();) {
423: agentName = (String) iter.next();
424: assureAgentInFrameSet(agentName);
425: }
426: restart();
427: }
428: }
429:
430: public void load() {
431: super .load();
432: ServiceBroker sb = getServiceBroker();
433: log = (LoggingService) sb.getService(this ,
434: LoggingService.class, null);
435: alarmService = (AlarmService) sb.getService(this ,
436: AlarmService.class, null);
437: wp = (WhitePagesService) sb.getService(this ,
438: WhitePagesService.class, null);
439: topologyService = (AgentTopologyService) sb.getService(this ,
440: AgentTopologyService.class, null);
441: }
442:
443: public void start() {
444: String files = getParameter("frame-set-files",
445: "org/cougaar/core/qos/frame/topology/cougaar-topology-protos.xml");
446: String name = getParameter("frame-set", "societyTopology");
447:
448: if (files != null) {
449: // Get a list of files from frame-set-files parameter
450: StringTokenizer tk = new StringTokenizer(files, ",");
451: String[] xml_filenames = new String[tk.countTokens()];
452: int i = 0;
453: while (tk.hasMoreTokens())
454: xml_filenames[i++] = tk.nextToken();
455: // Create Frameset
456: ServiceBroker sb = getServiceBroker();
457: BlackboardService bbs = getBlackboardService();
458: FrameSetService fss = (FrameSetService) sb.getService(this ,
459: FrameSetService.class, null);
460: frameSet = fss.loadFrameSet(name, xml_filenames, sb, bbs);
461: // TODO remove Test Frames
462: // addHost("test");
463: // addNode("NODE2");
464: // addAgent("agent");
465: // addNodeOnHost("test","NODE2");
466: // addAgentInNode("NODE2","agent");
467: // Start Poller to discover agents from WP and add them to Frameset
468: alarmService.addRealTimeAlarm(new Poller(POLL_PERIOD));
469: sb.releaseService(this , FrameSetService.class, fss);
470: } else {
471: if (log.isWarnEnabled())
472: log.warn("No FrameSet XML files were specified");
473: }
474: super .start();
475: }
476:
477: // Plugin code
478: // The plugin does two things:
479: // 1) Since it owns the topology frame set, it must register changes with the blackboard
480: // using the frameset.ProcessQueue();
481: // 2) It also monitors host, node, agent, indicator frames for changes,
482: // and updates their stautus
483: // A better solution is needed for monitoring indicators.
484: // the current frameset implementation
485: // does not register changes to frames for changes in thier parent
486:
487: private IncrementalSubscription agentSubscription;
488: private IncrementalSubscription nodeSubscription;
489: private IncrementalSubscription hostSubscription;
490:
491: private static final UnaryPredicate agentPredicate = new UnaryPredicate() {
492: public boolean execute(Object o) {
493: return (o instanceof Agent);
494: }
495: };
496:
497: private static final UnaryPredicate nodePredicate = new UnaryPredicate() {
498: public boolean execute(Object o) {
499: return (o instanceof Node);
500: }
501: };
502:
503: private static final UnaryPredicate hostPredicate = new UnaryPredicate() {
504: public boolean execute(Object o) {
505: return (o instanceof Host);
506: }
507: };
508:
509: protected void setupSubscriptions() {
510: //JAZ can't directly monitor indcators, because slot changes do not propagate to childern
511: //indicatorSubscription = (IncrementalSubscription) blackboard.subscribe(indicatorPredicate);
512: agentSubscription = (IncrementalSubscription) blackboard
513: .subscribe(agentPredicate);
514: nodeSubscription = (IncrementalSubscription) blackboard
515: .subscribe(nodePredicate);
516: hostSubscription = (IncrementalSubscription) blackboard
517: .subscribe(hostPredicate);
518: }
519:
520: protected void execute() {
521: // Process all the Frame.set changes. this has to be done for any frameset
522: frameSet.processQueue();
523:
524: // Look for changes to specific types of frames
525: if (agentSubscription.hasChanged()) {
526: Iterator itr = agentSubscription.getChangedCollection()
527: .iterator();
528: while (itr.hasNext()) {
529: Agent agentFrame = (Agent) itr.next();
530: updateAgentIndicators(agentFrame);
531: updateAgentStatus(agentFrame);
532: }
533: }
534: if (nodeSubscription.hasChanged()) {
535: Iterator itr = nodeSubscription.getChangedCollection()
536: .iterator();
537: while (itr.hasNext()) {
538: Node nodeFrame = (Node) itr.next();
539: updateNodeStatus(nodeFrame);
540: }
541: }
542: if (hostSubscription.hasChanged()) {
543: Iterator itr = hostSubscription.getChangedCollection()
544: .iterator();
545: while (itr.hasNext()) {
546: Host hostFrame = (Host) itr.next();
547: updateHostStatus(hostFrame);
548: }
549: }
550: }
551:
552: }
|