Source Code Cross Referenced for NAKACK.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » protocols » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.protocols 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        // $Id: NAKACK.java,v 1.15.10.1 2007/04/27 08:03:52 belaban Exp $
0002:
0003:        package org.jgroups.protocols;
0004:
0005:        import org.jgroups.*;
0006:        import org.jgroups.stack.*;
0007:        import org.jgroups.util.List;
0008:        import org.jgroups.util.TimeScheduler;
0009:        import org.jgroups.util.Util;
0010:
0011:        import java.util.Enumeration;
0012:        import java.util.Hashtable;
0013:        import java.util.Properties;
0014:        import java.util.Vector;
0015:
0016:        /**
0017:         * Negative AcKnowledgement layer (NAKs), paired with positive ACKs. The default is to send a message
0018:         * using NAKs: the sender sends messages with monotonically increasing seqnos, receiver requests
0019:         * retransmissions of missing messages (gaps). When a SWITCH_NAK_ACK event is received, the mode
0020:         * is switched to using NAK_ACKS: the sender still uses monotonically increasing seqnos, but the receiver
0021:         * acknowledges every message. NAK and NAK_ACK seqnos are the same, when switching the mode, the current
0022:         * seqno is reused. Both NAK and NAK_ACK messages use the current view ID in which the message is sent to
0023:         * queue messages destined for an upcoming view, or discard messages sent in a previous view. Both modes
0024:         * reset their seqnos to 0 when receiving a view change. The NAK_ACK scheme is used for broadcasting
0025:         * view changes.
0026:         * <p/>
0027:         * The third mode is for out-of-band control messages (activated by SWITCH_OUT_OF_BAND): this mode does
0028:         * neither employ view IDs, nor does it use the same seqnos as NAK and NAK_ACK. It uses its own seqnos,
0029:         * unrelated to the ones used by NAK and NAK_ACK, and never resets them. In combination with the sender's
0030:         * address, this makes every out-of-band message unique. Out-of-band messages are used for example for
0031:         * broadcasting FLUSH messages.<p>
0032:         * Once a mode is set, it remains in effect until exactly 1 message has been sent, afterwards the default
0033:         * mode NAK is used again.
0034:         * <p/>
0035:         * The following communication between 2 peers exists (left side is initiator,
0036:         * right side receiver): <pre>
0037:         * <p/>
0038:         * <p/>
0039:         * send_out_of_band
0040:         * -------------->       synchronous     (1)
0041:         * <-------------
0042:         * ack
0043:         * <p/>
0044:         * <p/>
0045:         * send_nak
0046:         * -------------->       asynchronous    (2)
0047:         * <p/>
0048:         * <p/>
0049:         * send_nak_ack
0050:         * -------------->       synchronous     (3)
0051:         * <--------------
0052:         * ack
0053:         * <p/>
0054:         * <p/>
0055:         * retransmit
0056:         * <--------------       asynchronous    (4)
0057:         * <p/>
0058:         * <p/>
0059:         * </pre>
0060:         * <p/>
0061:         * When a message is sent, it will contain a header describing the type of the
0062:         * message, and containing additional data, such as sequence number etc. When a
0063:         * message is received, it is fed into either the OutOfBander or NAKer, depending on the
0064:         * header's type.<p>
0065:         * Note that in the synchronous modes, ACKs are sent for each request. If a reliable unicast protocol layer
0066:         * exists somewhere underneath this layer, then even the ACKs are transmitted reliably, thus increasing
0067:         * the number of messages exchanged. However, since it is envisaged that ACK/OUT_OF_BAND are not used
0068:         * frequently, this problem is currently not addressed.
0069:         * 
0070:         * @author Bela Ban
0071:         */
0072:        public class NAKACK extends Protocol {
0073:            long[] retransmit_timeout = { 2000, 3000, 5000, 8000 }; // time(s) to wait before requesting xmit
0074:            NAKer naker = null;
0075:            OutOfBander out_of_bander = null;
0076:            ViewId vid = null;
0077:            View view = null;
0078:            boolean is_server = false;
0079:            Address local_addr = null;
0080:            final List queued_msgs = new List(); // msgs for next view (vid > current vid)
0081:            Vector members = null; // for OutOfBander: this is the destination set to
0082:            // send messages to
0083:            boolean send_next_msg_out_of_band = false;
0084:            boolean send_next_msg_acking = false;
0085:            long rebroadcast_timeout = 0; // until all outstanding ACKs recvd (rebcasting)
0086:            TimeScheduler timer = null;
0087:            static final String WRAPPED_MSG_KEY = "NAKACK.WRAPPED_HDR";
0088:
0089:            /**
0090:             * Do some initial tasks
0091:             */
0092:            public void init() throws Exception {
0093:                timer = stack != null ? stack.timer : null;
0094:                if (timer == null)
0095:                    if (log.isErrorEnabled())
0096:                        log.error("timer is null");
0097:                naker = new NAKer();
0098:                out_of_bander = new OutOfBander();
0099:            }
0100:
0101:            public void stop() {
0102:                out_of_bander.stop();
0103:                naker.stop();
0104:            }
0105:
0106:            public String getName() {
0107:                return "NAKACK";
0108:            }
0109:
0110:            public Vector providedUpServices() {
0111:                Vector retval = new Vector(3);
0112:                retval.addElement(new Integer(Event.GET_MSGS_RECEIVED));
0113:                retval.addElement(new Integer(Event.GET_MSG_DIGEST));
0114:                retval.addElement(new Integer(Event.GET_MSGS));
0115:                return retval;
0116:            }
0117:
0118:            public Vector providedDownServices() {
0119:                Vector retval = new Vector(1);
0120:                retval.addElement(new Integer(Event.GET_MSGS_RECEIVED));
0121:                return retval;
0122:            }
0123:
0124:            /**
0125:             * <b>Callback</b>. Called by superclass when event may be handled.<p>
0126:             * <b>Do not use <code>passUp()</code> in this method as the event is passed up
0127:             * by default by the superclass after this method returns !</b>
0128:             */
0129:            public void up(Event evt) {
0130:                NakAckHeader hdr;
0131:                Message msg, msg_copy;
0132:                int rc;
0133:
0134:                switch (evt.getType()) {
0135:
0136:                case Event.SUSPECT:
0137:
0138:                    if (log.isInfoEnabled())
0139:                        log.info("received SUSPECT event (suspected member="
0140:                                + evt.getArg() + ')');
0141:                    naker.suspect((Address) evt.getArg());
0142:                    out_of_bander.suspect((Address) evt.getArg());
0143:                    break;
0144:
0145:                case Event.STABLE: // generated by STABLE layer. Delete stable messages passed in arg
0146:                    naker.stable((long[]) evt.getArg());
0147:                    return; // don't pass up further (Bela Aug 7 2001)
0148:
0149:                case Event.SET_LOCAL_ADDRESS:
0150:                    local_addr = (Address) evt.getArg();
0151:                    break;
0152:
0153:                case Event.GET_MSGS_RECEIVED: // returns the highest seqnos delivered to the appl. (used by STABLE)
0154:                    long[] highest = naker.getHighestSeqnosDelivered();
0155:                    passDown(new Event(Event.GET_MSGS_RECEIVED_OK, highest));
0156:                    return; // don't pass up further (bela Aug 7 2001)
0157:
0158:                case Event.MSG:
0159:                    synchronized (this ) {
0160:                        msg = (Message) evt.getArg();
0161:
0162:                        // check to see if this is a wrapped msg. If yes, send an ACK
0163:                        hdr = (NakAckHeader) msg.removeHeader(WRAPPED_MSG_KEY); // see whether it is a wrapped message
0164:                        if (hdr != null && hdr.type == NakAckHeader.WRAPPED_MSG) { // send back an ACK to hdr.sender
0165:                            Message ack_msg = new Message(hdr.sender, null,
0166:                                    null);
0167:                            NakAckHeader h = new NakAckHeader(
0168:                                    NakAckHeader.NAK_ACK_RSP, hdr.seqno, null);
0169:                            if (hdr.sender == null)
0170:                                if (log.isWarnEnabled())
0171:                                    log
0172:                                            .warn("WRAPPED: header's 'sender' field is null; "
0173:                                                    + "cannot send ACK !");
0174:                            ack_msg.putHeader(getName(), h);
0175:                            passDown(new Event(Event.MSG, ack_msg));
0176:                        }
0177:
0178:                        hdr = (NakAckHeader) msg.removeHeader(getName());
0179:                        if (hdr == null)
0180:                            break; // pass up
0181:
0182:                        switch (hdr.type) {
0183:
0184:                        case NakAckHeader.NAK_ACK_MSG:
0185:                        case NakAckHeader.NAK_MSG:
0186:                            if (hdr.type == NakAckHeader.NAK_ACK_MSG) { // first thing: send ACK back to sender
0187:                                Message ack_msg = new Message(msg.getSrc(),
0188:                                        null, null);
0189:                                NakAckHeader h = new NakAckHeader(
0190:                                        NakAckHeader.NAK_ACK_RSP, hdr.seqno,
0191:                                        null);
0192:                                ack_msg.putHeader(getName(), h);
0193:                                passDown(new Event(Event.MSG, ack_msg));
0194:                            }
0195:
0196:                            // while still a client, we just pass up all messages, without checking for message
0197:                            // view IDs or seqnos: other layers further up will discard messages not destined
0198:                            // for us (e.g. based on view IDs).
0199:                            // Also: store msg in queue, when view change is received, replay messages with the same
0200:                            // vid as the new view
0201:                            if (!is_server) {
0202:                                msg_copy = msg.copy(); // msg without header
0203:                                msg_copy.putHeader(getName(), hdr); // put header back on as we removed it above
0204:                                queued_msgs.add(msg_copy); // need a copy since passUp() will modify msg
0205:                                passUp(new Event(Event.MSG, msg));
0206:                                return;
0207:                            }
0208:
0209:                            // check for VIDs: is the message's VID the same as ours ?
0210:                            if (vid != null && hdr.vid != null) { // only check if our vid and message's vid available
0211:                                Address my_addr = vid.getCoordAddress(), other_addr = hdr.vid
0212:                                        .getCoordAddress();
0213:
0214:                                if (my_addr == null || other_addr == null) {
0215:                                    if (log.isWarnEnabled())
0216:                                        log
0217:                                                .warn("my vid or message's vid does not contain "
0218:                                                        + "a coordinator; discarding message !");
0219:                                    return;
0220:                                }
0221:                                if (!my_addr.equals(other_addr)) {
0222:                                    if (log.isWarnEnabled())
0223:                                        log.warn("creator of own vid ("
0224:                                                + my_addr
0225:                                                + ")is different from "
0226:                                                + "creator of message's vid ("
0227:                                                + other_addr
0228:                                                + "); discarding message !");
0229:                                    return;
0230:                                }
0231:
0232:                                rc = hdr.vid.compareTo(vid);
0233:                                if (rc > 0) { // message is sent in next view -> store !
0234:
0235:                                    if (log.isInfoEnabled())
0236:                                        log
0237:                                                .info("message's vid ("
0238:                                                        + hdr.vid
0239:                                                        + '#'
0240:                                                        + hdr.seqno
0241:                                                        + ") is bigger than current vid: ("
0242:                                                        + vid
0243:                                                        + ") message is queued !");
0244:                                    msg.putHeader(getName(), hdr); // put header back on as we removed it above
0245:                                    queued_msgs.add(msg);
0246:                                    return;
0247:                                }
0248:                                if (rc < 0) { // message sent in prev. view -> discard !
0249:
0250:                                    if (log.isWarnEnabled())
0251:                                        log.warn("message's vid (" + hdr.vid
0252:                                                + ") is smaller than "
0253:                                                + "current vid (" + vid
0254:                                                + "): message <" + msg.getSrc()
0255:                                                + ":#" + hdr.seqno
0256:                                                + "> is discarded ! Hdr is "
0257:                                                + hdr);
0258:                                    return;
0259:                                }
0260:                                // If we made it down here, the vids are the same --> OK
0261:                            }
0262:
0263:                            msg.putHeader(getName(), hdr); // stored in received_msgs, re-sent later that's why hdr is added !
0264:                            naker.receive(hdr.seqno, msg, null);
0265:                            return; // naker passes message up for us !
0266:
0267:                        case NakAckHeader.RETRANSMIT_MSG:
0268:                            naker.retransmit(msg.getSrc(), hdr.seqno,
0269:                                    hdr.last_seqno);
0270:                            return;
0271:
0272:                        case NakAckHeader.NAK_ACK_RSP:
0273:                            naker.receiveAck(hdr.seqno, msg.getSrc());
0274:                            return; // discard, no need to pass up
0275:
0276:                        case NakAckHeader.OUT_OF_BAND_MSG:
0277:                            out_of_bander.receive(hdr.seqno, msg,
0278:                                    hdr.stable_msgs);
0279:                            return; // naker passes message up for us !
0280:
0281:                        case NakAckHeader.OUT_OF_BAND_RSP:
0282:                            out_of_bander.receiveAck(hdr.seqno, msg.getSrc());
0283:                            return;
0284:
0285:                        default:
0286:                            if (log.isErrorEnabled())
0287:                                log.error("NakAck header type " + hdr.type
0288:                                        + " not known !");
0289:                            break;
0290:                        }
0291:                    } //end synchronized
0292:
0293:                }
0294:
0295:                passUp(evt);
0296:            }
0297:
0298:            /**
0299:             * <b>Callback</b>. Called by superclass when event may be handled.<p>
0300:             * <b>Do not use <code>passDown</code> in this method as the event is passed down
0301:             * by default by the superclass after this method returns !</b>
0302:             */
0303:            public void down(Event evt) {
0304:                Message msg;
0305:
0306:                if (log.isTraceEnabled())
0307:                    log.trace("queued_msgs has " + queued_msgs.size()
0308:                            + " messages " + "\n\nnaker:\n"
0309:                            + naker.dumpContents() + "\n\nout_of_bander: "
0310:                            + out_of_bander.dumpContents()
0311:                            + "\n-----------------------------\n");
0312:
0313:                switch (evt.getType()) {
0314:
0315:                case Event.MSG:
0316:                    msg = (Message) evt.getArg();
0317:
0318:                    // unicast address: not null and not mcast, pass down unchanged
0319:                    if (vid == null
0320:                            || (msg.getDest() != null && !msg.getDest()
0321:                                    .isMulticastAddress()))
0322:                        break;
0323:
0324:                    if (send_next_msg_out_of_band) {
0325:                        out_of_bander.send(msg);
0326:                        send_next_msg_out_of_band = false;
0327:                    } else if (send_next_msg_acking) {
0328:                        naker.setAcks(true); // require acks when sending a msg
0329:                        naker.send(msg);
0330:                        naker.setAcks(false); // don't require acks when sending a msg
0331:                        send_next_msg_acking = false;
0332:                    } else
0333:                        naker.send(msg);
0334:
0335:                    return; // don't pass down the stack, naker does this for us !
0336:
0337:                case Event.GET_MSG_DIGEST:
0338:                    long[] highest_seqnos = (long[]) evt.getArg();
0339:                    Digest digest = naker.computeMessageDigest(highest_seqnos);
0340:                    passUp(new Event(Event.GET_MSG_DIGEST_OK, digest));
0341:                    return;
0342:
0343:                case Event.GET_MSGS:
0344:                    List lower_seqnos = naker.getMessagesInRange((long[][]) evt
0345:                            .getArg());
0346:                    passUp(new Event(Event.GET_MSGS_OK, lower_seqnos));
0347:                    return;
0348:
0349:                case Event.REBROADCAST_MSGS:
0350:                    rebroadcastMsgs((Vector) evt.getArg());
0351:                    break;
0352:
0353:                case Event.TMP_VIEW:
0354:                    Vector mbrs = ((View) evt.getArg()).getMembers();
0355:                    members = mbrs != null ? (Vector) mbrs.clone()
0356:                            : new Vector(11);
0357:                    break;
0358:
0359:                case Event.VIEW_CHANGE:
0360:                    synchronized (this ) {
0361:                        view = ((View) ((View) evt.getArg()).clone());
0362:                        vid = view.getVid();
0363:
0364:                        members = (Vector) view.getMembers().clone();
0365:
0366:                        naker.reset();
0367:                        out_of_bander.reset();
0368:
0369:                        is_server = true; // check vids from now on
0370:
0371:                        // deliver messages received previously for this view
0372:                        if (queued_msgs.size() > 0)
0373:                            deliverQueuedMessages();
0374:                    }
0375:                    break;
0376:
0377:                case Event.BECOME_SERVER:
0378:                    is_server = true;
0379:                    break;
0380:
0381:                case Event.SWITCH_NAK:
0382:                    naker.setAcks(false); // don't require acks when sending a msg
0383:                    return; // don't pass down any further
0384:
0385:                case Event.SWITCH_NAK_ACK:
0386:                    send_next_msg_acking = true;
0387:                    return; // don't pass down any further
0388:
0389:                case Event.SWITCH_OUT_OF_BAND:
0390:                    send_next_msg_out_of_band = true;
0391:                    return;
0392:
0393:                case Event.GET_MSGS_RECEIVED: // return the highest seqnos delivered (=consumed by the application)
0394:                    long[] h = naker.getHighestSeqnosDelivered();
0395:                    passUp(new Event(Event.GET_MSGS_RECEIVED_OK, h));
0396:                    break;
0397:                }
0398:
0399:                passDown(evt);
0400:            }
0401:
0402:            boolean coordinator() {
0403:                if (members == null || members.size() < 1 || local_addr == null)
0404:                    return false;
0405:                return local_addr.equals(members.elementAt(0));
0406:            }
0407:
0408:            /**
0409:             * Rebroadcasts the messages given as arguments
0410:             */
0411:            void rebroadcastMsgs(Vector v) {
0412:                Vector final_v;
0413:                Message m1, m2;
0414:                NakAckHeader h1, h2;
0415:
0416:                if (v == null)
0417:                    return;
0418:                final_v = new Vector(v.size());
0419:
0420:                // weed out duplicates
0421:                //TODO Check!!!!!
0422:                for (int i = 0; i < v.size(); i++) {
0423:                    boolean present = false;
0424:                    m1 = (Message) v.elementAt(i);
0425:                    h1 = m1 != null ? (NakAckHeader) m1.getHeader(getName())
0426:                            : null;
0427:                    if (m1 == null || h1 == null) { // +++ remove
0428:                        if (log.isErrorEnabled())
0429:                            log.error("message is null");
0430:                        continue;
0431:                    }
0432:
0433:                    for (int j = 0; j < final_v.size(); j++) {
0434:                        m2 = (Message) final_v.elementAt(j);
0435:                        h2 = m2 != null ? (NakAckHeader) m2
0436:                                .getHeader(getName()) : null;
0437:                        if (m2 == null || h2 == null) { // +++ remove
0438:                            if (log.isErrorEnabled())
0439:                                log.error("message m2 is null");
0440:                            continue;
0441:                        }
0442:                        if (h1.seqno == h2.seqno && m1.getSrc() != null
0443:                                && m2.getSrc() != null
0444:                                && m1.getSrc().equals(m2.getSrc())) {
0445:                            present = true;
0446:                        }
0447:                    }
0448:                    if (!present)
0449:                        final_v.addElement(m1);
0450:                }
0451:
0452:                if (log.isWarnEnabled())
0453:                    log.warn("rebroadcasting " + final_v.size() + " messages");
0454:
0455:                /* Now re-broadcast messages using original NakAckHeader (same seqnos, same sender !) */
0456:                for (int i = 0; i < final_v.size(); i++) {
0457:                    m1 = (Message) final_v.elementAt(i);
0458:                    naker.resend(m1);
0459:                }
0460:
0461:                // Wait until all members have acked reception of outstanding msgs. This will empty our
0462:                // retransmission table (AckMcastSenderWindow)
0463:                naker.waitUntilAllAcksReceived(rebroadcast_timeout);
0464:                passUp(new Event(Event.REBROADCAST_MSGS_OK));
0465:            }
0466:
0467:            /**
0468:             * Deliver all messages in the queue where <code>msg.vid == vid</code> holds. Messages were stored
0469:             * in the queue because their vid was greater than the current view.
0470:             */
0471:            void deliverQueuedMessages() {
0472:                NakAckHeader hdr;
0473:                Message tmpmsg;
0474:                int rc;
0475:
0476:                while (queued_msgs.size() > 0) {
0477:                    tmpmsg = (Message) queued_msgs.removeFromHead();
0478:                    hdr = (NakAckHeader) tmpmsg.getHeader(getName());
0479:                    rc = hdr.vid.compareTo(vid);
0480:                    if (rc == 0) { // same vid -> OK
0481:                        up(new Event(Event.MSG, tmpmsg));
0482:                    } else if (rc > 0) {
0483:                    } else
0484:                        /** todo Maybe messages from previous vids are stored while client */
0485:                        ; // can't be the case; only messages for future views are stored !
0486:                }
0487:            }
0488:
0489:            public boolean setProperties(Properties props) {
0490:                String str;
0491:                long[] tmp;
0492:
0493:                super .setProperties(props);
0494:                str = props.getProperty("retransmit_timeout");
0495:                if (str != null) {
0496:                    tmp = Util.parseCommaDelimitedLongs(str);
0497:                    props.remove("retransmit_timeout");
0498:                    if (tmp != null && tmp.length > 0)
0499:                        retransmit_timeout = tmp;
0500:                }
0501:
0502:                str = props.getProperty("rebroadcast_timeout");
0503:                if (str != null) {
0504:                    rebroadcast_timeout = Long.parseLong(str);
0505:                    props.remove("rebroadcast_timeout");
0506:                }
0507:
0508:                if (props.size() > 0) {
0509:                    log
0510:                            .error("NAKACK.setProperties(): these properties are not recognized: "
0511:                                    + props);
0512:
0513:                    return false;
0514:                }
0515:                return true;
0516:            }
0517:
0518:            class NAKer implements  Retransmitter.RetransmitCommand,
0519:                    AckMcastSenderWindow.RetransmitCommand {
0520:                long seqno = 0; // current message sequence number
0521:                final Hashtable received_msgs = new Hashtable(); // ordered by sender -> NakReceiverWindow
0522:                final Hashtable sent_msgs = new Hashtable(); // ordered by seqno (sent by me !) - Messages
0523:                final AckMcastSenderWindow sender_win = new AckMcastSenderWindow(
0524:                        this , timer);
0525:                boolean acking = false; // require acks when sending msgs
0526:                long deleted_up_to = 0;
0527:
0528:                // Used to periodically retransmit the last message
0529:                final LastMessageRetransmitter last_msg_xmitter = new LastMessageRetransmitter();
0530:
0531:                private class LastMessageRetransmitter implements 
0532:                        TimeScheduler.Task {
0533:                    boolean stopped = false;
0534:                    int num_times = 2; // number of times a message is retransmitted
0535:                    long last_xmitted_seqno = 0;
0536:
0537:                    public void stop() {
0538:                        stopped = true;
0539:                    }
0540:
0541:                    public boolean cancelled() {
0542:                        return stopped;
0543:                    }
0544:
0545:                    public long nextInterval() {
0546:                        return retransmit_timeout[0];
0547:                    }
0548:
0549:                    /**
0550:                     * Periodically retransmits the last seqno to all members. If the seqno doesn't change (ie. there
0551:                     * were no new messages sent) then the retransmitter task doesn't retransmit after 'num_times' times.
0552:                     */
0553:                    public void run() {
0554:                        synchronized (sent_msgs) {
0555:                            long prevSeqno = seqno - 1;
0556:
0557:                            if (prevSeqno == last_xmitted_seqno) {
0558:
0559:                                if (log.isInfoEnabled())
0560:                                    log.info("prevSeqno=" + prevSeqno
0561:                                            + ", last_xmitted_seqno="
0562:                                            + last_xmitted_seqno
0563:                                            + ", num_times=" + num_times);
0564:                                if (--num_times <= 0)
0565:                                    return;
0566:                            } else {
0567:                                num_times = 3;
0568:                                last_xmitted_seqno = prevSeqno;
0569:                            }
0570:
0571:                            if ((prevSeqno >= 0) && (prevSeqno > deleted_up_to)) {
0572:
0573:                                if (log.isInfoEnabled())
0574:                                    log.info("retransmitting last message "
0575:                                            + prevSeqno);
0576:                                retransmit(null, prevSeqno, prevSeqno);
0577:                            }
0578:                        }
0579:                    }
0580:
0581:                }
0582:
0583:                NAKer() {
0584:                    if (timer != null)
0585:                        timer.add(last_msg_xmitter, true); // fixed-rate scheduling
0586:                    else if (log.isErrorEnabled())
0587:                        log.error("timer is null");
0588:                }
0589:
0590:                long getNextSeqno() {
0591:                    return seqno++;
0592:                }
0593:
0594:                long getHighestSeqnoSent() {
0595:                    long highest_sent = -1;
0596:                    for (Enumeration e = sent_msgs.keys(); e.hasMoreElements();)
0597:                        highest_sent = Math.max(highest_sent, ((Long) e
0598:                                .nextElement()).longValue());
0599:                    return highest_sent;
0600:                }
0601:
0602:                /**
0603:                 * Returns an array of the highest sequence numbers consumed by the application so far,
0604:                 * its order corresponding with <code>mbrs</code>. Used by coordinator as argument when
0605:                 * sending initial FLUSH request to members
0606:                 */
0607:                long[] getHighestSeqnosDelivered() {
0608:                    long[] highest_deliv = members != null ? new long[members
0609:                            .size()] : null;
0610:                    Address mbr;
0611:                    NakReceiverWindow win;
0612:
0613:                    if (highest_deliv == null)
0614:                        return null;
0615:
0616:                    for (int i = 0; i < highest_deliv.length; i++)
0617:                        highest_deliv[i] = -1;
0618:
0619:                    synchronized (members) {
0620:                        for (int i = 0; i < members.size(); i++) {
0621:                            mbr = (Address) members.elementAt(i);
0622:                            win = (NakReceiverWindow) received_msgs.get(mbr);
0623:                            if (win != null)
0624:                                highest_deliv[i] = win.getHighestDelivered();
0625:                        }
0626:                    }
0627:                    return highest_deliv;
0628:                }
0629:
0630:                /**
0631:                 * Return all messages sent by us that are higher than <code>seqno</code>
0632:                 */
0633:                List getSentMessagesHigherThan(long seqno) {
0634:                    List retval = new List();
0635:                    Long key;
0636:
0637:                    for (Enumeration e = sent_msgs.keys(); e.hasMoreElements();) {
0638:                        key = (Long) e.nextElement();
0639:                        if (key.longValue() > seqno)
0640:                            retval.add(sent_msgs.get(key));
0641:                    }
0642:                    return retval;
0643:                }
0644:
0645:                /**
0646:                 * Returns a message digest: for each member P in <code>highest_seqnos</code>, the highest seqno
0647:                 * received from P is added to the digest's array. If P == this, then the highest seqno
0648:                 * <em>sent</em> is added: this makes sure that messages sent but not yet received are also
0649:                 * re-broadcast (because they are also unstable).<p>If my highest seqno for a member P is
0650:                 * higher than the one in <code>highest_seqnos</code>, then all messages from P (received or sent)
0651:                 * whose seqno is higher are added to the digest's messages. The coordinator will use all digests
0652:                 * to compute a set of messages than need to be re-broadcast to the members before installing
0653:                 * a new view.
0654:                 */
0655:                Digest computeMessageDigest(long[] highest_seqnos) {
0656:                    Digest digest = highest_seqnos != null ? new Digest(
0657:                            highest_seqnos.length) : null;
0658:                    Address sender;
0659:                    NakReceiverWindow win;
0660:                    List unstable_msgs;
0661:                    int own_index;
0662:                    long highest_seqno_sent = -1, highest_seqno_received = -1;
0663:
0664:                    if (digest == null) {
0665:
0666:                        if (log.isWarnEnabled())
0667:                            log
0668:                                    .warn("highest_seqnos is null, cannot compute digest !");
0669:                        return null;
0670:                    }
0671:
0672:                    if (highest_seqnos.length != members.size()) {
0673:
0674:                        if (log.isWarnEnabled())
0675:                            log
0676:                                    .warn("the mbrship size and the size "
0677:                                            + "of the highest_seqnos array are not equal, cannot compute digest !");
0678:                        return null;
0679:                    }
0680:
0681:                    System.arraycopy(highest_seqnos, 0, digest.highest_seqnos,
0682:                            0, digest.highest_seqnos.length);
0683:
0684:                    for (int i = 0; i < highest_seqnos.length; i++) {
0685:                        sender = (Address) members.elementAt(i);
0686:                        if (sender == null)
0687:                            continue;
0688:                        win = (NakReceiverWindow) received_msgs.get(sender);
0689:                        if (win == null)
0690:                            continue;
0691:                        digest.highest_seqnos[i] = win.getHighestReceived();
0692:                        unstable_msgs = win
0693:                                .getMessagesHigherThan(highest_seqnos[i]);
0694:                        for (Enumeration e = unstable_msgs.elements(); e
0695:                                .hasMoreElements();)
0696:                            digest.msgs.add(e.nextElement());
0697:                    }
0698:
0699:                    /** If our highest seqno <em>sent</em> is higher than the one <em>received</em>, we have to
0700:                     (a) set it in the digest and (b) add the corresponding messages **/
0701:
0702:                    own_index = members.indexOf(local_addr);
0703:                    if (own_index == -1) {
0704:
0705:                        if (log.isWarnEnabled())
0706:                            log.warn("no own address in highest_seqnos");
0707:                        return digest;
0708:                    }
0709:                    highest_seqno_received = digest.highest_seqnos[own_index];
0710:                    highest_seqno_sent = getHighestSeqnoSent();
0711:
0712:                    if (highest_seqno_sent > highest_seqno_received) {
0713:                        // (a) Set highest seqno sent in digest
0714:                        digest.highest_seqnos[own_index] = highest_seqno_sent;
0715:
0716:                        // (b) Add messages between highest_seqno_received and highest_seqno_sent
0717:                        unstable_msgs = getSentMessagesHigherThan(highest_seqno_received);
0718:                        for (Enumeration e = unstable_msgs.elements(); e
0719:                                .hasMoreElements();)
0720:                            digest.msgs.add(e.nextElement());
0721:                    }
0722:
0723:                    return digest;
0724:                }
0725:
0726:                /**
0727:                 * For each non-null member m in <code>range</code>, get messages with sequence numbers between
0728:                 * range[m][0] and range[m][1], excluding range[m][0] and including range[m][1].
0729:                 */
0730:                List getMessagesInRange(long[][] range) {
0731:                    List retval = new List();
0732:                    List tmp;
0733:                    NakReceiverWindow win;
0734:                    Address sender;
0735:
0736:                    for (int i = 0; i < range.length; i++) {
0737:                        if (range[i] != null) {
0738:                            sender = (Address) members.elementAt(i);
0739:                            if (sender == null)
0740:                                continue;
0741:                            win = (NakReceiverWindow) received_msgs.get(sender);
0742:                            if (win == null)
0743:                                continue;
0744:                            tmp = win.getMessagesInRange(range[i][0],
0745:                                    range[i][1]);
0746:                            if (tmp == null || tmp.size() < 1)
0747:                                continue;
0748:                            for (Enumeration e = tmp.elements(); e
0749:                                    .hasMoreElements();)
0750:                                retval.add(e.nextElement());
0751:                        }
0752:                    }
0753:                    return retval;
0754:                }
0755:
0756:                void setAcks(boolean f) {
0757:                    acking = f;
0758:                }
0759:
0760:                /**
0761:                 * Vector with messages (ordered by sender) that are stable and can be discarded.
0762:                 * This applies to NAK-based sender and receivers.
0763:                 */
0764:                void stable(long[] seqnos) {
0765:                    int index;
0766:                    long seqno;
0767:                    NakReceiverWindow recv_win;
0768:                    Address sender;
0769:
0770:                    if (members == null || local_addr == null) {
0771:                        if (log.isWarnEnabled())
0772:                            log.warn("members or local_addr are null !");
0773:                        return;
0774:                    }
0775:                    index = members.indexOf(local_addr);
0776:
0777:                    if (index < 0) {
0778:
0779:                        if (log.isWarnEnabled())
0780:                            log.warn("member " + local_addr + " not found in "
0781:                                    + members);
0782:                        return;
0783:                    }
0784:                    seqno = seqnos[index];
0785:
0786:                    if (log.isInfoEnabled())
0787:                        log.info("deleting stable messages [" + deleted_up_to
0788:                                + " - " + seqno + ']');
0789:
0790:                    // delete sent messages that are stable (kept for retransmission requests from receivers)
0791:                    synchronized (sent_msgs) {
0792:                        for (long i = deleted_up_to; i <= seqno; i++) {
0793:                            sent_msgs.remove(new Long(i));
0794:                        }
0795:                        deleted_up_to = seqno;
0796:                    }
0797:                    // delete received msgs that are stable
0798:                    for (int i = 0; i < members.size(); i++) {
0799:                        sender = (Address) members.elementAt(i);
0800:                        recv_win = (NakReceiverWindow) received_msgs
0801:                                .get(sender);
0802:                        if (recv_win != null)
0803:                            recv_win.stable(seqnos[i]); // delete all messages with seqnos <= seqnos[i]
0804:                    }
0805:                }
0806:
0807:                void send(Message msg) {
0808:                    long id = getNextSeqno();
0809:                    ViewId vid_copy;
0810:
0811:                    if (vid == null)
0812:                        return;
0813:                    vid_copy = (ViewId) vid.clone();
0814:                    /** todo No needs to copy vid */
0815:
0816:                    if (acking) {
0817:                        msg.putHeader(getName(), new NakAckHeader(
0818:                                NakAckHeader.NAK_ACK_MSG, id, vid_copy));
0819:                        sender_win
0820:                                .add(id, msg.copy(), (Vector) members.clone()); // msg must be copied !
0821:                    } else
0822:                        msg.putHeader(getName(), new NakAckHeader(
0823:                                NakAckHeader.NAK_MSG, id, vid_copy));
0824:
0825:                    if (log.isInfoEnabled())
0826:                        log.info("sending msg #" + id);
0827:
0828:                    sent_msgs.put(new Long(id), msg.copy());
0829:                    passDown(new Event(Event.MSG, msg));
0830:                }
0831:
0832:                /** Re-broadcast message. Message already contains NakAckHeader (therefore also seqno).
0833:                 Wrap message (including header) in a new message and bcasts using ACKS. Every receiver
0834:                 acks the message, unwraps it to get the original message and delivers the original message
0835:                 (if not yet delivered).
0836:                 // send msgs in Vector arg again (they already have a NakAckHeader !)
0837:                 // -> use the same seq numbers
0838:                 // -> destination has to be set to null (broadcast), e.g.:
0839:                 //      dst=p (me !), src=q --> dst=null, src=q
0840:
0841:                 TODO:
0842:                 -----
0843:                 resend() has to wait until it received all ACKs from all recipients (for all msgs), or until
0844:                 members were suspected. Thus we can ensure that all members received outstanding msgs before
0845:                 we switch to a new view. Otherwise, because the switch to a new view resets NAK and ACK msg
0846:                 transmission, slow members might never receive all outstanding messages.
0847:                 */
0848:
0849:                /**
0850:                 * 1. Set the destination address of the original msg to null
0851:                 * 2. Add a new header WRAPPED_MSG and send msg. The receiver will ACK the msg,
0852:                 * remove the header and deliver the msg
0853:                 */
0854:                void resend(Message msg) {
0855:                    Message copy = msg.copy();
0856:                    NakAckHeader hdr = (NakAckHeader) copy.getHeader(getName());
0857:                    NakAckHeader wrapped_hdr;
0858:                    long id = hdr.seqno;
0859:
0860:                    if (vid == null)
0861:                        return;
0862:                    copy.setDest(null); // broadcast, e.g. dst(p), src(q) --> dst(null), src(q)
0863:                    wrapped_hdr = new NakAckHeader(NakAckHeader.WRAPPED_MSG,
0864:                            hdr.seqno, hdr.vid);
0865:                    wrapped_hdr.sender = local_addr;
0866:                    copy.putHeader(WRAPPED_MSG_KEY, wrapped_hdr);
0867:                    sender_win.add(id, copy.copy(), (Vector) members.clone());
0868:                    if (log.isInfoEnabled())
0869:                        log.info("resending " + copy.getHeader(getName()));
0870:                    passDown(new Event(Event.MSG, copy));
0871:                }
0872:
0873:                void waitUntilAllAcksReceived(long timeout) {
0874:                    sender_win.waitUntilAllAcksReceived(timeout);
0875:                }
0876:
0877:                void receive(long id, Message msg, Vector stable_msgs) {
0878:                    /** todo Vector stable_msgs is not used in NAKer.receive() */
0879:                    Address sender = msg.getSrc();
0880:                    NakReceiverWindow win = (NakReceiverWindow) received_msgs
0881:                            .get(sender);
0882:                    Message msg_to_deliver;
0883:
0884:                    if (win == null) {
0885:                        win = new NakReceiverWindow(sender, this , 0);
0886:                        win.setRetransmitTimeouts(retransmit_timeout);
0887:                        received_msgs.put(sender, win);
0888:                    }
0889:
0890:                    if (log.isInfoEnabled())
0891:                        log.info("received <" + sender + '#' + id + '>');
0892:
0893:                    win.add(id, msg); // add in order, then remove and pass up as many msgs as possible
0894:                    while (true) {
0895:                        msg_to_deliver = win.remove();
0896:                        if (msg_to_deliver == null)
0897:                            break;
0898:
0899:                        if (msg_to_deliver.getHeader(getName()) instanceof  NakAckHeader)
0900:                            msg_to_deliver.removeHeader(getName());
0901:                        passUp(new Event(Event.MSG, msg_to_deliver));
0902:                    }
0903:                }
0904:
0905:                void receiveAck(long id, Address sender) {
0906:
0907:                    if (log.isInfoEnabled())
0908:                        log.info("received ack <-- ACK <" + sender + '#' + id
0909:                                + '>');
0910:                    sender_win.ack(id, sender);
0911:                }
0912:
0913:                /**
0914:                 * Implementation of interface AckMcastSenderWindow.RetransmitCommand.<p>
0915:                 * Called by retransmission thread of AckMcastSenderWindow. <code>msg</code> is already
0916:                 * a copy, so does not need to be copied again.
0917:                 */
0918:                public void retransmit(long seqno, Message msg, Address dest) {
0919:
0920:                    if (log.isInfoEnabled())
0921:                        log.info("retransmitting message " + seqno + " to "
0922:                                + dest + ", header is "
0923:                                + msg.getHeader(getName()));
0924:
0925:                    // check whether dest is member of group. If not, discard retransmission message and
0926:                    // also remove it from sender_win (AckMcastSenderWindow)
0927:                    if (members != null) {
0928:                        if (!members.contains(dest)) {
0929:
0930:                            if (log.isInfoEnabled())
0931:                                log
0932:                                        .info("retransmitting "
0933:                                                + seqno
0934:                                                + ") to "
0935:                                                + dest
0936:                                                + ": "
0937:                                                + dest
0938:                                                + " is not a member; discarding retransmission and removing "
0939:                                                + dest + " from sender_win");
0940:                            sender_win.remove(dest);
0941:                            return;
0942:                        }
0943:                    }
0944:
0945:                    msg.setDest(dest);
0946:                    passDown(new Event(Event.MSG, msg));
0947:                }
0948:
0949:                /**
0950:                 * Implementation of Retransmitter.RetransmitCommand.<p>
0951:                 * Called by retransmission thread when gap is detected. Sends retr. request
0952:                 * to originator of msg
0953:                 */
0954:                public void retransmit(long first_seqno, long last_seqno,
0955:                        Address sender) {
0956:
0957:                    if (log.isInfoEnabled())
0958:                        log.info("retransmit([" + first_seqno + ", "
0959:                                + last_seqno + "]) to " + sender + ", vid="
0960:                                + vid);
0961:
0962:                    NakAckHeader hdr = new NakAckHeader(
0963:                            NakAckHeader.RETRANSMIT_MSG, first_seqno,
0964:                            (ViewId) vid.clone());
0965:                    /** todo Not necessary to clone vid */
0966:                    Message retransmit_msg = new Message(sender, null, null);
0967:
0968:                    hdr.last_seqno = last_seqno;
0969:                    retransmit_msg.putHeader(getName(), hdr);
0970:                    passDown(new Event(Event.MSG, retransmit_msg));
0971:                }
0972:
0973:                // Retransmit from sent-table, called when RETRANSMIT message is received
0974:                void retransmit(Address dest, long first_seqno, long last_seqno) {
0975:                    Message m, retr_msg;
0976:
0977:                    for (long i = first_seqno; i <= last_seqno; i++) {
0978:                        m = (Message) sent_msgs.get(new Long(i));
0979:                        if (m == null) {
0980:                            if (log.isWarnEnabled())
0981:                                log.warn("(to " + dest + "): message with "
0982:                                        + "seqno=" + i + " not found !");
0983:                            continue;
0984:                        }
0985:
0986:                        retr_msg = m.copy();
0987:                        retr_msg.setDest(dest);
0988:
0989:                        try {
0990:                            passDown(new Event(Event.MSG, retr_msg));
0991:                        } catch (Exception e) {
0992:                            if (log.isDebugEnabled())
0993:                                log.debug("exception is " + e);
0994:                        }
0995:                    }
0996:                }
0997:
0998:                void stop() {
0999:                    if (sender_win != null)
1000:                        sender_win.stop();
1001:                }
1002:
1003:                void reset() {
1004:                    NakReceiverWindow win;
1005:
1006:                    // Only reset if not coord: coord may have to retransmit the VIEW_CHANGE msg to slow members,
1007:                    // since VIEW_CHANGE results in retransmitter resetting, retransmission would be killed, and the
1008:                    // slow mbr would never receive a new view (see ./design/ViewChangeRetransmission.txt)
1009:                    if (!coordinator())
1010:                        sender_win.reset();
1011:
1012:                    sent_msgs.clear();
1013:                    for (Enumeration e = received_msgs.elements(); e
1014:                            .hasMoreElements();) {
1015:                        win = (NakReceiverWindow) e.nextElement();
1016:                        win.reset();
1017:                    }
1018:                    received_msgs.clear();
1019:                    seqno = 0;
1020:                    deleted_up_to = 0;
1021:                }
1022:
1023:                public void suspect(Address mbr) {
1024:                    NakReceiverWindow w;
1025:
1026:                    w = (NakReceiverWindow) received_msgs.get(mbr);
1027:                    if (w != null) {
1028:                        w.reset();
1029:                        received_msgs.remove(mbr);
1030:                    }
1031:
1032:                    sender_win.suspect(mbr); // don't keep retransmitting messages to mbr
1033:                }
1034:
1035:                String dumpContents() {
1036:                    StringBuffer ret = new StringBuffer();
1037:
1038:                    ret.append("\nsent_msgs: " + sent_msgs.size());
1039:
1040:                    ret.append("\nreceived_msgs: ");
1041:                    for (Enumeration e = received_msgs.keys(); e
1042:                            .hasMoreElements();) {
1043:                        Address key = (Address) e.nextElement();
1044:                        NakReceiverWindow w = (NakReceiverWindow) received_msgs
1045:                                .get(key);
1046:                        ret.append('\n' + w.toString());
1047:                    }
1048:
1049:                    ret.append("\nsender_win: " + sender_win.toString());
1050:
1051:                    return ret.toString();
1052:                }
1053:
1054:            }
1055:
1056:            class OutOfBander implements  AckMcastSenderWindow.RetransmitCommand {
1057:                final AckMcastSenderWindow sender_win = new AckMcastSenderWindow(
1058:                        this , timer);
1059:                final AckMcastReceiverWindow receiver_win = new AckMcastReceiverWindow();
1060:                long seqno = 0;
1061:
1062:                void send(Message msg) {
1063:                    long id = seqno++;
1064:                    Vector stable_msgs = sender_win.getStableMessages();
1065:                    NakAckHeader hdr;
1066:
1067:                    if (log.isInfoEnabled())
1068:                        log.info("sending msg #=" + id);
1069:
1070:                    hdr = new NakAckHeader(NakAckHeader.OUT_OF_BAND_MSG, id,
1071:                            null);
1072:                    hdr.stable_msgs = stable_msgs;
1073:                    msg.putHeader(getName(), hdr);
1074:
1075:                    // msg needs to be copied, otherwise it will be modified by the code below
1076:                    sender_win.add(id, msg.copy(), (Vector) members.clone());
1077:
1078:                    passDown(new Event(Event.MSG, msg));
1079:                }
1080:
1081:                void receive(long id, Message msg, Vector stable_msgs) {
1082:                    Address sender = msg.getSrc();
1083:
1084:                    // first thing: send ACK back to sender
1085:                    Message ack_msg = new Message(msg.getSrc(), null, null);
1086:                    NakAckHeader hdr = new NakAckHeader(
1087:                            NakAckHeader.OUT_OF_BAND_RSP, id, null);
1088:                    ack_msg.putHeader(getName(), hdr);
1089:
1090:                    if (log.isInfoEnabled())
1091:                        log.info("received <" + sender + '#' + id + ">\n");
1092:
1093:                    if (receiver_win.add(sender, id)) // not received previously
1094:                        passUp(new Event(Event.MSG, msg));
1095:
1096:                    passDown(new Event(Event.MSG, ack_msg)); // send ACK
1097:                    if (log.isInfoEnabled())
1098:                        log.info("sending ack <" + sender + '#' + id + ">\n");
1099:
1100:                    if (stable_msgs != null)
1101:                        receiver_win.remove(sender, stable_msgs);
1102:                }
1103:
1104:                void receiveAck(long id, Address sender) {
1105:                    if (log.isInfoEnabled())
1106:                        log.info("received ack <" + sender + '#' + id + '>');
1107:                    sender_win.ack(id, sender);
1108:                }
1109:
1110:                /**
1111:                 * Called by retransmission thread of AckMcastSenderWindow. <code>msg</code> is already
1112:                 * a copy, so does not need to be copied again. All the necessary header are already present;
1113:                 * no header needs to be added ! The message is retransmitted as <em>unicast</em> !
1114:                 */
1115:                public void retransmit(long seqno, Message msg, Address dest) {
1116:                    if (log.isInfoEnabled())
1117:                        log.info("dest=" + dest + ", msg #" + seqno);
1118:                    msg.setDest(dest);
1119:                    passDown(new Event(Event.MSG, msg));
1120:                }
1121:
1122:                void reset() {
1123:                    sender_win.reset(); // +++ ?
1124:                    receiver_win.reset(); // +++ ?
1125:                }
1126:
1127:                void suspect(Address mbr) {
1128:                    sender_win.suspect(mbr);
1129:                    receiver_win.suspect(mbr);
1130:                }
1131:
1132:                void start() {
1133:                    sender_win.start();
1134:                }
1135:
1136:                void stop() {
1137:                    if (sender_win != null)
1138:                        sender_win.stop();
1139:                }
1140:
1141:                String dumpContents() {
1142:                    StringBuffer ret = new StringBuffer();
1143:                    ret.append("\nsender_win:\n" + sender_win.toString()
1144:                            + "\nreceiver_win:\n" + receiver_win.toString());
1145:                    return ret.toString();
1146:                }
1147:
1148:            }
1149:
1150:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.