0001: /*
0002: * <copyright>
0003: *
0004: * Copyright 1997-2004 BBNT Solutions, LLC
0005: * under sponsorship of the Defense Advanced Research Projects
0006: * Agency (DARPA).
0007: *
0008: * You can redistribute this software and/or modify it under the
0009: * terms of the Cougaar Open Source License as published on the
0010: * Cougaar Open Source Website (www.cougaar.org).
0011: *
0012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
0013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
0014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
0015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
0016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
0017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
0018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
0019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
0020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
0021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
0022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
0023: *
0024: * </copyright>
0025: */
0026:
0027: package org.cougaar.core.blackboard;
0028:
0029: import java.lang.ref.Reference;
0030: import java.lang.ref.ReferenceQueue;
0031: import java.lang.ref.WeakReference;
0032: import java.util.AbstractList;
0033: import java.util.ArrayList;
0034: import java.util.Collection;
0035: import java.util.Collections;
0036: import java.util.IdentityHashMap;
0037: import java.util.Iterator;
0038: import java.util.List;
0039: import java.util.Map;
0040: import java.util.Set;
0041:
0042: import org.cougaar.bootstrap.SystemProperties;
0043: import org.cougaar.core.agent.service.MessageSwitchService;
0044: import org.cougaar.core.component.ServiceBroker;
0045: import org.cougaar.core.logging.LoggingServiceWithPrefix;
0046: import org.cougaar.core.mts.MessageAddress;
0047: import org.cougaar.core.node.NodeBusyService;
0048: import org.cougaar.core.persist.Persistence;
0049: import org.cougaar.core.persist.PersistenceObject;
0050: import org.cougaar.core.persist.PersistenceSubscriberState;
0051: import org.cougaar.core.persist.RehydrationResult;
0052: import org.cougaar.core.service.AgentIdentificationService;
0053: import org.cougaar.core.service.QuiescenceReportForDistributorService;
0054: import org.cougaar.core.service.ThreadService;
0055: import org.cougaar.core.thread.Schedulable;
0056: import org.cougaar.core.thread.SchedulableStatus;
0057: import org.cougaar.util.UnaryPredicate;
0058: import org.cougaar.util.log.Logger;
0059: import org.cougaar.util.log.Logging;
0060:
0061: /**
0062: * The Distributor coordinates blackboard transactions, subscriber
0063: * updates, and persistence.
0064: *
0065: * @property org.cougaar.core.agent.keepPublishHistory
0066: * if set to <em>true</em>, enables tracking of
0067: * all publishes. Extremely expensive.
0068: * @property org.cougaar.core.agent.singleTransactionModel
0069: * Enables a blackboard/agent run model where only one
0070: * transaction may be open at a given time.
0071: */
0072: final class Distributor {
0073:
0074: /*
0075: * Design summary:
0076: *
0077: * The distributor uses two locks:
0078: * distributorLock
0079: * transactionLock
0080: * The distributorLock guards against "distribute()" and
0081: * blackboard modification. The transactionLock guards against
0082: * start/finish transaction and persistence. Only one distribute
0083: * can occur at a time, and it can't occur at the same time as a
0084: * persist. Only one persist can occur at a time, and there must
0085: * be no active transactions (to guard against object
0086: * modifications during serialization), but we can be suspended.
0087: * Transactions are allowed when we're not persisting or
0088: * suspended. It's okay for transactions to start/finish while
0089: * a distribute is taking place, to provide parallelism.
0090: *
0091: * Persistence runs in either a lazy or non-lazy mode, where the
0092: * best-supported mode is lazy. Lazy persistence occurs
0093: * periodically (33 seconds) if the blackboard has been modified
0094: * during that time, and only if the persistence implementation
0095: * indicates that it is "getPersistenceTime()".
0096: *
0097: * Old notes, partially accurate:
0098: *
0099: * Synchronization methodology. The distributor distributes three
0100: * kinds of things: envelopes, messages, and timers. All three are
0101: * synchronized on the distributor object (this) so only one of
0102: * these can be in progress at a time. The distributor must have
0103: * unfettered access to the subscribers meaning that the subscribers
0104: * cannot themselves by locked while awaiting access to the
0105: * distributor. Each distribution may generate a persistence
0106: * delta. Persistence deltas are not generated unless there are no
0107: * open transactions. Normally, subscribers are allowed to open
0108: * transactions except if sufficient time has elapsed since the
0109: * previous persistence delta requiring that a persistence delta
0110: * must be generated. A persistence delta must also be generated if
0111: * there are no open transactions and nothing has been distributed
0112: * to any subscriber.
0113: * @property org.cougaar.core.blackboard.persistenceReservationTimeout
0114: * specifies the maximum delay allowed before using a persistence
0115: * reservation. An agent exceeding this delay will have to request a
0116: * new reservation. Default value is 120000 milliseconds. A value of
0117: * zero disables the reservation mechanism.
0118: */
0119:
0120: /** The maximum interval between persistence deltas. */
0121: private static final long MAX_PERSIST_INTERVAL = 37000L;
0122: private static final long TIMER_PERSIST_INTERVAL = 33000L;
0123:
0124: /** The maximum delay allowed before using a persistence reservation */
0125: private static final String PERSISTENCE_RESERVATION_TIMEOUT_PROP = "org.cougaar.core.blackboard.persistenceReservationTimeout";
0126: private static final long PERSISTENCE_RESERVATION_TIMEOUT = SystemProperties
0127: .getLong(PERSISTENCE_RESERVATION_TIMEOUT_PROP, 120000L);
0128:
0129: private static final String PERSIST_AT_STARTUP_PROP = "org.cougaar.core.blackboard.persistAtStartup";
0130: private static final boolean PERSIST_AT_STARTUP = SystemProperties
0131: .getBoolean(PERSIST_AT_STARTUP_PROP);
0132:
0133: private static final UnaryPredicate anythingP = new UnaryPredicate() {
0134: public boolean execute(Object o) {
0135: return (o != null);
0136: }
0137: };
0138:
0139: private static final String SINGLE_TRANSACTION_PROP = "org.cougaar.core.agent.singleTransactionModel";
0140: /** The default setting for single transaction model */
0141: public static final boolean DEFAULT_SINGLE_TRANSACTION = false;
0142: private static final boolean SINGLE_TRANSACTION = SystemProperties
0143: .getBoolean(SINGLE_TRANSACTION_PROP,
0144: DEFAULT_SINGLE_TRANSACTION);
0145:
0146: //
0147: // these are set in the constructor and are final:
0148: //
0149:
0150: /** The publish history is available for subscriber use. */
0151: public final PublishHistory history = (SystemProperties
0152: .getBoolean("org.cougaar.core.agent.keepPublishHistory") ? new PublishHistory()
0153: : null);
0154:
0155: /** the name of this distributor */
0156: private final String name;
0157:
0158: /** NodeBusyService so we can indicate when we are busy persisting */
0159: private NodeBusyService nodeBusyService;
0160:
0161: // blackboard, noted below.
0162:
0163: /** the logger, which is thread safe */
0164: private final Logger logger;
0165:
0166: //
0167: // these are set immediately following the constructor, and
0168: // are effectively final:
0169: //
0170:
0171: /** True if using lazy persistence */
0172: private boolean lazyPersistence = true;
0173:
0174: /** The object we use to persist ourselves. */
0175: private Persistence persistence = null;
0176:
0177: /** If persistence is non-null, is it a dummy? */
0178: private boolean dummyPersistence;
0179:
0180: /** The reservation manager for persistence */
0181: private static final ReservationManager persistenceReservationManager = new ReservationManager(
0182: PERSISTENCE_RESERVATION_TIMEOUT);
0183:
0184: //
0185: // lock for open/close transaction and persistence:
0186: //
0187:
0188: private final Object transactionLock = new Object();
0189:
0190: /**
0191: * when singleTransactionModel is enabled, the transactionMutex is
0192: * locked for the duration of the transaction.
0193: */
0194: private final Object transactionMutexLock = new Object();
0195: private boolean transactionMutexInUse = false;
0196:
0197: /**
0198: * Acquire the transaction mutex. No-op if not running in SINGLE_TRANSACTION mode.
0199: */
0200: protected final void acquireTransactionMutex() {
0201: if (SINGLE_TRANSACTION) {
0202: synchronized (transactionMutexLock) {
0203: try {
0204: while (transactionMutexInUse) {
0205: transactionMutexLock.wait();
0206: }
0207: transactionMutexInUse = true;
0208: } catch (InterruptedException ie) {
0209: transactionMutexLock.notify();
0210: logger
0211: .error(
0212: "Interrupted while acquiring transactionMutex",
0213: ie);
0214: }
0215: }
0216: }
0217: }
0218:
0219: /**
0220: * Release the transaction mutex. No-op if not running in SINGLE_TRANSACTION mode.
0221: */
0222: protected final void releaseTransactionMutex() {
0223: if (SINGLE_TRANSACTION) {
0224: synchronized (transactionMutexLock) {
0225: transactionMutexInUse = false;
0226: transactionMutexLock.notify();
0227: }
0228: }
0229: }
0230:
0231: // the following are locked under the transactionLock:
0232:
0233: private static final int PERSIST_ACTIVE = (1 << 0);
0234: private static final int PERSIST_PENDING = (1 << 1);
0235: private static final int SUSPENDED = (1 << 2);
0236: private int persistFlags = 0;
0237: private int transactionCount = 0;
0238:
0239: // temporary list for use within "doPersist":
0240: private final List subscriberStates = new ArrayList();
0241:
0242: //
0243: // lock for distribute and blackboard access:
0244: //
0245:
0246: private final Object distributorLock = new Object();
0247:
0248: // the following are locked under the distributorLock:
0249:
0250: /** our blackboard */
0251: private final Blackboard blackboard;
0252:
0253: /** True if rehydration occurred at startup */
0254: private boolean didRehydrate = false;
0255:
0256: /** Tuples that have been distributed during a persistence ecoch */
0257: private Map epochTuples;
0258:
0259: /** The message manager for this agent */
0260: private MessageManager myMessageManager = null;
0261:
0262: /** Periodic persistence timer */
0263: private Schedulable distributorTimer = null;
0264:
0265: private final Subscribers subscribers = new Subscribers();
0266:
0267: /** The time that we last persisted */
0268: private long lastPersist = System.currentTimeMillis();
0269:
0270: /**
0271: * Do we need to persist sometime; changed state has not
0272: * been persisted
0273: */
0274: private boolean needToPersist = false;
0275:
0276: // temporary lists, for use within "distribute()":
0277: private final List outboxes = new ArrayList();
0278: private final List messagesToSend = new ArrayList();
0279:
0280: // temporary list, for use within "receiveMessages()":
0281: private final List directiveMessages = new ArrayList();
0282:
0283: //
0284: // periodic (lazy) persistence timer
0285: //
0286:
0287: private final Object timerLock = new Object();
0288:
0289: // the following are locked under the timerLock:
0290:
0291: private boolean timerActive;
0292:
0293: //
0294: // These are partially locked, and may cause bugs in
0295: // the future. In practice they seem to be fine:
0296: //
0297:
0298: /** Envelopes distributed since the last rehydrated delta */
0299: private List postRehydrationEnvelopes = null;
0300:
0301: /** All objects published prior to the last rehydrated delta */
0302: private PersistenceEnvelope rehydrationEnvelope = null;
0303:
0304: private QuiescenceMonitor quiescenceMonitor;
0305: private boolean quiescenceReportEnabled = false; // Not yet enabled
0306:
0307: private ServiceBroker sb;
0308: private QuiescenceReportForDistributorService quiescenceReportService;
0309:
0310: // Subscribers who should rehydrate before enabling the QuiescenceService
0311: // that is, the set of subscribers remaining to rehydrate for
0312: // which quiescence is required
0313: private Set subscribersToRehydrate = Collections.EMPTY_SET;
0314:
0315: /** Isolated constructor */
0316: public Distributor(Blackboard blackboard, ServiceBroker sb,
0317: String name) {
0318: this .blackboard = blackboard;
0319: this .name = (name != null ? name : "Anonymous");
0320: Logger l = Logging.getLogger(getClass());
0321: this .logger = new LoggingServiceWithPrefix(l, this .name + ": ");
0322: if (logger.isInfoEnabled()) {
0323: logger.info("Distributor started");
0324: }
0325:
0326: if (logger.isDebugEnabled()) {
0327: logger.debug("transaction options: " + "deferCommit="
0328: + ActiveSubscriptionObject.deferCommit
0329: + ", singleTransaction=" + SINGLE_TRANSACTION);
0330: }
0331:
0332: this .sb = sb;
0333: nodeBusyService = (NodeBusyService) sb.getService(this ,
0334: NodeBusyService.class, null);
0335: AgentIdentificationService ais = (AgentIdentificationService) sb
0336: .getService(this , AgentIdentificationService.class,
0337: null);
0338: nodeBusyService.setAgentIdentificationService(ais);
0339: quiescenceReportService = (QuiescenceReportForDistributorService) sb
0340: .getService(this ,
0341: QuiescenceReportForDistributorService.class,
0342: null);
0343: quiescenceReportService.setAgentIdentificationService(ais);
0344: quiescenceMonitor = new QuiescenceMonitor(
0345: quiescenceReportService, logger);
0346: }
0347:
0348: /**
0349: * Called by the blackboard immediately after the constructor,
0350: * and only once.
0351: */
0352: void setPersistence(Persistence newPersistence, boolean lazy) {
0353: assert persistence == null : "persistence already set";
0354: persistence = newPersistence;
0355: lazyPersistence = lazy;
0356: dummyPersistence = (persistence != null && persistence
0357: .isDummyPersistence());
0358: initializeEpochEnvelopes();
0359: }
0360:
0361: /**
0362: * Called by Subscriber to link into Blackboard persistence
0363: * mechanism
0364: */
0365: Persistence getPersistence() {
0366: return persistence;
0367: }
0368:
0369: /**
0370: * Called by subscriber to discard rehydration info.
0371: */
0372: void discardRehydrationInfo(Subscriber subscriber) {
0373: synchronized (distributorLock) {
0374: if (rehydrationEnvelope != null) {
0375: // Only need to check subscriber count to enable quiescence
0376: if (!quiescenceReportEnabled) {
0377: // Remove this subscriber from those we want to rehydrate before
0378: // registering with the quiescence service.
0379: // Later, in distribute, we'll see if this was in fact the last
0380: // relevant subscriber to be rehydrated
0381: PersistenceSubscriberState pss = persistence
0382: .getSubscriberState(subscriber);
0383: if (pss != null) {
0384: String key = pss.getKey();
0385: boolean didRemove = subscribersToRehydrate
0386: .remove(key);
0387: if (didRemove && logger.isDebugEnabled())
0388: logger
0389: .debug(".discard: Rehydrated q-relevant subscriber "
0390: + key);
0391: }
0392: }
0393:
0394: // Now dump its subscriber state
0395: // (needs to be after above so we can get the key...)
0396: persistence.discardSubscriberState(subscriber);
0397:
0398: // If there are no more persistence subscriber states left, dump
0399: // the rehydration information -- we're done with it
0400: if (!persistence.hasSubscriberStates()) {
0401: if (logger.isDebugEnabled())
0402: logger
0403: .debug(".discard: No sub states left at all. discarding rehydration info");
0404: // discard rehydration info:
0405: rehydrationEnvelope = null;
0406: postRehydrationEnvelopes = null;
0407: }
0408: }
0409: }
0410: }
0411:
0412: public boolean didRehydrate(Subscriber subscriber) {
0413: if (!didRehydrate)
0414: return false;
0415: return (persistence.getSubscriberState(subscriber) != null);
0416: }
0417:
0418: /**
0419: * Pass thru to blackboard to safely return blackboard object
0420: * counts.
0421: * Used by BlackboardMetricsService
0422: * @param cl The class type
0423: */
0424: public int getBlackboardCount(Class cl) {
0425: assert !Thread.holdsLock(distributorLock);
0426: assert !Thread.holdsLock(transactionLock);
0427: synchronized (distributorLock) {
0428: return blackboard.countBlackboard(cl);
0429: }
0430: }
0431:
0432: /**
0433: * Pass thru to blackboard to safely return blackboard object
0434: * counts.
0435: * Used by BlackboardMetricsService
0436: * @param predicate The objects to count in the blackboard
0437: * @return int The count of objects that match the predicate
0438: * currently in the blackboard
0439: */
0440: public int getBlackboardCount(UnaryPredicate predicate) {
0441: assert !Thread.holdsLock(distributorLock);
0442: assert !Thread.holdsLock(transactionLock);
0443: synchronized (distributorLock) {
0444: return blackboard.countBlackboard(predicate);
0445: }
0446: }
0447:
0448: /**
0449: * Pass thru to blackboard to safely return the size of the
0450: * blackboard collection.
0451: */
0452: public int getBlackboardSize() {
0453: assert !Thread.holdsLock(distributorLock);
0454: assert !Thread.holdsLock(transactionLock);
0455: synchronized (distributorLock) {
0456: return blackboard.getBlackboardSize();
0457: }
0458: }
0459:
0460: /**
0461: * Rehydrate this blackboard. If persistence is off, just create a
0462: * MessageManager that does nothing. If persistence is on, try to
0463: * rehydrate from existing persistence deltas. The result of this is
0464: * a List of undistributed envelopes and a MessageManager. There
0465: * might be no MessageManager in the result signifying that either
0466: * there were no persistence deltas or that lazyPersistence was on
0467: * so the message manager did not need to be saved. In either case
0468: * the existence of an appropriate message manager is assured. The
0469: * undistributed envelopes might be null signifying that there was
0470: * no persistence deltas in existence. This is reflected in the
0471: * value of the didRehydrate flag.
0472: */
0473: private void rehydrate(Object state) {
0474: assert Thread.holdsLock(distributorLock);
0475: assert !Thread.holdsLock(transactionLock);
0476: if (persistence == null) {
0477: myMessageManager = new MessageManagerImpl(false);
0478: return;
0479: }
0480:
0481: // Get the rehydration data. Note that the passed "state" is
0482: // always null, so we can't use it to check for dummyPersistence.
0483: rehydrationEnvelope = new PersistenceEnvelope();
0484: RehydrationResult rr = persistence.rehydrate(
0485: rehydrationEnvelope, state);
0486: if (rr.quiescenceMonitorState != null) {
0487: quiescenceMonitor.setState(rr.quiescenceMonitorState);
0488: }
0489:
0490: // Distributor tracks the subscribers who have yet to rehydrate
0491: // However, we only really care about those for whom we require
0492: // quiescence
0493: subscribersToRehydrate = persistence.getSubscriberStateKeys();
0494: if (logger.isDebugEnabled())
0495: logger.debug("Initial number of subscribers: "
0496: + subscribersToRehydrate.size());
0497: // Synchronize so that a subscriber who already
0498: // is in discardRehydrationInfo doesnt cause a ConcurrentModExc
0499: synchronized (distributorLock) {
0500: Iterator iter = subscribersToRehydrate.iterator();
0501: List toRemove = new ArrayList();
0502: while (iter.hasNext()) {
0503: String key = (String) iter.next();
0504: if (!quiescenceMonitor.isQuiescenceRequired(key)) {
0505: if (logger.isDebugEnabled())
0506: logger.debug("Ignoring subscriber " + key);
0507: toRemove.add(key);
0508: } else {
0509: if (logger.isDebugEnabled())
0510: logger.debug("NOT Ignoring subscriber " + key);
0511: }
0512: }
0513: subscribersToRehydrate.removeAll(toRemove);
0514: if (logger.isDebugEnabled())
0515: logger.debug("Trimmed number of subscribers: "
0516: + subscribersToRehydrate.size());
0517: } // end synchronized(distributorLock)
0518:
0519: if (lazyPersistence) { // Ignore any rehydrated message manager
0520: myMessageManager = new MessageManagerImpl(false);
0521: } else {
0522: myMessageManager = rr.messageManager;
0523: if (myMessageManager == null) {
0524: myMessageManager = new MessageManagerImpl(true);
0525: }
0526: }
0527: if (rr.undistributedEnvelopes != null) {
0528: didRehydrate = true;
0529: postRehydrationEnvelopes = new ArrayList();
0530: postRehydrationEnvelopes.addAll(rr.undistributedEnvelopes);
0531: addEpochEnvelopes(rr.undistributedEnvelopes);
0532: }
0533: }
0534:
0535: private MessageManager getMessageManager() {
0536: return myMessageManager;
0537: }
0538:
0539: /**
0540: * Rehydrate a new subscription. New subscriptions that correspond
0541: * to persisted subscriptions are quietly fed the
0542: * rehydrationEnvelope which has all the objects that had been
0543: * distributed prior to the last rehydrated delta. Objects in the
0544: * rehydrationEnvelope do not waken the subscriber; they are simply
0545: * added to the subscription container. Next, the envelopes that
0546: * were pending in the inbox of the subscriber at the last delta are
0547: * fed to the subscription. The subscriber _will_ be notified of
0548: * these. Finally, all envelopes in postRehydrationEnvelopes are fed
0549: * to the subscription. The subscriber will be notified of these as
0550: * well.
0551: */
0552: private void rehydrateNewSubscription(Subscription s,
0553: List persistedTransactionEnvelopes,
0554: List persistedPendingEnvelopes) {
0555: assert Thread.holdsLock(distributorLock);
0556: assert !Thread.holdsLock(transactionLock);
0557: s.fill(rehydrationEnvelope);
0558: if (persistedTransactionEnvelopes != null) {
0559: for (Iterator iter = persistedTransactionEnvelopes
0560: .iterator(); iter.hasNext();) {
0561: s.fill((Envelope) iter.next());
0562: }
0563: }
0564: if (persistedPendingEnvelopes != null) {
0565: for (Iterator iter = persistedPendingEnvelopes.iterator(); iter
0566: .hasNext();) {
0567: s.fill((Envelope) iter.next());
0568: }
0569: }
0570: for (Iterator iter = postRehydrationEnvelopes.iterator(); iter
0571: .hasNext();) {
0572: s.fill((Envelope) iter.next());
0573: }
0574: }
0575:
0576: /**
0577: * Start the distribution thread.
0578: * Note that although Distributor is Runnable, it does not
0579: * extend Thread, rather, it maintains its own thread state
0580: * privately.
0581: */
0582: public void start(MessageSwitchService msgSwitch, Object state) {
0583: assert !Thread.holdsLock(distributorLock);
0584: assert !Thread.holdsLock(transactionLock);
0585: synchronized (distributorLock) {
0586: rehydrate(state);
0587: getMessageManager().start(msgSwitch, didRehydrate);
0588: needToPersist = true;
0589: }
0590: // persist now, to claim the persistence ownership
0591: // and save our component hierarchy
0592: if (PERSIST_AT_STARTUP) {
0593: persist(false, false);
0594: }
0595: createTimer();
0596: enableTimer();
0597: }
0598:
0599: private void createTimer() {
0600: assert !Thread.holdsLock(distributorLock);
0601: assert !Thread.holdsLock(transactionLock);
0602: // disable the timer if it already exists
0603: disableTimer();
0604: // start a disabled periodic timer
0605: synchronized (distributorLock) {
0606: if (lazyPersistence && !dummyPersistence
0607: && distributorTimer == null) {
0608: Runnable task = new Runnable() {
0609: public void run() {
0610: synchronized (timerLock) {
0611: if (timerActive) {
0612: timerPersist();
0613: }
0614: }
0615: }
0616: };
0617: ThreadService tsvc = (ThreadService) sb.getService(
0618: this , ThreadService.class, null);
0619: distributorTimer = tsvc.getThread(this , task,
0620: "Persistence Timer",
0621: ThreadService.WILL_BLOCK_LANE);
0622: sb.releaseService(this , ThreadService.class, tsvc);
0623: distributorTimer.schedule(TIMER_PERSIST_INTERVAL,
0624: TIMER_PERSIST_INTERVAL);
0625: }
0626: }
0627: }
0628:
0629: private void enableTimer() {
0630: assert !Thread.holdsLock(distributorLock);
0631: assert !Thread.holdsLock(transactionLock);
0632: synchronized (timerLock) {
0633: timerActive = true;
0634: }
0635: }
0636:
0637: private void disableTimer() {
0638: assert !Thread.holdsLock(distributorLock);
0639: assert !Thread.holdsLock(transactionLock);
0640: // if the timer is running then this will block
0641: synchronized (timerLock) {
0642: timerActive = false;
0643: }
0644: }
0645:
0646: private void stopTimer() {
0647: assert !Thread.holdsLock(distributorLock);
0648: assert !Thread.holdsLock(transactionLock);
0649: // disable the timer if it's running
0650: disableTimer();
0651: // stop the timer thread. This isn't necessary unless we're
0652: // really "stop()"ing the distributor, but it's useful to
0653: // hide this thread when the timer is disabled.
0654: synchronized (distributorLock) {
0655: if (lazyPersistence) {
0656: if (distributorTimer != null) {
0657: distributorTimer.cancelTimer();
0658: distributorTimer = null;
0659: }
0660: }
0661: }
0662: }
0663:
0664: /**
0665: * Complete any active persists, lockout transactions and
0666: * timer-based persists, and only allow external state
0667: * captures via "persistNow()" and "getPersistenceObject()".
0668: */
0669: public void suspend() {
0670: assert !Thread.holdsLock(distributorLock);
0671: assert !Thread.holdsLock(transactionLock);
0672: if (logger.isInfoEnabled()) {
0673: logger.info("Suspending");
0674: }
0675: // stop our timer
0676: disableTimer();
0677: // pretend that we're getting ready to persist to lockout
0678: // all transactions. Then set the SUSPENDED flag, which
0679: // acts as another block for "startTransaction".
0680: // Then we release our active persistence lock to allow
0681: // externally-signaled persistence while still preventing
0682: // transaction-based persistence.
0683: synchronized (transactionLock) {
0684: // wait for persistence to complete
0685: lockoutPersistence();
0686: // block transactions
0687: lockoutTransactions();
0688: // keep the transactions blocked
0689: setSuspended(true);
0690: // allow persistence
0691: resumePersistence();
0692: resumeTransactions();
0693: }
0694: if (logger.isInfoEnabled()) {
0695: logger.info("Suspended");
0696: }
0697: }
0698:
0699: public void resume() {
0700: assert !Thread.holdsLock(distributorLock);
0701: assert !Thread.holdsLock(transactionLock);
0702: // see suspend for comments
0703: if (logger.isInfoEnabled()) {
0704: logger.info("Resuming");
0705: }
0706: synchronized (transactionLock) {
0707: setSuspended(false);
0708: }
0709: enableTimer();
0710: if (logger.isInfoEnabled()) {
0711: logger.info("Resumed");
0712: }
0713: }
0714:
0715: /**
0716: * Stop the distribution thread.
0717: * @see #start
0718: */
0719: public void stop() {
0720: assert !Thread.holdsLock(distributorLock);
0721: assert !Thread.holdsLock(transactionLock);
0722: quiescenceReportService.setQuiescenceReportEnabled(false); // Finished here
0723: sb.releaseService(this ,
0724: QuiescenceReportForDistributorService.class,
0725: quiescenceReportService);
0726: stopTimer();
0727: synchronized (distributorLock) {
0728: getMessageManager().stop();
0729: }
0730: }
0731:
0732: //
0733: // Subscriber Services
0734: //
0735:
0736: /**
0737: * Register a Subscriber with the Distributor. Future envelopes are
0738: * distributed to all registered subscribers.
0739: */
0740: public void registerSubscriber(Subscriber subscriber) {
0741: assert !Thread.holdsLock(distributorLock);
0742: assert !Thread.holdsLock(transactionLock);
0743: synchronized (distributorLock) {
0744: subscribers.register(subscriber);
0745: }
0746: }
0747:
0748: /**
0749: * Unregister subscriber with the Distributor. Future envelopes are
0750: * not distributed to unregistered subscribers.
0751: */
0752: public void unregisterSubscriber(Subscriber subscriber) {
0753: assert !Thread.holdsLock(distributorLock);
0754: assert !Thread.holdsLock(transactionLock);
0755: synchronized (distributorLock) {
0756: subscribers.unregister(subscriber);
0757: }
0758: }
0759:
0760: /**
0761: * Provide a new subscription with its initial fill. If the
0762: * subscriber of the subscription was persisted, we fill from the
0763: * persisted information (see rehydrateNewSubscription) otherwise
0764: * we fill from the Blackboard (blackboard.fillSubscription).
0765: */
0766: public void fillSubscription(Subscription subscription) {
0767: assert !Thread.holdsLock(distributorLock);
0768: assert !Thread.holdsLock(transactionLock);
0769: synchronized (distributorLock) {
0770: Subscriber subscriber = subscription.getSubscriber();
0771: PersistenceSubscriberState subscriberState = null;
0772: if (didRehydrate) {
0773: if (subscriber.isReadyToPersist()) {
0774: if (logger.isInfoEnabled()) {
0775: logger
0776: .info("No subscriber state for late subscribe of "
0777: + subscriber.getName());
0778: }
0779: } else {
0780: subscriberState = persistence
0781: .getSubscriberState(subscriber);
0782: }
0783: }
0784: if (subscriberState != null
0785: && subscriberState.pendingEnvelopes != null) {
0786: rehydrateNewSubscription(subscription,
0787: subscriberState.transactionEnvelopes,
0788: subscriberState.pendingEnvelopes);
0789: } else {
0790: blackboard.fillSubscription(subscription);
0791: }
0792:
0793: // distribute the initialize envelope -- used for subs
0794: // to know when their first transaction is done
0795: /*
0796: {
0797: // option 1
0798: distribute(new InitializeSubscriptionEnvelope(subscription), null);
0799: }
0800: */
0801: // blackboard subscribes don't need an ISE to fill
0802: if (subscriber != blackboard) {
0803: // option 2
0804: Subscriber s = subscription.getSubscriber();
0805: List l = new ArrayList(1);
0806: l.add(new InitializeSubscriptionEnvelope(subscription));
0807: s.receiveEnvelopes(l, true); // queue in the right spot.
0808: // Assume at least one of the
0809: // sources of the objects that
0810: // filled the subscription
0811: // requires quiescence.
0812: }
0813: }
0814: }
0815:
0816: public void fillQuery(Subscription subscription) {
0817: assert !Thread.holdsLock(distributorLock);
0818: assert !Thread.holdsLock(transactionLock);
0819: synchronized (distributorLock) {
0820: blackboard.fillQuery(subscription);
0821: }
0822: }
0823:
0824: /**
0825: * The main workhorse of the distributor. Distributes the contents
0826: * of an outbox envelope to everybody.
0827: *
0828: * If needToPersist is true and it is time to persist, we set the
0829: * PERSIST_PENDING flag to prevent any further openTransactions from
0830: * happening. Then we distribute the outbox and consequent
0831: * envelopes. If anything is distributed, we set the needToPersist
0832: * flag. Any messages generated by the Blackboard are gathered and
0833: * given to the message manager for eventual transmission. Finally,
0834: * the generation of a persistence delta is considered.
0835: * @return true if a persistance snapshot should be taken
0836: */
0837: private boolean distribute(Envelope outbox, BlackboardClient client) {
0838: return distribute(outbox, client, quiescenceMonitor
0839: .isQuiescenceRequired(client));
0840: }
0841:
0842: private boolean distribute(Envelope outbox,
0843: BlackboardClient client, boolean clientQuiescenceRequired) {
0844: assert Thread.holdsLock(distributorLock);
0845: assert !Thread.holdsLock(transactionLock);
0846: boolean result = false;
0847: if (outbox != null && logger.isDetailEnabled()
0848: && client != null) {
0849: logEnvelope(outbox, client);
0850: }
0851: if (persistence != null) {
0852: if (needToPersist) {
0853: if (timeToPersist()) {
0854: result = true;
0855: }
0856: }
0857: }
0858: blackboard.prepareForEnvelopes();
0859: boolean haveSomethingToDistribute = false;
0860: // nest loops in case delayed actions cascade into more
0861: // lp actions.
0862: while (outbox != null && outbox.size() > 0) {
0863: while (outbox != null && outbox.size() > 0) {
0864: outboxes.add(outbox);
0865: outbox = blackboard.receiveEnvelope(outbox);
0866: haveSomethingToDistribute = true;
0867: }
0868:
0869: // outbox should be empty at this point.
0870: // execute any pending DelayedLPActions
0871: outbox = blackboard.executeDelayedLPActions();
0872: }
0873: Blackboard.getTracker().clearLocalSet();
0874:
0875: // while (outbox != null && outbox.size() > 0) {
0876: // outboxes.add(outbox);
0877: // outbox = blackboard.receiveEnvelope(outbox);
0878: // haveSomethingToDistribute = true;
0879: // }
0880:
0881: /**
0882: * busy indicates that we have found evidence that things are
0883: * still happening or are about to happen in this agent.
0884: */
0885: boolean busy = haveSomethingToDistribute;
0886: boolean newSubscribersAreQuiescent = true; // Until proven otherwise
0887: if (persistence != null) {
0888: if (!needToPersist && haveSomethingToDistribute) {
0889: needToPersist = true;
0890: }
0891: }
0892: if (logger.isDebugEnabled()) {
0893: if (haveSomethingToDistribute) {
0894: logger.debug("quiescence"
0895: + (clientQuiescenceRequired ? "" : " not")
0896: + " required for outbox of "
0897: + client.getBlackboardClientName());
0898: }
0899: }
0900: for (Iterator iter = subscribers.iterator(); iter.hasNext();) {
0901: Subscriber subscriber = (Subscriber) iter.next();
0902: if (subscriber == blackboard)
0903: continue;
0904: boolean subscriberBusy = false;
0905: if (haveSomethingToDistribute) {
0906: subscriber.receiveEnvelopes(outboxes,
0907: clientQuiescenceRequired);
0908: subscriberBusy = true;
0909: } else if (subscriber.isBusy()) {
0910: subscriberBusy = true;
0911: }
0912: if (subscriberBusy) {
0913: busy = true;
0914: }
0915:
0916: if (quiescenceReportEnabled) {
0917: // Check if at least one subscriber we care about is
0918: // now non-quiescent
0919: BlackboardClient inboxClient = subscriber.getClient();
0920: if (quiescenceMonitor.isQuiescenceRequired(inboxClient)) {
0921: if (newSubscribersAreQuiescent
0922: && !subscriber.isQuiescent()) {
0923: if (logger.isDebugEnabled()) {
0924: logger
0925: .debug("There is at least one q-relevant subscriber,"
0926: + " so this distribute prevents quiescence.");
0927: }
0928: if (logger.isDetailEnabled())
0929: logger
0930: .detail(" First such subscriber: "
0931: + inboxClient
0932: .getBlackboardClientName());
0933: newSubscribersAreQuiescent = false;
0934: }
0935: }
0936: }
0937: }
0938: // Fill messagesToSend
0939: blackboard.appendMessagesToSend(messagesToSend);
0940: if (messagesToSend.size() > 0) {
0941: for (Iterator i = messagesToSend.iterator(); i.hasNext();) {
0942: DirectiveMessage msg = (DirectiveMessage) i.next();
0943: // If the publisher is q-relevant, then we number the message
0944: // to require a succesful receipt for quiescence
0945: if (clientQuiescenceRequired)
0946: quiescenceMonitor.numberOutgoingMessage(msg);
0947: if (logger.isDetailEnabled()) {
0948: Directive[] dirs = msg.getDirectives();
0949: for (int j = 0; j < dirs.length; j++) {
0950: logger.detail("SEND " + dirs[j]);
0951: }
0952: }
0953: }
0954: getMessageManager().sendMessages(messagesToSend.iterator());
0955: }
0956: messagesToSend.clear();
0957: if (persistence != null) {
0958: if (postRehydrationEnvelopes != null) {
0959: postRehydrationEnvelopes.addAll(outboxes);
0960: }
0961: addEpochEnvelopes(outboxes);
0962: if (!needToPersist
0963: && getMessageManager().needAdvanceEpoch()) {
0964: needToPersist = true;
0965: }
0966: if (!busy && transactionCount > 1) {
0967: // This is not the last transaction, still busy
0968: busy = true;
0969: }
0970: if (needToPersist) {
0971: if (!busy) {
0972: result = true;
0973: }
0974: }
0975: }
0976: outboxes.clear();
0977:
0978: // Update the cumulative quiescence of all the subscriber inboxes
0979: // based on this distribute: Non-q if a Q-relevant comp published something
0980: // to this agent, and we have at least one q-relevant subscriber
0981: if (quiescenceReportEnabled)
0982: quiescenceMonitor
0983: .setSubscribersAreQuiescent(newSubscribersAreQuiescent);
0984:
0985: // If we have not yet enabled the QRS, check to see if this
0986: // call to distribute means the last relevant subscriber
0987: // has rehydrated. If so, enable the QuiescenceReportService.
0988: // Note that discardRehydrationInfo is where we remove the subscribers
0989: // from the list we want to rehydrate first.
0990:
0991: // If this distribute is happening at the close of the first transaction
0992: // of the last q-relevant subscriber, then we enable the QRS -- but _after_
0993: // we've updated the Quiescence state based on the results of
0994: // this distribute -- otherwise, the close of the first transaction
0995: // of the last q-relevant subscriber would always make you non-q on rehydrate
0996: if (!quiescenceReportEnabled) {
0997: int nLeft = subscribersToRehydrate.size();
0998: if (nLeft == 0) {
0999: if (logger.isDebugEnabled())
1000: logger
1001: .debug(".distribute: No more q-relevant subscribers to rehydrate. Enabling QRS.");
1002: quiescenceReportService
1003: .setQuiescenceReportEnabled(true); // All ready to go
1004: quiescenceReportEnabled = true;
1005: } else if (logger.isDebugEnabled()) {
1006: if (nLeft == 1)
1007: logger
1008: .debug(".distribute: sole remaining q-relevant subscriber"
1009: + " left to rehydrate: "
1010: + subscribersToRehydrate);
1011: else
1012: logger
1013: .debug(".distribute: "
1014: + nLeft
1015: + " q-relevant subscribers left to rehydrate");
1016: }
1017: } // end check to enable QRS
1018:
1019: return result;
1020: } // end of distribute()
1021:
1022: private void initializeEpochEnvelopes() {
1023: assert !Thread.holdsLock(distributorLock);
1024: assert !Thread.holdsLock(transactionLock);
1025: if (dummyPersistence) {
1026: return;
1027: }
1028: //epochEnvelopes = new ArrayList();
1029: // this could use a regular equals-based map, but for persistence
1030: // consistency we use an =='s based identity map.
1031: epochTuples = new IdentityHashMap();
1032: }
1033:
1034: private void addEpochEnvelopes(List envelopes) {
1035: assert Thread.holdsLock(distributorLock);
1036: assert !Thread.holdsLock(transactionLock);
1037: if (dummyPersistence) {
1038: return;
1039: }
1040: //epochEnvelopes.addAll(rr.undistributedEnvelopes);
1041: for (int i = 0, n = envelopes.size(); i < n; i++) {
1042: Envelope e = (Envelope) envelopes.get(i);
1043: List tuples = e.getRawDeltas();
1044: for (int j = 0, m = tuples.size(); j < m; j++) {
1045: EnvelopeTuple tuple = (EnvelopeTuple) tuples.get(j);
1046: addEpochTuple(tuple);
1047: }
1048: }
1049: }
1050:
1051: private void addEpochTuple(EnvelopeTuple tuple) {
1052: assert Thread.holdsLock(distributorLock);
1053: assert !Thread.holdsLock(transactionLock);
1054: assert dummyPersistence;
1055: Object o = tuple.getObject();
1056: if (tuple.isBulk()) {
1057: Collection c = (Collection) o;
1058: for (Iterator iter = c.iterator(); iter.hasNext();) {
1059: Object o2 = iter.next();
1060: EnvelopeTuple oldTuple = (EnvelopeTuple) epochTuples
1061: .get(o2);
1062: if (oldTuple == null) {
1063: // N + A => A (common)
1064: epochTuples.put(o2, new AddEnvelopeTuple(o2));
1065: } else {
1066: // Other theoretical cases - not handled
1067: // A + A => A (error?)
1068: // C + A => C (error?)
1069: // R + A => R (error?)
1070: }
1071: }
1072: } else {
1073: EnvelopeTuple oldTuple = (EnvelopeTuple) epochTuples.get(o);
1074: if (oldTuple == null) {
1075: // N + A => A (common, ~25%)
1076: // N + C => C (rare)
1077: // N + R => R (rare)
1078: epochTuples.put(o, tuple);
1079: } else if (oldTuple.isAdd()) {
1080: if (tuple.isRemove()) {
1081: // A + R => N (common, ~10%)
1082: epochTuples.remove(o);
1083: } else {
1084: // Other theoretical cases, not handled
1085: // A + A => A (error?)
1086: // A + C => A (common, ~60%)
1087: }
1088: } else if (oldTuple.isChange()) {
1089: if (tuple.isAdd()) {
1090: // C + A => C (error?)
1091: } else {
1092: // C + C => C (common, ~2%)
1093: // C + R => R (rare)
1094: epochTuples.put(o, tuple);
1095: }
1096: } else {
1097: // Other theoretical cases, not handled
1098: // R + A => R (error?)
1099: // R + C => R (error? probably a race)
1100: // R + R => R (error?)
1101: }
1102: }
1103: }
1104:
1105: private List getEpochEnvelopes() {
1106: assert Thread.holdsLock(distributorLock);
1107: assert !Thread.holdsLock(transactionLock);
1108: if (dummyPersistence) {
1109: // we haven't been collecting tuples, since persistence is
1110: // disabled, but we must capture the full blackboard for a
1111: // forced persist (e.g. mobility).
1112: QuerySubscription everything = new QuerySubscription(
1113: anythingP);
1114: blackboard.fillQuery(everything);
1115: Envelope envelope = new Envelope();
1116: envelope.bulkAddObject(everything.getCollection());
1117: List ret = new ArrayList(1);
1118: ret.add(envelope);
1119: return ret;
1120: }
1121: //return epochEnvelopes;
1122: // create a list with a single envelope that contains all our
1123: // epochTuples, where the list's "clear()" clears our tuple
1124: // map for early GC.
1125: final Envelope e = new Envelope();
1126: e.getRawDeltas().addAll(epochTuples.values());
1127: List ret = new AbstractList() {
1128: private Envelope envelope = e;
1129:
1130: public int size() {
1131: return (envelope == null ? 0 : 1);
1132: }
1133:
1134: public Object get(int index) {
1135: if (index != 0 || envelope == null) {
1136: throw new IndexOutOfBoundsException("Index: "
1137: + index + ", Size: " + size());
1138: }
1139: return envelope;
1140: }
1141:
1142: public void clear() {
1143: if (envelope != null) {
1144: envelope = null;
1145: epochTuples.clear();
1146: }
1147: }
1148: };
1149: return ret;
1150: }
1151:
1152: private void clearEpochEnvelopes() {
1153: assert Thread.holdsLock(distributorLock);
1154: assert !Thread.holdsLock(transactionLock);
1155: if (dummyPersistence) {
1156: return;
1157: }
1158: //epochEnvelopes.clear();
1159: epochTuples.clear();
1160: }
1161:
1162: public void restartAgent(MessageAddress cid) {
1163: assert !Thread.holdsLock(distributorLock);
1164: assert !Thread.holdsLock(transactionLock);
1165: boolean persistWanted = false;
1166: try {
1167: startTransaction();
1168: synchronized (distributorLock) {
1169: try {
1170: blackboard.startTransaction();
1171: blackboard.restart(cid);
1172: Envelope envelope = blackboard
1173: .receiveMessages(Collections.EMPTY_LIST);
1174: persistWanted = distribute(envelope, blackboard
1175: .getClient(), true);
1176: } finally {
1177: blackboard.stopTransaction();
1178: }
1179: }
1180: if (persistWanted)
1181: maybeSetPersistPending();
1182: } finally {
1183: finishTransaction();
1184: }
1185: }
1186:
1187: /**
1188: * Process directive and ack messages from other agents. Acks
1189: * are given to the message manager. Directive messages are passed
1190: * through the message manager for validation and then given to
1191: * the Blackboard for processing. Envelopes resulting from that
1192: * processing are distributed.
1193: */
1194: public void receiveMessages(List messages) {
1195: assert !Thread.holdsLock(distributorLock);
1196: assert !Thread.holdsLock(transactionLock);
1197: try {
1198: // Do one of the incoming messages come from a q-relevant component?
1199: boolean messagesRequireQuiescence = false;
1200: boolean persistWanted = false;
1201: startTransaction(); // Blocks if persistence active
1202:
1203: synchronized (distributorLock) {
1204: for (Iterator msgs = messages.iterator(); msgs
1205: .hasNext();) {
1206: Object m = msgs.next();
1207: if (m instanceof DirectiveMessage) {
1208: DirectiveMessage msg = (DirectiveMessage) m;
1209: int code = getMessageManager().receiveMessage(
1210: msg);
1211: if ((code & MessageManager.RESTART) != 0) {
1212: try {
1213: blackboard.startTransaction();
1214: blackboard.restart(msg.getSource());
1215: } finally {
1216: blackboard.stopTransaction();
1217: }
1218: }
1219: if ((code & MessageManager.IGNORE) == 0) {
1220: if (logger.isDetailEnabled()) {
1221: Directive[] dirs = msg.getDirectives();
1222: for (int i = 0; i < dirs.length; i++) {
1223: logger.detail("RECV " + dirs[i]);
1224: }
1225: }
1226: directiveMessages.add(msg);
1227: messagesRequireQuiescence |= quiescenceMonitor
1228: .numberIncomingMessage(msg);
1229: }
1230: } else if (m instanceof AckDirectiveMessage) {
1231: AckDirectiveMessage msg = (AckDirectiveMessage) m;
1232: int code = getMessageManager().receiveAck(msg);
1233: if ((code & MessageManager.RESTART) != 0) {
1234: // Remote agent has restarted
1235: blackboard.restart(msg.getSource());
1236: }
1237: }
1238: }
1239: // We nominally ack the messages here so the persisted
1240: // state will include the acks. The acks are not actually
1241: // sent until the persistence delta is concluded.
1242: getMessageManager().acknowledgeMessages(
1243: directiveMessages.iterator());
1244:
1245: try {
1246: blackboard.startTransaction();
1247: Envelope envelope = blackboard
1248: .receiveMessages(directiveMessages);
1249: persistWanted = distribute(envelope, blackboard
1250: .getClient(), messagesRequireQuiescence);
1251: } finally {
1252: blackboard.stopTransaction();
1253: }
1254: directiveMessages.clear();
1255: }
1256: if (persistWanted)
1257: maybeSetPersistPending();
1258: } finally {
1259: finishTransaction();
1260: }
1261: }
1262:
1263: public void invokeABAChangeLPs(Set communities) {
1264: assert !Thread.holdsLock(distributorLock);
1265: assert !Thread.holdsLock(transactionLock);
1266: if (logger.isDebugEnabled()) {
1267: if (Thread.holdsLock(distributorLock)) {
1268: logger
1269: .debug(
1270: "Distributor.invokeABAChangeLPs invoked inside distributorLock",
1271: new Throwable());
1272: }
1273: if (Thread.holdsLock(transactionLock)) {
1274: logger
1275: .debug(
1276: "Distributor.invokeABAChangeLPs invoked inside transactionLock",
1277: new Throwable());
1278: }
1279: }
1280: boolean persistWanted = false;
1281: synchronized (distributorLock) {
1282: try {
1283: blackboard.startTransaction();
1284: blackboard.invokeABAChangeLPs(communities);
1285: Envelope envelope = blackboard
1286: .receiveMessages(Collections.EMPTY_LIST);
1287: persistWanted = distribute(envelope, blackboard
1288: .getClient(), true);
1289: } finally {
1290: blackboard.stopTransaction();
1291: }
1292: }
1293: if (persistWanted)
1294: maybeSetPersistPending();
1295: }
1296:
1297: /**
1298: * Generate a persistence delta if possible and necessary. It is
1299: * possible if the transaction count is zero and necessary if either
1300: * PERSIST_PENDING is true or needToPersist is true and we are not
1301: * busy. This second clause is needed so we don't end up being idle
1302: * with needToPersist being true.
1303: */
1304: private PersistenceObject doPersistence(
1305: boolean persistedStateNeeded, boolean full) {
1306: assert !Thread.holdsLock(distributorLock);
1307: assert !Thread.holdsLock(transactionLock);
1308: assert transactionCount == 1 : transactionCount;
1309: assert persistFlags != 0 : persistFlags;
1310: nodeBusyService.setAgentBusy(true);
1311: List epochEnvelopes;
1312: synchronized (distributorLock) {
1313: epochEnvelopes = getEpochEnvelopes();
1314: for (Iterator iter = subscribers.iterator(); iter.hasNext();) {
1315: Subscriber subscriber = (Subscriber) iter.next();
1316: if (subscriber.isReadyToPersist()) {
1317: subscriberStates
1318: .add(new PersistenceSubscriberState(
1319: subscriber));
1320: }
1321: }
1322: }
1323: PersistenceObject result;
1324: synchronized (getMessageManager()) {
1325: getMessageManager().advanceEpoch();
1326: result = persistence.persist(epochEnvelopes,
1327: Collections.EMPTY_LIST, subscriberStates,
1328: persistedStateNeeded, full, lazyPersistence ? null
1329: : getMessageManager(), quiescenceMonitor
1330: .getState());
1331: }
1332: synchronized (distributorLock) {
1333: clearEpochEnvelopes();
1334: subscriberStates.clear();
1335: needToPersist = false;
1336: lastPersist = System.currentTimeMillis();
1337: }
1338: setPersistPending(false);
1339: nodeBusyService.setAgentBusy(false);
1340: return result;
1341: }
1342:
1343: private boolean timeToLazilyPersist() {
1344: if (dummyPersistence) {
1345: return false;
1346: }
1347: long overdue = System.currentTimeMillis()
1348: - persistence.getPersistenceTime();
1349: return overdue > 0L;
1350: }
1351:
1352: private boolean timeToPersist() {
1353: assert Thread.holdsLock(distributorLock);
1354: assert !Thread.holdsLock(transactionLock);
1355: if (dummyPersistence) {
1356: return false;
1357: }
1358: long nextPersistTime = Math.min(lastPersist
1359: + MAX_PERSIST_INTERVAL, persistence
1360: .getPersistenceTime());
1361: return (System.currentTimeMillis() >= nextPersistTime);
1362: }
1363:
1364: /**
1365: * Transaction control
1366: */
1367: private static final String START_EXCUSE = "Waiting to Start Transaction";
1368:
1369: public void startTransaction() {
1370: assert !Thread.holdsLock(distributorLock);
1371: assert !Thread.holdsLock(transactionLock);
1372:
1373: acquireTransactionMutex();
1374:
1375: synchronized (transactionLock) {
1376: while (persistFlags != 0) {
1377: try {
1378: SchedulableStatus.beginWait(START_EXCUSE);
1379: transactionLock.wait();
1380: } catch (InterruptedException ie) {
1381: } finally {
1382: SchedulableStatus.endBlocking();
1383: }
1384: }
1385: transactionCount++;
1386: }
1387: }
1388:
1389: public void finishTransaction(Envelope outbox,
1390: BlackboardClient client) {
1391: assert !Thread.holdsLock(distributorLock);
1392: assert !Thread.holdsLock(transactionLock);
1393: boolean persistWanted = false;
1394: synchronized (distributorLock) {
1395: persistWanted = distribute(outbox, client);
1396: }
1397: if (persistWanted)
1398: maybeSetPersistPending();
1399: finishTransaction();
1400: }
1401:
1402: private void finishTransaction() {
1403: assert !Thread.holdsLock(distributorLock);
1404: assert !Thread.holdsLock(transactionLock);
1405: boolean doIt = false;
1406: synchronized (transactionLock) {
1407: if ((persistFlags & PERSIST_PENDING) != 0) {
1408: if (transactionCount == 1) {
1409: // transactionCount == 1 implies ((persistFlags & PERSIST_ACTIVE) == 0)
1410: if (logger.isInfoEnabled()) {
1411: logger
1412: .info("Persist started (finish transaction)");
1413: }
1414: assert ((persistFlags & PERSIST_ACTIVE) == 0);
1415: doIt = true;
1416: } else {
1417: if (logger.isInfoEnabled()) {
1418: logger.info("Persist deferred, "
1419: + transactionCount
1420: + " transactions open");
1421: }
1422: }
1423: }
1424: }
1425:
1426: releaseTransactionMutex();
1427:
1428: if (doIt) {
1429: doPersistence(false, false);
1430: if (logger.isInfoEnabled()) {
1431: logger.info("Persist completed (finish transaction)");
1432: }
1433: if (logger.isInfoEnabled())
1434: logger.info("reservation release");
1435: persistenceReservationManager.release(persistence);
1436: }
1437: synchronized (transactionLock) {
1438: transactionCount--;
1439: assert transactionCount >= 0 : transactionCount;
1440: transactionLock.notifyAll();
1441: }
1442: }
1443:
1444: /**
1445: * Force a persistence delta to be generated.
1446: */
1447: public void persistNow() {
1448: persist(false, true);
1449: }
1450:
1451: /**
1452: * Force a (full) persistence delta to be generated and
1453: * return result
1454: */
1455: public PersistenceObject getPersistenceObject() {
1456: return persist(true, true);
1457: }
1458:
1459: /**
1460: * Generate a persistence delta and (maybe) return the data of
1461: * that delta.
1462: * <p>
1463: * This code parallels that of start/finish transaction except
1464: * that:<pre>
1465: * distribute() is not called
1466: * we wait for all transactions to close
1467: * </pre><p>
1468: * A "PERSIST_ACTIVE" flag is used to guarantee that only
1469: * one persist occurs at a time.
1470: *
1471: * @param isStateWanted true if the data of a full persistence
1472: * delta is wanted
1473: * @return a state Object including all the data from a full
1474: * persistence delta if isStateWanted is true, null if
1475: * isStateWanted is false.
1476: */
1477: private PersistenceObject persist(boolean isStateWanted,
1478: boolean full) {
1479: assert !Thread.holdsLock(distributorLock);
1480: assert !Thread.holdsLock(transactionLock);
1481: if (persistence == null || (dummyPersistence && !isStateWanted)) {
1482: return null;
1483: }
1484: while (true) { // Loop until we succeed and return a result
1485: synchronized (transactionLock) {
1486: // First we must wait for any other persistence activity to cease
1487: lockoutPersistence();
1488: }
1489: // Then we have to wait for our reservation to become ripe
1490: if (logger.isInfoEnabled()) {
1491: logger.info("reservation waitfor");
1492: }
1493: persistenceReservationManager.waitFor(persistence, logger);
1494: synchronized (transactionLock) {
1495: // Now we wait for all transactions to finish
1496: lockoutTransactions();
1497: }
1498: try {
1499: // We commit to doing persistence, but we may have lost our reservation
1500: if (persistenceReservationManager.commit(persistence)) {
1501: if (logger.isInfoEnabled()) {
1502: logger.info("Persist started (persist)");
1503: }
1504: PersistenceObject result = doPersistence(
1505: isStateWanted, (full || dummyPersistence));
1506: if (logger.isInfoEnabled())
1507: logger.info("reservation release");
1508: persistenceReservationManager.release(persistence);
1509: if (logger.isInfoEnabled()) {
1510: logger.info("Persist completed (persist)");
1511: }
1512: return result;
1513: } else {
1514: // Lost our reservation. Back out of transaction counting and start over
1515: if (logger.isInfoEnabled()) {
1516: logger.info("Reservation lost, starting over");
1517: }
1518: }
1519: } finally {
1520: synchronized (transactionLock) {
1521: resumePersistence();
1522: resumeTransactions();
1523: }
1524: }
1525: }
1526: }
1527:
1528: private static final String LOCKOUT_EXCUSE = "Waiting for active persist to complete";
1529:
1530: private void lockoutPersistence() {
1531: assert !Thread.holdsLock(distributorLock);
1532: assert Thread.holdsLock(transactionLock);
1533: if ((persistFlags & PERSIST_ACTIVE) != 0) {
1534: if (logger.isInfoEnabled()) {
1535: logger.info(LOCKOUT_EXCUSE);
1536: }
1537: do {
1538: try {
1539: SchedulableStatus.beginWait(LOCKOUT_EXCUSE);
1540: transactionLock.wait();
1541: } catch (InterruptedException ie) {
1542: } finally {
1543: SchedulableStatus.endBlocking();
1544: }
1545: } while ((persistFlags & PERSIST_ACTIVE) != 0);
1546: }
1547: persistFlags |= PERSIST_ACTIVE;
1548: // don't care about PERSIST_PENDING or SUSPENDED.
1549: //
1550: // To persist we still need to lockout transactions
1551: }
1552:
1553: private void resumePersistence() {
1554: assert !Thread.holdsLock(distributorLock);
1555: assert Thread.holdsLock(transactionLock);
1556: assert ((persistFlags & PERSIST_ACTIVE) != 0);
1557: persistFlags &= ~PERSIST_ACTIVE;
1558: // we need to notify the transaction lock.
1559: //
1560: // This is always called just before resuming transactions,
1561: // which will notify the lock, so we'll let that method
1562: // do the notification.
1563: }
1564:
1565: private static final String TRANSACTION_EXCUSE = "Waiting for transaction to close";
1566:
1567: private void lockoutTransactions() {
1568: assert !Thread.holdsLock(distributorLock);
1569: assert Thread.holdsLock(transactionLock);
1570: if (logger.isInfoEnabled()) {
1571: logger.info("Waiting for " + transactionCount
1572: + " transactions to close");
1573: }
1574: transactionCount++;
1575: assert transactionCount >= 1 : transactionCount;
1576: while (transactionCount > 1) {
1577: try {
1578: SchedulableStatus.beginWait(TRANSACTION_EXCUSE);
1579: transactionLock.wait();
1580: } catch (InterruptedException ie) {
1581: } finally {
1582: SchedulableStatus.endBlocking();
1583: }
1584: }
1585: assert transactionCount == 1 : transactionCount;
1586: // if we've locked out persistence then it's now save to persist
1587: }
1588:
1589: private void resumeTransactions() {
1590: assert !Thread.holdsLock(distributorLock);
1591: assert Thread.holdsLock(transactionLock);
1592: assert transactionCount >= 1 : transactionCount;
1593: transactionCount--;
1594: assert transactionCount == 0 : transactionCount;
1595: transactionLock.notifyAll();
1596: }
1597:
1598: private void maybeSetPersistPending() {
1599: assert !Thread.holdsLock(distributorLock);
1600: assert !Thread.holdsLock(transactionLock);
1601: if (!lazyPersistence || timeToLazilyPersist()) {
1602: if (persistenceReservationManager.request(persistence)) {
1603: if (logger.isInfoEnabled())
1604: logger.info("reservation request succeeded");
1605: setPersistPending(true);
1606: } else if ((persistFlags & PERSIST_PENDING) != 0) {
1607: // Whoops. We lost our reservation
1608: if (logger.isInfoEnabled())
1609: logger.info("reservation request failed");
1610: setPersistPending(false); // Need to start all over
1611: }
1612: }
1613: }
1614:
1615: private void setPersistPending(boolean on) {
1616: assert !Thread.holdsLock(distributorLock);
1617: assert !Thread.holdsLock(transactionLock);
1618: synchronized (transactionLock) {
1619: if (on) {
1620: persistFlags |= PERSIST_PENDING;
1621: } else if ((persistFlags & PERSIST_PENDING) != 0) {
1622: persistFlags &= ~PERSIST_PENDING;
1623: transactionLock.notifyAll();
1624: }
1625: }
1626: }
1627:
1628: private void setSuspended(boolean on) {
1629: assert !Thread.holdsLock(distributorLock);
1630: assert Thread.holdsLock(transactionLock);
1631: if (on) {
1632: persistFlags |= SUSPENDED;
1633: } else if ((persistFlags & SUSPENDED) != 0) {
1634: persistFlags &= ~SUSPENDED;
1635: transactionLock.notifyAll();
1636: }
1637: }
1638:
1639: private void timerPersist() {
1640: assert !Thread.holdsLock(distributorLock);
1641: assert !Thread.holdsLock(transactionLock);
1642: if (lazyPersistence && !timeToLazilyPersist()) {
1643: return;
1644: }
1645: synchronized (distributorLock) {
1646: if (!needToPersist) {
1647: return;
1648: }
1649: }
1650: synchronized (transactionLock) {
1651: if ((persistFlags & PERSIST_ACTIVE) != 0) {
1652: return;
1653: }
1654: if ((persistFlags & PERSIST_PENDING) != 0
1655: && transactionCount >= 1) {
1656: return;
1657: }
1658: }
1659: // there's a small window here where we may do an unnecessary
1660: // second persist, but that would be harmless and quick
1661: persist(false, false);
1662: }
1663:
1664: private void logEnvelope(Envelope envelope, BlackboardClient client) {
1665: if (!logger.isDetailEnabled())
1666: return;
1667: boolean first = true;
1668: for (Iterator tuples = envelope.getAllTuples(); tuples
1669: .hasNext();) {
1670: if (first) {
1671: logger.detail("Outbox of "
1672: + client.getBlackboardClientName());
1673: first = false;
1674: }
1675: EnvelopeTuple tuple = (EnvelopeTuple) tuples.next();
1676: if (tuple.isBulk()) {
1677: for (Iterator objects = ((BulkEnvelopeTuple) tuple)
1678: .getCollection().iterator(); objects.hasNext();) {
1679: logger.detail("BULK " + objects.next());
1680: }
1681: } else {
1682: String kind = "";
1683: if (tuple.isAdd()) {
1684: kind = "ADD ";
1685: } else if (tuple.isChange()) {
1686: kind = "CHANGE ";
1687: } else {
1688: kind = "REMOVE ";
1689: }
1690: logger.detail(kind + tuple.getObject());
1691: }
1692: }
1693: }
1694:
1695: public String getName() {
1696: return name;
1697: } // agent name
1698:
1699: public String toString() {
1700: return "<Distributor " + getName() + ">";
1701: }
1702:
1703: /**
1704: * Hold our set of registered Subscribers.
1705: * <p>
1706: * The Distributor must lock this object with its
1707: * "distributorLock".
1708: */
1709: private static class Subscribers {
1710: private List subscribers = new ArrayList();
1711: ReferenceQueue refQ = new ReferenceQueue();
1712:
1713: public void register(Subscriber subscriber) {
1714: checkRefQ();
1715: subscribers.add(new WeakReference(subscriber, refQ));
1716: }
1717:
1718: public void unregister(Subscriber subscriber) {
1719: for (Iterator iter = subscribers.iterator(); iter.hasNext();) {
1720: WeakReference ref = (WeakReference) iter.next();
1721: if (ref.get() == subscriber) {
1722: iter.remove();
1723: }
1724: }
1725: }
1726:
1727: public Iterator iterator() {
1728: checkRefQ();
1729:
1730: class MyIterator implements Iterator {
1731: private Object n = null;
1732: private Iterator iter;
1733:
1734: public MyIterator(Iterator it) {
1735: iter = it;
1736: advance();
1737: }
1738:
1739: /**
1740: * Advance to the next non-null element, dropping
1741: * nulls along the way
1742: */
1743: private void advance() {
1744: while (iter.hasNext()) {
1745: WeakReference ref = (WeakReference) iter.next();
1746: n = ref.get();
1747: if (n == null) {
1748: iter.remove();
1749: } else {
1750: return;
1751: }
1752: }
1753: // ran off the end,
1754: n = null;
1755: }
1756:
1757: public boolean hasNext() {
1758: return (n != null);
1759: }
1760:
1761: public Object next() {
1762: Object x = n;
1763: advance();
1764: return x;
1765: }
1766:
1767: public void remove() {
1768: iter.remove();
1769: }
1770: }
1771: ;
1772: return new MyIterator(subscribers.iterator());
1773: }
1774:
1775: private void checkRefQ() {
1776: Reference ref;
1777: while ((ref = refQ.poll()) != null) {
1778: subscribers.remove(ref);
1779: }
1780: }
1781: }
1782: }
|