Source Code Cross Referenced for Blackboard.java in  » Science » Cougaar12_4 » org » cougaar » core » blackboard » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Science » Cougaar12_4 » org.cougaar.core.blackboard 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /*
0002:         * <copyright>
0003:         *  
0004:         *  Copyright 1997-2004 BBNT Solutions, LLC
0005:         *  under sponsorship of the Defense Advanced Research Projects
0006:         *  Agency (DARPA).
0007:         * 
0008:         *  You can redistribute this software and/or modify it under the
0009:         *  terms of the Cougaar Open Source License as published on the
0010:         *  Cougaar Open Source Website (www.cougaar.org).
0011:         * 
0012:         *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
0013:         *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
0014:         *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
0015:         *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
0016:         *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
0017:         *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
0018:         *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
0019:         *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
0020:         *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
0021:         *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
0022:         *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
0023:         *  
0024:         * </copyright>
0025:         */
0026:
0027:        package org.cougaar.core.blackboard;
0028:
0029:        import java.util.ArrayList;
0030:        import java.util.Collection;
0031:        import java.util.Collections;
0032:        import java.util.Enumeration;
0033:        import java.util.HashMap;
0034:        import java.util.HashSet;
0035:        import java.util.Iterator;
0036:        import java.util.List;
0037:        import java.util.Map;
0038:        import java.util.Set;
0039:        import java.util.Vector;
0040:
0041:        import org.cougaar.bootstrap.SystemProperties;
0042:        import org.cougaar.core.agent.Agent;
0043:        import org.cougaar.core.agent.service.MessageSwitchService;
0044:        import org.cougaar.core.component.ServiceBroker;
0045:        import org.cougaar.core.logging.LoggingServiceWithPrefix;
0046:        import org.cougaar.core.mts.MessageAddress;
0047:        import org.cougaar.core.mts.MessageAttributes;
0048:        import org.cougaar.core.persist.BlackboardPersistence;
0049:        import org.cougaar.core.persist.Persistence;
0050:        import org.cougaar.core.persist.PersistenceException;
0051:        import org.cougaar.core.persist.PersistenceNotEnabledException;
0052:        import org.cougaar.core.persist.PersistenceObject;
0053:        import org.cougaar.core.service.AlarmService;
0054:        import org.cougaar.core.service.DomainForBlackboardService;
0055:        import org.cougaar.core.service.LoggingService;
0056:        import org.cougaar.core.service.ThreadService;
0057:        import org.cougaar.core.service.community.CommunityChangeAdapter;
0058:        import org.cougaar.core.service.community.CommunityChangeEvent;
0059:        import org.cougaar.core.service.community.CommunityService;
0060:        import org.cougaar.core.thread.Schedulable;
0061:        import org.cougaar.multicast.AttributeBasedAddress;
0062:        import org.cougaar.util.UnaryPredicate;
0063:        import org.cougaar.util.log.Logger;
0064:        import org.cougaar.util.log.Logging;
0065:
0066:        /**
0067:         * A {@link Subscriber} created by the {@link StandardBlackboard}
0068:         * that maintains a view of all published objects, invokes
0069:         * {@link org.cougaar.core.domain.Domain} {@link
0070:         * org.cougaar.core.domain.LogicProvider}s, and monitors
0071:         * community membership changes.
0072:         *
0073:         * @property org.cougaar.core.agent.savePriorPublisher
0074:         * When set to <em>true</em>, will collect extra 
0075:         * information on each publish to detect problems with multiple adds,
0076:         * deletes, etc by complaining
0077:         * about unexpected state changes.  This adds significant runtime overhead.
0078:         * @property org.cougaar.core.agent.enablePublishException
0079:         * When set to <em>true</em>, collects stack frames
0080:         * for each published object in order to pinpoint both sides of
0081:         * publish conflicts.  This is <em>extremely</em> 
0082:         * expensive.
0083:         * @property org.cougaar.core.persistence.enable
0084:         * When set to <em>true</em> will enable blackboard persistence.
0085:         * @property org.cougaar.core.blackboard.waitForNewCommChangeNotifications Time in 
0086:         * milliseconds to wait for more community changes before asking the community 
0087:         * service for them. Default is 1,000.
0088:         */
0089:        public class Blackboard extends Subscriber implements 
0090:                BlackboardServesDomain, BlackboardClient, PrivilegedClaimant {
0091:            protected CollectionSubscription everything;
0092:            protected MessageAddress self;
0093:            private Distributor myDistributor;
0094:            protected ServiceBroker myServiceBroker;
0095:            protected AlarmService alarmService;
0096:            protected DomainForBlackboardService myDomainService;
0097:            protected ThreadService threadS;
0098:            protected LoggingService logger;
0099:
0100:            public static final String INSERTION_POINT = Agent.INSERTION_POINT
0101:                    + ".Blackboard";
0102:
0103:            public MessageAddress getCID() {
0104:                return self;
0105:            }
0106:
0107:            public static final boolean isSavePriorPublisher = SystemProperties
0108:                    .getBoolean("org.cougaar.core.agent.savePriorPublisher");
0109:            public static final boolean enablePublishException = SystemProperties
0110:                    .getBoolean("org.cougaar.core.agent.enablePublishException");
0111:
0112:            /** 
0113:             * @property org.cougaar.core.blackboard.pedantic When true (the default) enables a variety
0114:             * of extra checks for suspicious blackboard activity.  None of these checks are especially
0115:             * expensive, so it is generally recommended that pedantic be left enabled.
0116:             *
0117:             */
0118:            public static final boolean PEDANTIC = SystemProperties.getBoolean(
0119:                    "org.cougaar.core.blackboard.pedantic", true);
0120:
0121:            /** the queue of messages to send */
0122:            private List sendQueue = new ArrayList();
0123:
0124:            // mark the envelopes which we emit so that we can detect them later.
0125:            protected Envelope createEnvelope() {
0126:                if (isTimestamped()) {
0127:                    return new TimestampedPlanEnvelopeImpl();
0128:                } else {
0129:                    return new PlanEnvelopeImpl();
0130:                }
0131:            }
0132:
0133:            /**
0134:             * Marked Envelope <i>interface</i> so that we can detect envelopes which we've
0135:             * emitted.
0136:             *
0137:             * This isn't an Envelope, since Envelope is a class, but this interface
0138:             * will only be applied to the two Envelope subclasses listed below.
0139:             */
0140:            interface PlanEnvelope {
0141:            }
0142:
0143:            private static final class PlanEnvelopeImpl extends Envelope
0144:                    implements  PlanEnvelope {
0145:            }
0146:
0147:            private static final class TimestampedPlanEnvelopeImpl extends
0148:                    TimestampedEnvelope implements  PlanEnvelope {
0149:                public boolean isBlackboard() {
0150:                    return true;
0151:                }
0152:            }
0153:
0154:            /** override to immediately publish deltas rather than delay until transaction close */
0155:            protected EnvelopeTuple clientAddedObject(Object o) {
0156:                EnvelopeTuple tup = super .clientAddedObject(o);
0157:                consumeTuple(tup);
0158:                return tup;
0159:            }
0160:
0161:            /** override to immediately publish deltas rather than delay until transaction close */
0162:            protected EnvelopeTuple clientRemovedObject(Object o) {
0163:                EnvelopeTuple tup = super .clientRemovedObject(o);
0164:                consumeTuple(tup);
0165:                return tup;
0166:            }
0167:
0168:            /** override to immediately publish deltas rather than delay until transaction close */
0169:            protected EnvelopeTuple clientChangedObject(Object o, List changes) {
0170:                EnvelopeTuple tup = super .clientChangedObject(o, changes);
0171:                consumeTuple(tup);
0172:                return tup;
0173:            }
0174:
0175:            /** invoked via client*Object while executing an LP */
0176:            private final boolean consumeTuple(EnvelopeTuple tup) {
0177:                boolean somethingFired = false;
0178:                synchronized (subscriptions) {
0179:                    for (int i = 0, n = subscriptions.size(); i < n; i++) {
0180:                        Subscription subscription = (Subscription) subscriptions
0181:                                .get(i);
0182:                        somethingFired |= tup.applyToSubscription(subscription,
0183:                                true);
0184:                    }
0185:                }
0186:                // recurses
0187:                callLogicProviders(tup, false);
0188:                return somethingFired;
0189:            }
0190:
0191:            /** is the object non-null? */
0192:            private static final UnaryPredicate anythingP = new AnythingPredicate();
0193:
0194:            private static final class AnythingPredicate implements 
0195:                    UnaryPredicate {
0196:                public boolean execute(Object o) {
0197:                    return (o != null);
0198:                }
0199:            }
0200:
0201:            public Blackboard(MessageSwitchService msgSwitch, ServiceBroker sb,
0202:                    Object state) {
0203:                myServiceBroker = sb;
0204:                self = msgSwitch.getMessageAddress();
0205:                myDistributor = createDistributor(msgSwitch, state);
0206:                setClientDistributor((BlackboardClient) this , myDistributor);
0207:                setName("<blackboard>");
0208:                logger = (LoggingService) sb.getService(this ,
0209:                        LoggingService.class, null);
0210:                logger = LoggingServiceWithPrefix.add(logger, self + ": ");
0211:                myDomainService = (DomainForBlackboardService) sb.getService(
0212:                        this , DomainForBlackboardService.class, null);
0213:                if (myDomainService == null) {
0214:                    RuntimeException re = new RuntimeException(
0215:                            "Couldn't get DomainForBlackboardService!");
0216:                    re.printStackTrace();
0217:                    throw re;
0218:                }
0219:                alarmService = (AlarmService) sb.getService(this ,
0220:                        AlarmService.class, null);
0221:                threadS = (ThreadService) sb.getService(this ,
0222:                        ThreadService.class, null);
0223:            }
0224:
0225:            public void stop() {
0226:                // FIXME: Stop the cacheClearer thread
0227:                // This for bug 3704
0228:                myDistributor = null;
0229:            }
0230:
0231:            private static class AllObjectsSet extends HashSet {
0232:                Map stacks = createStackMap();
0233:
0234:                protected Map createStackMap() {
0235:                    if (isSavePriorPublisher) {
0236:                        return new HashMap();
0237:                    } else {
0238:                        return null; // Don't keep prior publishing info
0239:                    }
0240:                }
0241:
0242:                public AllObjectsSet(int size) {
0243:                    super (size);
0244:                }
0245:
0246:                public boolean add(Object o) {
0247:                    boolean result = super .add(o);
0248:                    if (!result) {
0249:                        PublishStack priorStack = null;
0250:                        if (stacks != null) {
0251:                            priorStack = (PublishStack) stacks.get(o);
0252:                        }
0253:                        throw new PublishException(
0254:                                "Blackboard.everything.add object already published: "
0255:                                        + o.toString(), priorStack,
0256:                                stacks != null);
0257:                    } else if (stacks != null) {
0258:                        stacks.put(o, new PublishStack("Prior publisher: "));
0259:                    }
0260:                    return result;
0261:                }
0262:
0263:                public boolean remove(Object o) {
0264:                    boolean result = super .remove(o);
0265:                    if (!result) {
0266:                        PublishStack priorStack = null;
0267:                        if (stacks != null) {
0268:                            priorStack = (PublishStack) stacks.get(o);
0269:                        }
0270:                        throw new PublishException(
0271:                                "Blackboard.everything.remove object not published: "
0272:                                        + o.toString(), priorStack,
0273:                                stacks != null);
0274:                    } else if (stacks != null) {
0275:                        stacks.put(o, new PublishStack("Prior remover: "));
0276:                    }
0277:                    return result;
0278:                }
0279:            }
0280:
0281:            public final void init() {
0282:                everything = new CollectionSubscription(anythingP,
0283:                        enablePublishException ? new AllObjectsSet(111)
0284:                                : new HashSet(111));
0285:                subscribe(everything);
0286:                setReadyToPersist();
0287:            }
0288:
0289:            // Subscription Client interface
0290:            public String getBlackboardClientName() {
0291:                return getClass().getName();
0292:            }
0293:
0294:            /**
0295:             * Provide a new subscription with its initial fill. Called under
0296:             * the protection of the distributor lock so we are guaranteed that
0297:             * the allPlanObjects won't change.
0298:             */
0299:            public void fillSubscription(Subscription subscription) {
0300:                if (subscription == everything) {
0301:                    return; // Don't fill ourselves
0302:                }
0303:                Envelope envelope = createQueryEnvelope(subscription);
0304:                envelope.bulkAddObject(everything.getCollection());
0305:                subscription.fill(envelope);
0306:            }
0307:
0308:            public void fillQuery(Subscription subscription) {
0309:                Envelope envelope = createQueryEnvelope(subscription);
0310:                envelope.bulkAddObject(everything.getCollection());
0311:                subscription.fill(envelope);
0312:            }
0313:
0314:            private Envelope createQueryEnvelope(Subscription subscription) {
0315:                if (isTimestamped()) {
0316:                    TimestampedEnvelope te = new TimestampedEnvelope();
0317:                    Subscriber subscriber = subscription.getSubscriber();
0318:                    if (subscriber != null) {
0319:                        te.setName(subscriber.getName());
0320:                    }
0321:                    long nowTime = System.currentTimeMillis();
0322:                    te.setTransactionOpenTime(nowTime);
0323:                    // should we wait until after the query to set the close time?
0324:                    te.setTransactionCloseTime(nowTime);
0325:                    return te;
0326:                } else {
0327:                    return new Envelope();
0328:                }
0329:            }
0330:
0331:            /** Alias for sendDirective(aDirective, null); */
0332:            public void sendDirective(Directive aDirective) {
0333:                if (aDirective == null) {
0334:                    throw new IllegalArgumentException(
0335:                            "directive must not be null.");
0336:                } else {
0337:                    sendQueue.add(aDirective);
0338:                }
0339:            }
0340:
0341:            /**
0342:             * Submit a directive with attached ChangeReports for transmission 
0343:             * from this agent. We fill in the ContentsId with the next available number.
0344:             */
0345:            public void sendDirective(Directive aDirective, Collection c) {
0346:                if (aDirective == null) {
0347:                    throw new IllegalArgumentException(
0348:                            "directive must not be null.");
0349:                } else {
0350:                    if (c != null && ((Collection) c).size() > 0) {
0351:                        DirectiveMessage.DirectiveWithChangeReports dd = new DirectiveMessage.DirectiveWithChangeReports(
0352:                                aDirective, c);
0353:                        aDirective = dd;
0354:                    }
0355:                    sendQueue.add(aDirective);
0356:                }
0357:            }
0358:
0359:            public long currentTimeMillis() {
0360:                return alarmService.currentTimeMillis();
0361:            }
0362:
0363:            /**
0364:             * Add Object to the Blackboard Collection
0365:             */
0366:            public void add(Object o) {
0367:                publishAdd(o);
0368:            }
0369:
0370:            /** Removed Object to the Blackboard Collection */
0371:            public void remove(Object o) {
0372:                publishRemove(o);
0373:            }
0374:
0375:            /** Change Object to the Blackboard Collection */
0376:            public void change(Object o) {
0377:                publishChange(o, null);
0378:            }
0379:
0380:            public void change(Object o, Collection changes) {
0381:                publishChange(o, changes);
0382:            }
0383:
0384:            public Enumeration searchBlackboard(UnaryPredicate predicate) {
0385:                Vector vec = new Vector();
0386:
0387:                for (Iterator i = everything.getCollection().iterator(); i
0388:                        .hasNext();) {
0389:                    Object o = i.next();
0390:                    if (predicate.execute(o)) {
0391:                        vec.addElement(o);
0392:                    }
0393:                }
0394:                return vec.elements();
0395:            }
0396:
0397:            public int countBlackboard(Class cl) {
0398:                // could optimize by maintaining an LRU table
0399:                int c = 0;
0400:                for (Iterator i = everything.getCollection().iterator(); i
0401:                        .hasNext();) {
0402:                    Object o = i.next();
0403:                    if (o != null && cl.isAssignableFrom(o.getClass())) {
0404:                        c++;
0405:                    }
0406:                }
0407:                return c;
0408:            }
0409:
0410:            public int countBlackboard(UnaryPredicate predicate) {
0411:                int c = 0;
0412:                for (Iterator i = everything.getCollection().iterator(); i
0413:                        .hasNext();) {
0414:                    Object o = i.next();
0415:                    if (predicate.execute(o)) {
0416:                        c++;
0417:                    }
0418:                }
0419:                return c;
0420:            }
0421:
0422:            public int getBlackboardSize() {
0423:                return everything.size();
0424:            }
0425:
0426:            /**
0427:             * Process incoming directive messages. All messages have been
0428:             * blessed by the message manager. The messages are implicitly
0429:             * acknowledged by this method. The envelope of published events
0430:             * resulting from handling the messages is returned.
0431:             */
0432:            public final Envelope receiveMessages(List msgs) {
0433:                //try {
0434:                //  startTransaction();
0435:                for (Iterator iter = msgs.iterator(); iter.hasNext();) {
0436:                    DirectiveMessage msg = (DirectiveMessage) iter.next();
0437:                    applyMessageAgainstLogicProviders(msg);
0438:                }
0439:
0440:                checkUnpostedChangeReports();
0441:                // There really should not be any change tracking subscriptions, at
0442:                // least not in the base classes!!!  MT
0443:                resetSubscriptionChanges(); // clear change tracking subscriptions
0444:
0445:                return privateGetPublishedChanges();
0446:                //} finally {
0447:                //  stopTransaction();
0448:                //}
0449:            }
0450:
0451:            private final List oneEnvelope = new ArrayList(1);
0452:
0453:            /**
0454:             * called by distributor to prepare for "receiveEnvelope(..)" calls.
0455:             */
0456:            public final void prepareForEnvelopes() {
0457:                setTransactionOpenTime();
0458:            }
0459:
0460:            /** 
0461:             * Called by transaction close within the thread of Plugins.  
0462:             * Also called at the end of an LP pseudo-transaction, but
0463:             * most of the logic here is disabled in that case.
0464:             */
0465:            public final Envelope receiveEnvelope(Envelope envelope) {
0466:                oneEnvelope.add(envelope);
0467:                super .receiveEnvelopes(oneEnvelope, false); // Move to our inbox
0468:                oneEnvelope.clear();
0469:
0470:                if (!(envelope instanceof  PlanEnvelope)) {
0471:                    // although we aways consume envelopes, we only act on them
0472:                    // when we didn't generate 'em
0473:                    privateUpdateSubscriptions();
0474:
0475:                    try {
0476:                        boolean isPersistenceEnvelope = envelope instanceof  PersistenceEnvelope;
0477:                        List tuples = envelope.getRawDeltas();
0478:                        int l = tuples.size();
0479:                        for (int i = 0; i < l; i++) {
0480:                            try {
0481:                                callLogicProviders((EnvelopeTuple) tuples
0482:                                        .get(i), isPersistenceEnvelope);
0483:                            } catch (Exception e) {
0484:                                System.err.println("Caught " + e
0485:                                        + " while running logic providers.");
0486:                                e.printStackTrace();
0487:                            }
0488:                        }
0489:                    } finally {
0490:                        // clear subscriptions deltas, just in case.
0491:                        resetSubscriptionChanges();
0492:                    }
0493:                }
0494:
0495:                return privateGetPublishedChanges();
0496:            }
0497:
0498:            private static class DestinationKey {
0499:                public MessageAddress cid;
0500:                public MessageAttributes attrs;
0501:                private int hc;
0502:
0503:                public DestinationKey(MessageAddress cid,
0504:                        MessageAttributes attrs) {
0505:                    this .cid = cid;
0506:                    this .attrs = attrs;
0507:                    hc = cid.hashCode() + attrs.hashCode();
0508:                }
0509:
0510:                public int hashCode() {
0511:                    return hc;
0512:                }
0513:
0514:                public boolean equals(Object o) {
0515:                    if (o instanceof  DestinationKey) {
0516:                        DestinationKey that = (DestinationKey) o;
0517:                        return this .cid.equals(that.cid)
0518:                                && this .attrs.equals(that.attrs);
0519:                    }
0520:                    return false;
0521:                }
0522:            }
0523:
0524:            private MessageAddress getDirectiveDestinationOfKey(Object key) {
0525:                if (key instanceof  MessageAddress) {
0526:                    return (MessageAddress) key;
0527:                } else {
0528:                    DestinationKey dkey = (DestinationKey) key;
0529:                    return dkey.cid;
0530:                }
0531:            }
0532:
0533:            private Object getDirectiveKeyOfDestination(MessageAddress dest) {
0534:                MessageAttributes attrs = dest.getMessageAttributes();
0535:                if (attrs == null)
0536:                    return dest;
0537:                return new DestinationKey(dest, attrs);
0538:            }
0539:
0540:            /*
0541:             * Builds up hashmap of arrays of directives for each agent, <code>MessageAddress</code>.
0542:             * Modified to handle destinations of <code>AttributeBasedAddress</code>es, so that these are 
0543:             * sent properly as well. 
0544:             */
0545:            public void appendMessagesToSend(List messages) {
0546:                HashMap directivesByDestination = new HashMap(89);
0547:
0548:                // FIXME - prefill cache of aba roles to addresses here, instead of building up a cache
0549:                // fillCache();
0550:
0551:                for (Iterator iter = sendQueue.iterator(); iter.hasNext();) {
0552:                    Directive dir = (Directive) iter.next();
0553:                    MessageAddress dest = dir.getDestination();
0554:
0555:                    // get all destinations
0556:
0557:                    /**
0558:                     * If dest is an ABA, get all agent_names from cache or 
0559:                     * nameserver and fills in the hashmap of directives
0560:                     * Short and easy way to handle ABA destinations
0561:                     */
0562:                    ArrayList dirs;
0563:
0564:                    if (dest instanceof  AttributeBasedAddress) {
0565:                        //System.out.println("-------BLACKBOARD ENCOUNTERED ABA-----");
0566:                        MessageAttributes qosAttributes = dest
0567:                                .getMessageAttributes();
0568:                        Collection agents = getABAAddresses((AttributeBasedAddress) dest); // List of CIs
0569:                        // for all destinations, add a new directive array and insert a new directive, or add to 
0570:                        // an existing array in the destinations hashmap
0571:                        for (Iterator i = agents.iterator(); i.hasNext();) {
0572:                            MessageAddress agentAddress = (MessageAddress) i
0573:                                    .next();
0574:                            if (qosAttributes != null) {
0575:                                agentAddress = MessageAddress
0576:                                        .getMessageAddress(
0577:                                                (MessageAddress) agentAddress,
0578:                                                qosAttributes);
0579:                            }
0580:                            Object key = getDirectiveKeyOfDestination(agentAddress);
0581:                            dirs = (ArrayList) directivesByDestination.get(key);
0582:                            if (dirs == null) {
0583:                                dirs = new ArrayList(1);
0584:                                directivesByDestination.put(key, dirs);
0585:                            }
0586:                            dirs.add(dir);
0587:                        }
0588:                    } // done with aba handling
0589:
0590:                    /**
0591:                     * dest is regular address so proceed as before 
0592:                     */
0593:                    else {
0594:                        Object key = getDirectiveKeyOfDestination(dest);
0595:                        dirs = (ArrayList) directivesByDestination.get(key);
0596:                        if (dirs == null) {
0597:                            dirs = new ArrayList(1);
0598:                            directivesByDestination.put(key, dirs);
0599:                        }
0600:                        dirs.add(dir);
0601:                    }
0602:                }
0603:                /**
0604:                 * By now directivesByDestination only has ArrayLists of MessageAddresss,
0605:                 * so we can set their directives as before. 
0606:                 */
0607:                for (Iterator iter = directivesByDestination.entrySet()
0608:                        .iterator(); iter.hasNext();) {
0609:                    Map.Entry entry = (Map.Entry) iter.next();
0610:                    MessageAddress tmpci = getDirectiveDestinationOfKey(entry
0611:                            .getKey());
0612:                    ArrayList dirs = (ArrayList) entry.getValue();
0613:                    int size = dirs.size();
0614:                    if (size > 0) {
0615:                        Directive[] directives = (Directive[]) dirs
0616:                                .toArray(new Directive[size]);
0617:                        DirectiveMessage ndm = new DirectiveMessage(directives);
0618:                        ndm.setDestination(tmpci);
0619:                        ndm.setSource(self);
0620:                        messages.add(ndm);
0621:                        dirs.clear();
0622:                    }
0623:                }
0624:                directivesByDestination.clear(); // maybe help gc a bit.
0625:                sendQueue.clear();
0626:            }
0627:
0628:            public void restart(MessageAddress cid) {
0629:                myDomainService.invokeRestartLogicProviders(cid);
0630:            }
0631:
0632:            private void applyMessageAgainstLogicProviders(DirectiveMessage m) {
0633:                myDomainService.invokeMessageLogicProviders(m);
0634:            }
0635:
0636:            /**
0637:             * called by receiveEnvelope (on behalf of a plugin) and
0638:             * consumeTuple (on behalf of an LP).
0639:             */
0640:            private void callLogicProviders(EnvelopeTuple obj,
0641:                    boolean isPersistenceEnvelope) {
0642:                if (!isPersistenceEnvelope) {
0643:                    handleActiveSubscriptionObjects(obj);
0644:                }
0645:                myDomainService.invokeEnvelopeLogicProviders(obj,
0646:                        isPersistenceEnvelope);
0647:            }
0648:
0649:            private void handleActiveSubscriptionObjects(EnvelopeTuple tup) {
0650:                if (ActiveSubscriptionObject.deferCommit) {
0651:                    Object o = tup.getObject();
0652:                    if (o instanceof  ActiveSubscriptionObject) {
0653:                        ActiveSubscriptionObject aso = (ActiveSubscriptionObject) o;
0654:                        try {
0655:                            if (tup.isAdd()) {
0656:                                aso.addingToBlackboard(this , true);
0657:                            } else if (tup.isChange()) {
0658:                                aso.changingInBlackboard(this , true);
0659:                            } else if (tup.isRemove()) {
0660:                                aso.removingFromBlackboard(this , true);
0661:                            } // else ignore: bulk and event are uneffected by ASOs
0662:                        } catch (BlackboardException be) {
0663:                            logger.error(
0664:                                    "Deferred ActiveSubscriptionObject action could"
0665:                                            + " not be vetoed", be);
0666:                        }
0667:                    }
0668:                }
0669:            }
0670:
0671:            /**
0672:             * If {@link ActiveSubscrptionObject#deferCommit} is enabled, this
0673:             * class tracks objects that would be modified by an {@link
0674:             * ActiveSubscriptionObject} at publish time and logs a warning
0675:             * if they are accessed before the transaction close time.
0676:             * <p>
0677:             * For details, see bug 3663.
0678:             */
0679:            private final static ObjectTracker tracker = new ObjectTracker();
0680:
0681:            public static final ObjectTracker getTracker() {
0682:                return tracker;
0683:            }
0684:
0685:            public static class ObjectTracker {
0686:                private final static Logger log = Logging
0687:                        .getLogger(ObjectTracker.class);
0688:                private static final Set globalSet = new HashSet(11);
0689:                private final ThreadLocal localSet = new ThreadLocal() {
0690:                    protected synchronized Object initialValue() {
0691:                        return new HashSet(11);
0692:                    }
0693:                };
0694:
0695:                public Set getLocalSet() {
0696:                    return (Set) (localSet.get());
0697:                }
0698:
0699:                public void checkpoint(boolean commit, Object ob, Object a) {
0700:                    if (ActiveSubscriptionObject.deferCommit) {
0701:                        // short circuit if we aren't actually tracking ASO gaps
0702:                        if (commit) {
0703:                            resolve(ob, a);
0704:                        } else {
0705:                            track(ob, a);
0706:                        }
0707:                    }
0708:                }
0709:
0710:                private void track(Object ob, Object a) {
0711:                    Object o = new Traversal(ob, a);
0712:                    if (log.isDebugEnabled())
0713:                        log.debug("Tracking " + o);
0714:                    synchronized (globalSet) {
0715:                        globalSet.add(o);
0716:                    }
0717:                    getLocalSet().add(o);
0718:                }
0719:
0720:                private void resolve(Object ob, Object a) {
0721:                    Object o = new Traversal(ob, a);
0722:                    if (log.isDebugEnabled())
0723:                        log.debug("Resolving " + o);
0724:                    synchronized (globalSet) {
0725:                        globalSet.remove(o);
0726:                    }
0727:                    getLocalSet().remove(o);
0728:                }
0729:
0730:                public void clearLocalSet() {
0731:                    if (ActiveSubscriptionObject.deferCommit) {
0732:                        // short circuit if we aren't actually tracking ASO gaps
0733:                        getLocalSet().clear();
0734:                    }
0735:                }
0736:
0737:                public void checkAccess(Object ob, Object a) {
0738:                    if (ActiveSubscriptionObject.deferCommit) {
0739:                        // short circuit if we aren't actually tracking ASO gaps
0740:                        Object o = new Traversal(ob, a);
0741:                        if (log.isDebugEnabled())
0742:                            log.debug("Checking " + o);
0743:                        boolean locP = getLocalSet().contains(o);
0744:                        if (locP) {
0745:                            log.warn(
0746:                                    "Local access of uncommitted ActiveSubscriptionObject"
0747:                                            + " data " + o, new Throwable());
0748:                        } else {
0749:                            if (log.isDebugEnabled()) {
0750:                                boolean gloP;
0751:                                synchronized (globalSet) {
0752:                                    gloP = globalSet.contains(o);
0753:                                }
0754:                                if (gloP) {
0755:                                    log.debug(
0756:                                            "Global access of uncommitted ActiveSubscriptionObject"
0757:                                                    + " data " + o,
0758:                                            new Throwable());
0759:                                }
0760:                            }
0761:                        }
0762:                    }
0763:                }
0764:            }
0765:
0766:            private static class Traversal {
0767:                private final Object o;
0768:                private final Object a;
0769:
0770:                public Traversal(Object o, Object a) {
0771:                    this .o = o;
0772:                    this .a = a;
0773:                }
0774:
0775:                public boolean equals(Object thing) {
0776:                    if (thing instanceof  Traversal) {
0777:                        return o.equals(((Traversal) thing).o)
0778:                                && a.equals(((Traversal) thing).a);
0779:                    } else {
0780:                        return false;
0781:                    }
0782:                }
0783:
0784:                public int hashCode() {
0785:                    return o.hashCode();
0786:                } /*don't bother to spread 'em out*/
0787:
0788:                public String toString() {
0789:                    return "Traversal(" + a + ") " + o;
0790:                }
0791:            }
0792:
0793:            public PublishHistory getHistory() {
0794:                return myDistributor.history;
0795:            }
0796:
0797:            protected Envelope executeDelayedLPActions() {
0798:                myDomainService.invokeDelayedLPActions();
0799:                return privateGetPublishedChanges();
0800:            }
0801:
0802:            public PersistenceObject getPersistenceObject()
0803:                    throws PersistenceNotEnabledException {
0804:                return myDistributor.getPersistenceObject();
0805:            }
0806:
0807:            /** Ensure that all the domains know that this is THE blackboard */
0808:            protected void connectDomains() {
0809:                myDomainService.setBlackboard(this );
0810:                setReadyToPersist();
0811:            }
0812:
0813:            //
0814:            // Distributor
0815:            //
0816:            private Distributor createDistributor(
0817:                    MessageSwitchService msgSwitch, Object state) {
0818:                Distributor d = new Distributor(this , myServiceBroker, self
0819:                        .getAddress());
0820:                Persistence persistence = createPersistence();
0821:                boolean lazyPersistence = SystemProperties.getBoolean(
0822:                        "org.cougaar.core.persistence.lazy", true);
0823:                d.setPersistence(persistence, lazyPersistence);
0824:                d.start(msgSwitch, state); // msgSwitch, state
0825:
0826:                return d;
0827:            }
0828:
0829:            public Distributor getDistributor() {
0830:                return myDistributor;
0831:            }
0832:
0833:            protected Persistence createPersistence() {
0834:                try {
0835:                    return BlackboardPersistence.find(myServiceBroker);
0836:                } catch (PersistenceException e) {
0837:                    e.printStackTrace();
0838:                }
0839:                return null;
0840:            }
0841:
0842:            // -------- Methods for ABA Handling Below --------  needs work //
0843:
0844:            // AttributeBasedAddress to ABATranslation cache
0845:            private Map cache = new HashMap(89);
0846:
0847:            private CacheClearer cacheClearer = new CacheClearer();
0848:            private Object cacheClearerLock = new Object();
0849:
0850:            private static class ABATranslationImpl implements  ABATranslation {
0851:                Collection old, current;
0852:
0853:                ABATranslationImpl(Collection current) {
0854:                    this .current = current;
0855:                }
0856:
0857:                public Collection getOldTranslation() {
0858:                    return old;
0859:                }
0860:
0861:                public Collection getCurrentTranslation() {
0862:                    return current;
0863:                }
0864:
0865:                void setCurrentTranslation(Collection newCurrentTranslation) {
0866:                    current = newCurrentTranslation;
0867:                }
0868:
0869:                void setOldTranslation(Collection newOldTranslation) {
0870:                    old = newOldTranslation;
0871:                }
0872:
0873:                boolean isEmpty() {
0874:                    return old == null && current == null;
0875:                }
0876:            }
0877:
0878:            private class MyCommunityChangeListener extends
0879:                    CommunityChangeAdapter {
0880:                public void communityChanged(CommunityChangeEvent e) {
0881:                    if (logger.isDebugEnabled())
0882:                        logger.debug(e.toString());
0883:                    clearCache(e.getCommunityName());
0884:                }
0885:            }
0886:
0887:            /*
0888:             * Loops through the cache of ABAs and returns MessageAddresss, 
0889:             * else it querries the nameserver for all agents with the ABA's role attribute, 
0890:             * and builds the cache.
0891:             * @return list (copy) of the addresses of the agents matching the ABA
0892:             */
0893:            public Collection getABAAddresses(AttributeBasedAddress aba) {
0894:                // first look in cache
0895:                Collection matches = null;
0896:                synchronized (cache) {
0897:                    ABATranslation abaTranslation = (ABATranslation) cache
0898:                            .get(aba);
0899:                    if (abaTranslation != null) {
0900:                        matches = abaTranslation.getCurrentTranslation();
0901:                    }
0902:                }
0903:                if (matches == null) {
0904:                    // Not in cache. Get it the hard way from community service
0905:                    matches = lookupABA(aba);
0906:                    if (logger.isDebugEnabled()) {
0907:                        logger.debug("lookupABA: " + aba + "->" + matches);
0908:                    }
0909:                    matches = Collections.unmodifiableCollection(matches);
0910:                    synchronized (cache) {
0911:                        ABATranslationImpl abaTranslation = (ABATranslationImpl) cache
0912:                                .get(aba);
0913:                        if (abaTranslation == null) {
0914:                            abaTranslation = new ABATranslationImpl(matches);
0915:                            cache.put(aba, abaTranslation);
0916:                        } else {
0917:                            abaTranslation.setCurrentTranslation(matches);
0918:                        }
0919:                    }
0920:                }
0921:                return matches; // matches is unmodifiable - no need to copy it
0922:            }
0923:
0924:            // get the CommunityService when possible 
0925:            private CommunityService _myCommunityService = null;
0926:
0927:            private CommunityService getCommunityService() {
0928:                if (_myCommunityService != null) {
0929:                    return _myCommunityService;
0930:                } else {
0931:                    _myCommunityService = (CommunityService) myServiceBroker
0932:                            .getService(this , CommunityService.class, null);
0933:                    if (_myCommunityService == null) {
0934:                        logger
0935:                                .warn(
0936:                                        "Warning: Blackboard had no CommunityService -"
0937:                                                + " will fall back to dynamic service lookup."
0938:                                                + "  Risk of Deadlock!",
0939:                                        new Throwable());
0940:                    }
0941:                    _myCommunityService
0942:                            .addListener(new MyCommunityChangeListener());
0943:                    return _myCommunityService;
0944:                }
0945:            }
0946:
0947:            /*
0948:             * Queries NameServer and gets a collection MessageAddresss of
0949:             * all agents having the attribute type and value specified by the
0950:             * ABA and stores the collection in the ABA cache. Returns a List
0951:             * (copy) of the addresses found.
0952:             * @return list (copy) of the addresses of the agents matching the ABA
0953:             */
0954:            private Collection lookupABA(AttributeBasedAddress aba) {
0955:                CommunityService cs = getCommunityService();
0956:                String communitySpec = getCommunitySpec(aba);
0957:                String roleValue = aba.getAttributeValue();
0958:                String roleName = aba.getAttributeType();
0959:                String filter = "(" + roleName + "=" + roleValue + ")";
0960:
0961:                if (cs == null) {
0962:                    return Collections.EMPTY_SET;
0963:                }
0964:                Collection matches = cs.search(communitySpec, filter);
0965:                // MIK - do we really need to copy this?.  No, but we're also filtering for MAs
0966:                List cis = new ArrayList(matches.size());
0967:                for (Iterator i = matches.iterator(); i.hasNext();) {
0968:                    Object o = i.next();
0969:                    if (o instanceof  MessageAddress) {
0970:                        cis.add(o);
0971:                    }
0972:                }
0973:                return cis;
0974:            }
0975:
0976:            private void clearCache(String communityName) {
0977:                synchronized (cacheClearerLock) {
0978:                    if (cacheClearer == null) {
0979:                        cacheClearer = new CacheClearer();
0980:                    }
0981:                }
0982:                cacheClearer.add(communityName);
0983:            }
0984:
0985:            private class CacheClearer implements  Runnable {
0986:                private Set changedCommunities = new HashSet();
0987:                private Schedulable thread;
0988:
0989:                // The delay time spent waiting for additional community
0990:                // change notifications to arrive before processing
0991:                // all such notifications. A trade-off between
0992:                // time for ABA translations to be updated
0993:                // and time for the local Comm Service to update
0994:                // its cache from the NameService
0995:                // This is in milliseconds
0996:                private long waitForNewCommChangeNotifications = SystemProperties
0997:                        .getLong(
0998:                                "org.cougaar.core.blackboard.waitForNewCommChangeNotifications",
0999:                                1000L);
1000:
1001:                private void reschedule() {
1002:                    if (thread != null) {
1003:                        thread.schedule(waitForNewCommChangeNotifications);
1004:                    }
1005:                }
1006:
1007:                public synchronized void add(String communityName) {
1008:                    changedCommunities.add(communityName);
1009:                    if (thread == null) {
1010:                        thread = threadS.getThread(this , this ,
1011:                                "ABA Cache Clearer");
1012:                    }
1013:                    reschedule();
1014:                }
1015:
1016:                public void run() {
1017:                    Set changes = new HashSet(11);
1018:                    synchronized (this ) {
1019:                        if (changedCommunities.size() == 0) {
1020:                            // exit without rescheduling
1021:                            return;
1022:                        } else {
1023:                            changes.addAll(changedCommunities);
1024:                            changedCommunities.clear();
1025:                        } // end of synch block
1026:                    }
1027:
1028:                    if (myDistributor == null) {
1029:                        // Blackboard was stopped?
1030:                        if (logger != null && logger.isInfoEnabled())
1031:                            logger
1032:                                    .info("ABA Cache clearer dropping received changes cause"
1033:                                            + " Distributor is null -- assuming Blackboard is stopping");
1034:                        thread = null;
1035:                        return;
1036:                    }
1037:
1038:                    // Process the community changes
1039:                    myDistributor.invokeABAChangeLPs(changes);
1040:                    changes.clear();
1041:                    reschedule(); // take another pass in a bit in case something else comes in
1042:                }
1043:            }
1044:
1045:            /**
1046:             * Tell all the ABA interested LPs about the new 
1047:             * community memberships, using the local cache of ABA translations.
1048:             */
1049:            public void invokeABAChangeLPs(Set communities) {
1050:                synchronized (cache) {
1051:                    for (Iterator i = cache.values().iterator(); i.hasNext();) {
1052:                        ABATranslationImpl at = (ABATranslationImpl) i.next();
1053:                        at.setOldTranslation(at.getCurrentTranslation());
1054:                        at.setCurrentTranslation(null); // Filled in when needed
1055:                    }
1056:                    myDomainService.invokeABAChangeLogicProviders(communities);
1057:                    for (Iterator i = cache.values().iterator(); i.hasNext();) {
1058:                        ABATranslationImpl at = (ABATranslationImpl) i.next();
1059:                        at.setOldTranslation(null);
1060:                        if (at.isEmpty())
1061:                            i.remove();
1062:                    }
1063:                }
1064:            }
1065:
1066:            public ABATranslation getABATranslation(AttributeBasedAddress aba) {
1067:                synchronized (cache) {
1068:                    ABATranslationImpl ret = (ABATranslationImpl) cache
1069:                            .get(aba);
1070:                    if (ret == null)
1071:                        return null;
1072:                    if (ret.getOldTranslation() == null)
1073:                        return null;
1074:                    if (ret.getCurrentTranslation() == null) {
1075:                        ret.setCurrentTranslation(lookupABA(aba));
1076:                    }
1077:                    return ret;
1078:                }
1079:            }
1080:
1081:            // Stub - should be replaced when we figure out semantics for
1082:            // community name spec in the aba.d
1083:            protected String getCommunitySpec(AttributeBasedAddress aba) {
1084:                String abaComm = aba.getCommunityName();
1085:
1086:                if ((abaComm == null) || (abaComm.equals(""))
1087:                        || (abaComm.equals("*"))) {
1088:                    return "";
1089:                } else {
1090:                    return abaComm;
1091:                }
1092:            }
1093:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.