Source Code Cross Referenced for TOTAL_OLD.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » protocols » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.protocols 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        // $Id: TOTAL_OLD.java,v 1.12 2006/01/19 09:53:37 belaban Exp $
0002:
0003:        package org.jgroups.protocols;
0004:
0005:        import org.apache.commons.logging.Log;
0006:        import org.apache.commons.logging.LogFactory;
0007:        import org.jgroups.*;
0008:        import org.jgroups.util.Util;
0009:        import org.jgroups.stack.Protocol;
0010:
0011:        import java.io.IOException;
0012:        import java.io.ObjectInput;
0013:        import java.io.ObjectOutput;
0014:        import java.util.Vector;
0015:
0016:        /**
0017:         * class SavedMessages
0018:         * <p/>
0019:         * Stores a set of messages along with their sequence id (assigned by the sequencer).
0020:         */
0021:        class SavedMessages {
0022:
0023:            final Log log = LogFactory.getLog(SavedMessages.class);
0024:
0025:            /**
0026:             * class Entry  (inner class)
0027:             * <p/>
0028:             * object type to store in the messages Vector (need to store sequence id in addition to message)
0029:             */
0030:            class Entry {
0031:                private final Message msg;
0032:                private final long seq;
0033:
0034:                Entry(Message msg, long seq) {
0035:                    this .msg = msg;
0036:                    this .seq = seq;
0037:                }
0038:
0039:                public Message getMsg() {
0040:                    return msg;
0041:                }
0042:
0043:                public long getSeq() {
0044:                    return seq;
0045:                }
0046:            } // class Entry
0047:
0048:            private final Vector messages; // vector of "Entry"s to store "Message"s, sorted by sequence id
0049:
0050:            /**
0051:             * Constructor - creates an empty space to store messages
0052:             */
0053:            SavedMessages() {
0054:                messages = new Vector();
0055:            }
0056:
0057:            /**
0058:             * inserts the specified message and sequence id into the "list" of stored messages
0059:             * if the sequence id given is already stored, then nothing is stored
0060:             */
0061:            public void insertMessage(Message msg, long seq) {
0062:                synchronized (messages) {
0063:                    int size = messages.size();
0064:                    int index = 0;
0065:                    long this _seq = -1; // used to prevent duplicate messages being stored
0066:
0067:                    // find the index where this message should be inserted
0068:                    try {
0069:                        while ((index < size)
0070:                                && ((this _seq = ((Entry) (messages
0071:                                        .elementAt(index))).getSeq()) < seq)) {
0072:                            index++;
0073:                        }
0074:                    } catch (java.lang.ClassCastException e) {
0075:                        log
0076:                                .error("Error: (TOTAL_OLD) SavedMessages.insertMessage() - ClassCastException: could not cast element of \"messages\" to an Entry (index "
0077:                                        + index + ')');
0078:                        return;
0079:                    }
0080:
0081:                    // check that the sequences aren't the same (don't want duplicates)
0082:                    if (this _seq == seq) {
0083:                        log
0084:                                .error("SavedMessages.insertMessage() - sequence "
0085:                                        + seq
0086:                                        + " already exists in saved messages. Message NOT saved.");
0087:                        return;
0088:                    }
0089:
0090:                    messages.insertElementAt(new Entry(msg, seq), index);
0091:                } // synchronized( messages )
0092:            }
0093:
0094:            /**
0095:             * returns a copy of the stored message with the given sequence id
0096:             * if delete_msg is true, then the message is removed from the
0097:             * the list of stored messages, otherwise the message is not
0098:             * removed from the list
0099:             * if no message is stored with this sequence id, null is returned
0100:             */
0101:            private Message getMessage(long seq, boolean delete_msg) {
0102:                synchronized (messages) {
0103:                    int size = messages.size();
0104:                    int index = 0;
0105:                    long this _seq = -1;
0106:                    try {
0107:                        while ((index < size)
0108:                                && ((this _seq = (((Entry) (messages
0109:                                        .elementAt(index))).getSeq())) < seq)) {
0110:                            index++;
0111:                        }
0112:                    } catch (java.lang.ClassCastException e) {
0113:                        log
0114:                                .error("Error: (TOTAL_OLD) SavedMessages.getMessage() - ClassCastException: could not cast element of \"messages\" to an Entry (index "
0115:                                        + index + ')');
0116:                        return null;
0117:                    }
0118:                    // determine if we found the specified sequence
0119:                    if (this _seq == seq) {
0120:                        // we found the message at index
0121:                        Object temp_obj = messages.elementAt(index);
0122:                        if (temp_obj instanceof  Entry) {
0123:                            Message ret_val = ((Entry) temp_obj).getMsg()
0124:                                    .copy();
0125:
0126:                            // should we delete
0127:                            if (delete_msg) {
0128:                                messages.removeElementAt(index);
0129:                            }
0130:
0131:                            return ret_val;
0132:                        } else {
0133:                            log
0134:                                    .error("Error: (TOTAL_OLD) SavedMessages.getMessage() - could not cast element of \"messages\" to an Entry (index "
0135:                                            + index + ')');
0136:                            return null;
0137:                        } // if ( temp_obj instanceof Entry )
0138:                    } else {
0139:                        // we didn't find this sequence number in the messages
0140:                        return null;
0141:                    }
0142:                } // synchronized( messages )
0143:            }
0144:
0145:            /**
0146:             * returns a stored message with the given sequence id
0147:             * the message is then removed from the list of stored messages
0148:             * if no message is stored with this sequence id, null is returned
0149:             */
0150:            public Message getMessage(long seq) {
0151:                return getMessage(seq, true);
0152:            }
0153:
0154:            /**
0155:             * similar to GetMessage, except a copy of the message is returned
0156:             * and the message is not removed from the list
0157:             */
0158:            public Message peekMessage(long seq) {
0159:                return getMessage(seq, false);
0160:            }
0161:
0162:            /**
0163:             * returns a copy of the stored message with the lowest sequence id
0164:             * if delete_msg is true, then the message is removed from the
0165:             * the list of stored messages, otherwise the message is not
0166:             * removed from the list
0167:             * if their are no messages stored, null is returned
0168:             */
0169:            private Message getFirstMessage(boolean delete_msg) {
0170:                synchronized (messages) {
0171:                    if (isEmpty()) {
0172:                        return null;
0173:                    } else {
0174:                        Object temp_obj = messages.firstElement();
0175:                        if (temp_obj instanceof  Entry) {
0176:                            Message ret_val = ((Entry) temp_obj).getMsg()
0177:                                    .copy();
0178:                            messages.removeElementAt(0);
0179:                            return ret_val;
0180:                        } else {
0181:                            log
0182:                                    .error("Error: (TOTAL_OLD) SavedMessages.getFirstMessage() - could not cast element of \"messages\" to an Entry");
0183:                            return null;
0184:                        } // if ( temp_obj instanceof Entry )
0185:                    }
0186:                } // synchronized( messages )
0187:            }
0188:
0189:            /**
0190:             * returns the stored message with the lowest sequence id;
0191:             * the message is then removed from the list of stored messages
0192:             * if their are no messages stored, null is returned
0193:             */
0194:            public synchronized Message getFirstMessage() {
0195:                return getFirstMessage(true);
0196:            }
0197:
0198:            /**
0199:             * similar to GetFirstMessage, except a copy of the message is returned
0200:             * and the message is not removed from the list
0201:             */
0202:            public Message peekFirstMessage() {
0203:                return getFirstMessage(false);
0204:            }
0205:
0206:            /**
0207:             * returns the lowest sequence id of the messages stored
0208:             * if no messages are stored, -1 is returned
0209:             */
0210:            public long getFirstSeq() {
0211:                synchronized (messages) {
0212:                    if (isEmpty()) {
0213:                        return -1;
0214:                    } else {
0215:                        Object temp_obj = messages.firstElement();
0216:                        if (temp_obj instanceof  Entry) {
0217:                            return ((Entry) temp_obj).getSeq();
0218:                        } else {
0219:                            log
0220:                                    .error("Error: (TOTAL_OLD) SavedMessages.getFirstSeq() - could not cast element of \"messages\" to an Entry ");
0221:                            return -1;
0222:                        }
0223:                    }
0224:                } // synchronized( messages )
0225:            }
0226:
0227:            /**
0228:             * returns true if there are messages stored
0229:             * returns false if there are no messages stored
0230:             */
0231:            public boolean isEmpty() {
0232:                return messages.isEmpty();
0233:            }
0234:
0235:            /**
0236:             * returns the number of messages stored
0237:             */
0238:            public int getSize() {
0239:                return messages.size();
0240:            }
0241:
0242:            /**
0243:             * clears all of the stored messages
0244:             */
0245:            public void clearMessages() {
0246:                synchronized (messages) {
0247:                    messages.removeAllElements();
0248:                }
0249:            }
0250:        } // class SavedMessages
0251:
0252:        /**
0253:         * class MessageAcks
0254:         * <p/>
0255:         * Used by sequencer to store cumulative acknowledgements of broadcast messages
0256:         * sent to the group in this view
0257:         */
0258:        class MessageAcks {
0259:
0260:            final Log log = LogFactory.getLog(MessageAcks.class);
0261:
0262:            // TODO: may also want to store some sort of timestamp in each Entry (maybe)
0263:            /**
0264:             * class Entry  (inner class)
0265:             * <p/>
0266:             * object type to store cumulative acknowledgements using a member's Address
0267:             * and the sequence id of a message
0268:             */
0269:            class Entry {
0270:                public final Address addr;
0271:                public long seq;
0272:
0273:                Entry(Address addr, long seq) {
0274:                    this .addr = addr;
0275:                    this .seq = seq;
0276:                }
0277:
0278:                Entry(Address addr) {
0279:                    this .addr = addr;
0280:                    this .seq = -1; // means that no acknowledgements have been made yet
0281:                }
0282:            } // class Entry
0283:
0284:            // Vector of "Entry"s representing cumulative acknowledgements for each member of the group
0285:            private final Vector acks;
0286:
0287:            private final SavedMessages message_history; // history of broadcast messages sent
0288:
0289:            /**
0290:             * Constructor - creates a Vector of "Entry"s given a Vector of "Address"es for the members
0291:             */
0292:            MessageAcks(Vector members) {
0293:                acks = new Vector();
0294:
0295:                // initialize the message history to contain no messages
0296:                message_history = new SavedMessages();
0297:
0298:                // insert slots for each member in the acknowledgement Vector
0299:                reset(members);
0300:            }
0301:
0302:            /**
0303:             * resets acknowledgement Vector with "Entry"s using the given Vector of "Address"es
0304:             * also clears the message history
0305:             */
0306:            public synchronized void reset(Vector members) {
0307:                clear();
0308:
0309:                // initialize Vector of acknowledgements (no acks for any member)
0310:                int num_members = members.size();
0311:                for (int i = 0; i < num_members; i++) {
0312:                    Object temp_obj = members.elementAt(i);
0313:                    if (temp_obj instanceof  Address) {
0314:                        acks.addElement(new Entry((Address) temp_obj));
0315:                    } else {
0316:                        log
0317:                                .error("Error: (TOTAL_OLD) MessageAcks.reset() - could not cast element of \"members\" to an Address object");
0318:                        return;
0319:                    }
0320:                }
0321:            }
0322:
0323:            /**
0324:             * clear all acknowledgements and the message history
0325:             */
0326:            private void clear() {
0327:                acks.removeAllElements();
0328:                message_history.clearMessages();
0329:            }
0330:
0331:            /**
0332:             * returns the Entry from the acknowledgement Vector with the given Address
0333:             * returns null if an Entry with the given Address is not found
0334:             */
0335:            private Entry getEntry(Address addr) {
0336:                synchronized (acks) {
0337:                    // look for this addreess in the acknowledgement Vector
0338:                    int size = acks.size();
0339:                    for (int i = 0; i < size; i++) {
0340:                        Object temp_obj = acks.elementAt(i);
0341:                        if (temp_obj instanceof  Entry) {
0342:                            Entry this _entry = (Entry) temp_obj;
0343:                            if ((this _entry.addr).equals(addr)) {
0344:                                // the given Address matches this entry
0345:                                return this _entry;
0346:                            }
0347:                        } else {
0348:                            log
0349:                                    .error("Error: (TOTAL_OLD) MessageAcks.getEntry() - could not cast element of \"acks\" to an Entry");
0350:                        } // if ( temp_obj instanceof Entry )
0351:                    }
0352:
0353:                    // if we get here, we didn't find this Address
0354:                    return null;
0355:                }
0356:            }
0357:
0358:            /**
0359:             * sets the sequence id for the given Address to the given value
0360:             * note: if the current sequence value for this host is greater than
0361:             * the given value, the sequence for this member is NOT changed
0362:             * (i.e. it will only set it to a larger value)
0363:             * if the given Address is not found in the member list,
0364:             * nothing is changed
0365:             */
0366:            public void setSeq(Address addr, long seq) {
0367:                Entry this _entry = getEntry(addr);
0368:                if ((this _entry != null) && (this _entry.seq < seq)) {
0369:                    this _entry.seq = seq;
0370:
0371:                    // try to remove any messages that we don't need anymore
0372:                    truncateHistory();
0373:                }
0374:            }
0375:
0376:            /**
0377:             * returns the sequence id of the "latest" cumulative acknowledgement
0378:             * for the specified Address
0379:             * if the Address is not found in the member list, a negative value
0380:             * is returned
0381:             * note: the value returned may also be negative if their have been
0382:             * no acknowledgements from the given address
0383:             */
0384:            public long getSeq(Address addr) {
0385:                Entry this _entry = getEntry(addr);
0386:                if (this _entry == null) {
0387:                    return -2; // TODO: change this to something else (e.g. constant) later  (maybe)
0388:                } else {
0389:                    return this _entry.seq;
0390:                }
0391:            }
0392:
0393:            /**
0394:             * returns the message in the history that matches the given sequence id
0395:             * returns null if no message exists in the history with this sequence id
0396:             */
0397:            public Message getMessage(long seq) {
0398:                return message_history.peekMessage(seq);
0399:            }
0400:
0401:            /**
0402:             * adds the given message (with the specified sequence id) to the
0403:             * message history
0404:             * if the given sequence id already exists in the message history,
0405:             * the message is NOT added
0406:             */
0407:            public void addMessage(Message msg, long seq) {
0408:                message_history.insertMessage(msg, seq);
0409:            }
0410:
0411:            /**
0412:             * returns the minimum cumulative acknowledged sequence id from all the members
0413:             * (i.e. the greatest sequence id cumulatively acknowledged by all members)
0414:             */
0415:            private long getLowestSeqAck() {
0416:                synchronized (acks) {
0417:                    long ret_val = -10; // start with a negative value
0418:
0419:                    int size = acks.size();
0420:                    for (int i = 0; i < size; i++) {
0421:                        Object temp_obj = acks.elementAt(i);
0422:                        if (temp_obj instanceof  Entry) {
0423:                            long this _seq = ((Entry) temp_obj).seq;
0424:                            if (this _seq < ret_val) {
0425:                                ret_val = this _seq;
0426:                            }
0427:                        } else {
0428:                            log
0429:                                    .error("Error: (TOTAL_OLD) MessageAcks.getLowestSeqAck() - could not cast element of \"acks\" to an Entry (index="
0430:                                            + i + ')');
0431:                            return -1;
0432:                        }
0433:                    }
0434:
0435:                    return ret_val;
0436:                }
0437:            }
0438:
0439:            /**
0440:             * removes messages from the history that have been acknowledged
0441:             * by all the members of the group
0442:             */
0443:            private synchronized void truncateHistory() {
0444:                long lowest_ack_seq = getLowestSeqAck();
0445:                if (lowest_ack_seq < 0) {
0446:                    // either no members, or someone has not received any messages yet
0447:                    //   either way, do nothing
0448:                    return;
0449:                }
0450:
0451:                // don't want message_history being altered during this operation
0452:                synchronized (message_history) {
0453:                    long lowest_stored_seq;
0454:                    // keep deleting the oldest stored message for as long as we can
0455:                    while (((lowest_stored_seq = message_history.getFirstSeq()) >= 0)
0456:                            && (lowest_stored_seq > lowest_ack_seq)) {
0457:                        // we can delete the oldest stored message
0458:                        message_history.getFirstMessage();
0459:                    }
0460:                } // synchronized( message_history )
0461:            }
0462:        } // class MessageAcks
0463:
0464:        /**
0465:         * **************************************************************************
0466:         * class TOTAL_OLD extends Protocol
0467:         * <p/>
0468:         * TODO: (more comments)
0469:         * Sequencer based total ordering protocol layer
0470:         * - requires the following layers "below" it in the stack
0471:         * (or layers with equivalent functionality):
0472:         * GMS, FD, PING, UDP, ...
0473:         *
0474:         * @author Manish Sambhu mms21@cornell.edu Spring 1999
0475:         *         **************************************************************************
0476:         */
0477:        public class TOTAL_OLD extends Protocol {
0478:            // the unique name of the protocol
0479:            private final static String PROTOCOL_NAME = "TOTAL_OLD";
0480:
0481:            private Address local_addr = null;
0482:            private Vector members = new Vector(); // note: members should never be null
0483:            //   (because of synchronized blocks)
0484:
0485:            /**
0486:             * next_seq_id
0487:             * the sequence id of the next message we expect to receive
0488:             * note: this value is only meaningful when non-negative
0489:             */
0490:            private long next_seq_id = -1;
0491:
0492:            /**
0493:             * next_seq_id_to_assign
0494:             * used only by the sequencer to assign sequence ids to requests
0495:             * and resend them to the group
0496:             * note: this value is only meaningful when non-negative
0497:             */
0498:            private long next_seq_id_to_assign = -1;
0499:
0500:            private final static long INIT_SEQ_ID = 10; // this value is pretty much arbitrary (should be positive though)
0501:
0502:            /**
0503:             * queued_messages
0504:             * broadcast messages that we received that we are storing so that we can
0505:             * deterministically order the messages based on their sequence ids
0506:             */
0507:            private final SavedMessages queued_messages = new SavedMessages();
0508:
0509:            /**
0510:             * ack_history
0511:             * used only by the sequencer
0512:             * stores the cumulative acks for each member of the group
0513:             * also stores messages that may be needed for resend requests
0514:             * (i.e. messages that have not been acked by all group members)
0515:             */
0516:            private MessageAcks ack_history = null;
0517:
0518:            /**
0519:             * retrans_thread
0520:             * thread that handles sending requests to the sequencer for messages
0521:             * that may not have been received but were expected to arrive
0522:             */
0523:            private final TotalRetransmissionThread retrans_thread = new TotalRetransmissionThread(
0524:                    this );
0525:
0526:            final Log log = LogFactory.getLog(TOTAL_OLD.class);
0527:
0528:            /**
0529:             * returns the unique name of this protocol
0530:             */
0531:            public String getName() {
0532:                return PROTOCOL_NAME;
0533:            }
0534:
0535:            public void start() throws Exception {
0536:                // Start work
0537:                retrans_thread.start();
0538:            }
0539:
0540:            public void stop() {
0541:                // stop the retransmission thread
0542:                retrans_thread.stopResendRequests();
0543:            }
0544:
0545:            /**
0546:             * Just remove if you don't need to reset any state
0547:             */
0548:            public void reset() {
0549:                // TODO: find out when this would be called, maybe do more here
0550:
0551:                // don't accept any messages until we receive a TOTAL_NEW_VIEW message from the sequencer
0552:                next_seq_id = -1;
0553:                // clear (i.e. delete) any messages that did not get propagated up
0554:                queued_messages.clearMessages();
0555:
0556:                // reset the retransmission thread state
0557:                retrans_thread.reset();
0558:            }
0559:
0560:            /**
0561:             * @return the next sequence id expected to be received in this view
0562:             */
0563:            protected long getNextSeqID() {
0564:                return next_seq_id;
0565:            }
0566:
0567:            /**
0568:             * Returns the sequence id of the "first" queued message
0569:             * (i.e., the lowest seq id queued).
0570:             * @return the sequence id of the queued message, or -1 if no messages are queued.
0571:             */
0572:            protected long getFirstQueuedSeqID() {
0573:                return queued_messages.getFirstSeq();
0574:            }
0575:
0576:            /**
0577:             * handles an Event coming up the Protocol Stack
0578:             */
0579:            public void up(Event evt) {
0580:                Message msg;
0581:
0582:                //System.out.println("UP: " + evt);
0583:
0584:                Object temp_obj; // used for type checking before performing casts
0585:                switch (evt.getType()) {
0586:
0587:                case Event.SET_LOCAL_ADDRESS:
0588:                    temp_obj = evt.getArg();
0589:                    if (temp_obj instanceof  Address) {
0590:                        local_addr = (Address) temp_obj;
0591:                    } else {
0592:                        log
0593:                                .error("Error: Total.up() - could not cast local address to an Address object");
0594:                    }
0595:                    break;
0596:
0597:                case Event.MSG:
0598:                    // get the message and the header for the TOTAL_OLD layer
0599:                    temp_obj = evt.getArg();
0600:                    if (temp_obj instanceof  Message) {
0601:                        msg = (Message) temp_obj;
0602:                        temp_obj = msg.removeHeader(getName());
0603:                        if (temp_obj instanceof  TotalHeader) {
0604:                            TotalHeader hdr = (TotalHeader) temp_obj;
0605:
0606:                            // switch on the "command" defined by the header
0607:                            switch (hdr.total_header_type) {
0608:
0609:                            case TotalHeader.TOTAL_UNICAST:
0610:                                // don't process this message, just pass it up (TotalHeader header already removed)
0611:                                passUp(evt);
0612:                                return;
0613:
0614:                            case TotalHeader.TOTAL_BCAST:
0615:                                handleBCastMessage(msg, hdr.seq_id);
0616:                                break;
0617:
0618:                            case TotalHeader.TOTAL_REQUEST:
0619:                                // if we are the sequencer, respond to this request
0620:                                if (isSequencer()) {
0621:                                    handleRequestMessage(msg);
0622:                                }
0623:                                break;
0624:
0625:                            case TotalHeader.TOTAL_NEW_VIEW:
0626:                                // store the sequence id that we should expect next
0627:                                next_seq_id = hdr.seq_id;
0628:
0629:                                // TODO: need to send some sort of ACK or something to the sequencer (maybe)
0630:                                break;
0631:
0632:                            case TotalHeader.TOTAL_CUM_SEQ_ACK:
0633:                                // if we are the sequencer, update state
0634:                                if (isSequencer()) {
0635:                                    temp_obj = msg.getSrc();
0636:                                    if (temp_obj instanceof  Address) {
0637:                                        ack_history.setSeq((Address) temp_obj,
0638:                                                hdr.seq_id);
0639:                                    } else {
0640:                                        log
0641:                                                .error("Error: TOTAL_OLD.Up() - could not cast source of message to an Address object (case TotalHeader.TOTAL_CUM_SEQ_ACK)");
0642:                                    }
0643:                                }
0644:                                break;
0645:
0646:                            case TotalHeader.TOTAL_RESEND:
0647:                                // if we are the sequencer, respond to this request
0648:                                if (isSequencer()) {
0649:                                    handleResendRequest(msg, hdr.seq_id);
0650:                                }
0651:                                break;
0652:
0653:                            default:
0654:                                // unrecognized header type - discard message
0655:                                log
0656:                                        .error("Error: TOTAL_OLD.up() - unrecognized TotalHeader in message - "
0657:                                                + hdr.toString());
0658:                                return; // don't let it call passUp()
0659:                            } // switch( hdr.total_header_type )
0660:                        } else {
0661:                            log
0662:                                    .error("Error: TOTAL_OLD.up() - could not cast message header to TotalHeader (case Event.MSG)");
0663:                        } // if ( temp_obj instanceof TotalHeader )
0664:                    } else {
0665:                        log
0666:                                .error("Error: TOTAL_OLD.up() - could not cast argument of Event to a Message (case Event.MSG)");
0667:                    } // if ( temp_obj instanceof Address )
0668:
0669:                    //System.out.println("The message is " + msg);
0670:                    return; // don't blindly pass up messages immediately (if at all)
0671:
0672:                    // begin mms21
0673:                    /*
0674:                    case Event.BECOME_SERVER:
0675:                    System.out.println( "Become Server event passed up to TOTAL_OLD (debug - mms21)" );
0676:                    break;
0677:                     */
0678:
0679:                case Event.TMP_VIEW: // TODO: this may be temporary
0680:                case Event.VIEW_CHANGE:
0681:                    System.out
0682:                            .println("View Change event passed up to TOTAL_OLD (debug - mms21)");
0683:                    View new_view = (View) evt.getArg();
0684:                    members = new_view.getMembers();
0685:                    // print the members of this new view
0686:                    System.out
0687:                            .println("New view members (printed in TOTAL_OLD):");
0688:                    int view_size = members.size();
0689:                    for (int i = 0; i < view_size; i++) {
0690:                        System.out.println("  "
0691:                                + members.elementAt(i).toString());
0692:                    }
0693:
0694:                    // reset the state for total ordering for this new view
0695:                    reset();
0696:
0697:                    // if we are the sequencer in this new view, send a new
0698:                    //   TOTAL_NEW_VIEW message to the group
0699:                    if (isSequencer()) {
0700:                        // we are the sequencer in this new view
0701:                        log
0702:                                .error("TOTAL_OLD.up() - I am the sequencer of this new view");
0703:
0704:                        // we need to keep track of acknowledgements messages
0705:                        ack_history = new MessageAcks(members);
0706:
0707:                        // start assigning messages with this sequence id
0708:                        next_seq_id_to_assign = INIT_SEQ_ID;
0709:
0710:                        // send a message to the group with the initial sequence id to expect
0711:                        Message new_view_msg = new Message(null, local_addr,
0712:                                null);
0713:                        new_view_msg.putHeader(getName(), new TotalHeader(
0714:                                TotalHeader.TOTAL_NEW_VIEW,
0715:                                next_seq_id_to_assign));
0716:                        passDown(new Event(Event.MSG, new_view_msg));
0717:                    }
0718:
0719:                    break;
0720:                // end mms21
0721:
0722:                default:
0723:                    break;
0724:                } // switch( evt.getType() )
0725:
0726:                passUp(evt); // Pass up to the layer above us
0727:            }
0728:
0729:            /**
0730:             * passes up (calling passUp()) any stored messages eligible according to
0731:             * the total ordering property
0732:             */
0733:            private synchronized int passUpMessages() {
0734:                if (next_seq_id < 0) {
0735:                    // don't know what to pass up so don't pass up anything
0736:                    return 0;
0737:                }
0738:
0739:                long lowest_seq_stored = queued_messages.getFirstSeq();
0740:                if (lowest_seq_stored < 0) {
0741:                    // there are no messages stored
0742:                    return 0;
0743:                }
0744:                if (lowest_seq_stored < next_seq_id) {
0745:                    // it is bad to have messages stored that have a lower sequence id than what
0746:                    //   we are expecting
0747:                    log
0748:                            .error("Error: TOTAL_OLD.passUpMessages() - next expected sequence id ("
0749:                                    + next_seq_id
0750:                                    + ") is greater than the sequence id of a stored message ("
0751:                                    + lowest_seq_stored + ')');
0752:                    return 0;
0753:                } else if (next_seq_id == lowest_seq_stored) {
0754:                    // we can pass this first message up the Protocol Stack
0755:                    Message msg = queued_messages.getFirstMessage();
0756:                    if (msg == null) {
0757:                        log
0758:                                .error("Error: TOTAL_OLD.passUpMessages() - unexpected null Message retrieved from stored messages");
0759:                        return 0;
0760:                    }
0761:                    passUp(new Event(Event.MSG, msg));
0762:
0763:                    // increment the next expected sequence id
0764:                    next_seq_id++;
0765:
0766:                    return (1 + passUpMessages());
0767:                } else {
0768:                    /* don't drop messages, it should be requesting resends
0769:                    // all messages stored have sequence ids greater than expected
0770:                    if ( queued_messages.getSize() > 10 ) {
0771:                     {
0772:                        log.error( "WARNING: TOTAL_OLD.passUpMessages() - more than 10 messages saved" );
0773:                        log.error( "Dropping sequence id: " + next_seq_id );
0774:                    }
0775:                    next_seq_id++;
0776:                    return passUpMessages();
0777:                    }
0778:                     */
0779:                    return 0;
0780:                }
0781:            }
0782:
0783:            private final long last_request_time = -1;
0784:
0785:            /**
0786:             * stores the message in the list of messages. also passes up any messages
0787:             * if it can (i.e. if it satisfies total ordering).
0788:             * if the sequence for the next expected message is unknown, the message is
0789:             * discarded without being stored
0790:             */
0791:            private synchronized void handleBCastMessage(Message msg, long seq) {
0792:                /* store the message anyway, hopefully we'll get a TOTAL_NEW_VIEW message later
0793:                if ( next_seq < 0 ) {
0794:                    // don't know what sequence id to expect
0795:                     log.error( "TOTAL_OLD.handleBCastMessage() - received broadcast message but don't know what sequence id to expect" );
0796:                    return;
0797:                }
0798:                 */
0799:
0800:                if (seq < next_seq_id) {
0801:                    // we're expecting a message with a greater sequence id
0802:                    //   hopefully, we've already seen this message so just ignore it
0803:                    return;
0804:                }
0805:
0806:                // save this message in the list of received broadcast messages
0807:                queued_messages.insertMessage(msg, seq);
0808:
0809:                // try to pass up any messages
0810:                int num_passed = passUpMessages();
0811:                // TODO: this if is temporary (debug)
0812:                if (num_passed > 1)
0813:                    log.error("TOTAL_OLD.handleBCastMessage() - " + num_passed
0814:                            + " message(s) passed up the Protocol Stack");
0815:
0816:                /* this is handles by the retransmission thread now
0817:                // see if we may need to issue any resend requests
0818:                if ( queued_messages.getSize() > 1 ) {  // TODO: magical constant N?
0819:                Address sequencer = getSequencer();
0820:                //Object sequencer = msg.makeReply().getSrc();  // test (debug)
0821:                if ( sequencer == null ) {
0822:                // couldn't get the sequencer of the group
0823:                log.error( "TOTAL_OLD.handleBCastMessage() - couldn't determine sequencer to send a TOTAL_RESEND request" );
0824:                return;
0825:                }
0826:
0827:                if ( local_addr == null ) {
0828:                // don't know local address, can't set source of message
0829:                log.error( "TOTAL_OLD.handleBCastMessage() - do not know local address so cannot send resend request for message " + seq );
0830:                return;
0831:                }
0832:
0833:                long time_now = System.currentTimeMillis();
0834:                if ( (last_request_time >= 0) && ((time_now - last_request_time) < 1000) ) {
0835:                return;
0836:                } else {
0837:                last_request_time = time_now;
0838:                }
0839:                // request a resend request for all missing sequence ids
0840:                //   from the next one expected up to the "earliest" queued one
0841:                // TODO: (works a little different now)
0842:                long first_queued_seq = queued_messages.getFirstSeq();
0843:                long max_resend_seq = ((next_seq_id + 10) > first_queued_seq) ? first_queued_seq : (next_seq_id + 10);
0844:                for( long resend_seq=next_seq_id; resend_seq<=max_resend_seq ; resend_seq++ ) {
0845:                Message resend_msg = new Message( sequencer, local_addr, null );
0846:                resend_msg.putHeader(getName(),  new TotalHeader( TotalHeader.TOTAL_RESEND, resend_seq ) );
0847:                passDown( new Event( Event.MSG, resend_msg ) );
0848:                 log.error( "TOTAL_OLD.handleBCastMessage() - resend requested for message " + resend_seq );
0849:                }
0850:                }
0851:                 */
0852:            }
0853:
0854:            /**
0855:             * respond to a request message by broadcasting a copy of the message to the group
0856:             * with the next sequence id assigned to it
0857:             * if we do not know what the next sequence id is to assign, discard the message
0858:             */
0859:            private synchronized void handleRequestMessage(Message msg) {
0860:                if (next_seq_id_to_assign < 0) {
0861:                    // we cannot assign a valid sequence id
0862:                    log
0863:                            .error("Error: TOTAL_OLD.handleRequestMessage() - cannot handle request... do not know what sequence id to assign");
0864:                    return;
0865:                }
0866:
0867:                // make the message a broadcast message to the group
0868:                msg.setDest(null);
0869:
0870:                // set the source of the message to be me
0871:                msg.setSrc(local_addr);
0872:
0873:                // add the sequence id to the message
0874:                msg.putHeader(getName(), new TotalHeader(
0875:                        TotalHeader.TOTAL_BCAST, next_seq_id_to_assign));
0876:
0877:                // store a copy of this message is the history
0878:                Message msg_copy = msg.copy();
0879:                ack_history.addMessage(msg_copy, next_seq_id_to_assign);
0880:
0881:                // begin debug
0882:                Object header = msg_copy.getHeader(getName());
0883:                if (!(header instanceof  TotalHeader)) {
0884:                    log
0885:                            .error("Error: TOTAL_OLD.handleRequestMessage() - BAD: stored message that did not contain a TotalHeader - "
0886:                                    + next_seq_id_to_assign);
0887:                }
0888:                // end debug
0889:
0890:                // increment the next sequence id to use
0891:                next_seq_id_to_assign++;
0892:
0893:                // pass this new Message (wrapped in an Event) down the Protocol Stack
0894:                passDown(new Event(Event.MSG, msg));
0895:            }
0896:
0897:            /**
0898:             * respond to a request to resend a message with the specified sequence id
0899:             */
0900:            private synchronized void handleResendRequest(Message msg, long seq) {
0901:                log
0902:                        .error("TOTAL_OLD.handleRequestMessage() - received resend request for message "
0903:                                + seq);
0904:
0905:                /* just rebroadcast for now because i can't get the source - this is bad (TODO: fix this)
0906:                Object requester = msg.makeReply().getSrc();  // Address? of requester - test (debug)
0907:                /*
0908:                Object temp_obj = msg.getSrc();
0909:                if ( temp_obj instanceof Address ) {
0910:                    Address requester = (Address) temp_obj;
0911:                } else {
0912:                    log.error( "Error: TOTAL_OLD.handleResendRequest() - could not cast source of message to an Address" );
0913:                    return;
0914:                }
0915:                 * /
0916:                if ( requester == null ) {
0917:                    // don't know who to send this back to
0918:                    log.error( "TOTAL_OLD.handleResendRequest() - do not know who requested this resend request for sequence " + seq );
0919:                    return;
0920:                }
0921:                 */
0922:                Address requester = null;
0923:                // log.error( "TOTAL_OLD: got here - 1" );
0924:                Message resend_msg = ack_history.getMessage(seq);
0925:                // log.error( "TOTAL_OLD: got here - 2" );
0926:                if (resend_msg == null) {
0927:                    // couldn't find this message in the history
0928:                    log
0929:                            .error("TOTAL_OLD.handleResendRequest() - could not find the message "
0930:                                    + seq + " in the history to resend");
0931:                    return;
0932:                }
0933:                resend_msg.setDest(requester);
0934:
0935:                // note: do not need to add a TotalHeader because it should already be a
0936:                //       TOTAL_BCAST message
0937:                // begin debug
0938:                Object header = resend_msg.getHeader(getName());
0939:                if (header instanceof  TotalHeader) {
0940:                    //log.error( "TOTAL_OLD: resend msg GOOD (header is TotalHeader) - " + seq );
0941:                } else {
0942:                    log
0943:                            .error("TOTAL_OLD: resend msg BAD (header is NOT a TotalHeader) - "
0944:                                    + seq);
0945:                }
0946:                // end debug
0947:
0948:                passDown(new Event(Event.MSG, resend_msg));
0949:                log
0950:                        .error("TOTAL_OLD.handleResendRequest() - responded to resend request for message "
0951:                                + seq);
0952:            }
0953:
0954:            /**
0955:             * handles an Event coming down the Protocol Stack
0956:             */
0957:            public void down(Event evt) {
0958:                Message msg;
0959:
0960:                //System.out.println("DOWN: " + evt);
0961:
0962:                switch (evt.getType()) {
0963:
0964:                case Event.VIEW_CHANGE:
0965:                    // this will probably never happen
0966:                    log.error("NOTE: VIEW_CHANGE Event going down through "
0967:                            + PROTOCOL_NAME);
0968:
0969:                    Vector new_members = ((View) evt.getArg()).getMembers();
0970:                    synchronized (members) {
0971:                        members.removeAllElements();
0972:                        if (new_members != null && new_members.size() > 0)
0973:                            for (int i = 0; i < new_members.size(); i++)
0974:                                members.addElement(new_members.elementAt(i));
0975:                    }
0976:                    break;
0977:
0978:                case Event.MSG:
0979:                    Object temp_obj = evt.getArg();
0980:                    if (temp_obj instanceof  Message) {
0981:                        msg = (Message) temp_obj;
0982:
0983:                        // note: a TotalHeader is added to every message (Event.MSG)
0984:                        //   that is sent
0985:
0986:                        // check if this is a broadcast message
0987:                        if (msg.getDest() == null) {
0988:                            // yes, this is a broadcast message
0989:
0990:                            // send out a request for a message to be broadcast
0991:                            //   (the sequencer will handle this)
0992:                            Address sequencer = getSequencer();
0993:                            if (sequencer != null) {
0994:                                // we only need to send the request to the sequencer (who will broadcast it)
0995:                                msg.setDest(sequencer);
0996:                            } else {
0997:                                // couldn't find sequencer of the group
0998:                                // for now, just send it to the original destination
0999:                                //   (don't need to do anything here)
1000:                            }
1001:
1002:                            //msg.putHeader(getName(),  TotalHeader.getRequestHeader() );
1003:                            msg.putHeader(getName(), new TotalHeader(
1004:                                    TotalHeader.TOTAL_REQUEST, -1));
1005:
1006:                        } else {
1007:                            // this is a point to point unicast message so just send it to its original destination
1008:                            msg.putHeader(getName(), new TotalHeader(
1009:                                    TotalHeader.TOTAL_UNICAST, -1)); // sequence id in header is irrelevant
1010:                        }
1011:                    } else {
1012:                        log
1013:                                .error("Error: TOTAL_OLD.down() - could not cast argument of Event to a Message (case Event.MSG)");
1014:                    } // if ( temp_obj instanceof Message )
1015:                    break;
1016:
1017:                default:
1018:                    break;
1019:                } // switch( evt.getType() )
1020:
1021:                passDown(evt); // Pass on to the layer below us
1022:
1023:            }
1024:
1025:            /**
1026:             * returns true if we are currently the sequencer of the group;
1027:             * returns false otherwise
1028:             * note: returns false if our local address is unknown, or the list of members is
1029:             * empty
1030:             */
1031:            private boolean isSequencer() {
1032:                if (local_addr == null) {
1033:                    // don't know my own local address
1034:                    log
1035:                            .error("TOTAL_OLD.isSequencer() - local address unknown!");
1036:                    return false;
1037:                }
1038:
1039:                synchronized (members) {
1040:                    if (members.size() == 0) {
1041:                        // there are no members listed for the group (not even myself)
1042:                        log.error("TOTAL_OLD.isSequencer() - no members!");
1043:                        return false;
1044:                    }
1045:
1046:                    Object temp_obj = members.elementAt(0);
1047:                    if (temp_obj instanceof  Address) {
1048:                        Address seq_addr = (Address) temp_obj;
1049:                        return local_addr.equals(seq_addr);
1050:                    } else {
1051:                        log
1052:                                .error("Error: TOTAL_OLD.isSequencer() - could not cast element of \"members\" to an Address");
1053:                        return false;
1054:                    } // if ( temp_obj instanceof Address )
1055:                }
1056:            }
1057:
1058:            /**
1059:             * returns the Address of the local machine
1060:             * returns null if it is not known yet
1061:             */
1062:            protected Address getLocalAddr() {
1063:                return local_addr;
1064:            }
1065:
1066:            /**
1067:             * returns the address of the current sequencer of the group
1068:             * returns null if the list of members is empty
1069:             */
1070:            protected Address getSequencer() {
1071:                synchronized (members) {
1072:                    if (members.size() == 0) {
1073:                        log.error("TOTAL_OLD.getSequencer() - no members");
1074:                        return null;
1075:                    } else {
1076:                        Object temp_obj = members.elementAt(0);
1077:                        if (temp_obj instanceof  Address) {
1078:                            return (Address) temp_obj;
1079:                        } else {
1080:                            log
1081:                                    .error("Error: TOTAL_OLD.getSequencer() - could not cast first element of \"members\" to an Address");
1082:                            return null;
1083:                        }
1084:                    }
1085:                }
1086:            }
1087:
1088:            /**
1089:             * class TotalHeader
1090:             * <p/>
1091:             * The header that is prepended to every message passed down through the TOTAL_OLD layer
1092:             * and removed (and processed) from every message passed up through the TOTAL_OLD layer
1093:             */
1094:            public static class TotalHeader extends Header {
1095:                // Total message types
1096:                public final static int TOTAL_UNICAST = 0; // a point to point unicast message that should not be processed by TOTAL_OLD
1097:                public final static int TOTAL_BCAST = 1; // message broadcast by the sequencer
1098:                public final static int TOTAL_REQUEST = 2; // request for a message to be broadcast
1099:                public final static int TOTAL_NEW_VIEW = 3; // reset with a view change, sequence number also reset
1100:                public final static int TOTAL_NEW_VIEW_ACK = 4; // acknowledgement of new view and sequence id
1101:                public final static int TOTAL_CUM_SEQ_ACK = 5; // cumulatively acknowledge the reception of messages up to a sequence id
1102:                public final static int TOTAL_SEQ_ACK = 6; // acknowledge the reception of a message with a certain sequence id  (probably won't be used)
1103:                public final static int TOTAL_RESEND = 7; // request the message with a certain sequence id
1104:
1105:                public int total_header_type;
1106:
1107:                final Log log = LogFactory.getLog(TotalHeader.class);
1108:
1109:                // TODO: finish commenting meaning of seq_id for different header types
1110:                /**
1111:                 * For TOTAL_BCAST messages, seq_id is used to determine the order of messages
1112:                 * in the view. The seq_id is expected to increment by one for each new message
1113:                 * sent in the current view. this sequence id is reset with each new view.
1114:                 * the GMS layer should make sure that messages sent in one view are not
1115:                 * received in another view.
1116:                 * For TOTAL_REQUEST messages, seq_id is not used.
1117:                 * For TOTAL_NEW_VIEW, seq_id is the sequence id that the sequencer of this
1118:                 * view will use for the first message broadcast to the group
1119:                 * (i.e. the expected sequence id is "reset" to this value).
1120:                 * For TOTAL_NEW_VIEW_ACK, ..
1121:                 * For TOTAL_CUM_SEQ_ACK messages, the seq_id is the cumulative sequence id
1122:                 * that the sender has received.
1123:                 * For TOTAL_SEQ_ACK messages, seq_id is the sequence id that is being acknowledged.
1124:                 * For TOTAL_RESEND, seq_id is the sequence id to be sent again.
1125:                 */
1126:                public long seq_id; // see use above (varies between types of headers)
1127:
1128:                public TotalHeader() {
1129:                } // used for externalization
1130:
1131:                public TotalHeader(int type, long seq) {
1132:                    switch (type) {
1133:                    case TOTAL_UNICAST:
1134:                    case TOTAL_BCAST:
1135:                    case TOTAL_REQUEST:
1136:                    case TOTAL_NEW_VIEW:
1137:                    case TOTAL_NEW_VIEW_ACK:
1138:                    case TOTAL_CUM_SEQ_ACK:
1139:                    case TOTAL_SEQ_ACK:
1140:                    case TOTAL_RESEND:
1141:                        // the given type is a known one
1142:                        total_header_type = type;
1143:                        break;
1144:
1145:                    default:
1146:                        // this type is unknown
1147:                        log
1148:                                .error("Error: TotalHeader.TotalHeader() - unknown TotalHeader type given: "
1149:                                        + type);
1150:                        total_header_type = -1;
1151:                        break;
1152:                    }
1153:
1154:                    seq_id = seq;
1155:                }
1156:
1157:                //static TotalHeader getRequestHeader() {
1158:                //return new TotalHeader( TOTAL_REQUEST, -1 );  // sequence id is irrelevant
1159:                //}
1160:
1161:                public String toString() {
1162:                    String type = "";
1163:                    switch (total_header_type) {
1164:                    case TOTAL_UNICAST:
1165:                        type = "TOTAL_UNICAST";
1166:                        break;
1167:
1168:                    case TOTAL_BCAST:
1169:                        type = "TOTAL_BCAST";
1170:                        break;
1171:
1172:                    case TOTAL_REQUEST:
1173:                        type = "TOTAL_REQUEST";
1174:                        break;
1175:
1176:                    case TOTAL_NEW_VIEW:
1177:                        type = "NEW_VIEW";
1178:                        break;
1179:
1180:                    case TOTAL_NEW_VIEW_ACK:
1181:                        type = "NEW_VIEW_ACK";
1182:                        break;
1183:
1184:                    case TOTAL_CUM_SEQ_ACK:
1185:                        type = "TOTAL_CUM_SEQ_ACK";
1186:                        break;
1187:
1188:                    case TOTAL_SEQ_ACK:
1189:                        type = "TOTAL_SEQ_ACK";
1190:                        break;
1191:
1192:                    case TOTAL_RESEND:
1193:                        type = "TOTAL_RESEND";
1194:                        break;
1195:
1196:                    default:
1197:                        type = "UNKNOWN TYPE (" + total_header_type + ')';
1198:                        break;
1199:                    }
1200:
1201:                    return "[ TOTAL_OLD: type=" + type + ", seq=" + seq_id
1202:                            + " ]";
1203:                }
1204:
1205:                public void writeExternal(ObjectOutput out) throws IOException {
1206:                    out.writeInt(total_header_type);
1207:                    out.writeLong(seq_id);
1208:                }
1209:
1210:                public void readExternal(ObjectInput in) throws IOException,
1211:                        ClassNotFoundException {
1212:                    total_header_type = in.readInt();
1213:                    seq_id = in.readLong();
1214:                }
1215:
1216:            } // class TotalHeader
1217:
1218:        } // class TOTAL_OLD
1219:
1220:        /**
1221:         * **************************************************************************
1222:         * class TotalRetransmissionThread
1223:         * <p/>
1224:         * thread that handles retransmission for the TOTAL_OLD protocol
1225:         * **************************************************************************
1226:         */
1227:        class TotalRetransmissionThread extends Thread {
1228:            // state variables to determine when and what to request
1229:            private long last_retrans_request_time; // last time (in milliseconds) that we sent a resend request
1230:            private long last_requested_seq; // latest sequence id that we have requested
1231:
1232:            // retransmission constants
1233:            final private static long polling_delay = 1000; // how long (in milliseconds) to sleep before rechecking for resend
1234:            final private static long resend_timeout = 2000; // amount of time (in milliseconds) to wait on a resend request before resending another request
1235:            final private static int max_request = 10; // maximum number of resend request to send out in one iteration
1236:
1237:            // reference to the parent TOTAL_OLD protocol instance
1238:            private TOTAL_OLD prot_ptr;
1239:
1240:            // flag to specify if the thread should continue running
1241:            private boolean is_running;
1242:
1243:            final Log log = LogFactory.getLog(TotalRetransmissionThread.class);
1244:
1245:            /**
1246:             * constructor
1247:             * <p/>
1248:             * creates and initializes a retransmission thread for the
1249:             * specified instance of a TOTAL_OLD protocol
1250:             */
1251:            TotalRetransmissionThread(TOTAL_OLD parent_prot) {
1252:                super (Util.getGlobalThreadGroup(), "retransmission thread");
1253:                if (parent_prot != null) {
1254:                    prot_ptr = parent_prot;
1255:                } else {
1256:                    // parent thread not specified
1257:                    log
1258:                            .fatal("given parent protocol reference is null\n  (FATAL ERROR - TOTAL_OLD protocol will not function properly)");
1259:
1260:                    // prevent the run method from doing any work
1261:                    is_running = false;
1262:                }
1263:
1264:                // initialize the state variables
1265:                reset();
1266:
1267:                // let the thread make resend requests
1268:                is_running = true;
1269:            }
1270:
1271:            /**
1272:             * resets the state of the thread as if it was just started
1273:             * the thread will assume that there were no resend requests make
1274:             */
1275:            public void reset() {
1276:                // we have not made any resend requests for any messages
1277:                last_retrans_request_time = -1;
1278:                last_requested_seq = -1;
1279:            }
1280:
1281:            /**
1282:             * send a resend request to the given sequencer (from the given local_addr)
1283:             * for the given sequence id
1284:             */
1285:            private void sendResendRequest(Address sequencer,
1286:                    Address local_addr, long seq_id) {
1287:                Message resend_msg = new Message(sequencer, local_addr, null);
1288:                resend_msg.putHeader(getName(), new TOTAL_OLD.TotalHeader(
1289:                        TOTAL_OLD.TotalHeader.TOTAL_RESEND, seq_id));
1290:                prot_ptr.passDown(new Event(Event.MSG, resend_msg));
1291:
1292:                // debug
1293:                log
1294:                        .error("TotalRetransmissionThread.resend() - resend requested for message "
1295:                                + seq_id);
1296:            }
1297:
1298:            /**
1299:             * checks if a resend request should be made to the sequencer. if a request needs
1300:             * to be made, it makes the appropriate requests with the parameters specified
1301:             * by the constants in this class
1302:             */
1303:            private void checkForResend() {
1304:                long first_seq_id = prot_ptr.getFirstQueuedSeqID(); // sequence id of first queued message
1305:                /*
1306:                // begin debug
1307:                System.out.println( "DEBUG (TotalRetransmissionThread) - first_seq_id = " + first_seq_id );
1308:                // end debug
1309:                 */
1310:                if (first_seq_id >= 0) {
1311:                    // there is at least one message in the queue
1312:
1313:                    long next_seq_id = prot_ptr.getNextSeqID(); // next sequence id expected from the group
1314:                    if ((next_seq_id < first_seq_id)) { // TODO: handle case to resend TOTAL_NEW_VIEW message
1315:                        // there are messages that we received out of order
1316:                        //log.error( "DEBUG (TotalRetransmissionThread) - there are messages queued" ); // debug
1317:
1318:                        // see if it is time to send a request
1319:                        long time_now = System.currentTimeMillis();
1320:                        if ((next_seq_id > last_requested_seq)
1321:                                || (time_now > (last_retrans_request_time + resend_timeout))
1322:                                || (last_retrans_request_time < 0)) {
1323:                            // send a resend request to the sequencer
1324:                            //log.error( "DEBUG (TotalRetransmissionThread) - sending resend requests" ); // debug
1325:                            Address sequencer = prot_ptr.getSequencer();
1326:                            if (sequencer == null) {
1327:                                System.out
1328:                                        .println("Error: (TOTAL_OLD) TotalRetransmissionThread.checkForResend() - could not determine sequencer to send a TOTAL_RESEND request");
1329:
1330:                                return;
1331:                            }
1332:
1333:                            Address local_addr = prot_ptr.getLocalAddr();
1334:                            if (local_addr == null) {
1335:                                System.out
1336:                                        .println("Warning: (TOTAL_OLD) TotalRetransmissionThread.checkForResend() - local address not specified in TOTAL_RESEND request... attempting to send requests anyway");
1337:                            }
1338:
1339:                            long temp_long = (next_seq_id + max_request); // potential max seq id to request (exclusive)
1340:                            long last_resend_seq_id = (temp_long > first_seq_id) ? first_seq_id
1341:                                    : temp_long;
1342:                            for (long resend_seq = next_seq_id; resend_seq < last_resend_seq_id; resend_seq++) {
1343:                                sendResendRequest(sequencer, local_addr,
1344:                                        resend_seq);
1345:                            }
1346:                            // update state for this set of resend requests
1347:                            last_retrans_request_time = time_now;
1348:                            last_requested_seq = last_resend_seq_id;
1349:                        }
1350:                    } // if ( (next_seq_id < first_seq_id) )
1351:                } // if ( first_seq_id >= 0 )
1352:                // else there are no messages to request
1353:            }
1354:
1355:            /**
1356:             * overloaded from Thread
1357:             * method that executes when the thread is started
1358:             */
1359:            public void run() {
1360:                while (is_running) {
1361:                    // resend any requests if necessary
1362:                    //log.error( "DEBUG (TotalRetransmissionThread) - heartbeat" ); // debug
1363:                    checkForResend();
1364:
1365:                    // wait before check again
1366:                    try {
1367:                        sleep(polling_delay);
1368:                    } catch (InterruptedException e) {
1369:                    } // do nothing if interrupted
1370:                }
1371:            }
1372:
1373:            /**
1374:             * stops the thread from making any further resend requests
1375:             * note: the thread may not die immediately
1376:             */
1377:            public void stopResendRequests() {
1378:                is_running = false;
1379:            }
1380:        } // class TotalRetransmissionThread
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.