Source Code Cross Referenced for MessageManagerImpl.java in  » Science » Cougaar12_4 » org » cougaar » core » blackboard » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Science » Cougaar12_4 » org.cougaar.core.blackboard 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * <copyright>
003:         *  
004:         *  Copyright 1997-2004 BBNT Solutions, LLC
005:         *  under sponsorship of the Defense Advanced Research Projects
006:         *  Agency (DARPA).
007:         * 
008:         *  You can redistribute this software and/or modify it under the
009:         *  terms of the Cougaar Open Source License as published on the
010:         *  Cougaar Open Source Website (www.cougaar.org).
011:         * 
012:         *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013:         *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014:         *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015:         *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016:         *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017:         *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018:         *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019:         *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020:         *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021:         *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022:         *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023:         *  
024:         * </copyright>
025:         */
026:
027:        package org.cougaar.core.blackboard;
028:
029:        import java.io.FileWriter;
030:        import java.io.IOException;
031:        import java.io.ObjectInputStream;
032:        import java.io.ObjectOutputStream;
033:        import java.io.PrintWriter;
034:        import java.io.Serializable;
035:        import java.text.DateFormat;
036:        import java.text.SimpleDateFormat;
037:        import java.util.ArrayList;
038:        import java.util.Date;
039:        import java.util.HashMap;
040:        import java.util.Iterator;
041:        import java.util.TreeSet;
042:
043:        import org.cougaar.bootstrap.SystemProperties;
044:        import org.cougaar.core.agent.ClusterMessage;
045:        import org.cougaar.core.agent.service.MessageSwitchService;
046:        import org.cougaar.core.mts.MessageAddress;
047:        import org.cougaar.util.StringUtility;
048:
049:        /**
050:         * A message acknowledgement manager used by the {@link Distributor}'s
051:         * non-lazy persistence mode to ensure that unacknowledged messages
052:         * are persisted.
053:         */
054:        class MessageManagerImpl implements  MessageManager, Serializable {
055:
056:            public static final long serialVersionUID = -8662117243114391926L;
057:
058:            private static final boolean debug = SystemProperties
059:                    .getBoolean("org.cougaar.core.blackboard.MessageManager.debug");
060:
061:            private static final long KEEP_ALIVE_INTERVAL = 55000L;
062:
063:            private boolean USE_MESSAGE_MANAGER = false;
064:
065:            /** The agent's mts */
066:            private transient MessageAddress self;
067:            private transient MessageSwitchService msgSwitch;
068:
069:            private transient String agentNameForLog;
070:
071:            /** Messages we need to send at the end of this epoch. */
072:            private transient ArrayList stuffToSend = new ArrayList();
073:
074:            /** Tracks the sequence numbers of other agents */
075:            private HashMap agentInfo = new HashMap(13);
076:
077:            /** Something has happened during this epoch. */
078:            private transient boolean needAdvanceEpoch = false;
079:
080:            /** The retransmitter thread */
081:            private transient Retransmitter retransmitter;
082:
083:            /** The acknowledgement sender thread */
084:            private transient AcknowledgementSender ackSender;
085:
086:            /** The keep alive sender thread */
087:            private transient KeepAliveSender keepAliveSender;
088:
089:            /** Debug logging */
090:            private transient PrintWriter logWriter = null;
091:
092:            /** The format of timestamps in the log */
093:            private static DateFormat logTimeFormat = new SimpleDateFormat(
094:                    "yyyy/MM/dd HH:mm:ss.SSS");
095:
096:            /**
097:             * Inner static class to track the state of communication with
098:             * another agent.
099:             */
100:            private class AgentInfo implements  java.io.Serializable {
101:                /** The MessageAddress of the remote agent */
102:                private MessageAddress agentIdentifier;
103:
104:                private long remoteIncarnationNumber = 0L;
105:
106:                private long localIncarnationNumber = System
107:                        .currentTimeMillis();
108:
109:                private transient boolean restarted = false;
110:
111:                public AgentInfo(MessageAddress cid) {
112:                    agentIdentifier = cid;
113:                }
114:
115:                public MessageAddress getMessageAddress() {
116:                    return agentIdentifier;
117:                }
118:
119:                /**
120:                 * The last sequence number we transmited to the agent described
121:                 * by this AgentInfo
122:                 */
123:                private int currentTransmitSequenceNumber = 0;
124:
125:                /**
126:                 * The next sequence number we expect to receive from the agent
127:                 * described by this AgentInfo
128:                 */
129:                private int currentReceiveSequenceNumber = 0;
130:
131:                /**
132:                 * The record of messages that have been acknowledged. We acknowledge the highest
133:
134:                /**
135:                 * The queue of messages that are outstanding.
136:                 */
137:                private TreeSet outstandingMessages = new TreeSet();
138:
139:                public void addOutstandingMessage(TimestampedMessage tsm) {
140:                    outstandingMessages.add(tsm);
141:                    needAdvanceEpoch = true;
142:                }
143:
144:                public TimestampedMessage[] getOutstandingMessages() {
145:                    return (TimestampedMessage[]) outstandingMessages
146:                            .toArray(new TimestampedMessage[outstandingMessages
147:                                    .size()]);
148:                }
149:
150:                public synchronized TimestampedMessage getFirstOutstandingMessage() {
151:                    if (outstandingMessages.isEmpty())
152:                        return null;
153:                    return (TimestampedMessage) outstandingMessages.first();
154:                }
155:
156:                /** Which messages have we actually processed (need acks) */
157:                private AckSet ackSet = new AckSet(1);
158:
159:                private boolean needSendAcknowledgement = false;
160:
161:                private transient long transmissionTime = 0;
162:
163:                public synchronized long getTransmissionTime() {
164:                    return transmissionTime;
165:                }
166:
167:                public synchronized void setTransmissionTime(long now) {
168:                    transmissionTime = now;
169:                }
170:
171:                public void acknowledgeMessage(ClusterMessage aMessage) {
172:                    ackSet.set(aMessage.getContentsId());
173:                    needAdvanceEpoch = true;
174:                }
175:
176:                public boolean needSendAcknowledgement() {
177:                    return needSendAcknowledgement;
178:                }
179:
180:                public void setNeedSendAcknowledgment() {
181:                    needSendAcknowledgement = true;
182:                    ackSender.poke();
183:                }
184:
185:                public boolean getRestarted() {
186:                    return restarted;
187:                }
188:
189:                public void setRestarted(boolean newRestarted) {
190:                    restarted = newRestarted;
191:                }
192:
193:                public void advance() {
194:                    int oldMin = ackSet.getMinSequence();
195:                    if (ackSet.advance() > oldMin) {
196:                        needAdvanceEpoch = true; // State has change, need to persist
197:                        setNeedSendAcknowledgment(); // Also need to send an ack
198:                    }
199:                }
200:
201:                /**
202:                 * Create an acknowledgement for the current min sequence of the
203:                 * ackset.
204:                 */
205:                public AckDirectiveMessage getAcknowledgement() {
206:                    int firstZero = ackSet.getMinSequence();
207:                    AckDirectiveMessage ack = new AckDirectiveMessage(
208:                            getMessageAddress(), self, firstZero - 1,
209:                            remoteIncarnationNumber);
210:                    needSendAcknowledgement = false;
211:                    return ack;
212:                }
213:
214:                public void receiveAck(MessageManagerImpl mm, int sequence,
215:                        boolean isRestart) {
216:                    long now = System.currentTimeMillis();
217:                    for (Iterator messages = outstandingMessages.iterator(); messages
218:                            .hasNext();) {
219:                        TimestampedMessage tsm = (TimestampedMessage) messages
220:                                .next();
221:                        if (tsm.getSequenceNumber() <= sequence) {
222:                            mm.printMessage("Remv", tsm);
223:                            messages.remove();
224:                        } else if (isRestart) {
225:                            tsm.setTimestamp(now); // Retransmit this ASAP
226:                        } else {
227:                            break; // Nothing left to do
228:                        }
229:                    }
230:                }
231:
232:                /**
233:                 * Check that the given message has the right sequence number.
234:                 * @return a code indicating whether the message is old, current,
235:                 * or future.
236:                 */
237:                public int checkReceiveSequenceNumber(DirectiveMessage aMessage) {
238:                    int seq = aMessage.getContentsId();
239:                    if (remoteIncarnationNumber == 0L)
240:                        return FUTURE;
241:                    if (seq <= currentReceiveSequenceNumber)
242:                        return DUPLICATE;
243:                    if (seq > currentReceiveSequenceNumber + 1)
244:                        return FUTURE;
245:                    return PRESENT;
246:                }
247:
248:                public long getLocalIncarnationNumber() {
249:                    return localIncarnationNumber;
250:                }
251:
252:                public long getRemoteIncarnationNumber() {
253:                    return remoteIncarnationNumber;
254:                }
255:
256:                public void setRemoteIncarnationNumber(long incarnationNumber) {
257:                    remoteIncarnationNumber = incarnationNumber;
258:                }
259:
260:                public int getCurrentTransmitSequenceNumber() {
261:                    return currentTransmitSequenceNumber;
262:                }
263:
264:                public int getNextTransmitSequenceNumber() {
265:                    return ++currentTransmitSequenceNumber;
266:                }
267:
268:                public int getCurrentReceiveSequenceNumber() {
269:                    return currentReceiveSequenceNumber;
270:                }
271:
272:                /**
273:                 * Update the current receive sequence number of this other agent.
274:                 * @param seqno the new current sequence number of this other
275:                 * agent.
276:                 */
277:                public void updateReceiveSequenceNumber(int seqno) {
278:                    currentReceiveSequenceNumber = seqno;
279:                    needAdvanceEpoch = true; // Our state changed need to persist
280:                }
281:
282:                public String toString() {
283:                    return "AgentInfo " + agentIdentifier + " "
284:                            + incarnationToString(localIncarnationNumber)
285:                            + "->"
286:                            + incarnationToString(remoteIncarnationNumber);
287:                }
288:            }
289:
290:            /**
291:             * Tag a message on a retransmit queue with the time at which it
292:             * should next be sent.
293:             */
294:            private class TimestampedMessage implements  Comparable,
295:                    java.io.Serializable {
296:                protected transient long timestamp = System.currentTimeMillis();
297:                private transient int nTries = 0;
298:
299:                protected transient DirectiveMessage theMessage;
300:
301:                private MessageAddress theDestination;
302:                private int theSequenceNumber;
303:                private long theIncarnationNumber;
304:                private Directive[] theDirectives;
305:                private AgentInfo info;
306:
307:                TimestampedMessage(AgentInfo info, DirectiveMessage aMsg) {
308:                    this .info = info;
309:                    theMessage = aMsg;
310:                    theDestination = aMsg.getDestination();
311:                    theSequenceNumber = aMsg.getContentsId();
312:                    theIncarnationNumber = aMsg.getIncarnationNumber();
313:                    theDirectives = aMsg.getDirectives();
314:                }
315:
316:                public void send(long now) {
317:                    msgSwitch.sendMessage(getMessage());
318:                    long nextRetransmission = now
319:                            + retransmitSchedule[Math.min(nTries++,
320:                                    retransmitSchedule.length - 1)];
321:                    setTimestamp(nextRetransmission);
322:                }
323:
324:                /**
325:                 * Get the DirectiveMessage. If theMessage is null, create a new
326:                 * one. theMessage will be null only after rehydration of the
327:                 * message manager.
328:                 */
329:                public DirectiveMessage getMessage() {
330:                    if (theMessage == null) {
331:                        theMessage = new DirectiveMessage(getSource(),
332:                                getDestination(), theIncarnationNumber,
333:                                getDirectives());
334:                        theMessage.setContentsId(theSequenceNumber);
335:                    }
336:                    theMessage.setAllMessagesAcknowledged(info
337:                            .getFirstOutstandingMessage() == this );
338:                    return theMessage;
339:                }
340:
341:                public MessageAddress getDestination() {
342:                    return theDestination;
343:                }
344:
345:                public MessageAddress getSource() {
346:                    return self;
347:                }
348:
349:                public int getSequenceNumber() {
350:                    return theSequenceNumber;
351:                }
352:
353:                public long getIncarnationNumber() {
354:                    return theIncarnationNumber;
355:                }
356:
357:                public Directive[] getDirectives() {
358:                    return theDirectives;
359:                }
360:
361:                public void setTimestamp(long ts) {
362:                    timestamp = ts;
363:                }
364:
365:                public int compareTo(Object other) {
366:                    TimestampedMessage otherMsg = (TimestampedMessage) other;
367:                    return this .theSequenceNumber - otherMsg.theSequenceNumber;
368:                }
369:
370:                public String toString() {
371:                    StringBuffer buf = new StringBuffer();
372:                    buf.append("seq(");
373:                    buf.append(theIncarnationNumber);
374:                    buf.append("/");
375:                    buf.append(theSequenceNumber);
376:                    buf.append(") ");
377:                    StringUtility.appendArray(buf, theDirectives);
378:                    return buf.substring(0);
379:                }
380:            }
381:
382:            private static long[] retransmitSchedule = { 20000L, 20000L,
383:                    60000L, 120000L, 300000L };
384:
385:            public MessageManagerImpl(boolean enable) {
386:                USE_MESSAGE_MANAGER = enable;
387:            }
388:
389:            public void start(MessageSwitchService msgSwitch,
390:                    boolean didRehydrate) {
391:                self = msgSwitch.getMessageAddress();
392:                this .msgSwitch = msgSwitch;
393:                String agentName = self.getAddress();
394:                agentNameForLog = "               ".substring(Math.min(14,
395:                        agentName.length()))
396:                        + agentName + " ";
397:                if (debug) {
398:                    try {
399:                        logWriter = new PrintWriter(new FileWriter(
400:                                "MessageManager_" + agentName + ".log",
401:                                true || didRehydrate));
402:                        printLog("MessageManager Started");
403:                    } catch (IOException e) {
404:                        System.err
405:                                .println("Can't open MessageManager log file: "
406:                                        + e);
407:                    }
408:                }
409:
410:                if (USE_MESSAGE_MANAGER) {
411:                    retransmitter = new Retransmitter(agentName);
412:                    retransmitter.start();
413:                    ackSender = new AcknowledgementSender(agentName);
414:                    ackSender.start();
415:                    keepAliveSender = new KeepAliveSender(agentName);
416:                    keepAliveSender.start();
417:                }
418:            }
419:
420:            public void stop() {
421:                if (USE_MESSAGE_MANAGER) {
422:                    // TODO postponed until needed
423:                    System.err
424:                            .println("\nFIXME MessageManager \"stop()\" for (USE_MESSAGE_MANAGER == true) "
425:                                    + "should halt internal threads");
426:                }
427:            }
428:
429:            private synchronized void sendKeepAlive() {
430:                ArrayList messages = new ArrayList(agentInfo.size());
431:                Directive[] directives = new Directive[0];
432:                long now = System.currentTimeMillis();
433:                for (Iterator agents = agentInfo.values().iterator(); agents
434:                        .hasNext();) {
435:                    AgentInfo info = (AgentInfo) agents.next();
436:                    if (info.getFirstOutstandingMessage() == null) {
437:                        if (now > info.getTransmissionTime()
438:                                + KEEP_ALIVE_INTERVAL) {
439:                            DirectiveMessage ndm = new DirectiveMessage(self,
440:                                    info.getMessageAddress(), info
441:                                            .getLocalIncarnationNumber(),
442:                                    directives);
443:                            messages.add(ndm);
444:                        }
445:                    }
446:                }
447:                sendMessages(messages.iterator());
448:            }
449:
450:            private void printMessage(String prefix, DirectiveMessage aMessage) {
451:                printMessage(prefix, aMessage.getIncarnationNumber(), aMessage
452:                        .getContentsId(), aMessage.getSource().getAddress(),
453:                        aMessage.getDestination().getAddress(), (aMessage
454:                                .areAllMessagesAcknowledged() ? " yes" : " no")
455:                                + StringUtility.arrayToString(aMessage
456:                                        .getDirectives()));
457:            }
458:
459:            private void printMessage(String prefix,
460:                    AckDirectiveMessage aMessage) {
461:                printMessage(prefix, aMessage.getIncarnationNumber(), aMessage
462:                        .getContentsId(), aMessage.getSource().getAddress(),
463:                        aMessage.getDestination().getAddress(), "");
464:            }
465:
466:            private void printMessage(String prefix, TimestampedMessage tsm) {
467:                printMessage(prefix, tsm.getIncarnationNumber(), tsm
468:                        .getSequenceNumber(), tsm.getSource().getAddress(), tsm
469:                        .getDestination().getAddress(), " ???"
470:                        + StringUtility.arrayToString(tsm.getDirectives()));
471:            }
472:
473:            private Date tDate = new Date();
474:            private SimpleDateFormat incarnationFormat = new SimpleDateFormat(
475:                    "yyyy/MM/dd/hh:mm:ss.SSS");
476:
477:            private String incarnationToString(long l) {
478:                if (l == 0L)
479:                    return "<none>";
480:                tDate.setTime(l);
481:                return incarnationFormat.format(tDate);
482:            }
483:
484:            private void printMessage(String prefix, long incarnationNumber,
485:                    int sequence, String from, String to, String contents) {
486:                tDate.setTime(incarnationNumber);
487:                String msg = prefix + " " + sequence + " " + from + "->" + to
488:                        + " (" + incarnationFormat.format(tDate) + "): "
489:                        + contents;
490:                //      System.out.println(msg);
491:                if (logWriter != null) {
492:                    printLog(msg);
493:                }
494:            }
495:
496:            private void printLog(String msg) {
497:                logWriter.print(logTimeFormat.format(new Date(System
498:                        .currentTimeMillis())));
499:                logWriter.print(agentNameForLog);
500:                logWriter.println(msg);
501:                logWriter.flush();
502:            }
503:
504:            /**
505:             * Submit a DirectiveMessage for transmission from this agent. The
506:             * message is added to the set of message to be transmitted at the
507:             * end of the current epoch.
508:             */
509:            public void sendMessages(Iterator messages) {
510:                if (USE_MESSAGE_MANAGER) {
511:                    synchronized (this ) {
512:                        while (messages.hasNext()) {
513:                            DirectiveMessage aMessage = (DirectiveMessage) messages
514:                                    .next();
515:                            AgentInfo info = getAgentInfo(aMessage
516:                                    .getDestination());
517:                            if (info == null) {
518:                                if (debug)
519:                                    printLog("sendMessage createNewConnection");
520:                                info = createNewConnection(aMessage
521:                                        .getDestination(), 0L);
522:                            }
523:                            aMessage.setIncarnationNumber(info
524:                                    .getLocalIncarnationNumber());
525:                            aMessage.setContentsId(info
526:                                    .getNextTransmitSequenceNumber());
527:                            stuffToSend.add(new TimestampedMessage(info,
528:                                    aMessage));
529:                            if (debug)
530:                                printMessage("QSnd", aMessage);
531:                        }
532:                        needAdvanceEpoch = true;
533:                    }
534:                } else {
535:                    while (messages.hasNext()) {
536:                        msgSwitch.sendMessage((DirectiveMessage) messages
537:                                .next());
538:                    }
539:                }
540:            }
541:
542:            private Directive[] emptyDirectives = new Directive[0];
543:
544:            private void sendInitializeMessage(AgentInfo info) {
545:                DirectiveMessage msg = new DirectiveMessage(self, info
546:                        .getMessageAddress(), info.getLocalIncarnationNumber(),
547:                        emptyDirectives);
548:                msg.setContentsId(info.getNextTransmitSequenceNumber());
549:                stuffToSend.add(new TimestampedMessage(info, msg));
550:                if (debug)
551:                    printMessage("QSnd", msg);
552:                needAdvanceEpoch = true;
553:            }
554:
555:            private AgentInfo getAgentInfo(MessageAddress agentIdentifier) {
556:                return (AgentInfo) agentInfo.get(agentIdentifier);
557:            }
558:
559:            private AgentInfo createAgentInfo(MessageAddress agentIdentifier) {
560:                AgentInfo info = new AgentInfo(agentIdentifier);
561:                agentInfo.put(agentIdentifier, info);
562:                return info;
563:            }
564:
565:            /**
566:             * Check a received DirectiveMessage for being a duplicate.
567:             * @param aMessage The received DirectiveMessage
568:             * @return DUPLICATE, FUTURE, RESTART, IGNORE, or OK
569:             */
570:            public int receiveMessage(DirectiveMessage directiveMessage) {
571:                if (!USE_MESSAGE_MANAGER)
572:                    return OK;
573:                synchronized (this ) {
574:                    boolean restarted = false;
575:                    MessageAddress sourceIdentifier = directiveMessage
576:                            .getSource();
577:                    AgentInfo info = getAgentInfo(sourceIdentifier);
578:                    boolean isFirst = directiveMessage.getContentsId() == 1;
579:                    if (info != null) {
580:                        if (info.getRestarted()) {
581:                            restarted = true;
582:                            info.setRestarted(false);
583:                        }
584:                        long infoIncarnation = info
585:                                .getRemoteIncarnationNumber();
586:                        long messageIncarnation = directiveMessage
587:                                .getIncarnationNumber();
588:                        if (infoIncarnation != messageIncarnation) {
589:                            if (infoIncarnation == 0L) {
590:                                if (isFirst) {
591:                                    info
592:                                            .setRemoteIncarnationNumber(messageIncarnation);
593:                                } else {
594:                                    if (debug)
595:                                        printMessage("Nnz1", directiveMessage);
596:                                    info.setNeedSendAcknowledgment();
597:                                    return restarted ? (IGNORE | RESTART)
598:                                            : IGNORE; // Stray message
599:                                }
600:                            } else if (messageIncarnation < infoIncarnation) {
601:                                if (debug)
602:                                    printMessage("Prev", directiveMessage);
603:                                info.setNeedSendAcknowledgment();
604:                                // Message from previous incarnation of remote agent
605:                                return restarted ? (IGNORE | RESTART) : IGNORE;
606:                            } else if (messageIncarnation > infoIncarnation) {
607:                                // Message from new incarnation
608:                                if (isFirst) { // Synchronize to new incarnation
609:                                    if (debug)
610:                                        printLog("receiveMessage messageIncarnation > infoIncarnation");
611:                                    info = createNewConnection(
612:                                            sourceIdentifier, directiveMessage
613:                                                    .getIncarnationNumber());
614:                                    restarted = true;
615:                                } else {
616:                                    if (debug)
617:                                        printMessage("Nnz2", directiveMessage);
618:                                    info.setNeedSendAcknowledgment();
619:                                    // Apparently new incarnation, but not sequence 0
620:                                    return restarted ? (IGNORE | RESTART)
621:                                            : IGNORE;
622:                                }
623:                            }
624:                        }
625:                    } else {
626:                        if (isFirst) {
627:                            if (debug)
628:                                printLog("receiveMessage null info is first");
629:                            info = createNewConnection(sourceIdentifier,
630:                                    directiveMessage.getIncarnationNumber());
631:                        } else {
632:                            if (debug)
633:                                printMessage(
634:                                        "receiveMessage null info not first",
635:                                        directiveMessage);
636:                            info = createNewConnection(sourceIdentifier, 0L);
637:                            return IGNORE; // Must have sequence zero to synchronize
638:                        }
639:                    }
640:                    switch (info.checkReceiveSequenceNumber(directiveMessage)) {
641:                    case DUPLICATE:
642:                        if (debug)
643:                            printMessage("Dupl", directiveMessage);
644:                        info.setNeedSendAcknowledgment();
645:                        return IGNORE;
646:                    default:
647:                    case FUTURE:
648:                        if (directiveMessage.areAllMessagesAcknowledged()) {
649:                            // We are out of sync
650:                            if (debug)
651:                                printLog("receiveMessage from future all acked");
652:                            info = createNewConnection(sourceIdentifier, 0L);
653:                            return IGNORE | RESTART;
654:                        }
655:                        if (debug)
656:                            printMessage("Futr", directiveMessage);
657:                        return IGNORE; // Message out of order; ignore it
658:                    case OK:
659:                        if (debug)
660:                            printMessage("Rcvd", directiveMessage);
661:                        info.updateReceiveSequenceNumber(directiveMessage
662:                                .getContentsId());
663:                        needAdvanceEpoch = true;
664:                        return restarted ? (RESTART | OK) : OK;
665:                    }
666:                }
667:            }
668:
669:            private AgentInfo createNewConnection(
670:                    MessageAddress sourceIdentifier,
671:                    long remoteIncarnationNumber) {
672:                AgentInfo info = createAgentInfo(sourceIdentifier); // New connection
673:                info.setRemoteIncarnationNumber(remoteIncarnationNumber);
674:                if (debug)
675:                    printLog("New Connection: " + info.toString());
676:                sendInitializeMessage(info);
677:                return info;
678:            }
679:
680:            public void acknowledgeMessages(Iterator messages) {
681:                if (!USE_MESSAGE_MANAGER)
682:                    return;
683:                synchronized (this ) {
684:                    while (messages.hasNext()) {
685:                        DirectiveMessage aMessage = (DirectiveMessage) messages
686:                                .next();
687:                        if (aMessage.getContentsId() == 0)
688:                            return; // Not reliably sent
689:                        AgentInfo info = getAgentInfo(aMessage.getSource());
690:                        info.acknowledgeMessage(aMessage);
691:                        needAdvanceEpoch = true;
692:                        if (debug)
693:                            printMessage("QAck", aMessage);
694:                    }
695:                }
696:            }
697:
698:            /**
699:             * Process a directive acknowledgement. The acknowledged messages
700:             * are removed from the retransmission queues. If the ack is marked
701:             * as having been sent during a agent restart, we speed up the
702:             * retransmission process to hasten the recovery process.
703:             */
704:            public int receiveAck(AckDirectiveMessage theAck) {
705:                synchronized (this ) {
706:                    if (debug)
707:                        printMessage("RAck", theAck);
708:                    AgentInfo info = getAgentInfo(theAck.getSource());
709:                    if (info != null) {
710:                        boolean restarted = false;
711:                        if (info.getRestarted()) {
712:                            info.setRestarted(false);
713:                            restarted = true;
714:                        }
715:                        long localIncarnationNumber = info
716:                                .getLocalIncarnationNumber();
717:                        long ackIncarnationNumber = theAck
718:                                .getIncarnationNumber();
719:                        if (localIncarnationNumber == ackIncarnationNumber) {
720:                            int seq = theAck.getContentsId();
721:                            if (info.getCurrentTransmitSequenceNumber() < seq) {
722:                                if (debug)
723:                                    printLog("receiveAck from future same incarnation");
724:                                createNewConnection(info.getMessageAddress(),
725:                                        0L);
726:                                return RESTART;
727:                            }
728:                            info.receiveAck(this , seq, false);
729:                            return restarted ? (RESTART | OK) : OK;
730:                        } else if (localIncarnationNumber < ackIncarnationNumber) {
731:                            // We are living in the past. We must have rehydrated with
732:                            // an old set of connections.
733:                            if (debug)
734:                                printLog("receiveAck from future incarnation");
735:                            createNewConnection(info.getMessageAddress(), 0L);
736:                            return RESTART;
737:                        } else {
738:                            // The other end is living in the past. Hopefully, he will
739:                            // eventually get with the program.
740:                            if (debug)
741:                                printLog("receiveAck from past incarnation");
742:                            return restarted ? (IGNORE | RESTART) : IGNORE;
743:                        }
744:                    } else {
745:                        return IGNORE;
746:                    }
747:                }
748:            }
749:
750:            /**
751:             * Determine if anything has happened during this epoch.
752:             * @return true if anything has changed.
753:             */
754:            public boolean needAdvanceEpoch() {
755:                return needAdvanceEpoch;
756:            }
757:
758:            /**
759:             * Wrap up the current epoch and get into the correct state to be
760:             * persisted. Every message that has been queued for transmission is
761:             * sent. Acknowledgement numbers are advanced so we begin
762:             * acknowledging messages we have received and processed. This
763:             * method must be called while this MessageManager is
764:             * synchronized. We purposely omit the "synchronized" here because
765:             * proper operation is precluded unless the synchronization is
766:             * performed externally.
767:             */
768:            public void advanceEpoch() {
769:                if (!USE_MESSAGE_MANAGER)
770:                    return;
771:                // Advance the information about every other agent
772:                for (Iterator agents = agentInfo.values().iterator(); agents
773:                        .hasNext();) {
774:                    AgentInfo info = (AgentInfo) agents.next();
775:                    info.advance();
776:                }
777:                needAdvanceEpoch = false;
778:                for (Iterator iter = stuffToSend.iterator(); iter.hasNext();) {
779:                    TimestampedMessage tsm = (TimestampedMessage) iter.next();
780:                    getAgentInfo(tsm.getDestination()).addOutstandingMessage(
781:                            tsm);
782:                    retransmitter.poke();
783:                }
784:                stuffToSend.clear();
785:                if (logWriter != null) {
786:                    printLog("Advanced epoch");
787:                }
788:            }
789:
790:            private class KeepAliveSender extends Thread {
791:                public KeepAliveSender(String agentName) {
792:                    super ("Keep Alive Sender/" + agentName);
793:                }
794:
795:                public void run() {
796:                    while (true) {
797:                        sendKeepAlive();
798:                        try {
799:                            sleep(KEEP_ALIVE_INTERVAL);
800:                        } catch (InterruptedException ie) {
801:                        }
802:                    }
803:                }
804:            }
805:
806:            private class AcknowledgementSender extends Thread {
807:                private boolean poked = false;
808:                ArrayList acksToSend = new ArrayList();
809:
810:                public AcknowledgementSender(String agentName) {
811:                    super ("Ack Sender/" + agentName);
812:                }
813:
814:                public synchronized void poke() {
815:                    poked = true;
816:                    AcknowledgementSender.this .notify();
817:                }
818:
819:                public void run() {
820:                    while (true) {
821:                        synchronized (AcknowledgementSender.this ) {
822:                            while (!poked) {
823:                                try {
824:                                    AcknowledgementSender.this .wait();
825:                                } catch (InterruptedException ie) {
826:                                }
827:                            }
828:                            poked = false;
829:                        }
830:                        synchronized (MessageManagerImpl.this ) {
831:                            for (Iterator agents = agentInfo.values()
832:                                    .iterator(); agents.hasNext();) {
833:                                AgentInfo info = (AgentInfo) agents.next();
834:                                if (info.needSendAcknowledgement()) {
835:                                    acksToSend.add(info.getAcknowledgement());
836:                                }
837:                            }
838:                        }
839:                        for (Iterator iter = acksToSend.iterator(); iter
840:                                .hasNext();) {
841:                            AckDirectiveMessage ack = (AckDirectiveMessage) iter
842:                                    .next();
843:                            if (debug)
844:                                printMessage("SAck", ack);
845:                            msgSwitch.sendMessage(ack);
846:                        }
847:                        acksToSend.clear();
848:                    }
849:                }
850:            }
851:
852:            private class Retransmitter extends Thread {
853:                private boolean poked = false;
854:                private ArrayList messagesToRetransmit = new ArrayList();
855:
856:                public Retransmitter(String agentName) {
857:                    super (agentName + "/Message Manager");
858:                }
859:
860:                public synchronized void poke() {
861:                    poked = true;
862:                    Retransmitter.this .notify();
863:                }
864:
865:                /**
866:                 * Retransmit messages that have not been acknowledged. Iterate
867:                 * through all the agents for which we have AgentInfo and
868:                 * interate through all the outstandmessage that have been sent to
869:                 * that agent. Check the time to retransmit of the message and if
870:                 * the current time has passed that time, then retransmit the
871:                 * message. Keep the earliest time of any message that is not ready
872:                 * to be retransmitted and sleep long enough so that there could be
873:                 * at least one message to retransmit when we awaken.
874:                 */
875:                public void run() {
876:                    while (true) {
877:                        try {
878:                            long now = System.currentTimeMillis();
879:                            long earliestTime = now + retransmitSchedule[0];
880:                            synchronized (MessageManagerImpl.this ) {
881:                                for (Iterator agents = agentInfo.values()
882:                                        .iterator(); agents.hasNext();) {
883:                                    AgentInfo info = (AgentInfo) agents.next();
884:                                    TimestampedMessage tsm = info
885:                                            .getFirstOutstandingMessage();
886:                                    if (tsm == null)
887:                                        continue;
888:                                    if (tsm.timestamp <= now) {
889:                                        TimestampedMessage[] messages = info
890:                                                .getOutstandingMessages();
891:                                        info.setTransmissionTime(now);
892:                                        messagesToRetransmit
893:                                                .addAll(java.util.Arrays
894:                                                        .asList(messages));
895:                                    } else if (tsm.timestamp < earliestTime) {
896:                                        earliestTime = tsm.timestamp;
897:                                    }
898:                                }
899:                            }
900:                            if (!messagesToRetransmit.isEmpty()) {
901:                                for (Iterator iter = messagesToRetransmit
902:                                        .iterator(); iter.hasNext();) {
903:                                    TimestampedMessage tsm = (TimestampedMessage) iter
904:                                            .next();
905:                                    tsm.send(now);
906:                                    if (tsm.timestamp < earliestTime) {
907:                                        earliestTime = tsm.timestamp;
908:                                    }
909:                                    if (debug)
910:                                        printMessage(tsm.nTries == 1 ? "Send"
911:                                                : ("Rxm" + tsm.nTries), tsm);
912:                                }
913:                                messagesToRetransmit.clear();
914:                            }
915:                            synchronized (Retransmitter.this ) {
916:                                if (!poked) {
917:                                    long sleepTime = 5000L + earliestTime - now;
918:                                    if (sleepTime > 30000L)
919:                                        sleepTime = 30000L;
920:                                    Retransmitter.this .wait(sleepTime);
921:                                }
922:                                poked = false;
923:                            }
924:                        } catch (Exception e) {
925:                            e.printStackTrace();
926:                        }
927:                    }
928:                }
929:            }
930:
931:            /** Serialize ourselves. Used for persistence. */
932:            private void writeObject(ObjectOutputStream os) throws IOException {
933:                synchronized (this ) {
934:                    if (stuffToSend.size() > 0) {
935:                        throw new IOException("Non-empty stuffToSend");
936:                    }
937:                    os.defaultWriteObject();
938:                }
939:            }
940:
941:            private void readObject(ObjectInputStream is) throws IOException,
942:                    ClassNotFoundException {
943:                is.defaultReadObject();
944:                stuffToSend = new ArrayList();
945:                needAdvanceEpoch = false;
946:                //      for (Iterator agents = agentInfo.values().iterator(); agents.hasNext(); ) {
947:                //        AgentInfo info = (AgentInfo) agents.next();
948:                //        info.setRestarted(true);
949:                //      }
950:            }
951:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.