Source Code Cross Referenced for NAKACK.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » protocols » pbcast » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.protocols.pbcast 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        // $Id: NAKACK.java,v 1.81.2.1 2007/04/27 08:03:55 belaban Exp $
0002:
0003:        package org.jgroups.protocols.pbcast;
0004:
0005:        import org.jgroups.*;
0006:        import org.jgroups.stack.NakReceiverWindow;
0007:        import org.jgroups.stack.Protocol;
0008:        import org.jgroups.stack.Retransmitter;
0009:        import org.jgroups.util.*;
0010:
0011:        import java.io.IOException;
0012:        import java.util.*;
0013:
0014:        /**
0015:         * Negative AcKnowledgement layer (NAKs). Messages are assigned a monotonically increasing sequence number (seqno).
0016:         * Receivers deliver messages ordered according to seqno and request retransmission of missing messages. Retransmitted
0017:         * messages are bundled into bigger ones, e.g. when getting an xmit request for messages 1-10, instead of sending 10
0018:         * unicast messages, we bundle all 10 messages into 1 and send it. However, since this protocol typically sits below
0019:         * FRAG, we cannot count on FRAG to fragement/defragment the (possibly) large message into smaller ones. Therefore we
0020:         * only bundle messages up to max_xmit_size bytes to prevent too large messages. For example, if the bundled message
0021:         * size was a total of 34000 bytes, and max_xmit_size=16000, we'd send 3 messages: 2 16K and a 2K message. <em>Note that
0022:         * max_xmit_size should be the same value as FRAG.frag_size (or smaller).</em><br/> Retransmit requests are always sent
0023:         * to the sender. If the sender dies, and not everyone has received its messages, they will be lost. In the future, this
0024:         * may be changed to have receivers store all messages, so that retransmit requests can be answered by any member.
0025:         * Trivial to implement, but not done yet. For most apps, the default retransmit properties are sufficient, if not use
0026:         * vsync.
0027:         *
0028:         * @author Bela Ban
0029:         */
0030:        public class NAKACK extends Protocol implements 
0031:                Retransmitter.RetransmitCommand, NakReceiverWindow.Listener {
0032:            private long[] retransmit_timeout = { 600, 1200, 2400, 4800 }; // time(s) to wait before requesting retransmission
0033:            private boolean is_server = false;
0034:            private Address local_addr = null;
0035:            private final Vector members = new Vector(11);
0036:            private View view;
0037:            private long seqno = -1; // current message sequence number (starts with 0)
0038:            private long max_xmit_size = 8192; // max size of a retransmit message (otherwise send multiple)
0039:            private int gc_lag = 20; // number of msgs garbage collection lags behind
0040:
0041:            /**
0042:             * Retransmit messages using multicast rather than unicast. This has the advantage that, if many receivers lost a
0043:             * message, the sender only retransmits once.
0044:             */
0045:            private boolean use_mcast_xmit = true;
0046:
0047:            /**
0048:             * Ask a random member for retransmission of a missing message. If set to true, discard_delivered_msgs will be
0049:             * set to false
0050:             */
0051:            private boolean xmit_from_random_member = false;
0052:
0053:            /**
0054:             * Messages that have been received in order are sent up the stack (= delivered to the application). Delivered
0055:             * messages are removed from NakReceiverWindow.received_msgs and moved to NakReceiverWindow.delivered_msgs, where
0056:             * they are later garbage collected (by STABLE). Since we do retransmits only from sent messages, never
0057:             * received or delivered messages, we can turn the moving to delivered_msgs off, so we don't keep the message
0058:             * around, and don't need to wait for garbage collection to remove them.
0059:             */
0060:            private boolean discard_delivered_msgs = false;
0061:
0062:            /** If value is > 0, the retransmit buffer is bounded: only the max_xmit_buf_size latest messages are kept,
0063:             * older ones are discarded when the buffer size is exceeded. A value <= 0 means unbounded buffers
0064:             */
0065:            private int max_xmit_buf_size = 0;
0066:
0067:            /**
0068:             * Hashtable<Address,NakReceiverWindow>. Stores received messages (keyed by sender). Note that this is no long term
0069:             * storage; messages are just stored until they can be delivered (ie., until the correct FIFO order is established)
0070:             */
0071:            private final HashMap received_msgs = new HashMap(11);
0072:
0073:            /** TreeMap<Long,Message>. Map of messages sent by me (keyed and sorted on sequence number) */
0074:            private final TreeMap sent_msgs = new TreeMap();
0075:
0076:            private boolean leaving = false;
0077:            private boolean started = false;
0078:            private TimeScheduler timer = null;
0079:            private static final String name = "NAKACK";
0080:
0081:            private long xmit_reqs_received;
0082:            private long xmit_reqs_sent;
0083:            private long xmit_rsps_received;
0084:            private long xmit_rsps_sent;
0085:            private long missing_msgs_received;
0086:
0087:            /** Captures stats on XMIT_REQS, XMIT_RSPS per sender */
0088:            private HashMap sent = new HashMap();
0089:
0090:            /** Captures stats on XMIT_REQS, XMIT_RSPS per receiver */
0091:            private HashMap received = new HashMap();
0092:
0093:            private int stats_list_size = 20;
0094:
0095:            /** BoundedList<XmitRequest>. Keeps track of the last stats_list_size XMIT requests */
0096:            private BoundedList receive_history;
0097:
0098:            /** BoundedList<MissingMessage>. Keeps track of the last stats_list_size missing messages received */
0099:            private BoundedList send_history;
0100:
0101:            public NAKACK() {
0102:            }
0103:
0104:            public String getName() {
0105:                return name;
0106:            }
0107:
0108:            public long getXmitRequestsReceived() {
0109:                return xmit_reqs_received;
0110:            }
0111:
0112:            public long getXmitRequestsSent() {
0113:                return xmit_reqs_sent;
0114:            }
0115:
0116:            public long getXmitResponsesReceived() {
0117:                return xmit_rsps_received;
0118:            }
0119:
0120:            public long getXmitResponsesSent() {
0121:                return xmit_rsps_sent;
0122:            }
0123:
0124:            public long getMissingMessagesReceived() {
0125:                return missing_msgs_received;
0126:            }
0127:
0128:            public int getPendingRetransmissionRequests() {
0129:                int num = 0;
0130:                NakReceiverWindow win;
0131:                synchronized (received_msgs) {
0132:                    for (Iterator it = received_msgs.values().iterator(); it
0133:                            .hasNext();) {
0134:                        win = (NakReceiverWindow) it.next();
0135:                        num += win.size();
0136:                    }
0137:                }
0138:                return num;
0139:            }
0140:
0141:            public int getSentTableSize() {
0142:                int size;
0143:                synchronized (sent_msgs) {
0144:                    size = sent_msgs.size();
0145:                }
0146:                return size;
0147:            }
0148:
0149:            public int getReceivedTableSize() {
0150:                int ret = 0;
0151:                NakReceiverWindow win;
0152:                Set s = new LinkedHashSet(received_msgs.values());
0153:                for (Iterator it = s.iterator(); it.hasNext();) {
0154:                    win = (NakReceiverWindow) it.next();
0155:                    ret += win.size();
0156:                }
0157:                return ret;
0158:            }
0159:
0160:            public void resetStats() {
0161:                xmit_reqs_received = xmit_reqs_sent = xmit_rsps_received = xmit_rsps_sent = missing_msgs_received = 0;
0162:                sent.clear();
0163:                received.clear();
0164:                if (receive_history != null)
0165:                    receive_history.removeAll();
0166:                if (send_history != null)
0167:                    send_history.removeAll();
0168:            }
0169:
0170:            public void init() throws Exception {
0171:                if (stats) {
0172:                    send_history = new BoundedList(stats_list_size);
0173:                    receive_history = new BoundedList(stats_list_size);
0174:                }
0175:            }
0176:
0177:            public int getGcLag() {
0178:                return gc_lag;
0179:            }
0180:
0181:            public void setGcLag(int gc_lag) {
0182:                this .gc_lag = gc_lag;
0183:            }
0184:
0185:            public boolean isUseMcastXmit() {
0186:                return use_mcast_xmit;
0187:            }
0188:
0189:            public void setUseMcastXmit(boolean use_mcast_xmit) {
0190:                this .use_mcast_xmit = use_mcast_xmit;
0191:            }
0192:
0193:            public boolean isXmitFromRandomMember() {
0194:                return xmit_from_random_member;
0195:            }
0196:
0197:            public void setXmitFromRandomMember(boolean xmit_from_random_member) {
0198:                this .xmit_from_random_member = xmit_from_random_member;
0199:            }
0200:
0201:            public boolean isDiscardDeliveredMsgs() {
0202:                return discard_delivered_msgs;
0203:            }
0204:
0205:            public void setDiscardDeliveredMsgs(boolean discard_delivered_msgs) {
0206:                this .discard_delivered_msgs = discard_delivered_msgs;
0207:            }
0208:
0209:            public int getMaxXmitBufSize() {
0210:                return max_xmit_buf_size;
0211:            }
0212:
0213:            public void setMaxXmitBufSize(int max_xmit_buf_size) {
0214:                this .max_xmit_buf_size = max_xmit_buf_size;
0215:            }
0216:
0217:            public long getMaxXmitSize() {
0218:                return max_xmit_size;
0219:            }
0220:
0221:            public void setMaxXmitSize(long max_xmit_size) {
0222:                this .max_xmit_size = max_xmit_size;
0223:            }
0224:
0225:            public boolean setProperties(Properties props) {
0226:                String str;
0227:                long[] tmp;
0228:
0229:                super .setProperties(props);
0230:                str = props.getProperty("retransmit_timeout");
0231:                if (str != null) {
0232:                    tmp = Util.parseCommaDelimitedLongs(str);
0233:                    props.remove("retransmit_timeout");
0234:                    if (tmp != null && tmp.length > 0) {
0235:                        retransmit_timeout = tmp;
0236:                    }
0237:                }
0238:
0239:                str = props.getProperty("gc_lag");
0240:                if (str != null) {
0241:                    gc_lag = Integer.parseInt(str);
0242:                    if (gc_lag < 0) {
0243:                        log
0244:                                .error("NAKACK.setProperties(): gc_lag cannot be negative, setting it to 0");
0245:                    }
0246:                    props.remove("gc_lag");
0247:                }
0248:
0249:                str = props.getProperty("max_xmit_size");
0250:                if (str != null) {
0251:                    max_xmit_size = Long.parseLong(str);
0252:                    props.remove("max_xmit_size");
0253:                }
0254:
0255:                str = props.getProperty("use_mcast_xmit");
0256:                if (str != null) {
0257:                    use_mcast_xmit = Boolean.valueOf(str).booleanValue();
0258:                    props.remove("use_mcast_xmit");
0259:                }
0260:
0261:                str = props.getProperty("discard_delivered_msgs");
0262:                if (str != null) {
0263:                    discard_delivered_msgs = Boolean.valueOf(str)
0264:                            .booleanValue();
0265:                    props.remove("discard_delivered_msgs");
0266:                }
0267:
0268:                str = props.getProperty("xmit_from_random_member");
0269:                if (str != null) {
0270:                    xmit_from_random_member = Boolean.valueOf(str)
0271:                            .booleanValue();
0272:                    props.remove("xmit_from_random_member");
0273:                }
0274:
0275:                str = props.getProperty("max_xmit_buf_size");
0276:                if (str != null) {
0277:                    max_xmit_buf_size = Integer.parseInt(str);
0278:                    props.remove("max_xmit_buf_size");
0279:                }
0280:
0281:                str = props.getProperty("stats_list_size");
0282:                if (str != null) {
0283:                    stats_list_size = Integer.parseInt(str);
0284:                    props.remove("stats_list_size");
0285:                }
0286:
0287:                if (xmit_from_random_member) {
0288:                    if (discard_delivered_msgs) {
0289:                        discard_delivered_msgs = false;
0290:                        log
0291:                                .warn("xmit_from_random_member set to true: changed discard_delivered_msgs to false");
0292:                    }
0293:                }
0294:
0295:                if (props.size() > 0) {
0296:                    log
0297:                            .error("NAKACK.setProperties(): these properties are not recognized: "
0298:                                    + props);
0299:
0300:                    return false;
0301:                }
0302:                return true;
0303:            }
0304:
0305:            public Map dumpStats() {
0306:                Map retval = super .dumpStats();
0307:                if (retval == null)
0308:                    retval = new HashMap();
0309:
0310:                retval.put("xmit_reqs_received", new Long(xmit_reqs_received));
0311:                retval.put("xmit_reqs_sent", new Long(xmit_reqs_sent));
0312:                retval.put("xmit_rsps_received", new Long(xmit_rsps_received));
0313:                retval.put("xmit_rsps_sent", new Long(xmit_rsps_sent));
0314:                retval.put("missing_msgs_received", new Long(
0315:                        missing_msgs_received));
0316:
0317:                retval.put("sent_msgs", printSentMsgs());
0318:
0319:                StringBuffer sb = new StringBuffer();
0320:                Map.Entry entry;
0321:                Address addr;
0322:                Object w;
0323:                synchronized (received_msgs) {
0324:                    for (Iterator it = received_msgs.entrySet().iterator(); it
0325:                            .hasNext();) {
0326:                        entry = (Map.Entry) it.next();
0327:                        addr = (Address) entry.getKey();
0328:                        w = entry.getValue();
0329:                        sb.append(addr).append(": ").append(w.toString())
0330:                                .append('\n');
0331:                    }
0332:                }
0333:
0334:                retval.put("received_msgs", sb.toString());
0335:                return retval;
0336:            }
0337:
0338:            public String printStats() {
0339:                Map.Entry entry;
0340:                Object key, val;
0341:                StringBuffer sb = new StringBuffer();
0342:                sb.append("sent:\n");
0343:                for (Iterator it = sent.entrySet().iterator(); it.hasNext();) {
0344:                    entry = (Map.Entry) it.next();
0345:                    key = entry.getKey();
0346:                    if (key == null)
0347:                        key = "<mcast dest>";
0348:                    val = entry.getValue();
0349:                    sb.append(key).append(": ").append(val).append("\n");
0350:                }
0351:                sb.append("\nreceived:\n");
0352:                for (Iterator it = received.entrySet().iterator(); it.hasNext();) {
0353:                    entry = (Map.Entry) it.next();
0354:                    key = entry.getKey();
0355:                    val = entry.getValue();
0356:                    sb.append(key).append(": ").append(val).append("\n");
0357:                }
0358:
0359:                sb.append("\nXMIT_REQS sent:\n");
0360:                XmitRequest tmp;
0361:                for (Enumeration en = send_history.elements(); en
0362:                        .hasMoreElements();) {
0363:                    tmp = (XmitRequest) en.nextElement();
0364:                    sb.append(tmp).append("\n");
0365:                }
0366:
0367:                sb.append("\nMissing messages received\n");
0368:                MissingMessage missing;
0369:                for (Enumeration en = receive_history.elements(); en
0370:                        .hasMoreElements();) {
0371:                    missing = (MissingMessage) en.nextElement();
0372:                    sb.append(missing).append("\n");
0373:                }
0374:
0375:                return sb.toString();
0376:            }
0377:
0378:            public Vector providedUpServices() {
0379:                Vector retval = new Vector(5);
0380:                retval.addElement(new Integer(Event.GET_DIGEST));
0381:                retval.addElement(new Integer(Event.GET_DIGEST_STABLE));
0382:                retval.addElement(new Integer(Event.GET_DIGEST_STATE));
0383:                retval.addElement(new Integer(Event.SET_DIGEST));
0384:                retval.addElement(new Integer(Event.MERGE_DIGEST));
0385:                return retval;
0386:            }
0387:
0388:            public Vector providedDownServices() {
0389:                Vector retval = new Vector(2);
0390:                retval.addElement(new Integer(Event.GET_DIGEST));
0391:                retval.addElement(new Integer(Event.GET_DIGEST_STABLE));
0392:                return retval;
0393:            }
0394:
0395:            public void start() throws Exception {
0396:                timer = stack != null ? stack.timer : null;
0397:                if (timer == null)
0398:                    throw new Exception("timer is null");
0399:                started = true;
0400:            }
0401:
0402:            public void stop() {
0403:                started = false;
0404:                reset(); // clears sent_msgs and destroys all NakReceiverWindows
0405:            }
0406:
0407:            /**
0408:             * <b>Callback</b>. Called by superclass when event may be handled.<p> <b>Do not use <code>passDown()</code> in this
0409:             * method as the event is passed down by default by the superclass after this method returns !</b>
0410:             */
0411:            public void down(Event evt) {
0412:                Digest digest;
0413:                Vector mbrs;
0414:
0415:                switch (evt.getType()) {
0416:
0417:                case Event.MSG:
0418:                    Message msg = (Message) evt.getArg();
0419:                    Address dest = msg.getDest();
0420:                    if (dest != null && !dest.isMulticastAddress()) {
0421:                        break; // unicast address: not null and not mcast, pass down unchanged
0422:                    }
0423:                    send(evt, msg);
0424:                    return; // don't pass down the stack
0425:
0426:                case Event.STABLE: // generated by STABLE layer. Delete stable messages passed in arg
0427:                    stable((Digest) evt.getArg());
0428:                    return; // do not pass down further (Bela Aug 7 2001)
0429:
0430:                case Event.GET_DIGEST:
0431:                    digest = getDigest();
0432:                    passUp(new Event(Event.GET_DIGEST_OK,
0433:                            digest != null ? digest.copy() : null));
0434:                    return;
0435:
0436:                case Event.GET_DIGEST_STABLE:
0437:                    digest = getDigestHighestDeliveredMsgs();
0438:                    passUp(new Event(Event.GET_DIGEST_STABLE_OK,
0439:                            digest != null ? digest.copy() : null));
0440:                    return;
0441:
0442:                case Event.GET_DIGEST_STATE:
0443:                    digest = getDigest();
0444:                    passUp(new Event(Event.GET_DIGEST_STATE_OK,
0445:                            digest != null ? digest.copy() : null));
0446:                    return;
0447:
0448:                case Event.SET_DIGEST:
0449:                    setDigest((Digest) evt.getArg());
0450:                    return;
0451:
0452:                case Event.MERGE_DIGEST:
0453:                    mergeDigest((Digest) evt.getArg());
0454:                    return;
0455:
0456:                case Event.CONFIG:
0457:                    passDown(evt);
0458:                    if (log.isDebugEnabled()) {
0459:                        log.debug("received CONFIG event: " + evt.getArg());
0460:                    }
0461:                    handleConfigEvent((HashMap) evt.getArg());
0462:                    return;
0463:
0464:                case Event.TMP_VIEW:
0465:                    View tmp_view = (View) evt.getArg();
0466:                    mbrs = tmp_view.getMembers();
0467:                    members.clear();
0468:                    members.addAll(mbrs);
0469:                    adjustReceivers(false);
0470:                    break;
0471:
0472:                case Event.VIEW_CHANGE:
0473:                    tmp_view = (View) evt.getArg();
0474:                    mbrs = tmp_view.getMembers();
0475:                    members.clear();
0476:                    members.addAll(mbrs);
0477:                    adjustReceivers(true);
0478:                    is_server = true; // check vids from now on
0479:
0480:                    Set tmp = new LinkedHashSet(members);
0481:                    tmp.add(null); // for null destination (= mcast)
0482:                    sent.keySet().retainAll(tmp);
0483:                    received.keySet().retainAll(tmp);
0484:                    view = tmp_view;
0485:                    break;
0486:
0487:                case Event.BECOME_SERVER:
0488:                    is_server = true;
0489:                    break;
0490:
0491:                case Event.DISCONNECT:
0492:                    leaving = true;
0493:                    reset();
0494:                    break;
0495:                }
0496:
0497:                passDown(evt);
0498:            }
0499:
0500:            /**
0501:             * <b>Callback</b>. Called by superclass when event may be handled.<p> <b>Do not use <code>PassUp</code> in this
0502:             * method as the event is passed up by default by the superclass after this method returns !</b>
0503:             */
0504:            public void up(Event evt) {
0505:                NakAckHeader hdr;
0506:                Message msg;
0507:                Digest digest;
0508:
0509:                switch (evt.getType()) {
0510:
0511:                case Event.MSG:
0512:                    msg = (Message) evt.getArg();
0513:                    hdr = (NakAckHeader) msg.getHeader(name);
0514:                    if (hdr == null)
0515:                        break; // pass up (e.g. unicast msg)
0516:
0517:                    // discard messages while not yet server (i.e., until JOIN has returned)
0518:                    if (!is_server) {
0519:                        if (log.isTraceEnabled())
0520:                            log.trace("message was discarded (not yet server)");
0521:                        return;
0522:                    }
0523:
0524:                    // Changed by bela Jan 29 2003: we must not remove the header, otherwise
0525:                    // further xmit requests will fail !
0526:                    //hdr=(NakAckHeader)msg.removeHeader(getName());
0527:
0528:                    switch (hdr.type) {
0529:
0530:                    case NakAckHeader.MSG:
0531:                        handleMessage(msg, hdr);
0532:                        return; // transmitter passes message up for us !
0533:
0534:                    case NakAckHeader.XMIT_REQ:
0535:                        if (hdr.range == null) {
0536:                            if (log.isErrorEnabled()) {
0537:                                log
0538:                                        .error("XMIT_REQ: range of xmit msg is null; discarding request from "
0539:                                                + msg.getSrc());
0540:                            }
0541:                            return;
0542:                        }
0543:                        handleXmitReq(msg.getSrc(), hdr.range.low,
0544:                                hdr.range.high, hdr.sender);
0545:                        return;
0546:
0547:                    case NakAckHeader.XMIT_RSP:
0548:                        if (log.isTraceEnabled())
0549:                            log.trace("received missing messages " + hdr.range);
0550:                        handleXmitRsp(msg);
0551:                        return;
0552:
0553:                    default:
0554:                        if (log.isErrorEnabled()) {
0555:                            log.error("NakAck header type " + hdr.type
0556:                                    + " not known !");
0557:                        }
0558:                        return;
0559:                    }
0560:
0561:                case Event.STABLE: // generated by STABLE layer. Delete stable messages passed in arg
0562:                    stable((Digest) evt.getArg());
0563:                    return; // do not pass up further (Bela Aug 7 2001)
0564:
0565:                case Event.GET_DIGEST:
0566:                    digest = getDigestHighestDeliveredMsgs();
0567:                    passDown(new Event(Event.GET_DIGEST_OK, digest));
0568:                    return;
0569:
0570:                case Event.GET_DIGEST_STABLE:
0571:                    digest = getDigestHighestDeliveredMsgs();
0572:                    passDown(new Event(Event.GET_DIGEST_STABLE_OK, digest));
0573:                    return;
0574:
0575:                case Event.SET_LOCAL_ADDRESS:
0576:                    local_addr = (Address) evt.getArg();
0577:                    break;
0578:
0579:                case Event.CONFIG:
0580:                    passUp(evt);
0581:                    if (log.isDebugEnabled()) {
0582:                        log.debug("received CONFIG event: " + evt.getArg());
0583:                    }
0584:                    handleConfigEvent((HashMap) evt.getArg());
0585:                    return;
0586:                }
0587:                passUp(evt);
0588:            }
0589:
0590:            /* --------------------------------- Private Methods --------------------------------------- */
0591:
0592:            /**
0593:             * Adds the message to the sent_msgs table and then passes it down the stack. Change Bela Ban May 26 2002: we don't
0594:             * store a copy of the message, but a reference ! This saves us a lot of memory. However, this also means that a
0595:             * message should not be changed after storing it in the sent-table ! See protocols/DESIGN for details.
0596:             * Made seqno increment and adding to sent_msgs atomic, e.g. seqno won't get incremented if adding to
0597:             * sent_msgs fails e.g. due to an OOM (see http://jira.jboss.com/jira/browse/JGRP-179). bela Jan 13 2006
0598:             */
0599:            private void send(Event evt, Message msg) {
0600:                if (msg == null)
0601:                    throw new NullPointerException("msg is null; event is "
0602:                            + evt);
0603:
0604:                if (!started) {
0605:                    if (log.isWarnEnabled())
0606:                        log
0607:                                .warn("["
0608:                                        + local_addr
0609:                                        + "] discarded message as start() has not been called, message: "
0610:                                        + msg);
0611:                    return;
0612:                }
0613:
0614:                long msg_id;
0615:                synchronized (sent_msgs) {
0616:                    try { // incrementing seqno and adding the msg to sent_msgs needs to be atomic
0617:                        msg_id = seqno + 1;
0618:                        msg.putHeader(name, new NakAckHeader(NakAckHeader.MSG,
0619:                                msg_id));
0620:                        if (Global.copy) {
0621:                            sent_msgs.put(new Long(msg_id), msg.copy());
0622:                        } else {
0623:                            sent_msgs.put(new Long(msg_id), msg);
0624:                        }
0625:                        seqno = msg_id;
0626:                    } catch (Throwable t) {
0627:                        if (t instanceof  Error)
0628:                            throw (Error) t;
0629:                        if (t instanceof  RuntimeException)
0630:                            throw (RuntimeException) t;
0631:                        else {
0632:                            throw new RuntimeException("failure adding msg "
0633:                                    + msg + " to the retransmit table", t);
0634:                        }
0635:                    }
0636:                }
0637:
0638:                try { // moved passDown() out of synchronized clause (bela Sept 7 2006) http://jira.jboss.com/jira/browse/JGRP-300
0639:                    if (log.isTraceEnabled())
0640:                        log.trace("sending " + local_addr + "#" + msg_id);
0641:                    passDown(evt); // if this fails, since msg is in sent_msgs, it can be retransmitted
0642:                } catch (Throwable t) { // eat the exception, don't pass it up the stack
0643:                    if (log.isWarnEnabled()) {
0644:                        log.warn("failure passing message down", t);
0645:                    }
0646:                }
0647:            }
0648:
0649:            /**
0650:             * Finds the corresponding NakReceiverWindow and adds the message to it (according to seqno). Then removes as many
0651:             * messages as possible from the NRW and passes them up the stack. Discards messages from non-members.
0652:             */
0653:            private void handleMessage(Message msg, NakAckHeader hdr) {
0654:                NakReceiverWindow win;
0655:                Message msg_to_deliver;
0656:                Address sender = msg.getSrc();
0657:
0658:                if (sender == null) {
0659:                    if (log.isErrorEnabled())
0660:                        log.error("sender of message is null");
0661:                    return;
0662:                }
0663:
0664:                if (log.isTraceEnabled()) {
0665:                    StringBuffer sb = new StringBuffer('[');
0666:                    sb.append(local_addr).append(": received ").append(sender)
0667:                            .append('#').append(hdr.seqno);
0668:                    log.trace(sb.toString());
0669:                }
0670:
0671:                // msg is potentially re-sent later as result of XMIT_REQ reception; that's why hdr is added !
0672:
0673:                // Changed by bela Jan 29 2003: we currently don't resend from received msgs, just from sent_msgs !
0674:                // msg.putHeader(getName(), hdr);
0675:
0676:                synchronized (received_msgs) {
0677:                    win = (NakReceiverWindow) received_msgs.get(sender);
0678:                }
0679:                if (win == null) { // discard message if there is no entry for sender
0680:                    if (leaving)
0681:                        return;
0682:                    if (log.isWarnEnabled()) {
0683:                        StringBuffer sb = new StringBuffer('[');
0684:                        sb.append(local_addr).append(
0685:                                "] discarded message from non-member ").append(
0686:                                sender).append(", my view is ").append(
0687:                                this .view);
0688:                        log.warn(sb);
0689:                    }
0690:                    return;
0691:                }
0692:                win.add(hdr.seqno, msg); // add in order, then remove and pass up as many msgs as possible
0693:
0694:                // Prevents concurrent passing up of messages by different threads (http://jira.jboss.com/jira/browse/JGRP-198);
0695:                // this is all the more important once we have a threadless stack (http://jira.jboss.com/jira/browse/JGRP-181),
0696:                // where lots of threads can come up to this point concurrently, but only 1 is allowed to pass at a time
0697:                // We *can* deliver messages from *different* senders concurrently, e.g. reception of P1, Q1, P2, Q2 can result in
0698:                // delivery of P1, Q1, Q2, P2: FIFO (implemented by NAKACK) says messages need to be delivered only in the
0699:                // order in which they were sent by the sender
0700:                synchronized (win) {
0701:                    while ((msg_to_deliver = win.remove()) != null) {
0702:
0703:                        // Changed by bela Jan 29 2003: not needed (see above)
0704:                        //msg_to_deliver.removeHeader(getName());
0705:                        passUp(new Event(Event.MSG, msg_to_deliver));
0706:                    }
0707:                }
0708:            }
0709:
0710:            /**
0711:             * Retransmit from sent-table, called when XMIT_REQ is received. Bundles all messages to be xmitted into one large
0712:             * message and sends them back with an XMIT_RSP header. Note that since we cannot count on a fragmentation layer
0713:             * below us, we have to make sure the message doesn't exceed max_xmit_size bytes. If this is the case, we split the
0714:             * message into multiple, smaller-chunked messages. But in most cases this still yields fewer messages than if each
0715:             * requested message was retransmitted separately.
0716:             *
0717:             * @param xmit_requester        The sender of the XMIT_REQ, we have to send the requested copy of the message to this address
0718:             * @param first_seqno The first sequence number to be retransmitted (<= last_seqno)
0719:             * @param last_seqno  The last sequence number to be retransmitted (>= first_seqno)
0720:             * @param original_sender The member who originally sent the messsage. Guaranteed to be non-null
0721:             */
0722:            private void handleXmitReq(Address xmit_requester,
0723:                    long first_seqno, long last_seqno, Address original_sender) {
0724:                Message m, tmp;
0725:                LinkedList list;
0726:                long size = 0, marker = first_seqno, len;
0727:                NakReceiverWindow win = null;
0728:                boolean amISender; // am I the original sender ?
0729:
0730:                if (log.isTraceEnabled()) {
0731:                    StringBuffer sb = new StringBuffer();
0732:                    sb.append(local_addr).append(
0733:                            ": received xmit request from ").append(
0734:                            xmit_requester).append(" for ");
0735:                    sb.append(original_sender).append(" [").append(first_seqno)
0736:                            .append(" - ").append(last_seqno).append("]");
0737:                    log.trace(sb.toString());
0738:                }
0739:
0740:                if (first_seqno > last_seqno) {
0741:                    if (log.isErrorEnabled())
0742:                        log.error("first_seqno (" + first_seqno
0743:                                + ") > last_seqno (" + last_seqno
0744:                                + "): not able to retransmit");
0745:                    return;
0746:                }
0747:
0748:                if (stats) {
0749:                    xmit_reqs_received += last_seqno - first_seqno + 1;
0750:                    updateStats(received, xmit_requester, 1, 0, 0);
0751:                }
0752:
0753:                amISender = local_addr.equals(original_sender);
0754:                if (!amISender)
0755:                    win = (NakReceiverWindow) received_msgs
0756:                            .get(original_sender);
0757:
0758:                list = new LinkedList();
0759:                for (long i = first_seqno; i <= last_seqno; i++) {
0760:                    if (amISender) {
0761:                        m = (Message) sent_msgs.get(new Long(i)); // no need to synchronize
0762:                    } else {
0763:                        m = win != null ? win.get(i) : null;
0764:                    }
0765:                    if (m == null) {
0766:                        if (log.isErrorEnabled()) {
0767:                            StringBuffer sb = new StringBuffer();
0768:                            sb.append("(requester=").append(xmit_requester)
0769:                                    .append(", local_addr=").append(
0770:                                            this .local_addr);
0771:                            sb.append(") message ").append(original_sender)
0772:                                    .append("::").append(i);
0773:                            sb.append(" not found in ").append(
0774:                                    (amISender ? "sent" : "received")).append(
0775:                                    " msgs. ");
0776:                            if (win != null) {
0777:                                sb.append("Received messages from ").append(
0778:                                        original_sender).append(": ").append(
0779:                                        win.toString());
0780:                            } else {
0781:                                sb.append("\nSent messages: ").append(
0782:                                        printSentMsgs());
0783:                            }
0784:                            log.error(sb);
0785:                        }
0786:                        continue;
0787:                    }
0788:                    len = m.size();
0789:                    size += len;
0790:                    if (size > max_xmit_size && list.size() > 0) { // changed from >= to > (yaron-r, bug #943709)
0791:                        // yaronr: added &&listSize()>0 since protocols between FRAG and NAKACK add headers, and message exceeds size.
0792:
0793:                        // size has reached max_xmit_size. go ahead and send message (excluding the current message)
0794:                        if (log.isTraceEnabled())
0795:                            log.trace("xmitting msgs [" + marker + '-'
0796:                                    + (i - 1) + "] to " + xmit_requester);
0797:                        sendXmitRsp(xmit_requester, (LinkedList) list.clone(),
0798:                                marker, i - 1);
0799:                        marker = i;
0800:                        list.clear();
0801:                        // fixed Dec 15 2003 (bela, patch from Joel Dice (dicej)), see explanantion under
0802:                        // bug report #854887
0803:                        size = len;
0804:                    }
0805:                    if (Global.copy) {
0806:                        tmp = m.copy();
0807:                    } else {
0808:                        tmp = m;
0809:                    }
0810:                    // tmp.setDest(xmit_requester);
0811:                    // tmp.setSrc(local_addr);
0812:                    if (tmp.getSrc() == null)
0813:                        tmp.setSrc(local_addr);
0814:                    list.add(tmp);
0815:                }
0816:
0817:                if (list.size() > 0) {
0818:                    if (log.isTraceEnabled())
0819:                        log.trace("xmitting msgs [" + marker + '-' + last_seqno
0820:                                + "] to " + xmit_requester);
0821:                    sendXmitRsp(xmit_requester, (LinkedList) list.clone(),
0822:                            marker, last_seqno);
0823:                    list.clear();
0824:                }
0825:            }
0826:
0827:            private static void updateStats(HashMap map, Address key, int req,
0828:                    int rsp, int missing) {
0829:                Entry entry = (Entry) map.get(key);
0830:                if (entry == null) {
0831:                    entry = new Entry();
0832:                    map.put(key, entry);
0833:                }
0834:                entry.xmit_reqs += req;
0835:                entry.xmit_rsps += rsp;
0836:                entry.missing_msgs_rcvd += missing;
0837:            }
0838:
0839:            private void sendXmitRsp(Address dest, LinkedList xmit_list,
0840:                    long first_seqno, long last_seqno) {
0841:                Buffer buf;
0842:                if (xmit_list == null || xmit_list.size() == 0) {
0843:                    if (log.isErrorEnabled())
0844:                        log.error("xmit_list is empty");
0845:                    return;
0846:                }
0847:                if (use_mcast_xmit)
0848:                    dest = null;
0849:
0850:                if (stats) {
0851:                    xmit_rsps_sent += xmit_list.size();
0852:                    updateStats(sent, dest, 0, 1, 0);
0853:                }
0854:
0855:                try {
0856:                    buf = Util.msgListToByteBuffer(xmit_list);
0857:                    Message msg = new Message(dest, null, buf.getBuf(), buf
0858:                            .getOffset(), buf.getLength());
0859:                    msg.putHeader(name, new NakAckHeader(NakAckHeader.XMIT_RSP,
0860:                            first_seqno, last_seqno));
0861:                    passDown(new Event(Event.MSG, msg));
0862:                } catch (IOException ex) {
0863:                    log.error("failed marshalling xmit list", ex);
0864:                }
0865:            }
0866:
0867:            private void handleXmitRsp(Message msg) {
0868:                LinkedList list;
0869:                Message m;
0870:
0871:                if (msg == null) {
0872:                    if (log.isWarnEnabled())
0873:                        log.warn("message is null");
0874:                    return;
0875:                }
0876:                try {
0877:                    list = Util.byteBufferToMessageList(msg.getRawBuffer(), msg
0878:                            .getOffset(), msg.getLength());
0879:                    if (list != null) {
0880:                        if (stats) {
0881:                            xmit_rsps_received += list.size();
0882:                            updateStats(received, msg.getSrc(), 0, 1, 0);
0883:                        }
0884:                        for (Iterator it = list.iterator(); it.hasNext();) {
0885:                            m = (Message) it.next();
0886:                            up(new Event(Event.MSG, m));
0887:                        }
0888:                        list.clear();
0889:                    }
0890:                } catch (Exception ex) {
0891:                    if (log.isErrorEnabled()) {
0892:                        log
0893:                                .error(
0894:                                        "failed reading list of retransmitted messages",
0895:                                        ex);
0896:                    }
0897:                }
0898:            }
0899:
0900:            /**
0901:             * Remove old members from NakReceiverWindows and add new members (starting seqno=0). Essentially removes all
0902:             * entries from received_msgs that are not in <code>members</code>
0903:             */
0904:            private void adjustReceivers(boolean remove) {
0905:                Address sender;
0906:                NakReceiverWindow win;
0907:
0908:                synchronized (received_msgs) {
0909:                    if (remove) {
0910:                        // 1. Remove all senders in received_msgs that are not members anymore
0911:                        for (Iterator it = received_msgs.keySet().iterator(); it
0912:                                .hasNext();) {
0913:                            sender = (Address) it.next();
0914:                            if (!members.contains(sender)) {
0915:                                win = (NakReceiverWindow) received_msgs
0916:                                        .get(sender);
0917:                                win.reset();
0918:                                if (log.isDebugEnabled()) {
0919:                                    log
0920:                                            .debug("removing "
0921:                                                    + sender
0922:                                                    + " from received_msgs (not member anymore)");
0923:                                }
0924:                                it.remove();
0925:                            }
0926:                        }
0927:                    }
0928:
0929:                    // 2. Add newly joined members to received_msgs (starting seqno=0)
0930:                    for (int i = 0; i < members.size(); i++) {
0931:                        sender = (Address) members.elementAt(i);
0932:                        if (!received_msgs.containsKey(sender)) {
0933:                            win = createNakReceiverWindow(sender, 0);
0934:                            received_msgs.put(sender, win);
0935:                        }
0936:                    }
0937:                }
0938:            }
0939:
0940:            /**
0941:             * Returns a message digest: for each member P the highest seqno received from P is added to the digest.
0942:             */
0943:            private Digest getDigest() {
0944:                Digest digest;
0945:                Address sender;
0946:                Range range;
0947:
0948:                digest = new Digest(members.size());
0949:                for (int i = 0; i < members.size(); i++) {
0950:                    sender = (Address) members.elementAt(i);
0951:                    range = getLowestAndHighestSeqno(sender, false); // get the highest received seqno
0952:                    if (range == null) {
0953:                        if (log.isErrorEnabled()) {
0954:                            log.error("range is null");
0955:                        }
0956:                        continue;
0957:                    }
0958:                    digest.add(sender, range.low, range.high); // add another entry to the digest
0959:                }
0960:                return digest;
0961:            }
0962:
0963:            /**
0964:             * Returns a message digest: for each member P the highest seqno received from P <em>without a gap</em> is added to
0965:             * the digest. E.g. if the seqnos received from P are [+3 +4 +5 -6 +7 +8], then 5 will be returned. Also, the
0966:             * highest seqno <em>seen</em> is added. The max of all highest seqnos seen will be used (in STABLE) to determine
0967:             * whether the last seqno from a sender was received (see "Last Message Dropped" topic in DESIGN).
0968:             */
0969:            private Digest getDigestHighestDeliveredMsgs() {
0970:                Digest digest;
0971:                Address sender;
0972:                Range range;
0973:                long high_seqno_seen;
0974:
0975:                digest = new Digest(members.size());
0976:                for (int i = 0; i < members.size(); i++) {
0977:                    sender = (Address) members.elementAt(i);
0978:                    range = getLowestAndHighestSeqno(sender, true); // get the highest deliverable seqno
0979:                    if (range == null) {
0980:                        if (log.isErrorEnabled()) {
0981:                            log.error("range is null");
0982:                        }
0983:                        continue;
0984:                    }
0985:                    high_seqno_seen = getHighSeqnoSeen(sender);
0986:                    digest.add(sender, range.low, range.high, high_seqno_seen); // add another entry to the digest
0987:                }
0988:                return digest;
0989:            }
0990:
0991:            /**
0992:             * Creates a NakReceiverWindow for each sender in the digest according to the sender's seqno. If NRW already exists,
0993:             * reset it.
0994:             */
0995:            private void setDigest(Digest d) {
0996:                if (d == null || d.senders == null) {
0997:                    if (log.isErrorEnabled()) {
0998:                        log.error("digest or digest.senders is null");
0999:                    }
1000:                    return;
1001:                }
1002:
1003:                clear();
1004:
1005:                Map.Entry entry;
1006:                Address sender;
1007:                org.jgroups.protocols.pbcast.Digest.Entry val;
1008:                long initial_seqno;
1009:                NakReceiverWindow win;
1010:
1011:                for (Iterator it = d.senders.entrySet().iterator(); it
1012:                        .hasNext();) {
1013:                    entry = (Map.Entry) it.next();
1014:                    sender = (Address) entry.getKey();
1015:                    val = (org.jgroups.protocols.pbcast.Digest.Entry) entry
1016:                            .getValue();
1017:
1018:                    if (sender == null || val == null) {
1019:                        if (log.isWarnEnabled()) {
1020:                            log.warn("sender or value is null");
1021:                        }
1022:                        continue;
1023:                    }
1024:                    initial_seqno = val.high_seqno;
1025:                    win = createNakReceiverWindow(sender, initial_seqno);
1026:                    synchronized (received_msgs) {
1027:                        received_msgs.put(sender, win);
1028:                    }
1029:                }
1030:            }
1031:
1032:            /**
1033:             * For all members of the digest, adjust the NakReceiverWindows in the received_msgs hashtable. If the member
1034:             * already exists, sets its seqno to be the max of the seqno and the seqno of the member in the digest. If no entry
1035:             * exists, create one with the initial seqno set to the seqno of the member in the digest.
1036:             */
1037:            private void mergeDigest(Digest d) {
1038:                if (d == null || d.senders == null) {
1039:                    if (log.isErrorEnabled()) {
1040:                        log.error("digest or digest.senders is null");
1041:                    }
1042:                    return;
1043:                }
1044:
1045:                Map.Entry entry;
1046:                Address sender;
1047:                org.jgroups.protocols.pbcast.Digest.Entry val;
1048:                NakReceiverWindow win;
1049:                long initial_seqno;
1050:
1051:                for (Iterator it = d.senders.entrySet().iterator(); it
1052:                        .hasNext();) {
1053:                    entry = (Map.Entry) it.next();
1054:                    sender = (Address) entry.getKey();
1055:                    val = (org.jgroups.protocols.pbcast.Digest.Entry) entry
1056:                            .getValue();
1057:
1058:                    if (sender == null || val == null) {
1059:                        if (log.isWarnEnabled()) {
1060:                            log.warn("sender or value is null");
1061:                        }
1062:                        continue;
1063:                    }
1064:                    initial_seqno = val.high_seqno;
1065:                    synchronized (received_msgs) {
1066:                        win = (NakReceiverWindow) received_msgs.get(sender);
1067:                        if (win == null) {
1068:                            win = createNakReceiverWindow(sender, initial_seqno);
1069:                            received_msgs.put(sender, win);
1070:                        } else {
1071:                            if (win.getHighestReceived() < initial_seqno) {
1072:                                win.reset();
1073:                                received_msgs.remove(sender);
1074:                                win = createNakReceiverWindow(sender,
1075:                                        initial_seqno);
1076:                                received_msgs.put(sender, win);
1077:                            }
1078:                        }
1079:                    }
1080:                }
1081:            }
1082:
1083:            private NakReceiverWindow createNakReceiverWindow(Address sender,
1084:                    long initial_seqno) {
1085:                NakReceiverWindow win = new NakReceiverWindow(sender, this ,
1086:                        initial_seqno, timer);
1087:                win.setRetransmitTimeouts(retransmit_timeout);
1088:                win.setDiscardDeliveredMessages(discard_delivered_msgs);
1089:                win.setMaxXmitBufSize(this .max_xmit_buf_size);
1090:                if (stats)
1091:                    win.setListener(this );
1092:                return win;
1093:            }
1094:
1095:            /**
1096:             * Returns the lowest seqno still in cache (so it can be retransmitted) and the highest seqno received so far.
1097:             *
1098:             * @param sender       The address for which the highest and lowest seqnos are to be retrieved
1099:             * @param stop_at_gaps If true, the highest seqno *deliverable* will be returned. If false, the highest seqno
1100:             *                     *received* will be returned. E.g. for [+3 +4 +5 -6 +7 +8], the highest_seqno_received is 8,
1101:             *                     whereas the higheset_seqno_seen (deliverable) is 5.
1102:             */
1103:            private Range getLowestAndHighestSeqno(Address sender,
1104:                    boolean stop_at_gaps) {
1105:                Range r = null;
1106:                NakReceiverWindow win;
1107:
1108:                if (sender == null) {
1109:                    if (log.isErrorEnabled()) {
1110:                        log.error("sender is null");
1111:                    }
1112:                    return r;
1113:                }
1114:                synchronized (received_msgs) {
1115:                    win = (NakReceiverWindow) received_msgs.get(sender);
1116:                }
1117:                if (win == null) {
1118:                    if (log.isErrorEnabled()) {
1119:                        log.error("sender " + sender
1120:                                + " not found in received_msgs");
1121:                    }
1122:                    return r;
1123:                }
1124:                if (stop_at_gaps) {
1125:                    r = new Range(win.getLowestSeen(), win.getHighestSeen()); // deliverable messages (no gaps)
1126:                } else {
1127:                    r = new Range(win.getLowestSeen(),
1128:                            win.getHighestReceived() + 1); // received messages
1129:                }
1130:                return r;
1131:            }
1132:
1133:            /**
1134:             * Returns the highest seqno seen from sender. E.g. if we received 1, 2, 4, 5 from P, then 5 will be returned
1135:             * (doesn't take gaps into account). If we are the sender, we will return the highest seqno <em>sent</em> rather
1136:             * then <em>received</em>
1137:             */
1138:            private long getHighSeqnoSeen(Address sender) {
1139:                NakReceiverWindow win;
1140:                long ret = 0;
1141:
1142:                if (sender == null) {
1143:                    if (log.isErrorEnabled()) {
1144:                        log.error("sender is null");
1145:                    }
1146:                    return ret;
1147:                }
1148:                if (sender.equals(local_addr)) {
1149:                    return seqno - 1;
1150:                }
1151:
1152:                synchronized (received_msgs) {
1153:                    win = (NakReceiverWindow) received_msgs.get(sender);
1154:                }
1155:                if (win == null) {
1156:                    if (log.isErrorEnabled()) {
1157:                        log.error("sender " + sender
1158:                                + " not found in received_msgs");
1159:                    }
1160:                    return ret;
1161:                }
1162:                ret = win.getHighestReceived();
1163:                return ret;
1164:            }
1165:
1166:            /**
1167:             * Garbage collect messages that have been seen by all members. Update sent_msgs: for the sender P in the digest
1168:             * which is equal to the local address, garbage collect all messages <= seqno at digest[P]. Update received_msgs:
1169:             * for each sender P in the digest and its highest seqno seen SEQ, garbage collect all delivered_msgs in the
1170:             * NakReceiverWindow corresponding to P which are <= seqno at digest[P].
1171:             */
1172:            private void stable(Digest d) {
1173:                NakReceiverWindow recv_win;
1174:                long my_highest_rcvd; // highest seqno received in my digest for a sender P
1175:                long stability_highest_rcvd; // highest seqno received in the stability vector for a sender P
1176:
1177:                if (members == null || local_addr == null || d == null) {
1178:                    if (log.isWarnEnabled())
1179:                        log.warn("members, local_addr or digest are null !");
1180:                    return;
1181:                }
1182:
1183:                if (log.isTraceEnabled()) {
1184:                    log.trace("received stable digest " + d);
1185:                }
1186:
1187:                Map.Entry entry;
1188:                Address sender;
1189:                org.jgroups.protocols.pbcast.Digest.Entry val;
1190:                long high_seqno_delivered, high_seqno_received;
1191:
1192:                for (Iterator it = d.senders.entrySet().iterator(); it
1193:                        .hasNext();) {
1194:                    entry = (Map.Entry) it.next();
1195:                    sender = (Address) entry.getKey();
1196:                    if (sender == null)
1197:                        continue;
1198:                    val = (org.jgroups.protocols.pbcast.Digest.Entry) entry
1199:                            .getValue();
1200:                    high_seqno_delivered = val.high_seqno;
1201:                    high_seqno_received = val.high_seqno_seen;
1202:
1203:                    // check whether the last seqno received for a sender P in the stability vector is > last seqno
1204:                    // received for P in my digest. if yes, request retransmission (see "Last Message Dropped" topic
1205:                    // in DESIGN)
1206:                    synchronized (received_msgs) {
1207:                        recv_win = (NakReceiverWindow) received_msgs
1208:                                .get(sender);
1209:                    }
1210:                    if (recv_win != null) {
1211:                        my_highest_rcvd = recv_win.getHighestReceived();
1212:                        stability_highest_rcvd = high_seqno_received;
1213:
1214:                        if (stability_highest_rcvd >= 0
1215:                                && stability_highest_rcvd > my_highest_rcvd) {
1216:                            if (log.isTraceEnabled()) {
1217:                                log
1218:                                        .trace("my_highest_rcvd ("
1219:                                                + my_highest_rcvd
1220:                                                + ") < stability_highest_rcvd ("
1221:                                                + stability_highest_rcvd
1222:                                                + "): requesting retransmission of "
1223:                                                + sender + '#'
1224:                                                + stability_highest_rcvd);
1225:                            }
1226:                            retransmit(stability_highest_rcvd,
1227:                                    stability_highest_rcvd, sender);
1228:                        }
1229:                    }
1230:
1231:                    high_seqno_delivered -= gc_lag;
1232:                    if (high_seqno_delivered < 0) {
1233:                        continue;
1234:                    }
1235:
1236:                    if (log.isTraceEnabled())
1237:                        log.trace("deleting msgs <= " + high_seqno_delivered
1238:                                + " from " + sender);
1239:
1240:                    // garbage collect from sent_msgs if sender was myself
1241:                    if (sender.equals(local_addr)) {
1242:                        synchronized (sent_msgs) {
1243:                            // gets us a subset from [lowest seqno - seqno]
1244:                            SortedMap stable_keys = sent_msgs.headMap(new Long(
1245:                                    high_seqno_delivered));
1246:                            if (stable_keys != null) {
1247:                                stable_keys.clear(); // this will modify sent_msgs directly
1248:                            }
1249:                        }
1250:                    }
1251:
1252:                    // delete *delivered* msgs that are stable
1253:                    // recv_win=(NakReceiverWindow)received_msgs.get(sender);
1254:                    if (recv_win != null)
1255:                        recv_win.stable(high_seqno_delivered); // delete all messages with seqnos <= seqno
1256:                }
1257:            }
1258:
1259:            /* ---------------------- Interface Retransmitter.RetransmitCommand ---------------------- */
1260:
1261:            /**
1262:             * Implementation of Retransmitter.RetransmitCommand. Called by retransmission thread when gap is detected.
1263:             */
1264:            public void retransmit(long first_seqno, long last_seqno,
1265:                    Address sender) {
1266:                NakAckHeader hdr;
1267:                Message retransmit_msg;
1268:                Address dest = sender; // to whom do we send the XMIT request ?
1269:
1270:                if (xmit_from_random_member && !local_addr.equals(sender)) {
1271:                    Address random_member = (Address) Util
1272:                            .pickRandomElement(members);
1273:                    if (random_member != null
1274:                            && !local_addr.equals(random_member)) {
1275:                        dest = random_member;
1276:                        if (log.isTraceEnabled())
1277:                            log.trace("picked random member " + dest
1278:                                    + " to send XMIT request to");
1279:                    }
1280:                }
1281:
1282:                hdr = new NakAckHeader(NakAckHeader.XMIT_REQ, first_seqno,
1283:                        last_seqno, sender);
1284:                retransmit_msg = new Message(dest, null, null);
1285:                if (log.isTraceEnabled())
1286:                    log
1287:                            .trace(local_addr + ": sending XMIT_REQ (["
1288:                                    + first_seqno + ", " + last_seqno
1289:                                    + "]) to " + dest);
1290:                retransmit_msg.putHeader(name, hdr);
1291:                passDown(new Event(Event.MSG, retransmit_msg));
1292:                if (stats) {
1293:                    xmit_reqs_sent += last_seqno - first_seqno + 1;
1294:                    updateStats(sent, dest, 1, 0, 0);
1295:                    for (long i = first_seqno; i <= last_seqno; i++) {
1296:                        XmitRequest req = new XmitRequest(sender, i, dest);
1297:                        send_history.add(req);
1298:                    }
1299:                }
1300:            }
1301:
1302:            /* ------------------- End of Interface Retransmitter.RetransmitCommand -------------------- */
1303:
1304:            /* ----------------------- Interface NakReceiverWindow.Listener ---------------------- */
1305:            public void missingMessageReceived(long seqno, Message msg) {
1306:                if (stats) {
1307:                    missing_msgs_received++;
1308:                    updateStats(received, msg.getSrc(), 0, 0, 1);
1309:                    MissingMessage missing = new MissingMessage(msg.getSrc(),
1310:                            seqno);
1311:                    receive_history.add(missing);
1312:                }
1313:            }
1314:
1315:            /* ------------------- End of Interface NakReceiverWindow.Listener ------------------- */
1316:
1317:            private void clear() {
1318:                NakReceiverWindow win;
1319:
1320:                // changed April 21 2004 (bela): SourceForge bug# 938584. We cannot delete our own messages sent between
1321:                // a join() and a getState(). Otherwise retransmission requests from members who missed those msgs might
1322:                // fail. Not to worry though: those msgs will be cleared by STABLE (message garbage collection)
1323:
1324:                // sent_msgs.clear();
1325:
1326:                synchronized (received_msgs) {
1327:                    for (Iterator it = received_msgs.values().iterator(); it
1328:                            .hasNext();) {
1329:                        win = (NakReceiverWindow) it.next();
1330:                        win.reset();
1331:                    }
1332:                    received_msgs.clear();
1333:                }
1334:            }
1335:
1336:            private void reset() {
1337:                NakReceiverWindow win;
1338:
1339:                synchronized (sent_msgs) {
1340:                    sent_msgs.clear();
1341:                    seqno = -1;
1342:                }
1343:
1344:                synchronized (received_msgs) {
1345:                    for (Iterator it = received_msgs.values().iterator(); it
1346:                            .hasNext();) {
1347:                        win = (NakReceiverWindow) it.next();
1348:                        win.destroy();
1349:                    }
1350:                    received_msgs.clear();
1351:                }
1352:            }
1353:
1354:            public String printMessages() {
1355:                StringBuffer ret = new StringBuffer();
1356:                Map.Entry entry;
1357:                Address addr;
1358:                Object w;
1359:
1360:                ret.append("\nsent_msgs: ").append(printSentMsgs());
1361:                ret.append("\nreceived_msgs:\n");
1362:                synchronized (received_msgs) {
1363:                    for (Iterator it = received_msgs.entrySet().iterator(); it
1364:                            .hasNext();) {
1365:                        entry = (Map.Entry) it.next();
1366:                        addr = (Address) entry.getKey();
1367:                        w = entry.getValue();
1368:                        ret.append(addr).append(": ").append(w.toString())
1369:                                .append('\n');
1370:                    }
1371:                }
1372:                return ret.toString();
1373:            }
1374:
1375:            public String printSentMsgs() {
1376:                StringBuffer sb = new StringBuffer();
1377:                Long min_seqno, max_seqno;
1378:                synchronized (sent_msgs) {
1379:                    min_seqno = sent_msgs.size() > 0 ? (Long) sent_msgs
1380:                            .firstKey() : new Long(0);
1381:                    max_seqno = sent_msgs.size() > 0 ? (Long) sent_msgs
1382:                            .lastKey() : new Long(0);
1383:                }
1384:                sb.append('[').append(min_seqno).append(" - ")
1385:                        .append(max_seqno).append("] (").append(
1386:                                sent_msgs.size()).append(")");
1387:                return sb.toString();
1388:            }
1389:
1390:            private void handleConfigEvent(HashMap map) {
1391:                if (map == null) {
1392:                    return;
1393:                }
1394:                if (map.containsKey("frag_size")) {
1395:                    max_xmit_size = ((Integer) map.get("frag_size")).intValue();
1396:                    if (log.isInfoEnabled()) {
1397:                        log.info("max_xmit_size=" + max_xmit_size);
1398:                    }
1399:                }
1400:            }
1401:
1402:            static class Entry {
1403:                long xmit_reqs, xmit_rsps, missing_msgs_rcvd;
1404:
1405:                public String toString() {
1406:                    StringBuffer sb = new StringBuffer();
1407:                    sb.append(xmit_reqs).append(" xmit_reqs").append(", ")
1408:                            .append(xmit_rsps).append(" xmit_rsps");
1409:                    sb.append(", ").append(missing_msgs_rcvd).append(
1410:                            " missing msgs");
1411:                    return sb.toString();
1412:                }
1413:            }
1414:
1415:            static class XmitRequest {
1416:                Address original_sender; // original sender of message
1417:                long seq, timestamp = System.currentTimeMillis();
1418:                Address xmit_dest; // destination to which XMIT_REQ is sent, usually the original sender
1419:
1420:                XmitRequest(Address original_sender, long seqno,
1421:                        Address xmit_dest) {
1422:                    this .original_sender = original_sender;
1423:                    this .xmit_dest = xmit_dest;
1424:                    this .seq = seqno;
1425:                }
1426:
1427:                public String toString() {
1428:                    StringBuffer sb = new StringBuffer();
1429:                    sb.append(new Date(timestamp)).append(": ").append(
1430:                            original_sender).append(" #").append(seq);
1431:                    sb.append(" (XMIT_REQ sent to ").append(xmit_dest).append(
1432:                            ")");
1433:                    return sb.toString();
1434:                }
1435:            }
1436:
1437:            static class MissingMessage {
1438:                Address original_sender;
1439:                long seq, timestamp = System.currentTimeMillis();
1440:
1441:                MissingMessage(Address original_sender, long seqno) {
1442:                    this .original_sender = original_sender;
1443:                    this .seq = seqno;
1444:                }
1445:
1446:                public String toString() {
1447:                    StringBuffer sb = new StringBuffer();
1448:                    sb.append(new Date(timestamp)).append(": ").append(
1449:                            original_sender).append(" #").append(seq);
1450:                    return sb.toString();
1451:                }
1452:            }
1453:
1454:            /* ----------------------------- End of Private Methods ------------------------------------ */
1455:
1456:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.