Source Code Cross Referenced for UNICAST.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » protocols » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.protocols 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        // $Id: UNICAST.java,v 1.63.2.4 2007/04/27 08:03:52 belaban Exp $
002:
003:        package org.jgroups.protocols;
004:
005:        import org.jgroups.*;
006:        import org.jgroups.stack.AckReceiverWindow;
007:        import org.jgroups.stack.AckSenderWindow;
008:        import org.jgroups.stack.Protocol;
009:        import org.jgroups.util.BoundedList;
010:        import org.jgroups.util.Streamable;
011:        import org.jgroups.util.TimeScheduler;
012:        import org.jgroups.util.Util;
013:
014:        import java.io.*;
015:        import java.util.*;
016:
017:        /**
018:         * Reliable unicast layer. Uses acknowledgement scheme similar to TCP to provide lossless transmission
019:         * of unicast messages (for reliable multicast see NAKACK layer). When a message is sent to a peer for
020:         * the first time, we add the pair <peer_addr, Entry> to the hashtable (peer address is the key). All
021:         * messages sent to that peer will be added to hashtable.peer_addr.sent_msgs. When we receive a
022:         * message from a peer for the first time, another entry will be created and added to the hashtable
023:         * (unless already existing). Msgs will then be added to hashtable.peer_addr.received_msgs.<p> This
024:         * layer is used to reliably transmit point-to-point messages, that is, either messages sent to a
025:         * single receiver (vs. messages multicast to a group) or for example replies to a multicast message. The 
026:         * sender uses an <code>AckSenderWindow</code> which retransmits messages for which it hasn't received
027:         * an ACK, the receiver uses <code>AckReceiverWindow</code> which keeps track of the lowest seqno
028:         * received so far, and keeps messages in order.<p>
029:         * Messages in both AckSenderWindows and AckReceiverWindows will be removed. A message will be removed from
030:         * AckSenderWindow when an ACK has been received for it and messages will be removed from AckReceiverWindow
031:         * whenever a message is received: the new message is added and then we try to remove as many messages as
032:         * possible (until we stop at a gap, or there are no more messages).
033:         * @author Bela Ban
034:         */
035:        public class UNICAST extends Protocol implements 
036:                AckSenderWindow.RetransmitCommand {
037:            private final Vector members = new Vector(11);
038:            private final HashMap connections = new HashMap(11); // Object (sender or receiver) -- Entries
039:            private long[] timeout = { 400, 800, 1600, 3200 }; // for AckSenderWindow: max time to wait for missing acks
040:            private Address local_addr = null;
041:            private TimeScheduler timer = null; // used for retransmissions (passed to AckSenderWindow)
042:
043:            // if UNICAST is used without GMS, don't consult the membership on retransmit() if use_gms=false
044:            // default is true
045:            private boolean use_gms = true;
046:            private boolean started = false;
047:
048:            /** A list of members who left, used to determine when to prevent sending messages to left mbrs */
049:            private final BoundedList previous_members = new BoundedList(50);
050:
051:            private final static String name = "UNICAST";
052:            private static final long DEFAULT_FIRST_SEQNO = 1;
053:
054:            private long num_msgs_sent = 0, num_msgs_received = 0,
055:                    num_bytes_sent = 0, num_bytes_received = 0;
056:            private long num_acks_sent = 0, num_acks_received = 0,
057:                    num_xmit_requests_received = 0;
058:
059:            /** All protocol names have to be unique ! */
060:            public String getName() {
061:                return name;
062:            }
063:
064:            public String getLocalAddress() {
065:                return local_addr != null ? local_addr.toString() : "null";
066:            }
067:
068:            public String getMembers() {
069:                return members != null ? members.toString() : "[]";
070:            }
071:
072:            public String printConnections() {
073:                StringBuffer sb = new StringBuffer();
074:                Map.Entry entry;
075:                for (Iterator it = connections.entrySet().iterator(); it
076:                        .hasNext();) {
077:                    entry = (Map.Entry) it.next();
078:                    sb.append(entry.getKey()).append(": ").append(
079:                            entry.getValue()).append("\n");
080:                }
081:                return sb.toString();
082:            }
083:
084:            public long getNumMessagesSent() {
085:                return num_msgs_sent;
086:            }
087:
088:            public long getNumMessagesReceived() {
089:                return num_msgs_received;
090:            }
091:
092:            public long getNumBytesSent() {
093:                return num_bytes_sent;
094:            }
095:
096:            public long getNumBytesReceived() {
097:                return num_bytes_received;
098:            }
099:
100:            public long getNumAcksSent() {
101:                return num_acks_sent;
102:            }
103:
104:            public long getNumAcksReceived() {
105:                return num_acks_received;
106:            }
107:
108:            public long getNumberOfRetransmitRequestsReceived() {
109:                return num_xmit_requests_received;
110:            }
111:
112:            /** The number of messages in all Entry.sent_msgs tables (haven't received an ACK yet) */
113:            public int getNumberOfUnackedMessages() {
114:                int num = 0;
115:                Entry entry;
116:                synchronized (connections) {
117:                    for (Iterator it = connections.values().iterator(); it
118:                            .hasNext();) {
119:                        entry = (Entry) it.next();
120:                        if (entry.sent_msgs != null)
121:                            num += entry.sent_msgs.size();
122:                    }
123:                }
124:                return num;
125:            }
126:
127:            public String getUnackedMessages() {
128:                StringBuffer sb = new StringBuffer();
129:                Map.Entry entry;
130:                Entry e;
131:                Object member;
132:                synchronized (connections) {
133:                    for (Iterator it = connections.entrySet().iterator(); it
134:                            .hasNext();) {
135:                        entry = (Map.Entry) it.next();
136:                        member = entry.getKey();
137:                        e = (Entry) entry.getValue();
138:                        sb.append(member).append(": ");
139:                        if (e.sent_msgs != null)
140:                            sb.append(e.sent_msgs.toString()).append("\n");
141:                    }
142:                }
143:                return sb.toString();
144:            }
145:
146:            public int getNumberOfMessagesInReceiveWindows() {
147:                int num = 0;
148:                Entry entry;
149:                synchronized (connections) {
150:                    for (Iterator it = connections.values().iterator(); it
151:                            .hasNext();) {
152:                        entry = (Entry) it.next();
153:                        if (entry.received_msgs != null)
154:                            num += entry.received_msgs.size();
155:                    }
156:                }
157:                return num;
158:            }
159:
160:            public void resetStats() {
161:                num_msgs_sent = num_msgs_received = num_bytes_sent = num_bytes_received = num_acks_sent = num_acks_received = 0;
162:                num_xmit_requests_received = 0;
163:            }
164:
165:            public Map dumpStats() {
166:                Map m = new HashMap();
167:                m.put("num_msgs_sent", new Long(num_msgs_sent));
168:                m.put("num_msgs_received", new Long(num_msgs_received));
169:                m.put("num_bytes_sent", new Long(num_bytes_sent));
170:                m.put("num_bytes_received", new Long(num_bytes_received));
171:                m.put("num_acks_sent", new Long(num_acks_sent));
172:                m.put("num_acks_received", new Long(num_acks_received));
173:                m.put("num_xmit_requests_received", new Long(
174:                        num_xmit_requests_received));
175:                m.put("num_unacked_msgs",
176:                        new Long(getNumberOfUnackedMessages()));
177:                m.put("unacked_msgs", getUnackedMessages());
178:                m.put("num_msgs_in_recv_windows", new Long(
179:                        getNumberOfMessagesInReceiveWindows()));
180:                return m;
181:            }
182:
183:            public boolean setProperties(Properties props) {
184:                String str;
185:                long[] tmp;
186:
187:                super .setProperties(props);
188:                str = props.getProperty("timeout");
189:                if (str != null) {
190:                    tmp = Util.parseCommaDelimitedLongs(str);
191:                    if (tmp != null && tmp.length > 0)
192:                        timeout = tmp;
193:                    props.remove("timeout");
194:                }
195:
196:                str = props.getProperty("window_size");
197:                if (str != null) {
198:                    props.remove("window_size");
199:                    log.warn("window_size is deprecated and will be ignored");
200:                }
201:
202:                str = props.getProperty("min_threshold");
203:                if (str != null) {
204:                    props.remove("min_threshold");
205:                    log.warn("min_threshold is deprecated and will be ignored");
206:                }
207:
208:                str = props.getProperty("use_gms");
209:                if (str != null) {
210:                    use_gms = Boolean.valueOf(str).booleanValue();
211:                    props.remove("use_gms");
212:                }
213:
214:                if (props.size() > 0) {
215:                    log.error("these properties are not recognized: " + props);
216:                    return false;
217:                }
218:                return true;
219:            }
220:
221:            public void start() throws Exception {
222:                timer = stack != null ? stack.timer : null;
223:                if (timer == null)
224:                    throw new Exception("timer is null");
225:                started = true;
226:            }
227:
228:            public void stop() {
229:                started = false;
230:                removeAllConnections();
231:            }
232:
233:            public void up(Event evt) {
234:                Message msg;
235:                Address dst, src;
236:                UnicastHeader hdr;
237:
238:                switch (evt.getType()) {
239:
240:                case Event.MSG:
241:                    msg = (Message) evt.getArg();
242:                    dst = msg.getDest();
243:
244:                    if (dst == null || dst.isMulticastAddress()) // only handle unicast messages
245:                        break; // pass up
246:
247:                    // changed from removeHeader(): we cannot remove the header because if we do loopback=true at the
248:                    // transport level, we will not have the header on retransmit ! (bela Aug 22 2006)
249:                    hdr = (UnicastHeader) msg.getHeader(name);
250:                    if (hdr == null)
251:                        break;
252:                    src = msg.getSrc();
253:                    switch (hdr.type) {
254:                    case UnicastHeader.DATA: // received regular message
255:                        if (handleDataReceived(src, hdr.seqno, msg))
256:                            sendAck(src, hdr.seqno); // only send an ACK if added to the received_msgs table (bela Aug 2006)
257:                        return; // we pass the deliverable message up in handleDataReceived()
258:                    case UnicastHeader.ACK: // received ACK for previously sent message
259:                        handleAckReceived(src, hdr.seqno);
260:                        break;
261:                    default:
262:                        log.error("UnicastHeader type " + hdr.type
263:                                + " not known !");
264:                        break;
265:                    }
266:                    return;
267:
268:                case Event.SET_LOCAL_ADDRESS:
269:                    local_addr = (Address) evt.getArg();
270:                    break;
271:                }
272:
273:                passUp(evt); // Pass up to the layer above us
274:            }
275:
276:            public void down(Event evt) {
277:                switch (evt.getType()) {
278:
279:                case Event.MSG: // Add UnicastHeader, add to AckSenderWindow and pass down
280:                    Message msg = (Message) evt.getArg();
281:                    Object dst = msg.getDest();
282:
283:                    /* only handle unicast messages */
284:                    if (dst == null || ((Address) dst).isMulticastAddress()) {
285:                        break;
286:                    }
287:
288:                    if (previous_members.contains(dst)) {
289:                        if (log.isTraceEnabled())
290:                            log.trace("discarding message to " + dst
291:                                    + " as this member left the group,"
292:                                    + " previous_members=" + previous_members);
293:                        return;
294:                    }
295:
296:                    if (!started) {
297:                        if (log.isWarnEnabled())
298:                            log
299:                                    .warn("discarded message as start() has not yet been called, message: "
300:                                            + msg);
301:                        return;
302:                    }
303:
304:                    Entry entry;
305:                    synchronized (connections) {
306:                        entry = (Entry) connections.get(dst);
307:                        if (entry == null) {
308:                            entry = new Entry();
309:                            connections.put(dst, entry);
310:                            if (log.isTraceEnabled())
311:                                log.trace(local_addr
312:                                        + ": created new connection for dst "
313:                                        + dst);
314:                        }
315:                    }
316:
317:                    Message tmp;
318:                    synchronized (entry) { // threads will only sync if they access the same entry
319:                        long seqno = -2;
320:
321:                        try {
322:                            seqno = entry.sent_msgs_seqno;
323:                            UnicastHeader hdr = new UnicastHeader(
324:                                    UnicastHeader.DATA, seqno);
325:                            if (entry.sent_msgs == null) { // first msg to peer 'dst'
326:                                entry.sent_msgs = new AckSenderWindow(this ,
327:                                        timeout, timer, this .local_addr); // use the protocol stack's timer
328:                            }
329:                            msg.putHeader(name, hdr);
330:                            if (log.isTraceEnabled())
331:                                log.trace(new StringBuffer().append(local_addr)
332:                                        .append(" --> DATA(").append(dst)
333:                                        .append(": #").append(seqno));
334:                            tmp = Global.copy ? msg.copy() : msg;
335:                            entry.sent_msgs.add(seqno, tmp); // add *including* UnicastHeader, adds to retransmitter
336:                            entry.sent_msgs_seqno++;
337:                        } catch (Throwable t) {
338:                            entry.sent_msgs.ack(seqno); // remove seqno again, so it is not transmitted
339:                            if (t instanceof  Error)
340:                                throw (Error) t;
341:                            if (t instanceof  RuntimeException)
342:                                throw (RuntimeException) t;
343:                            else {
344:                                throw new RuntimeException(
345:                                        "failure adding msg " + msg
346:                                                + " to the retransmit table", t);
347:                            }
348:                        }
349:                    }
350:                    // moved passing down of message out of the synchronized block: similar to NAKACK, we do *not* need
351:                    // to send unicast messages in order of sequence numbers because they will be sorted into the correct
352:                    // order at the receiver anyway. Of course, most of the time, the order will be correct (FIFO), so
353:                    // the cost of reordering is minimal. This is part of http://jira.jboss.com/jira/browse/JGRP-303
354:                    try {
355:                        passDown(new Event(Event.MSG, tmp));
356:                        num_msgs_sent++;
357:                        num_bytes_sent += msg.getLength();
358:                    } catch (Throwable t) { // eat the exception, don't pass it up the stack
359:                        if (log.isWarnEnabled()) {
360:                            log.warn("failure passing message down", t);
361:                        }
362:                    }
363:
364:                    msg = null;
365:                    return; // we already passed the msg down
366:
367:                case Event.VIEW_CHANGE: // remove connections to peers that are not members anymore !
368:                    Vector new_members = ((View) evt.getArg()).getMembers();
369:                    Vector left_members;
370:                    synchronized (members) {
371:                        left_members = Util.determineLeftMembers(members,
372:                                new_members);
373:                        members.clear();
374:                        if (new_members != null)
375:                            members.addAll(new_members);
376:                    }
377:
378:                    // Remove all connections for members that left between the current view and the new view
379:                    // See DESIGN for details
380:                    boolean rc;
381:                    if (use_gms && left_members.size() > 0) {
382:                        Object mbr;
383:                        for (int i = 0; i < left_members.size(); i++) {
384:                            mbr = left_members.elementAt(i);
385:                            rc = removeConnection(mbr); // adds to previous_members
386:                            if (rc && log.isTraceEnabled())
387:                                log.trace("removed " + mbr
388:                                        + " from connection table, member(s) "
389:                                        + left_members + " left");
390:                        }
391:                    }
392:                    // code by Matthias Weber May 23 2006
393:                    for (Enumeration e = previous_members.elements(); e
394:                            .hasMoreElements();) {
395:                        Object mbr = e.nextElement();
396:                        if (members.contains(mbr)) {
397:                            if (previous_members.removeElement(mbr) != null) {
398:                                if (log.isTraceEnabled())
399:                                    log
400:                                            .trace("removing "
401:                                                    + mbr
402:                                                    + " from previous_members as result of VIEW_CHANGE event, "
403:                                                    + "previous_members="
404:                                                    + previous_members);
405:                            }
406:                        }
407:                    }
408:                    break;
409:
410:                case Event.ENABLE_UNICASTS_TO:
411:                    Object member = evt.getArg();
412:                    previous_members.removeElement(member);
413:                    if (log.isTraceEnabled())
414:                        log
415:                                .trace("removing "
416:                                        + member
417:                                        + " from previous_members as result of ENABLE_UNICAST_TO event, "
418:                                        + "previous_members="
419:                                        + previous_members);
420:                    break;
421:                }
422:
423:                passDown(evt); // Pass on to the layer below us
424:            }
425:
426:            /** Removes and resets from connection table (which is already locked). Returns true if member was found, otherwise false */
427:            private boolean removeConnection(Object mbr) {
428:                Entry entry;
429:
430:                synchronized (connections) {
431:                    entry = (Entry) connections.remove(mbr);
432:                    if (!previous_members.contains(mbr))
433:                        previous_members.add(mbr);
434:                }
435:                if (entry != null) {
436:                    entry.reset();
437:                    if (log.isTraceEnabled())
438:                        log.trace(local_addr + ": removed connection for dst "
439:                                + mbr);
440:                    return true;
441:                } else
442:                    return false;
443:            }
444:
445:            private void removeAllConnections() {
446:                Entry entry;
447:
448:                synchronized (connections) {
449:                    for (Iterator it = connections.values().iterator(); it
450:                            .hasNext();) {
451:                        entry = (Entry) it.next();
452:                        entry.reset();
453:                    }
454:                    connections.clear();
455:                }
456:            }
457:
458:            /** Called by AckSenderWindow to resend messages for which no ACK has been received yet */
459:            public void retransmit(long seqno, Message msg) {
460:                Object dst = msg.getDest();
461:
462:                // bela Dec 23 2002:
463:                // this will remove a member on a MERGE request, e.g. A and B merge: when A sends the unicast
464:                // request to B and there's a retransmit(), B will be removed !
465:
466:                //          if(use_gms && !members.contains(dst) && !prev_members.contains(dst)) {
467:                //
468:                //                  if(log.isWarnEnabled()) log.warn("UNICAST.retransmit()", "seqno=" + seqno + ":  dest " + dst +
469:                //                             " is not member any longer; removing entry !");
470:
471:                //              synchronized(connections) {
472:                //                  removeConnection(dst);
473:                //              }
474:                //              return;
475:                //          }
476:
477:                if (log.isTraceEnabled())
478:                    log.trace("[" + local_addr + "] --> XMIT(" + dst + ": #"
479:                            + seqno + ')');
480:
481:                if (Global.copy)
482:                    passDown(new Event(Event.MSG, msg.copy()));
483:                else
484:                    passDown(new Event(Event.MSG, msg));
485:                num_xmit_requests_received++;
486:            }
487:
488:            /**
489:             * Check whether the hashtable contains an entry e for <code>sender</code> (create if not). If
490:             * e.received_msgs is null and <code>first</code> is true: create a new AckReceiverWindow(seqno) and
491:             * add message. Set e.received_msgs to the new window. Else just add the message.
492:             * @return boolean True if we can send an ack, false otherwise
493:             */
494:            private boolean handleDataReceived(Object sender, long seqno,
495:                    Message msg) {
496:                if (log.isTraceEnabled())
497:                    log.trace(new StringBuffer().append(local_addr).append(
498:                            " <-- DATA(").append(sender).append(": #").append(
499:                            seqno));
500:
501:                if (previous_members.contains(sender)) {
502:                    // we don't want to see messages from departed members
503:                    if (seqno > DEFAULT_FIRST_SEQNO) {
504:                        if (log.isTraceEnabled())
505:                            log.trace("discarding message " + seqno
506:                                    + " from previous member " + sender);
507:                        return false; // don't ack this message so the sender keeps resending it !
508:                    }
509:                    if (log.isTraceEnabled())
510:                        log
511:                                .trace("removed "
512:                                        + sender
513:                                        + " from previous_members as we received a message from it");
514:                    previous_members.removeElement(sender);
515:                }
516:
517:                Entry entry;
518:                AckReceiverWindow win;
519:                synchronized (connections) {
520:                    entry = (Entry) connections.get(sender);
521:                    if (entry == null) {
522:                        entry = new Entry();
523:                        connections.put(sender, entry);
524:                        if (log.isTraceEnabled())
525:                            log.trace(local_addr
526:                                    + ": created new connection for dst "
527:                                    + sender);
528:                    }
529:                    win = entry.received_msgs;
530:                    if (win == null) {
531:                        win = new AckReceiverWindow(DEFAULT_FIRST_SEQNO);
532:                        entry.received_msgs = win;
533:                    }
534:                }
535:
536:                win.add(seqno, msg); // entry.received_msgs is guaranteed to be non-null if we get here
537:                num_msgs_received++;
538:                num_bytes_received += msg.getLength();
539:
540:                // Try to remove (from the AckReceiverWindow) as many messages as possible as pass them up
541:                Message m;
542:
543:                // Prevents concurrent passing up of messages by different threads (http://jira.jboss.com/jira/browse/JGRP-198);
544:                // this is all the more important once we have a threadless stack (http://jira.jboss.com/jira/browse/JGRP-181),
545:                // where lots of threads can come up to this point concurrently, but only 1 is allowed to pass at a time
546:                // We *can* deliver messages from *different* senders concurrently, e.g. reception of P1, Q1, P2, Q2 can result in
547:                // delivery of P1, Q1, Q2, P2: FIFO (implemented by UNICAST) says messages need to be delivered only in the
548:                // order in which they were sent by their senders
549:                synchronized (win) {
550:                    while ((m = win.remove()) != null)
551:                        passUp(new Event(Event.MSG, m));
552:                }
553:                return true; // msg was successfully received - send an ack back to the sender
554:            }
555:
556:            /** Add the ACK to hashtable.sender.sent_msgs */
557:            private void handleAckReceived(Object sender, long seqno) {
558:                Entry entry;
559:                AckSenderWindow win;
560:
561:                if (log.isTraceEnabled())
562:                    log.trace(new StringBuffer().append(local_addr).append(
563:                            " <-- ACK(").append(sender).append(": #").append(
564:                            seqno).append(')'));
565:                synchronized (connections) {
566:                    entry = (Entry) connections.get(sender);
567:                }
568:                if (entry == null || entry.sent_msgs == null)
569:                    return;
570:                win = entry.sent_msgs;
571:                win.ack(seqno); // removes message from retransmission
572:                num_acks_received++;
573:            }
574:
575:            private void sendAck(Address dst, long seqno) {
576:                Message ack = new Message(dst);
577:                ack
578:                        .putHeader(name, new UnicastHeader(UnicastHeader.ACK,
579:                                seqno));
580:                if (log.isTraceEnabled())
581:                    log.trace(new StringBuffer().append(local_addr).append(
582:                            " --> ACK(").append(dst).append(": #")
583:                            .append(seqno).append(')'));
584:                passDown(new Event(Event.MSG, ack));
585:                num_acks_sent++;
586:            }
587:
588:            public static class UnicastHeader extends Header implements 
589:                    Streamable {
590:                public static final byte DATA = 0;
591:                public static final byte ACK = 1;
592:
593:                byte type = DATA;
594:                long seqno = 0;
595:
596:                static final long serialized_size = Global.BYTE_SIZE
597:                        + Global.LONG_SIZE;
598:
599:                public UnicastHeader() {
600:                } // used for externalization
601:
602:                public UnicastHeader(byte type, long seqno) {
603:                    this .type = type;
604:                    this .seqno = seqno;
605:                }
606:
607:                public String toString() {
608:                    return "[UNICAST: " + type2Str(type) + ", seqno=" + seqno
609:                            + ']';
610:                }
611:
612:                public static String type2Str(byte t) {
613:                    switch (t) {
614:                    case DATA:
615:                        return "DATA";
616:                    case ACK:
617:                        return "ACK";
618:                    default:
619:                        return "<unknown>";
620:                    }
621:                }
622:
623:                public final long size() {
624:                    return serialized_size;
625:                }
626:
627:                public void writeExternal(ObjectOutput out) throws IOException {
628:                    out.writeByte(type);
629:                    out.writeLong(seqno);
630:                }
631:
632:                public void readExternal(ObjectInput in) throws IOException,
633:                        ClassNotFoundException {
634:                    type = in.readByte();
635:                    seqno = in.readLong();
636:                }
637:
638:                public void writeTo(DataOutputStream out) throws IOException {
639:                    out.writeByte(type);
640:                    out.writeLong(seqno);
641:                }
642:
643:                public void readFrom(DataInputStream in) throws IOException,
644:                        IllegalAccessException, InstantiationException {
645:                    type = in.readByte();
646:                    seqno = in.readLong();
647:                }
648:            }
649:
650:            private static final class Entry {
651:                AckReceiverWindow received_msgs = null; // stores all msgs rcvd by a certain peer in seqno-order
652:                AckSenderWindow sent_msgs = null; // stores (and retransmits) msgs sent by us to a certain peer
653:                long sent_msgs_seqno = DEFAULT_FIRST_SEQNO; // seqno for msgs sent by us
654:
655:                void reset() {
656:                    if (sent_msgs != null)
657:                        sent_msgs.reset();
658:                    if (received_msgs != null)
659:                        received_msgs.reset();
660:                    sent_msgs_seqno = DEFAULT_FIRST_SEQNO;
661:                }
662:
663:                public String toString() {
664:                    StringBuffer sb = new StringBuffer();
665:                    if (sent_msgs != null)
666:                        sb.append("sent_msgs=").append(sent_msgs).append('\n');
667:                    if (received_msgs != null)
668:                        sb.append("received_msgs=").append(received_msgs)
669:                                .append('\n');
670:                    return sb.toString();
671:                }
672:            }
673:
674:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.