Source Code Cross Referenced for Distributor.java in  » Science » Cougaar12_4 » org » cougaar » core » blackboard » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Science » Cougaar12_4 » org.cougaar.core.blackboard 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /*
0002:         * <copyright>
0003:         *  
0004:         *  Copyright 1997-2004 BBNT Solutions, LLC
0005:         *  under sponsorship of the Defense Advanced Research Projects
0006:         *  Agency (DARPA).
0007:         * 
0008:         *  You can redistribute this software and/or modify it under the
0009:         *  terms of the Cougaar Open Source License as published on the
0010:         *  Cougaar Open Source Website (www.cougaar.org).
0011:         * 
0012:         *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
0013:         *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
0014:         *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
0015:         *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
0016:         *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
0017:         *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
0018:         *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
0019:         *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
0020:         *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
0021:         *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
0022:         *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
0023:         *  
0024:         * </copyright>
0025:         */
0026:
0027:        package org.cougaar.core.blackboard;
0028:
0029:        import java.lang.ref.Reference;
0030:        import java.lang.ref.ReferenceQueue;
0031:        import java.lang.ref.WeakReference;
0032:        import java.util.AbstractList;
0033:        import java.util.ArrayList;
0034:        import java.util.Collection;
0035:        import java.util.Collections;
0036:        import java.util.IdentityHashMap;
0037:        import java.util.Iterator;
0038:        import java.util.List;
0039:        import java.util.Map;
0040:        import java.util.Set;
0041:
0042:        import org.cougaar.bootstrap.SystemProperties;
0043:        import org.cougaar.core.agent.service.MessageSwitchService;
0044:        import org.cougaar.core.component.ServiceBroker;
0045:        import org.cougaar.core.logging.LoggingServiceWithPrefix;
0046:        import org.cougaar.core.mts.MessageAddress;
0047:        import org.cougaar.core.node.NodeBusyService;
0048:        import org.cougaar.core.persist.Persistence;
0049:        import org.cougaar.core.persist.PersistenceObject;
0050:        import org.cougaar.core.persist.PersistenceSubscriberState;
0051:        import org.cougaar.core.persist.RehydrationResult;
0052:        import org.cougaar.core.service.AgentIdentificationService;
0053:        import org.cougaar.core.service.QuiescenceReportForDistributorService;
0054:        import org.cougaar.core.service.ThreadService;
0055:        import org.cougaar.core.thread.Schedulable;
0056:        import org.cougaar.core.thread.SchedulableStatus;
0057:        import org.cougaar.util.UnaryPredicate;
0058:        import org.cougaar.util.log.Logger;
0059:        import org.cougaar.util.log.Logging;
0060:
0061:        /**
0062:         * The Distributor coordinates blackboard transactions, subscriber
0063:         * updates, and persistence.
0064:         *
0065:         * @property org.cougaar.core.agent.keepPublishHistory
0066:         *   if set to <em>true</em>, enables tracking of
0067:         *   all publishes.  Extremely expensive.
0068:         * @property org.cougaar.core.agent.singleTransactionModel
0069:         *   Enables a blackboard/agent run model where only one
0070:         *   transaction may be open at a given time.
0071:         */
0072:        final class Distributor {
0073:
0074:            /*
0075:             * Design summary:
0076:             *
0077:             * The distributor uses two locks:
0078:             *   distributorLock
0079:             *   transactionLock
0080:             * The distributorLock guards against "distribute()" and
0081:             * blackboard modification.  The transactionLock guards against
0082:             * start/finish transaction and persistence.  Only one distribute
0083:             * can occur at a time, and it can't occur at the same time as a
0084:             * persist.  Only one persist can occur at a time, and there must
0085:             * be no active transactions (to guard against object
0086:             * modifications during serialization), but we can be suspended.
0087:             * Transactions are allowed when we're not persisting or
0088:             * suspended.  It's okay for transactions to start/finish while
0089:             * a distribute is taking place, to provide parallelism.
0090:             *
0091:             * Persistence runs in either a lazy or non-lazy mode, where the
0092:             * best-supported mode is lazy.  Lazy persistence occurs
0093:             * periodically (33 seconds) if the blackboard has been modified
0094:             * during that time, and only if the persistence implementation
0095:             * indicates that it is "getPersistenceTime()".
0096:             *
0097:             * Old notes, partially accurate:
0098:             *
0099:             * Synchronization methodology. The distributor distributes three
0100:             * kinds of things: envelopes, messages, and timers. All three are
0101:             * synchronized on the distributor object (this) so only one of
0102:             * these can be in progress at a time. The distributor must have
0103:             * unfettered access to the subscribers meaning that the subscribers
0104:             * cannot themselves by locked while awaiting access to the
0105:             * distributor. Each distribution may generate a persistence
0106:             * delta. Persistence deltas are not generated unless there are no
0107:             * open transactions. Normally, subscribers are allowed to open
0108:             * transactions except if sufficient time has elapsed since the
0109:             * previous persistence delta requiring that a persistence delta
0110:             * must be generated. A persistence delta must also be generated if
0111:             * there are no open transactions and nothing has been distributed
0112:             * to any subscriber.
0113:             * @property org.cougaar.core.blackboard.persistenceReservationTimeout
0114:             * specifies the maximum delay allowed before using a persistence
0115:             * reservation. An agent exceeding this delay will have to request a
0116:             * new reservation. Default value is 120000 milliseconds. A value of
0117:             * zero disables the reservation mechanism.
0118:             */
0119:
0120:            /** The maximum interval between persistence deltas. */
0121:            private static final long MAX_PERSIST_INTERVAL = 37000L;
0122:            private static final long TIMER_PERSIST_INTERVAL = 33000L;
0123:
0124:            /** The maximum delay allowed before using a persistence reservation */
0125:            private static final String PERSISTENCE_RESERVATION_TIMEOUT_PROP = "org.cougaar.core.blackboard.persistenceReservationTimeout";
0126:            private static final long PERSISTENCE_RESERVATION_TIMEOUT = SystemProperties
0127:                    .getLong(PERSISTENCE_RESERVATION_TIMEOUT_PROP, 120000L);
0128:
0129:            private static final String PERSIST_AT_STARTUP_PROP = "org.cougaar.core.blackboard.persistAtStartup";
0130:            private static final boolean PERSIST_AT_STARTUP = SystemProperties
0131:                    .getBoolean(PERSIST_AT_STARTUP_PROP);
0132:
0133:            private static final UnaryPredicate anythingP = new UnaryPredicate() {
0134:                public boolean execute(Object o) {
0135:                    return (o != null);
0136:                }
0137:            };
0138:
0139:            private static final String SINGLE_TRANSACTION_PROP = "org.cougaar.core.agent.singleTransactionModel";
0140:            /** The default setting for single transaction model */
0141:            public static final boolean DEFAULT_SINGLE_TRANSACTION = false;
0142:            private static final boolean SINGLE_TRANSACTION = SystemProperties
0143:                    .getBoolean(SINGLE_TRANSACTION_PROP,
0144:                            DEFAULT_SINGLE_TRANSACTION);
0145:
0146:            //
0147:            // these are set in the constructor and are final:
0148:            //
0149:
0150:            /** The publish history is available for subscriber use. */
0151:            public final PublishHistory history = (SystemProperties
0152:                    .getBoolean("org.cougaar.core.agent.keepPublishHistory") ? new PublishHistory()
0153:                    : null);
0154:
0155:            /** the name of this distributor */
0156:            private final String name;
0157:
0158:            /** NodeBusyService so we can indicate when we are busy persisting */
0159:            private NodeBusyService nodeBusyService;
0160:
0161:            // blackboard, noted below.
0162:
0163:            /** the logger, which is thread safe */
0164:            private final Logger logger;
0165:
0166:            //
0167:            // these are set immediately following the constructor, and
0168:            // are effectively final:
0169:            //
0170:
0171:            /** True if using lazy persistence */
0172:            private boolean lazyPersistence = true;
0173:
0174:            /** The object we use to persist ourselves. */
0175:            private Persistence persistence = null;
0176:
0177:            /** If persistence is non-null, is it a dummy? */
0178:            private boolean dummyPersistence;
0179:
0180:            /** The reservation manager for persistence */
0181:            private static final ReservationManager persistenceReservationManager = new ReservationManager(
0182:                    PERSISTENCE_RESERVATION_TIMEOUT);
0183:
0184:            //
0185:            // lock for open/close transaction and persistence:
0186:            //
0187:
0188:            private final Object transactionLock = new Object();
0189:
0190:            /**
0191:             * when singleTransactionModel is enabled, the transactionMutex is
0192:             * locked for the duration of the transaction.  
0193:             */
0194:            private final Object transactionMutexLock = new Object();
0195:            private boolean transactionMutexInUse = false;
0196:
0197:            /**
0198:             * Acquire the transaction mutex.  No-op if not running in SINGLE_TRANSACTION mode.
0199:             */
0200:            protected final void acquireTransactionMutex() {
0201:                if (SINGLE_TRANSACTION) {
0202:                    synchronized (transactionMutexLock) {
0203:                        try {
0204:                            while (transactionMutexInUse) {
0205:                                transactionMutexLock.wait();
0206:                            }
0207:                            transactionMutexInUse = true;
0208:                        } catch (InterruptedException ie) {
0209:                            transactionMutexLock.notify();
0210:                            logger
0211:                                    .error(
0212:                                            "Interrupted while acquiring transactionMutex",
0213:                                            ie);
0214:                        }
0215:                    }
0216:                }
0217:            }
0218:
0219:            /**
0220:             * Release the transaction mutex. No-op if not running in SINGLE_TRANSACTION mode.
0221:             */
0222:            protected final void releaseTransactionMutex() {
0223:                if (SINGLE_TRANSACTION) {
0224:                    synchronized (transactionMutexLock) {
0225:                        transactionMutexInUse = false;
0226:                        transactionMutexLock.notify();
0227:                    }
0228:                }
0229:            }
0230:
0231:            // the following are locked under the transactionLock:
0232:
0233:            private static final int PERSIST_ACTIVE = (1 << 0);
0234:            private static final int PERSIST_PENDING = (1 << 1);
0235:            private static final int SUSPENDED = (1 << 2);
0236:            private int persistFlags = 0;
0237:            private int transactionCount = 0;
0238:
0239:            // temporary list for use within "doPersist":
0240:            private final List subscriberStates = new ArrayList();
0241:
0242:            //
0243:            // lock for distribute and blackboard access:
0244:            //
0245:
0246:            private final Object distributorLock = new Object();
0247:
0248:            // the following are locked under the distributorLock:
0249:
0250:            /** our blackboard */
0251:            private final Blackboard blackboard;
0252:
0253:            /** True if rehydration occurred at startup */
0254:            private boolean didRehydrate = false;
0255:
0256:            /** Tuples that have been distributed during a persistence ecoch */
0257:            private Map epochTuples;
0258:
0259:            /** The message manager for this agent */
0260:            private MessageManager myMessageManager = null;
0261:
0262:            /** Periodic persistence timer */
0263:            private Schedulable distributorTimer = null;
0264:
0265:            private final Subscribers subscribers = new Subscribers();
0266:
0267:            /** The time that we last persisted */
0268:            private long lastPersist = System.currentTimeMillis();
0269:
0270:            /**
0271:             * Do we need to persist sometime; changed state has not
0272:             * been persisted
0273:             */
0274:            private boolean needToPersist = false;
0275:
0276:            // temporary lists, for use within "distribute()":
0277:            private final List outboxes = new ArrayList();
0278:            private final List messagesToSend = new ArrayList();
0279:
0280:            // temporary list, for use within "receiveMessages()":
0281:            private final List directiveMessages = new ArrayList();
0282:
0283:            //
0284:            // periodic (lazy) persistence timer
0285:            //
0286:
0287:            private final Object timerLock = new Object();
0288:
0289:            // the following are locked under the timerLock:
0290:
0291:            private boolean timerActive;
0292:
0293:            //
0294:            // These are partially locked, and may cause bugs in
0295:            // the future.  In practice they seem to be fine:
0296:            //
0297:
0298:            /** Envelopes distributed since the last rehydrated delta */
0299:            private List postRehydrationEnvelopes = null;
0300:
0301:            /** All objects published prior to the last rehydrated delta */
0302:            private PersistenceEnvelope rehydrationEnvelope = null;
0303:
0304:            private QuiescenceMonitor quiescenceMonitor;
0305:            private boolean quiescenceReportEnabled = false; // Not yet enabled
0306:
0307:            private ServiceBroker sb;
0308:            private QuiescenceReportForDistributorService quiescenceReportService;
0309:
0310:            // Subscribers who should rehydrate before enabling the QuiescenceService
0311:            // that is, the set of subscribers remaining to rehydrate for
0312:            // which quiescence is required
0313:            private Set subscribersToRehydrate = Collections.EMPTY_SET;
0314:
0315:            /** Isolated constructor */
0316:            public Distributor(Blackboard blackboard, ServiceBroker sb,
0317:                    String name) {
0318:                this .blackboard = blackboard;
0319:                this .name = (name != null ? name : "Anonymous");
0320:                Logger l = Logging.getLogger(getClass());
0321:                this .logger = new LoggingServiceWithPrefix(l, this .name + ": ");
0322:                if (logger.isInfoEnabled()) {
0323:                    logger.info("Distributor started");
0324:                }
0325:
0326:                if (logger.isDebugEnabled()) {
0327:                    logger.debug("transaction options: " + "deferCommit="
0328:                            + ActiveSubscriptionObject.deferCommit
0329:                            + ", singleTransaction=" + SINGLE_TRANSACTION);
0330:                }
0331:
0332:                this .sb = sb;
0333:                nodeBusyService = (NodeBusyService) sb.getService(this ,
0334:                        NodeBusyService.class, null);
0335:                AgentIdentificationService ais = (AgentIdentificationService) sb
0336:                        .getService(this , AgentIdentificationService.class,
0337:                                null);
0338:                nodeBusyService.setAgentIdentificationService(ais);
0339:                quiescenceReportService = (QuiescenceReportForDistributorService) sb
0340:                        .getService(this ,
0341:                                QuiescenceReportForDistributorService.class,
0342:                                null);
0343:                quiescenceReportService.setAgentIdentificationService(ais);
0344:                quiescenceMonitor = new QuiescenceMonitor(
0345:                        quiescenceReportService, logger);
0346:            }
0347:
0348:            /**
0349:             * Called by the blackboard immediately after the constructor,
0350:             * and only once.
0351:             */
0352:            void setPersistence(Persistence newPersistence, boolean lazy) {
0353:                assert persistence == null : "persistence already set";
0354:                persistence = newPersistence;
0355:                lazyPersistence = lazy;
0356:                dummyPersistence = (persistence != null && persistence
0357:                        .isDummyPersistence());
0358:                initializeEpochEnvelopes();
0359:            }
0360:
0361:            /**
0362:             * Called by Subscriber to link into Blackboard persistence
0363:             * mechanism
0364:             */
0365:            Persistence getPersistence() {
0366:                return persistence;
0367:            }
0368:
0369:            /**
0370:             * Called by subscriber to discard rehydration info.
0371:             */
0372:            void discardRehydrationInfo(Subscriber subscriber) {
0373:                synchronized (distributorLock) {
0374:                    if (rehydrationEnvelope != null) {
0375:                        // Only need to check subscriber count to enable quiescence
0376:                        if (!quiescenceReportEnabled) {
0377:                            // Remove this subscriber from those we want to rehydrate before
0378:                            // registering with the quiescence service.
0379:                            // Later, in distribute, we'll see if this was in fact the last
0380:                            // relevant subscriber to be rehydrated
0381:                            PersistenceSubscriberState pss = persistence
0382:                                    .getSubscriberState(subscriber);
0383:                            if (pss != null) {
0384:                                String key = pss.getKey();
0385:                                boolean didRemove = subscribersToRehydrate
0386:                                        .remove(key);
0387:                                if (didRemove && logger.isDebugEnabled())
0388:                                    logger
0389:                                            .debug(".discard: Rehydrated q-relevant subscriber "
0390:                                                    + key);
0391:                            }
0392:                        }
0393:
0394:                        // Now dump its subscriber state
0395:                        // (needs to be after above so we can get the key...)
0396:                        persistence.discardSubscriberState(subscriber);
0397:
0398:                        // If there are no more persistence subscriber states left, dump
0399:                        // the rehydration information -- we're done with it
0400:                        if (!persistence.hasSubscriberStates()) {
0401:                            if (logger.isDebugEnabled())
0402:                                logger
0403:                                        .debug(".discard: No sub states left at all. discarding rehydration info");
0404:                            // discard rehydration info:
0405:                            rehydrationEnvelope = null;
0406:                            postRehydrationEnvelopes = null;
0407:                        }
0408:                    }
0409:                }
0410:            }
0411:
0412:            public boolean didRehydrate(Subscriber subscriber) {
0413:                if (!didRehydrate)
0414:                    return false;
0415:                return (persistence.getSubscriberState(subscriber) != null);
0416:            }
0417:
0418:            /**
0419:             * Pass thru to blackboard to safely return blackboard object
0420:             * counts.
0421:             * Used by BlackboardMetricsService
0422:             * @param cl The class type
0423:             */
0424:            public int getBlackboardCount(Class cl) {
0425:                assert !Thread.holdsLock(distributorLock);
0426:                assert !Thread.holdsLock(transactionLock);
0427:                synchronized (distributorLock) {
0428:                    return blackboard.countBlackboard(cl);
0429:                }
0430:            }
0431:
0432:            /**
0433:             * Pass thru to blackboard to safely return blackboard object
0434:             * counts.
0435:             * Used by BlackboardMetricsService
0436:             * @param predicate The objects to count in the blackboard
0437:             * @return int The count of objects that match the predicate
0438:             *   currently in the blackboard
0439:             */
0440:            public int getBlackboardCount(UnaryPredicate predicate) {
0441:                assert !Thread.holdsLock(distributorLock);
0442:                assert !Thread.holdsLock(transactionLock);
0443:                synchronized (distributorLock) {
0444:                    return blackboard.countBlackboard(predicate);
0445:                }
0446:            }
0447:
0448:            /**
0449:             * Pass thru to blackboard to safely return the size of the
0450:             * blackboard collection.
0451:             */
0452:            public int getBlackboardSize() {
0453:                assert !Thread.holdsLock(distributorLock);
0454:                assert !Thread.holdsLock(transactionLock);
0455:                synchronized (distributorLock) {
0456:                    return blackboard.getBlackboardSize();
0457:                }
0458:            }
0459:
0460:            /**
0461:             * Rehydrate this blackboard. If persistence is off, just create a
0462:             * MessageManager that does nothing. If persistence is on, try to
0463:             * rehydrate from existing persistence deltas. The result of this is
0464:             * a List of undistributed envelopes and a MessageManager. There
0465:             * might be no MessageManager in the result signifying that either
0466:             * there were no persistence deltas or that lazyPersistence was on
0467:             * so the message manager did not need to be saved. In either case
0468:             * the existence of an appropriate message manager is assured. The
0469:             * undistributed envelopes might be null signifying that there was
0470:             * no persistence deltas in existence. This is reflected in the
0471:             * value of the didRehydrate flag.
0472:             */
0473:            private void rehydrate(Object state) {
0474:                assert Thread.holdsLock(distributorLock);
0475:                assert !Thread.holdsLock(transactionLock);
0476:                if (persistence == null) {
0477:                    myMessageManager = new MessageManagerImpl(false);
0478:                    return;
0479:                }
0480:
0481:                // Get the rehydration data.  Note that the passed "state" is
0482:                // always null, so we can't use it to check for dummyPersistence.
0483:                rehydrationEnvelope = new PersistenceEnvelope();
0484:                RehydrationResult rr = persistence.rehydrate(
0485:                        rehydrationEnvelope, state);
0486:                if (rr.quiescenceMonitorState != null) {
0487:                    quiescenceMonitor.setState(rr.quiescenceMonitorState);
0488:                }
0489:
0490:                // Distributor tracks the subscribers who have yet to rehydrate
0491:                // However, we only really care about those for whom we require
0492:                // quiescence
0493:                subscribersToRehydrate = persistence.getSubscriberStateKeys();
0494:                if (logger.isDebugEnabled())
0495:                    logger.debug("Initial number of subscribers: "
0496:                            + subscribersToRehydrate.size());
0497:                // Synchronize so that a subscriber who already 
0498:                // is in discardRehydrationInfo doesnt cause a ConcurrentModExc
0499:                synchronized (distributorLock) {
0500:                    Iterator iter = subscribersToRehydrate.iterator();
0501:                    List toRemove = new ArrayList();
0502:                    while (iter.hasNext()) {
0503:                        String key = (String) iter.next();
0504:                        if (!quiescenceMonitor.isQuiescenceRequired(key)) {
0505:                            if (logger.isDebugEnabled())
0506:                                logger.debug("Ignoring subscriber " + key);
0507:                            toRemove.add(key);
0508:                        } else {
0509:                            if (logger.isDebugEnabled())
0510:                                logger.debug("NOT Ignoring subscriber " + key);
0511:                        }
0512:                    }
0513:                    subscribersToRehydrate.removeAll(toRemove);
0514:                    if (logger.isDebugEnabled())
0515:                        logger.debug("Trimmed number of subscribers: "
0516:                                + subscribersToRehydrate.size());
0517:                } // end synchronized(distributorLock)
0518:
0519:                if (lazyPersistence) { // Ignore any rehydrated message manager
0520:                    myMessageManager = new MessageManagerImpl(false);
0521:                } else {
0522:                    myMessageManager = rr.messageManager;
0523:                    if (myMessageManager == null) {
0524:                        myMessageManager = new MessageManagerImpl(true);
0525:                    }
0526:                }
0527:                if (rr.undistributedEnvelopes != null) {
0528:                    didRehydrate = true;
0529:                    postRehydrationEnvelopes = new ArrayList();
0530:                    postRehydrationEnvelopes.addAll(rr.undistributedEnvelopes);
0531:                    addEpochEnvelopes(rr.undistributedEnvelopes);
0532:                }
0533:            }
0534:
0535:            private MessageManager getMessageManager() {
0536:                return myMessageManager;
0537:            }
0538:
0539:            /**
0540:             * Rehydrate a new subscription. New subscriptions that correspond
0541:             * to persisted subscriptions are quietly fed the
0542:             * rehydrationEnvelope which has all the objects that had been
0543:             * distributed prior to the last rehydrated delta. Objects in the
0544:             * rehydrationEnvelope do not waken the subscriber; they are simply
0545:             * added to the subscription container. Next, the envelopes that
0546:             * were pending in the inbox of the subscriber at the last delta are
0547:             * fed to the subscription. The subscriber _will_ be notified of
0548:             * these. Finally, all envelopes in postRehydrationEnvelopes are fed
0549:             * to the subscription. The subscriber will be notified of these as
0550:             * well.
0551:             */
0552:            private void rehydrateNewSubscription(Subscription s,
0553:                    List persistedTransactionEnvelopes,
0554:                    List persistedPendingEnvelopes) {
0555:                assert Thread.holdsLock(distributorLock);
0556:                assert !Thread.holdsLock(transactionLock);
0557:                s.fill(rehydrationEnvelope);
0558:                if (persistedTransactionEnvelopes != null) {
0559:                    for (Iterator iter = persistedTransactionEnvelopes
0560:                            .iterator(); iter.hasNext();) {
0561:                        s.fill((Envelope) iter.next());
0562:                    }
0563:                }
0564:                if (persistedPendingEnvelopes != null) {
0565:                    for (Iterator iter = persistedPendingEnvelopes.iterator(); iter
0566:                            .hasNext();) {
0567:                        s.fill((Envelope) iter.next());
0568:                    }
0569:                }
0570:                for (Iterator iter = postRehydrationEnvelopes.iterator(); iter
0571:                        .hasNext();) {
0572:                    s.fill((Envelope) iter.next());
0573:                }
0574:            }
0575:
0576:            /**
0577:             * Start the distribution thread.
0578:             * Note that although Distributor is Runnable, it does not
0579:             * extend Thread, rather, it maintains its own thread state
0580:             * privately.
0581:             */
0582:            public void start(MessageSwitchService msgSwitch, Object state) {
0583:                assert !Thread.holdsLock(distributorLock);
0584:                assert !Thread.holdsLock(transactionLock);
0585:                synchronized (distributorLock) {
0586:                    rehydrate(state);
0587:                    getMessageManager().start(msgSwitch, didRehydrate);
0588:                    needToPersist = true;
0589:                }
0590:                // persist now, to claim the persistence ownership
0591:                // and save our component hierarchy
0592:                if (PERSIST_AT_STARTUP) {
0593:                    persist(false, false);
0594:                }
0595:                createTimer();
0596:                enableTimer();
0597:            }
0598:
0599:            private void createTimer() {
0600:                assert !Thread.holdsLock(distributorLock);
0601:                assert !Thread.holdsLock(transactionLock);
0602:                // disable the timer if it already exists
0603:                disableTimer();
0604:                // start a disabled periodic timer
0605:                synchronized (distributorLock) {
0606:                    if (lazyPersistence && !dummyPersistence
0607:                            && distributorTimer == null) {
0608:                        Runnable task = new Runnable() {
0609:                            public void run() {
0610:                                synchronized (timerLock) {
0611:                                    if (timerActive) {
0612:                                        timerPersist();
0613:                                    }
0614:                                }
0615:                            }
0616:                        };
0617:                        ThreadService tsvc = (ThreadService) sb.getService(
0618:                                this , ThreadService.class, null);
0619:                        distributorTimer = tsvc.getThread(this , task,
0620:                                "Persistence Timer",
0621:                                ThreadService.WILL_BLOCK_LANE);
0622:                        sb.releaseService(this , ThreadService.class, tsvc);
0623:                        distributorTimer.schedule(TIMER_PERSIST_INTERVAL,
0624:                                TIMER_PERSIST_INTERVAL);
0625:                    }
0626:                }
0627:            }
0628:
0629:            private void enableTimer() {
0630:                assert !Thread.holdsLock(distributorLock);
0631:                assert !Thread.holdsLock(transactionLock);
0632:                synchronized (timerLock) {
0633:                    timerActive = true;
0634:                }
0635:            }
0636:
0637:            private void disableTimer() {
0638:                assert !Thread.holdsLock(distributorLock);
0639:                assert !Thread.holdsLock(transactionLock);
0640:                // if the timer is running then this will block
0641:                synchronized (timerLock) {
0642:                    timerActive = false;
0643:                }
0644:            }
0645:
0646:            private void stopTimer() {
0647:                assert !Thread.holdsLock(distributorLock);
0648:                assert !Thread.holdsLock(transactionLock);
0649:                // disable the timer if it's running
0650:                disableTimer();
0651:                // stop the timer thread.  This isn't necessary unless we're
0652:                // really "stop()"ing the distributor, but it's useful to
0653:                // hide this thread when the timer is disabled.
0654:                synchronized (distributorLock) {
0655:                    if (lazyPersistence) {
0656:                        if (distributorTimer != null) {
0657:                            distributorTimer.cancelTimer();
0658:                            distributorTimer = null;
0659:                        }
0660:                    }
0661:                }
0662:            }
0663:
0664:            /**
0665:             * Complete any active persists, lockout transactions and
0666:             * timer-based persists, and only allow external state
0667:             * captures via "persistNow()" and "getPersistenceObject()".
0668:             */
0669:            public void suspend() {
0670:                assert !Thread.holdsLock(distributorLock);
0671:                assert !Thread.holdsLock(transactionLock);
0672:                if (logger.isInfoEnabled()) {
0673:                    logger.info("Suspending");
0674:                }
0675:                // stop our timer
0676:                disableTimer();
0677:                // pretend that we're getting ready to persist to lockout
0678:                // all transactions.  Then set the SUSPENDED flag, which
0679:                // acts as another block for "startTransaction".
0680:                // Then we release our active persistence lock to allow
0681:                // externally-signaled persistence while still preventing
0682:                // transaction-based persistence.
0683:                synchronized (transactionLock) {
0684:                    // wait for persistence to complete
0685:                    lockoutPersistence();
0686:                    // block transactions
0687:                    lockoutTransactions();
0688:                    // keep the transactions blocked
0689:                    setSuspended(true);
0690:                    // allow persistence
0691:                    resumePersistence();
0692:                    resumeTransactions();
0693:                }
0694:                if (logger.isInfoEnabled()) {
0695:                    logger.info("Suspended");
0696:                }
0697:            }
0698:
0699:            public void resume() {
0700:                assert !Thread.holdsLock(distributorLock);
0701:                assert !Thread.holdsLock(transactionLock);
0702:                // see suspend for comments
0703:                if (logger.isInfoEnabled()) {
0704:                    logger.info("Resuming");
0705:                }
0706:                synchronized (transactionLock) {
0707:                    setSuspended(false);
0708:                }
0709:                enableTimer();
0710:                if (logger.isInfoEnabled()) {
0711:                    logger.info("Resumed");
0712:                }
0713:            }
0714:
0715:            /**
0716:             * Stop the distribution thread.
0717:             * @see #start
0718:             */
0719:            public void stop() {
0720:                assert !Thread.holdsLock(distributorLock);
0721:                assert !Thread.holdsLock(transactionLock);
0722:                quiescenceReportService.setQuiescenceReportEnabled(false); // Finished here
0723:                sb.releaseService(this ,
0724:                        QuiescenceReportForDistributorService.class,
0725:                        quiescenceReportService);
0726:                stopTimer();
0727:                synchronized (distributorLock) {
0728:                    getMessageManager().stop();
0729:                }
0730:            }
0731:
0732:            //
0733:            // Subscriber Services
0734:            //
0735:
0736:            /**
0737:             * Register a Subscriber with the Distributor.  Future envelopes are
0738:             * distributed to all registered subscribers.
0739:             */
0740:            public void registerSubscriber(Subscriber subscriber) {
0741:                assert !Thread.holdsLock(distributorLock);
0742:                assert !Thread.holdsLock(transactionLock);
0743:                synchronized (distributorLock) {
0744:                    subscribers.register(subscriber);
0745:                }
0746:            }
0747:
0748:            /**
0749:             * Unregister subscriber with the Distributor. Future envelopes are
0750:             * not distributed to unregistered subscribers.
0751:             */
0752:            public void unregisterSubscriber(Subscriber subscriber) {
0753:                assert !Thread.holdsLock(distributorLock);
0754:                assert !Thread.holdsLock(transactionLock);
0755:                synchronized (distributorLock) {
0756:                    subscribers.unregister(subscriber);
0757:                }
0758:            }
0759:
0760:            /**
0761:             * Provide a new subscription with its initial fill. If the
0762:             * subscriber of the subscription was persisted, we fill from the
0763:             * persisted information (see rehydrateNewSubscription) otherwise
0764:             * we fill from the Blackboard (blackboard.fillSubscription).
0765:             */
0766:            public void fillSubscription(Subscription subscription) {
0767:                assert !Thread.holdsLock(distributorLock);
0768:                assert !Thread.holdsLock(transactionLock);
0769:                synchronized (distributorLock) {
0770:                    Subscriber subscriber = subscription.getSubscriber();
0771:                    PersistenceSubscriberState subscriberState = null;
0772:                    if (didRehydrate) {
0773:                        if (subscriber.isReadyToPersist()) {
0774:                            if (logger.isInfoEnabled()) {
0775:                                logger
0776:                                        .info("No subscriber state for late subscribe of "
0777:                                                + subscriber.getName());
0778:                            }
0779:                        } else {
0780:                            subscriberState = persistence
0781:                                    .getSubscriberState(subscriber);
0782:                        }
0783:                    }
0784:                    if (subscriberState != null
0785:                            && subscriberState.pendingEnvelopes != null) {
0786:                        rehydrateNewSubscription(subscription,
0787:                                subscriberState.transactionEnvelopes,
0788:                                subscriberState.pendingEnvelopes);
0789:                    } else {
0790:                        blackboard.fillSubscription(subscription);
0791:                    }
0792:
0793:                    // distribute the initialize envelope -- used for subs
0794:                    // to know when their first transaction is done
0795:                    /*
0796:                       {
0797:                    // option 1
0798:                    distribute(new InitializeSubscriptionEnvelope(subscription), null);
0799:                    }
0800:                     */
0801:                    // blackboard subscribes don't need an ISE to fill
0802:                    if (subscriber != blackboard) {
0803:                        // option 2
0804:                        Subscriber s = subscription.getSubscriber();
0805:                        List l = new ArrayList(1);
0806:                        l.add(new InitializeSubscriptionEnvelope(subscription));
0807:                        s.receiveEnvelopes(l, true); // queue in the right spot.
0808:                        // Assume at least one of the
0809:                        // sources of the objects that
0810:                        // filled the subscription
0811:                        // requires quiescence.
0812:                    }
0813:                }
0814:            }
0815:
0816:            public void fillQuery(Subscription subscription) {
0817:                assert !Thread.holdsLock(distributorLock);
0818:                assert !Thread.holdsLock(transactionLock);
0819:                synchronized (distributorLock) {
0820:                    blackboard.fillQuery(subscription);
0821:                }
0822:            }
0823:
0824:            /**
0825:             * The main workhorse of the distributor. Distributes the contents
0826:             * of an outbox envelope to everybody.
0827:             *
0828:             * If needToPersist is true and it is time to persist, we set the
0829:             * PERSIST_PENDING flag to prevent any further openTransactions from
0830:             * happening. Then we distribute the outbox and consequent
0831:             * envelopes. If anything is distributed, we set the needToPersist
0832:             * flag. Any messages generated by the Blackboard are gathered and
0833:             * given to the message manager for eventual transmission. Finally,
0834:             * the generation of a persistence delta is considered.
0835:             * @return true if a persistance snapshot should be taken
0836:             */
0837:            private boolean distribute(Envelope outbox, BlackboardClient client) {
0838:                return distribute(outbox, client, quiescenceMonitor
0839:                        .isQuiescenceRequired(client));
0840:            }
0841:
0842:            private boolean distribute(Envelope outbox,
0843:                    BlackboardClient client, boolean clientQuiescenceRequired) {
0844:                assert Thread.holdsLock(distributorLock);
0845:                assert !Thread.holdsLock(transactionLock);
0846:                boolean result = false;
0847:                if (outbox != null && logger.isDetailEnabled()
0848:                        && client != null) {
0849:                    logEnvelope(outbox, client);
0850:                }
0851:                if (persistence != null) {
0852:                    if (needToPersist) {
0853:                        if (timeToPersist()) {
0854:                            result = true;
0855:                        }
0856:                    }
0857:                }
0858:                blackboard.prepareForEnvelopes();
0859:                boolean haveSomethingToDistribute = false;
0860:                // nest loops in case delayed actions cascade into more
0861:                // lp actions.
0862:                while (outbox != null && outbox.size() > 0) {
0863:                    while (outbox != null && outbox.size() > 0) {
0864:                        outboxes.add(outbox);
0865:                        outbox = blackboard.receiveEnvelope(outbox);
0866:                        haveSomethingToDistribute = true;
0867:                    }
0868:
0869:                    // outbox should be empty at this point.
0870:                    // execute any pending DelayedLPActions
0871:                    outbox = blackboard.executeDelayedLPActions();
0872:                }
0873:                Blackboard.getTracker().clearLocalSet();
0874:
0875:                //      while (outbox != null && outbox.size() > 0) {
0876:                //        outboxes.add(outbox);
0877:                //        outbox = blackboard.receiveEnvelope(outbox);
0878:                //        haveSomethingToDistribute = true;
0879:                //      }
0880:
0881:                /**
0882:                 * busy indicates that we have found evidence that things are
0883:                 * still happening or are about to happen in this agent.
0884:                 */
0885:                boolean busy = haveSomethingToDistribute;
0886:                boolean newSubscribersAreQuiescent = true; // Until proven otherwise
0887:                if (persistence != null) {
0888:                    if (!needToPersist && haveSomethingToDistribute) {
0889:                        needToPersist = true;
0890:                    }
0891:                }
0892:                if (logger.isDebugEnabled()) {
0893:                    if (haveSomethingToDistribute) {
0894:                        logger.debug("quiescence"
0895:                                + (clientQuiescenceRequired ? "" : " not")
0896:                                + " required for outbox of "
0897:                                + client.getBlackboardClientName());
0898:                    }
0899:                }
0900:                for (Iterator iter = subscribers.iterator(); iter.hasNext();) {
0901:                    Subscriber subscriber = (Subscriber) iter.next();
0902:                    if (subscriber == blackboard)
0903:                        continue;
0904:                    boolean subscriberBusy = false;
0905:                    if (haveSomethingToDistribute) {
0906:                        subscriber.receiveEnvelopes(outboxes,
0907:                                clientQuiescenceRequired);
0908:                        subscriberBusy = true;
0909:                    } else if (subscriber.isBusy()) {
0910:                        subscriberBusy = true;
0911:                    }
0912:                    if (subscriberBusy) {
0913:                        busy = true;
0914:                    }
0915:
0916:                    if (quiescenceReportEnabled) {
0917:                        // Check if at least one subscriber we care about is
0918:                        // now non-quiescent
0919:                        BlackboardClient inboxClient = subscriber.getClient();
0920:                        if (quiescenceMonitor.isQuiescenceRequired(inboxClient)) {
0921:                            if (newSubscribersAreQuiescent
0922:                                    && !subscriber.isQuiescent()) {
0923:                                if (logger.isDebugEnabled()) {
0924:                                    logger
0925:                                            .debug("There is at least one q-relevant subscriber,"
0926:                                                    + " so this distribute prevents quiescence.");
0927:                                }
0928:                                if (logger.isDetailEnabled())
0929:                                    logger
0930:                                            .detail("       First such subscriber: "
0931:                                                    + inboxClient
0932:                                                            .getBlackboardClientName());
0933:                                newSubscribersAreQuiescent = false;
0934:                            }
0935:                        }
0936:                    }
0937:                }
0938:                // Fill messagesToSend
0939:                blackboard.appendMessagesToSend(messagesToSend);
0940:                if (messagesToSend.size() > 0) {
0941:                    for (Iterator i = messagesToSend.iterator(); i.hasNext();) {
0942:                        DirectiveMessage msg = (DirectiveMessage) i.next();
0943:                        // If the publisher is q-relevant, then we number the message
0944:                        // to require a succesful receipt for quiescence
0945:                        if (clientQuiescenceRequired)
0946:                            quiescenceMonitor.numberOutgoingMessage(msg);
0947:                        if (logger.isDetailEnabled()) {
0948:                            Directive[] dirs = msg.getDirectives();
0949:                            for (int j = 0; j < dirs.length; j++) {
0950:                                logger.detail("SEND   " + dirs[j]);
0951:                            }
0952:                        }
0953:                    }
0954:                    getMessageManager().sendMessages(messagesToSend.iterator());
0955:                }
0956:                messagesToSend.clear();
0957:                if (persistence != null) {
0958:                    if (postRehydrationEnvelopes != null) {
0959:                        postRehydrationEnvelopes.addAll(outboxes);
0960:                    }
0961:                    addEpochEnvelopes(outboxes);
0962:                    if (!needToPersist
0963:                            && getMessageManager().needAdvanceEpoch()) {
0964:                        needToPersist = true;
0965:                    }
0966:                    if (!busy && transactionCount > 1) {
0967:                        // This is not the last transaction, still busy
0968:                        busy = true;
0969:                    }
0970:                    if (needToPersist) {
0971:                        if (!busy) {
0972:                            result = true;
0973:                        }
0974:                    }
0975:                }
0976:                outboxes.clear();
0977:
0978:                // Update the cumulative quiescence of all the subscriber inboxes
0979:                // based on this distribute: Non-q if a Q-relevant comp published something
0980:                // to this agent, and we have at least one q-relevant subscriber
0981:                if (quiescenceReportEnabled)
0982:                    quiescenceMonitor
0983:                            .setSubscribersAreQuiescent(newSubscribersAreQuiescent);
0984:
0985:                // If we have not yet enabled the QRS, check to see if this 
0986:                // call to distribute means the last relevant subscriber
0987:                // has rehydrated. If so, enable the QuiescenceReportService.
0988:                // Note that discardRehydrationInfo is where we remove the subscribers
0989:                // from the list we want to rehydrate first.
0990:
0991:                // If this distribute is happening at the close of the first transaction
0992:                // of the last q-relevant subscriber, then we enable the QRS -- but _after_
0993:                // we've updated the Quiescence state based on the results of
0994:                // this distribute -- otherwise, the close of the first transaction
0995:                // of the last q-relevant subscriber would always make you non-q on rehydrate
0996:                if (!quiescenceReportEnabled) {
0997:                    int nLeft = subscribersToRehydrate.size();
0998:                    if (nLeft == 0) {
0999:                        if (logger.isDebugEnabled())
1000:                            logger
1001:                                    .debug(".distribute: No more q-relevant subscribers to rehydrate. Enabling QRS.");
1002:                        quiescenceReportService
1003:                                .setQuiescenceReportEnabled(true); // All ready to go
1004:                        quiescenceReportEnabled = true;
1005:                    } else if (logger.isDebugEnabled()) {
1006:                        if (nLeft == 1)
1007:                            logger
1008:                                    .debug(".distribute: sole remaining q-relevant subscriber"
1009:                                            + " left to rehydrate: "
1010:                                            + subscribersToRehydrate);
1011:                        else
1012:                            logger
1013:                                    .debug(".distribute: "
1014:                                            + nLeft
1015:                                            + " q-relevant subscribers left to rehydrate");
1016:                    }
1017:                } // end check to enable QRS
1018:
1019:                return result;
1020:            } // end of distribute()
1021:
1022:            private void initializeEpochEnvelopes() {
1023:                assert !Thread.holdsLock(distributorLock);
1024:                assert !Thread.holdsLock(transactionLock);
1025:                if (dummyPersistence) {
1026:                    return;
1027:                }
1028:                //epochEnvelopes = new ArrayList();
1029:                // this could use a regular equals-based map, but for persistence
1030:                // consistency we use an =='s based identity map.
1031:                epochTuples = new IdentityHashMap();
1032:            }
1033:
1034:            private void addEpochEnvelopes(List envelopes) {
1035:                assert Thread.holdsLock(distributorLock);
1036:                assert !Thread.holdsLock(transactionLock);
1037:                if (dummyPersistence) {
1038:                    return;
1039:                }
1040:                //epochEnvelopes.addAll(rr.undistributedEnvelopes);
1041:                for (int i = 0, n = envelopes.size(); i < n; i++) {
1042:                    Envelope e = (Envelope) envelopes.get(i);
1043:                    List tuples = e.getRawDeltas();
1044:                    for (int j = 0, m = tuples.size(); j < m; j++) {
1045:                        EnvelopeTuple tuple = (EnvelopeTuple) tuples.get(j);
1046:                        addEpochTuple(tuple);
1047:                    }
1048:                }
1049:            }
1050:
1051:            private void addEpochTuple(EnvelopeTuple tuple) {
1052:                assert Thread.holdsLock(distributorLock);
1053:                assert !Thread.holdsLock(transactionLock);
1054:                assert dummyPersistence;
1055:                Object o = tuple.getObject();
1056:                if (tuple.isBulk()) {
1057:                    Collection c = (Collection) o;
1058:                    for (Iterator iter = c.iterator(); iter.hasNext();) {
1059:                        Object o2 = iter.next();
1060:                        EnvelopeTuple oldTuple = (EnvelopeTuple) epochTuples
1061:                                .get(o2);
1062:                        if (oldTuple == null) {
1063:                            // N + A => A (common)
1064:                            epochTuples.put(o2, new AddEnvelopeTuple(o2));
1065:                        } else {
1066:                            // Other theoretical cases - not handled
1067:                            // A + A => A (error?)
1068:                            // C + A => C (error?)
1069:                            // R + A => R (error?)
1070:                        }
1071:                    }
1072:                } else {
1073:                    EnvelopeTuple oldTuple = (EnvelopeTuple) epochTuples.get(o);
1074:                    if (oldTuple == null) {
1075:                        // N + A => A  (common, ~25%)
1076:                        // N + C => C  (rare)
1077:                        // N + R => R  (rare)
1078:                        epochTuples.put(o, tuple);
1079:                    } else if (oldTuple.isAdd()) {
1080:                        if (tuple.isRemove()) {
1081:                            // A + R => N  (common, ~10%)
1082:                            epochTuples.remove(o);
1083:                        } else {
1084:                            // Other theoretical cases, not handled
1085:                            // A + A => A  (error?)
1086:                            // A + C => A  (common, ~60%)
1087:                        }
1088:                    } else if (oldTuple.isChange()) {
1089:                        if (tuple.isAdd()) {
1090:                            // C + A => C  (error?)
1091:                        } else {
1092:                            // C + C => C  (common, ~2%)
1093:                            // C + R => R  (rare)
1094:                            epochTuples.put(o, tuple);
1095:                        }
1096:                    } else {
1097:                        // Other theoretical cases, not handled
1098:                        // R + A => R  (error?)
1099:                        // R + C => R  (error?  probably a race)
1100:                        // R + R => R  (error?)
1101:                    }
1102:                }
1103:            }
1104:
1105:            private List getEpochEnvelopes() {
1106:                assert Thread.holdsLock(distributorLock);
1107:                assert !Thread.holdsLock(transactionLock);
1108:                if (dummyPersistence) {
1109:                    // we haven't been collecting tuples, since persistence is
1110:                    // disabled, but we must capture the full blackboard for a
1111:                    // forced persist (e.g. mobility).
1112:                    QuerySubscription everything = new QuerySubscription(
1113:                            anythingP);
1114:                    blackboard.fillQuery(everything);
1115:                    Envelope envelope = new Envelope();
1116:                    envelope.bulkAddObject(everything.getCollection());
1117:                    List ret = new ArrayList(1);
1118:                    ret.add(envelope);
1119:                    return ret;
1120:                }
1121:                //return epochEnvelopes;
1122:                // create a list with a single envelope that contains all our
1123:                // epochTuples, where the list's "clear()" clears our tuple
1124:                // map for early GC.
1125:                final Envelope e = new Envelope();
1126:                e.getRawDeltas().addAll(epochTuples.values());
1127:                List ret = new AbstractList() {
1128:                    private Envelope envelope = e;
1129:
1130:                    public int size() {
1131:                        return (envelope == null ? 0 : 1);
1132:                    }
1133:
1134:                    public Object get(int index) {
1135:                        if (index != 0 || envelope == null) {
1136:                            throw new IndexOutOfBoundsException("Index: "
1137:                                    + index + ", Size: " + size());
1138:                        }
1139:                        return envelope;
1140:                    }
1141:
1142:                    public void clear() {
1143:                        if (envelope != null) {
1144:                            envelope = null;
1145:                            epochTuples.clear();
1146:                        }
1147:                    }
1148:                };
1149:                return ret;
1150:            }
1151:
1152:            private void clearEpochEnvelopes() {
1153:                assert Thread.holdsLock(distributorLock);
1154:                assert !Thread.holdsLock(transactionLock);
1155:                if (dummyPersistence) {
1156:                    return;
1157:                }
1158:                //epochEnvelopes.clear();
1159:                epochTuples.clear();
1160:            }
1161:
1162:            public void restartAgent(MessageAddress cid) {
1163:                assert !Thread.holdsLock(distributorLock);
1164:                assert !Thread.holdsLock(transactionLock);
1165:                boolean persistWanted = false;
1166:                try {
1167:                    startTransaction();
1168:                    synchronized (distributorLock) {
1169:                        try {
1170:                            blackboard.startTransaction();
1171:                            blackboard.restart(cid);
1172:                            Envelope envelope = blackboard
1173:                                    .receiveMessages(Collections.EMPTY_LIST);
1174:                            persistWanted = distribute(envelope, blackboard
1175:                                    .getClient(), true);
1176:                        } finally {
1177:                            blackboard.stopTransaction();
1178:                        }
1179:                    }
1180:                    if (persistWanted)
1181:                        maybeSetPersistPending();
1182:                } finally {
1183:                    finishTransaction();
1184:                }
1185:            }
1186:
1187:            /**
1188:             * Process directive and ack messages from other agents. Acks
1189:             * are given to the message manager. Directive messages are passed
1190:             * through the message manager for validation and then given to
1191:             * the Blackboard for processing. Envelopes resulting from that
1192:             * processing are distributed.
1193:             */
1194:            public void receiveMessages(List messages) {
1195:                assert !Thread.holdsLock(distributorLock);
1196:                assert !Thread.holdsLock(transactionLock);
1197:                try {
1198:                    // Do one of the incoming messages come from a q-relevant component?
1199:                    boolean messagesRequireQuiescence = false;
1200:                    boolean persistWanted = false;
1201:                    startTransaction(); // Blocks if persistence active
1202:
1203:                    synchronized (distributorLock) {
1204:                        for (Iterator msgs = messages.iterator(); msgs
1205:                                .hasNext();) {
1206:                            Object m = msgs.next();
1207:                            if (m instanceof  DirectiveMessage) {
1208:                                DirectiveMessage msg = (DirectiveMessage) m;
1209:                                int code = getMessageManager().receiveMessage(
1210:                                        msg);
1211:                                if ((code & MessageManager.RESTART) != 0) {
1212:                                    try {
1213:                                        blackboard.startTransaction();
1214:                                        blackboard.restart(msg.getSource());
1215:                                    } finally {
1216:                                        blackboard.stopTransaction();
1217:                                    }
1218:                                }
1219:                                if ((code & MessageManager.IGNORE) == 0) {
1220:                                    if (logger.isDetailEnabled()) {
1221:                                        Directive[] dirs = msg.getDirectives();
1222:                                        for (int i = 0; i < dirs.length; i++) {
1223:                                            logger.detail("RECV   " + dirs[i]);
1224:                                        }
1225:                                    }
1226:                                    directiveMessages.add(msg);
1227:                                    messagesRequireQuiescence |= quiescenceMonitor
1228:                                            .numberIncomingMessage(msg);
1229:                                }
1230:                            } else if (m instanceof  AckDirectiveMessage) {
1231:                                AckDirectiveMessage msg = (AckDirectiveMessage) m;
1232:                                int code = getMessageManager().receiveAck(msg);
1233:                                if ((code & MessageManager.RESTART) != 0) {
1234:                                    // Remote agent has restarted
1235:                                    blackboard.restart(msg.getSource());
1236:                                }
1237:                            }
1238:                        }
1239:                        // We nominally ack the messages here so the persisted
1240:                        // state will include the acks. The acks are not actually
1241:                        // sent until the persistence delta is concluded.
1242:                        getMessageManager().acknowledgeMessages(
1243:                                directiveMessages.iterator());
1244:
1245:                        try {
1246:                            blackboard.startTransaction();
1247:                            Envelope envelope = blackboard
1248:                                    .receiveMessages(directiveMessages);
1249:                            persistWanted = distribute(envelope, blackboard
1250:                                    .getClient(), messagesRequireQuiescence);
1251:                        } finally {
1252:                            blackboard.stopTransaction();
1253:                        }
1254:                        directiveMessages.clear();
1255:                    }
1256:                    if (persistWanted)
1257:                        maybeSetPersistPending();
1258:                } finally {
1259:                    finishTransaction();
1260:                }
1261:            }
1262:
1263:            public void invokeABAChangeLPs(Set communities) {
1264:                assert !Thread.holdsLock(distributorLock);
1265:                assert !Thread.holdsLock(transactionLock);
1266:                if (logger.isDebugEnabled()) {
1267:                    if (Thread.holdsLock(distributorLock)) {
1268:                        logger
1269:                                .debug(
1270:                                        "Distributor.invokeABAChangeLPs invoked inside distributorLock",
1271:                                        new Throwable());
1272:                    }
1273:                    if (Thread.holdsLock(transactionLock)) {
1274:                        logger
1275:                                .debug(
1276:                                        "Distributor.invokeABAChangeLPs invoked inside transactionLock",
1277:                                        new Throwable());
1278:                    }
1279:                }
1280:                boolean persistWanted = false;
1281:                synchronized (distributorLock) {
1282:                    try {
1283:                        blackboard.startTransaction();
1284:                        blackboard.invokeABAChangeLPs(communities);
1285:                        Envelope envelope = blackboard
1286:                                .receiveMessages(Collections.EMPTY_LIST);
1287:                        persistWanted = distribute(envelope, blackboard
1288:                                .getClient(), true);
1289:                    } finally {
1290:                        blackboard.stopTransaction();
1291:                    }
1292:                }
1293:                if (persistWanted)
1294:                    maybeSetPersistPending();
1295:            }
1296:
1297:            /**
1298:             * Generate a persistence delta if possible and necessary. It is
1299:             * possible if the transaction count is zero and necessary if either
1300:             * PERSIST_PENDING is true or needToPersist is true and we are not
1301:             * busy. This second clause is needed so we don't end up being idle
1302:             * with needToPersist being true.
1303:             */
1304:            private PersistenceObject doPersistence(
1305:                    boolean persistedStateNeeded, boolean full) {
1306:                assert !Thread.holdsLock(distributorLock);
1307:                assert !Thread.holdsLock(transactionLock);
1308:                assert transactionCount == 1 : transactionCount;
1309:                assert persistFlags != 0 : persistFlags;
1310:                nodeBusyService.setAgentBusy(true);
1311:                List epochEnvelopes;
1312:                synchronized (distributorLock) {
1313:                    epochEnvelopes = getEpochEnvelopes();
1314:                    for (Iterator iter = subscribers.iterator(); iter.hasNext();) {
1315:                        Subscriber subscriber = (Subscriber) iter.next();
1316:                        if (subscriber.isReadyToPersist()) {
1317:                            subscriberStates
1318:                                    .add(new PersistenceSubscriberState(
1319:                                            subscriber));
1320:                        }
1321:                    }
1322:                }
1323:                PersistenceObject result;
1324:                synchronized (getMessageManager()) {
1325:                    getMessageManager().advanceEpoch();
1326:                    result = persistence.persist(epochEnvelopes,
1327:                            Collections.EMPTY_LIST, subscriberStates,
1328:                            persistedStateNeeded, full, lazyPersistence ? null
1329:                                    : getMessageManager(), quiescenceMonitor
1330:                                    .getState());
1331:                }
1332:                synchronized (distributorLock) {
1333:                    clearEpochEnvelopes();
1334:                    subscriberStates.clear();
1335:                    needToPersist = false;
1336:                    lastPersist = System.currentTimeMillis();
1337:                }
1338:                setPersistPending(false);
1339:                nodeBusyService.setAgentBusy(false);
1340:                return result;
1341:            }
1342:
1343:            private boolean timeToLazilyPersist() {
1344:                if (dummyPersistence) {
1345:                    return false;
1346:                }
1347:                long overdue = System.currentTimeMillis()
1348:                        - persistence.getPersistenceTime();
1349:                return overdue > 0L;
1350:            }
1351:
1352:            private boolean timeToPersist() {
1353:                assert Thread.holdsLock(distributorLock);
1354:                assert !Thread.holdsLock(transactionLock);
1355:                if (dummyPersistence) {
1356:                    return false;
1357:                }
1358:                long nextPersistTime = Math.min(lastPersist
1359:                        + MAX_PERSIST_INTERVAL, persistence
1360:                        .getPersistenceTime());
1361:                return (System.currentTimeMillis() >= nextPersistTime);
1362:            }
1363:
1364:            /**
1365:             * Transaction control
1366:             */
1367:            private static final String START_EXCUSE = "Waiting to Start Transaction";
1368:
1369:            public void startTransaction() {
1370:                assert !Thread.holdsLock(distributorLock);
1371:                assert !Thread.holdsLock(transactionLock);
1372:
1373:                acquireTransactionMutex();
1374:
1375:                synchronized (transactionLock) {
1376:                    while (persistFlags != 0) {
1377:                        try {
1378:                            SchedulableStatus.beginWait(START_EXCUSE);
1379:                            transactionLock.wait();
1380:                        } catch (InterruptedException ie) {
1381:                        } finally {
1382:                            SchedulableStatus.endBlocking();
1383:                        }
1384:                    }
1385:                    transactionCount++;
1386:                }
1387:            }
1388:
1389:            public void finishTransaction(Envelope outbox,
1390:                    BlackboardClient client) {
1391:                assert !Thread.holdsLock(distributorLock);
1392:                assert !Thread.holdsLock(transactionLock);
1393:                boolean persistWanted = false;
1394:                synchronized (distributorLock) {
1395:                    persistWanted = distribute(outbox, client);
1396:                }
1397:                if (persistWanted)
1398:                    maybeSetPersistPending();
1399:                finishTransaction();
1400:            }
1401:
1402:            private void finishTransaction() {
1403:                assert !Thread.holdsLock(distributorLock);
1404:                assert !Thread.holdsLock(transactionLock);
1405:                boolean doIt = false;
1406:                synchronized (transactionLock) {
1407:                    if ((persistFlags & PERSIST_PENDING) != 0) {
1408:                        if (transactionCount == 1) {
1409:                            // transactionCount == 1 implies ((persistFlags & PERSIST_ACTIVE) == 0)
1410:                            if (logger.isInfoEnabled()) {
1411:                                logger
1412:                                        .info("Persist started (finish transaction)");
1413:                            }
1414:                            assert ((persistFlags & PERSIST_ACTIVE) == 0);
1415:                            doIt = true;
1416:                        } else {
1417:                            if (logger.isInfoEnabled()) {
1418:                                logger.info("Persist deferred, "
1419:                                        + transactionCount
1420:                                        + " transactions open");
1421:                            }
1422:                        }
1423:                    }
1424:                }
1425:
1426:                releaseTransactionMutex();
1427:
1428:                if (doIt) {
1429:                    doPersistence(false, false);
1430:                    if (logger.isInfoEnabled()) {
1431:                        logger.info("Persist completed (finish transaction)");
1432:                    }
1433:                    if (logger.isInfoEnabled())
1434:                        logger.info("reservation release");
1435:                    persistenceReservationManager.release(persistence);
1436:                }
1437:                synchronized (transactionLock) {
1438:                    transactionCount--;
1439:                    assert transactionCount >= 0 : transactionCount;
1440:                    transactionLock.notifyAll();
1441:                }
1442:            }
1443:
1444:            /**
1445:             * Force a persistence delta to be generated.
1446:             */
1447:            public void persistNow() {
1448:                persist(false, true);
1449:            }
1450:
1451:            /**
1452:             * Force a (full) persistence delta to be generated and
1453:             * return result
1454:             */
1455:            public PersistenceObject getPersistenceObject() {
1456:                return persist(true, true);
1457:            }
1458:
1459:            /**
1460:             * Generate a persistence delta and (maybe) return the data of
1461:             * that delta.
1462:             * <p>
1463:             * This code parallels that of start/finish transaction except
1464:             * that:<pre>
1465:             *   distribute() is not called
1466:             *   we wait for all transactions to close
1467:             * </pre><p>
1468:             * A "PERSIST_ACTIVE" flag is used to guarantee that only
1469:             * one persist occurs at a time.
1470:             *
1471:             * @param isStateWanted true if the data of a full persistence
1472:             *   delta is wanted
1473:             * @return a state Object including all the data from a full
1474:             * persistence delta if isStateWanted is true, null if
1475:             *   isStateWanted is false.
1476:             */
1477:            private PersistenceObject persist(boolean isStateWanted,
1478:                    boolean full) {
1479:                assert !Thread.holdsLock(distributorLock);
1480:                assert !Thread.holdsLock(transactionLock);
1481:                if (persistence == null || (dummyPersistence && !isStateWanted)) {
1482:                    return null;
1483:                }
1484:                while (true) { // Loop until we succeed and return a result
1485:                    synchronized (transactionLock) {
1486:                        // First we must wait for any other persistence activity to cease
1487:                        lockoutPersistence();
1488:                    }
1489:                    // Then we have to wait for our reservation to become ripe
1490:                    if (logger.isInfoEnabled()) {
1491:                        logger.info("reservation waitfor");
1492:                    }
1493:                    persistenceReservationManager.waitFor(persistence, logger);
1494:                    synchronized (transactionLock) {
1495:                        // Now we wait for all transactions to finish
1496:                        lockoutTransactions();
1497:                    }
1498:                    try {
1499:                        // We commit to doing persistence, but we may have lost our reservation
1500:                        if (persistenceReservationManager.commit(persistence)) {
1501:                            if (logger.isInfoEnabled()) {
1502:                                logger.info("Persist started (persist)");
1503:                            }
1504:                            PersistenceObject result = doPersistence(
1505:                                    isStateWanted, (full || dummyPersistence));
1506:                            if (logger.isInfoEnabled())
1507:                                logger.info("reservation release");
1508:                            persistenceReservationManager.release(persistence);
1509:                            if (logger.isInfoEnabled()) {
1510:                                logger.info("Persist completed (persist)");
1511:                            }
1512:                            return result;
1513:                        } else {
1514:                            // Lost our reservation. Back out of transaction counting and start over
1515:                            if (logger.isInfoEnabled()) {
1516:                                logger.info("Reservation lost, starting over");
1517:                            }
1518:                        }
1519:                    } finally {
1520:                        synchronized (transactionLock) {
1521:                            resumePersistence();
1522:                            resumeTransactions();
1523:                        }
1524:                    }
1525:                }
1526:            }
1527:
1528:            private static final String LOCKOUT_EXCUSE = "Waiting for active persist to complete";
1529:
1530:            private void lockoutPersistence() {
1531:                assert !Thread.holdsLock(distributorLock);
1532:                assert Thread.holdsLock(transactionLock);
1533:                if ((persistFlags & PERSIST_ACTIVE) != 0) {
1534:                    if (logger.isInfoEnabled()) {
1535:                        logger.info(LOCKOUT_EXCUSE);
1536:                    }
1537:                    do {
1538:                        try {
1539:                            SchedulableStatus.beginWait(LOCKOUT_EXCUSE);
1540:                            transactionLock.wait();
1541:                        } catch (InterruptedException ie) {
1542:                        } finally {
1543:                            SchedulableStatus.endBlocking();
1544:                        }
1545:                    } while ((persistFlags & PERSIST_ACTIVE) != 0);
1546:                }
1547:                persistFlags |= PERSIST_ACTIVE;
1548:                // don't care about PERSIST_PENDING or SUSPENDED.
1549:                //
1550:                // To persist we still need to lockout transactions
1551:            }
1552:
1553:            private void resumePersistence() {
1554:                assert !Thread.holdsLock(distributorLock);
1555:                assert Thread.holdsLock(transactionLock);
1556:                assert ((persistFlags & PERSIST_ACTIVE) != 0);
1557:                persistFlags &= ~PERSIST_ACTIVE;
1558:                // we need to notify the transaction lock.  
1559:                //
1560:                // This is always called just before resuming transactions,
1561:                // which will notify the lock, so we'll let that method
1562:                // do the notification.
1563:            }
1564:
1565:            private static final String TRANSACTION_EXCUSE = "Waiting for transaction to close";
1566:
1567:            private void lockoutTransactions() {
1568:                assert !Thread.holdsLock(distributorLock);
1569:                assert Thread.holdsLock(transactionLock);
1570:                if (logger.isInfoEnabled()) {
1571:                    logger.info("Waiting for " + transactionCount
1572:                            + " transactions to close");
1573:                }
1574:                transactionCount++;
1575:                assert transactionCount >= 1 : transactionCount;
1576:                while (transactionCount > 1) {
1577:                    try {
1578:                        SchedulableStatus.beginWait(TRANSACTION_EXCUSE);
1579:                        transactionLock.wait();
1580:                    } catch (InterruptedException ie) {
1581:                    } finally {
1582:                        SchedulableStatus.endBlocking();
1583:                    }
1584:                }
1585:                assert transactionCount == 1 : transactionCount;
1586:                // if we've locked out persistence then it's now save to persist
1587:            }
1588:
1589:            private void resumeTransactions() {
1590:                assert !Thread.holdsLock(distributorLock);
1591:                assert Thread.holdsLock(transactionLock);
1592:                assert transactionCount >= 1 : transactionCount;
1593:                transactionCount--;
1594:                assert transactionCount == 0 : transactionCount;
1595:                transactionLock.notifyAll();
1596:            }
1597:
1598:            private void maybeSetPersistPending() {
1599:                assert !Thread.holdsLock(distributorLock);
1600:                assert !Thread.holdsLock(transactionLock);
1601:                if (!lazyPersistence || timeToLazilyPersist()) {
1602:                    if (persistenceReservationManager.request(persistence)) {
1603:                        if (logger.isInfoEnabled())
1604:                            logger.info("reservation request succeeded");
1605:                        setPersistPending(true);
1606:                    } else if ((persistFlags & PERSIST_PENDING) != 0) {
1607:                        // Whoops. We lost our reservation
1608:                        if (logger.isInfoEnabled())
1609:                            logger.info("reservation request failed");
1610:                        setPersistPending(false); // Need to start all over
1611:                    }
1612:                }
1613:            }
1614:
1615:            private void setPersistPending(boolean on) {
1616:                assert !Thread.holdsLock(distributorLock);
1617:                assert !Thread.holdsLock(transactionLock);
1618:                synchronized (transactionLock) {
1619:                    if (on) {
1620:                        persistFlags |= PERSIST_PENDING;
1621:                    } else if ((persistFlags & PERSIST_PENDING) != 0) {
1622:                        persistFlags &= ~PERSIST_PENDING;
1623:                        transactionLock.notifyAll();
1624:                    }
1625:                }
1626:            }
1627:
1628:            private void setSuspended(boolean on) {
1629:                assert !Thread.holdsLock(distributorLock);
1630:                assert Thread.holdsLock(transactionLock);
1631:                if (on) {
1632:                    persistFlags |= SUSPENDED;
1633:                } else if ((persistFlags & SUSPENDED) != 0) {
1634:                    persistFlags &= ~SUSPENDED;
1635:                    transactionLock.notifyAll();
1636:                }
1637:            }
1638:
1639:            private void timerPersist() {
1640:                assert !Thread.holdsLock(distributorLock);
1641:                assert !Thread.holdsLock(transactionLock);
1642:                if (lazyPersistence && !timeToLazilyPersist()) {
1643:                    return;
1644:                }
1645:                synchronized (distributorLock) {
1646:                    if (!needToPersist) {
1647:                        return;
1648:                    }
1649:                }
1650:                synchronized (transactionLock) {
1651:                    if ((persistFlags & PERSIST_ACTIVE) != 0) {
1652:                        return;
1653:                    }
1654:                    if ((persistFlags & PERSIST_PENDING) != 0
1655:                            && transactionCount >= 1) {
1656:                        return;
1657:                    }
1658:                }
1659:                // there's a small window here where we may do an unnecessary
1660:                // second persist, but that would be harmless and quick
1661:                persist(false, false);
1662:            }
1663:
1664:            private void logEnvelope(Envelope envelope, BlackboardClient client) {
1665:                if (!logger.isDetailEnabled())
1666:                    return;
1667:                boolean first = true;
1668:                for (Iterator tuples = envelope.getAllTuples(); tuples
1669:                        .hasNext();) {
1670:                    if (first) {
1671:                        logger.detail("Outbox of "
1672:                                + client.getBlackboardClientName());
1673:                        first = false;
1674:                    }
1675:                    EnvelopeTuple tuple = (EnvelopeTuple) tuples.next();
1676:                    if (tuple.isBulk()) {
1677:                        for (Iterator objects = ((BulkEnvelopeTuple) tuple)
1678:                                .getCollection().iterator(); objects.hasNext();) {
1679:                            logger.detail("BULK   " + objects.next());
1680:                        }
1681:                    } else {
1682:                        String kind = "";
1683:                        if (tuple.isAdd()) {
1684:                            kind = "ADD    ";
1685:                        } else if (tuple.isChange()) {
1686:                            kind = "CHANGE ";
1687:                        } else {
1688:                            kind = "REMOVE ";
1689:                        }
1690:                        logger.detail(kind + tuple.getObject());
1691:                    }
1692:                }
1693:            }
1694:
1695:            public String getName() {
1696:                return name;
1697:            } // agent name
1698:
1699:            public String toString() {
1700:                return "<Distributor " + getName() + ">";
1701:            }
1702:
1703:            /**
1704:             * Hold our set of registered Subscribers.
1705:             * <p>
1706:             * The Distributor must lock this object with its
1707:             * "distributorLock".
1708:             */
1709:            private static class Subscribers {
1710:                private List subscribers = new ArrayList();
1711:                ReferenceQueue refQ = new ReferenceQueue();
1712:
1713:                public void register(Subscriber subscriber) {
1714:                    checkRefQ();
1715:                    subscribers.add(new WeakReference(subscriber, refQ));
1716:                }
1717:
1718:                public void unregister(Subscriber subscriber) {
1719:                    for (Iterator iter = subscribers.iterator(); iter.hasNext();) {
1720:                        WeakReference ref = (WeakReference) iter.next();
1721:                        if (ref.get() == subscriber) {
1722:                            iter.remove();
1723:                        }
1724:                    }
1725:                }
1726:
1727:                public Iterator iterator() {
1728:                    checkRefQ();
1729:
1730:                    class MyIterator implements  Iterator {
1731:                        private Object n = null;
1732:                        private Iterator iter;
1733:
1734:                        public MyIterator(Iterator it) {
1735:                            iter = it;
1736:                            advance();
1737:                        }
1738:
1739:                        /**
1740:                         * Advance to the next non-null element, dropping
1741:                         * nulls along the way
1742:                         */
1743:                        private void advance() {
1744:                            while (iter.hasNext()) {
1745:                                WeakReference ref = (WeakReference) iter.next();
1746:                                n = ref.get();
1747:                                if (n == null) {
1748:                                    iter.remove();
1749:                                } else {
1750:                                    return;
1751:                                }
1752:                            }
1753:                            // ran off the end,
1754:                            n = null;
1755:                        }
1756:
1757:                        public boolean hasNext() {
1758:                            return (n != null);
1759:                        }
1760:
1761:                        public Object next() {
1762:                            Object x = n;
1763:                            advance();
1764:                            return x;
1765:                        }
1766:
1767:                        public void remove() {
1768:                            iter.remove();
1769:                        }
1770:                    }
1771:                    ;
1772:                    return new MyIterator(subscribers.iterator());
1773:                }
1774:
1775:                private void checkRefQ() {
1776:                    Reference ref;
1777:                    while ((ref = refQ.poll()) != null) {
1778:                        subscribers.remove(ref);
1779:                    }
1780:                }
1781:            }
1782:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.