Source Code Cross Referenced for TOTAL_TOKEN.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » protocols » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.protocols 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        //$Id: TOTAL_TOKEN.java,v 1.14 2006/04/28 15:25:00 belaban Exp $
0002:
0003:        package org.jgroups.protocols;
0004:
0005:        import org.jgroups.*;
0006:        import org.jgroups.blocks.GroupRequest;
0007:        import org.jgroups.protocols.pbcast.Digest;
0008:        import org.jgroups.protocols.ring.RingNodeFlowControl;
0009:        import org.jgroups.protocols.ring.RingToken;
0010:        import org.jgroups.protocols.ring.TokenLostException;
0011:        import org.jgroups.protocols.ring.UdpRingNode;
0012:        import org.jgroups.stack.IpAddress;
0013:        import org.jgroups.stack.RpcProtocol;
0014:        import org.jgroups.util.RspList;
0015:        import org.jgroups.util.Util;
0016:
0017:        import java.io.IOException;
0018:        import java.io.ObjectInput;
0019:        import java.io.ObjectOutput;
0020:        import java.util.*;
0021:
0022:        /**
0023:         * <p>
0024:         * Total order implementation based on <a href="http://citeseer.nj.nec.com/amir95totem.html">
0025:         * The Totem Single-Ring Ordering and Membership Protocol</a>.
0026:         * <p>
0027:         *
0028:         * <p>
0029:         * However, this is an adaption of algorithm mentioned in the research paper above since we reuse
0030:         * our own membership protocol and failure detectors. Somewhat different flow control mechanism is
0031:         * also implemented.
0032:         *
0033:         * <p>
0034:         * Token passing is done through reliable point-to-point udp channels provided by UNICAST layer.
0035:         * Process groups nodes members are organized in a logical ring.
0036:         * </p>
0037:         *
0038:         * <p>
0039:         * Total token layer doesn't need NAKACK nor STABLE layer beneath it since it implements it's own
0040:         * retransmission and tracks stability of the messages from the information piggybacked on the
0041:         * token itself.
0042:         * </p>
0043:         *
0044:         * <p>
0045:         * For the typical protocol stack configuration used, see org.jgroups.demos.TotalTokenDemo and
0046:         * total-token.xml configuration file provided with this distribution of JGroups.
0047:         * </p>
0048:         *
0049:         *
0050:         *
0051:         *@author Vladimir Blagojevic vladimir@cs.yorku.ca
0052:         *@version $Revision: 1.14 $
0053:         *
0054:         *@see org.jgroups.protocols.ring.RingNodeFlowControl
0055:         *@see org.jgroups.protocols.ring.RingNode
0056:         *@see org.jgroups.protocols.ring.TcpRingNode
0057:         *@see org.jgroups.protocols.ring.UdpRingNode
0058:         *
0059:         **/
0060:
0061:        public class TOTAL_TOKEN extends RpcProtocol {
0062:            private static final Object[] NULL_OBJ = new Object[] {};
0063:            private static final Class[] NULL_TYPES = new Class[] {};
0064:
0065:            public static class TotalTokenHeader extends Header {
0066:
0067:                /**
0068:                 * sequence number of the message
0069:                 */
0070:                private long seq;
0071:
0072:                /**
0073:                 *used for externalization
0074:                 */
0075:                public TotalTokenHeader() {
0076:                }
0077:
0078:                public TotalTokenHeader(long seq) {
0079:                    this .seq = seq;
0080:                }
0081:
0082:                public TotalTokenHeader(Long seq) {
0083:                    this .seq = seq.longValue();
0084:                }
0085:
0086:                /**
0087:                 *Returns sequence number of the message that owns this header
0088:                 *@return sequence number
0089:                 */
0090:                public long getSeq() {
0091:                    return seq;
0092:                }
0093:
0094:                /**
0095:                 *Returns size of the header
0096:                 * @return headersize in bytes
0097:                 */
0098:                public long size() {
0099:                    //calculated using Util.SizeOf(Object)
0100:                    return 121;
0101:                }
0102:
0103:                /**
0104:                 * Manual serialization
0105:                 *
0106:                 *
0107:                 */
0108:                public void writeExternal(ObjectOutput out) throws IOException {
0109:                    out.writeLong(seq);
0110:                }
0111:
0112:                /**
0113:                 * Manual deserialization
0114:                 *
0115:                 */
0116:                public void readExternal(ObjectInput in) throws IOException,
0117:                        ClassNotFoundException {
0118:                    seq = in.readLong();
0119:                }
0120:
0121:                public String toString() {
0122:                    return "[TotalTokenHeader=" + seq + ']';
0123:                }
0124:            }
0125:
0126:            public static class RingTokenHeader extends Header {
0127:                public RingTokenHeader() {
0128:                }
0129:
0130:                public void writeExternal(ObjectOutput out) throws IOException {
0131:                }
0132:
0133:                public void readExternal(ObjectInput in) throws IOException,
0134:                        ClassNotFoundException {
0135:                }
0136:
0137:                public long size() {
0138:                    //calculated using Util.SizeOf(Object)
0139:                    return 110;
0140:                }
0141:            }
0142:
0143:            private static final int OPERATIONAL_STATE = 0;
0144:            private static final int RECOVERY_STATE = 1;
0145:
0146:            UdpRingNode node;
0147:            RingNodeFlowControl flowControl;
0148:            Address localAddress;
0149:            private final TokenTransmitter tokenRetransmitter = new TokenTransmitter();
0150:            final List newMessagesQueue = Collections
0151:                    .synchronizedList(new ArrayList());
0152:            SortedSet liveMembersInRecovery, suspects;
0153:
0154:            final Object mutex = new Object();
0155:            TreeMap receivedMessagesQueue;
0156:            long myAru = 0;
0157:
0158:            final Object threadCoordinationMutex = new Object();
0159:            final boolean tokenInStack = false;
0160:            final boolean threadDeliveringMessage = false;
0161:            boolean tokenSeen = false;
0162:
0163:            volatile boolean isRecoveryLeader = false;
0164:            volatile int state;
0165:            volatile int sleepTime = 10;
0166:
0167:            long highestSeenSeq = 0;
0168:            long lastRoundTokensAru = 0;
0169:            int lastRoundTransmitCount, lastRoundRebroadcastCount = 0;
0170:            int blockSendingBacklogThreshold = Integer.MAX_VALUE;
0171:            int unblockSendingBacklogThreshold = Integer.MIN_VALUE;
0172:            boolean tokenCirculating = false;
0173:            boolean senderBlocked = false;
0174:            final Object block_sending = new Object();
0175:            public static final String prot_name = "TOTAL_TOKEN";
0176:
0177:            public String getName() {
0178:                return prot_name;
0179:            }
0180:
0181:            private String getState() {
0182:                if (state == OPERATIONAL_STATE) {
0183:                    return "OPERATIONAL";
0184:                } else
0185:                    return "RECOVERY";
0186:            }
0187:
0188:            public void start() throws Exception {
0189:                super .start();
0190:                receivedMessagesQueue = new TreeMap();
0191:                tokenRetransmitter.start();
0192:            }
0193:
0194:            /**
0195:             * Overrides @org.jgroups.stack.MessageProtocol#stop().
0196:             */
0197:            public void stop() {
0198:                super .stop();
0199:                tokenRetransmitter.shutDown();
0200:            }
0201:
0202:            /**
0203:             * Setup the Protocol instance acording to the configuration string
0204:             *
0205:             */
0206:            public boolean setProperties(Properties props) {
0207:                String str;
0208:
0209:                super .setProperties(props);
0210:                str = props.getProperty("block_sending");
0211:                if (str != null) {
0212:                    blockSendingBacklogThreshold = Integer.parseInt(str);
0213:                    props.remove("block_sending");
0214:                }
0215:
0216:                str = props.getProperty("unblock_sending");
0217:                if (str != null) {
0218:                    unblockSendingBacklogThreshold = Integer.parseInt(str);
0219:                    props.remove("unblock_sending");
0220:                }
0221:
0222:                if (props.size() > 0) {
0223:                    log
0224:                            .error("UDP.setProperties(): the following properties are not recognized: "
0225:                                    + props);
0226:
0227:                    return false;
0228:                }
0229:                return true;
0230:            }
0231:
0232:            public IpAddress getTokenReceiverAddress() {
0233:                return node != null ? node.getTokenReceiverAddress() : null;
0234:            }
0235:
0236:            public Vector providedUpServices() {
0237:                Vector retval = new Vector();
0238:                retval.addElement(new Integer(Event.GET_DIGEST));
0239:                retval.addElement(new Integer(Event.GET_DIGEST_STATE));
0240:                retval.addElement(new Integer(Event.SET_DIGEST));
0241:                return retval;
0242:            }
0243:
0244:            public boolean handleUpEvent(Event evt) {
0245:                Message msg;
0246:                Header h;
0247:                switch (evt.getType()) {
0248:
0249:                case Event.SET_LOCAL_ADDRESS:
0250:                    localAddress = (Address) evt.getArg();
0251:                    node = new UdpRingNode(this , localAddress);
0252:                    flowControl = new RingNodeFlowControl();
0253:                    break;
0254:
0255:                case Event.SUSPECT:
0256:                    Address suspect = (Address) evt.getArg();
0257:                    onSuspectMessage(suspect);
0258:                    break;
0259:
0260:                case Event.MSG:
0261:                    msg = (Message) evt.getArg();
0262:                    h = msg.getHeader(getName());
0263:                    if (h instanceof  TotalTokenHeader) {
0264:                        messageArrived(msg);
0265:                        return false;
0266:                    } else if (h instanceof  RingTokenHeader) {
0267:                        if (node != null) {
0268:                            Object tmp = msg.getObject();
0269:                            node.tokenArrived(tmp);
0270:                        }
0271:                        return false;
0272:                    }
0273:                }
0274:                return true;
0275:            }
0276:
0277:            public boolean handleDownEvent(Event evt) {
0278:                switch (evt.getType()) {
0279:                case Event.GET_DIGEST:
0280:                case Event.GET_DIGEST_STATE:
0281:
0282:                    Digest d = new Digest(members.size());
0283:                    Address sender = null;
0284:                    //all members have same digest :)
0285:                    for (int j = 0; j < members.size(); j++) {
0286:                        sender = (Address) members.elementAt(j);
0287:                        d.add(sender, highestSeenSeq, highestSeenSeq);
0288:                    }
0289:                    passUp(new Event(Event.GET_DIGEST_OK, d));
0290:                    return false;
0291:                case Event.SET_DIGEST:
0292:                    Digest receivedDigest = (Digest) evt.getArg();
0293:
0294:                    // changed by bela July 12 2005, not sure if this is correct, don't know what the original author
0295:                    // intended to do here
0296:                    // myAru = receivedDigest.highSeqnoAt(0);
0297:                    myAru = receivedDigest.highSeqnoAt(localAddress);
0298:                    return false;
0299:
0300:                case Event.VIEW_CHANGE:
0301:                    onViewChange();
0302:                    return true;
0303:
0304:                    /*
0305:                    case Event.CLEANUP:
0306:                    // do not pass cleanup event
0307:                    //further down. This is a hack to enable
0308:                    // sucessfull leave from group when using pbcast.GMS.
0309:                    // It just buys us 5 seconds to imminent STOP
0310:                    // event following CLEANUP. We hope that the moment
0311:                    // this node disconnect up until new view is installed
0312:                    // at other members is less than 5 seconds.
0313:
0314:                    //The proper way would be to:
0315:                    //trap DISCONNECT event on the way down, do not pass it further.
0316:                    //wait for the new view to be installed (effectively excluding this node out of
0317:                    //ring) , wait for one token roundtrip time, and then send that trapped
0318:                    //DISCONNECT event down furhter to generate DISCONNECT_OK on the way up.
0319:                    // CLEANUP and STOP are generated after DISCONNECT.
0320:
0321:                    //However, as the things stand right now pbcast.GMS stops working immediately
0322:                    //when it receives DISCONNECT thus the new view is never generated in node that is
0323:                    //leaving the group.
0324:
0325:                    //pbcsat.GMS should still generate new view and stop working when
0326:                    //it receives STOP event.
0327:
0328:                    //In timeline DISCONNECT < CLEANUP < STOP
0329:                    return false;
0330:                     */
0331:
0332:                case Event.MSG:
0333:                    Message msg = (Message) evt.getArg();
0334:                    //handle only multicasts
0335:                    if (msg == null)
0336:                        return false;
0337:                    if (msg.getDest() == null
0338:                            || msg.getDest().isMulticastAddress()) {
0339:                        newMessagesQueue.add(msg);
0340:                        return false;
0341:                    }
0342:                }
0343:                return true;
0344:            }
0345:
0346:            private void onViewChange() {
0347:                isRecoveryLeader = false;
0348:
0349:                if (suspects != null) {
0350:                    suspects.clear();
0351:                    suspects = null;
0352:                }
0353:                if (liveMembersInRecovery != null) {
0354:                    liveMembersInRecovery.clear();
0355:                    liveMembersInRecovery = null;
0356:                }
0357:            }
0358:
0359:            private void onSuspectMessage(Address suspect) {
0360:                state = RECOVERY_STATE;
0361:                if (suspects == null || suspects.size() == 0) {
0362:                    suspects = Collections.synchronizedSortedSet(new TreeSet());
0363:                    liveMembersInRecovery = Collections
0364:                            .synchronizedSortedSet(new TreeSet(members));
0365:                }
0366:                suspects.add(suspect);
0367:                liveMembersInRecovery.removeAll(suspects);
0368:                isRecoveryLeader = isRecoveryLeader(liveMembersInRecovery);
0369:            }
0370:
0371:            /**
0372:             * Given a set of surviving members in the transitioanl view
0373:             * returns true if this stack is elected to be recovery leader.
0374:             *
0375:             */
0376:            private boolean isRecoveryLeader(SortedSet liveMembers) {
0377:                boolean recoveryLeader = false;
0378:                if (liveMembers.size() > 0) {
0379:                    recoveryLeader = localAddress.equals(liveMembers.first());
0380:                }
0381:
0382:                if (log.isInfoEnabled())
0383:                    log.info("live memebers are " + liveMembers);
0384:                if (log.isInfoEnabled())
0385:                    log.info("I am recovery leader?" + recoveryLeader);
0386:                return recoveryLeader;
0387:
0388:            }
0389:
0390:            public long getAllReceivedUpTo() {
0391:                return myAru;
0392:            }
0393:
0394:            public void installTransitionalView(Vector members) {
0395:                if (node != null)
0396:                    node.reconfigure(members);
0397:            }
0398:
0399:            /**
0400:             *  Total Token recovery protocol (TTRP)
0401:             *
0402:             *
0403:             *
0404:             *  Upon transition to recovery state, coordinator sends multiple reliable
0405:             *  unicasts message requesting each ring member to report it's allReceivedUpto
0406:             *  value. When all replies are received, a response list of allReceivedUpto
0407:             *  values is sorted and transformed into a set while dropping the lowest
0408:             *  allReceivedUpto value. For example , received response list  [4,4,5,6,7,7,8]
0409:             *  is transformed into [5,6,7,8] thus not including the lowest value 4.
0410:             *
0411:             *  The objective of the recovery protocol is to have each member receive all
0412:             *  messages up to maximum sequence value M from the response list, thus
0413:             *  satisfying virtual synchrony properties.
0414:             *
0415:             *  Note that if all members report the same allReceivedUpto values, then
0416:             *  virtual synchrony is satisfied (since all surviving members have seen
0417:             *  the same set of messages) and we can immediately inject operational
0418:             *  token which will enable installment of the new view.
0419:             *
0420:             *  Otherwise, a constructed set S of all allReceivedUpto values represent sequence ids
0421:             *  of messages that have to be received by all mebers prior to installing new
0422:             *  view thus satisfying virtual synchrony properties.
0423:             *
0424:             *  A transitional view, visible only to TOTAL_TOKEN layer is then installed
0425:             *  on the ring by a coordinator. Again a multiple unicast are used. A
0426:             *  transitional view is deduced from current view by excluding suspected members.
0427:             *  Coordinator creates a recovery token by appending the set S of sequence ids to
0428:             *  token retransmission request list. Recovery token is then inserted into
0429:             *  transitional ring view.
0430:             *
0431:             *  Upon reception of recovery token, ring members are not allowed to transmit
0432:             *  any additional new messages but only to retransmit messages from the
0433:             *  specified token retransmission request list.
0434:             *
0435:             *  When all member detect that they have received all messages upto sequence
0436:             *  value M , the next member that first receives token converts it to operatioanl
0437:             *  token and  normal operational state is restored in all nodes.
0438:             *
0439:             *  If token is lost during recovery stage, recovery protocol is restarted.
0440:             *
0441:             * */
0442:            private void recover() {
0443:
0444:                if (isRecoveryLeader && state == RECOVERY_STATE) {
0445:
0446:                    if (log.isInfoEnabled())
0447:                        log.info("I am starting recovery now");
0448:
0449:                    Vector m = new Vector(liveMembersInRecovery);
0450:
0451:                    RspList list = callRemoteMethods(m, "getAllReceivedUpTo",
0452:                            NULL_OBJ, NULL_TYPES, GroupRequest.GET_ALL, 0);
0453:                    //RspList list = callRemoteMethods(m, "getAllReceivedUpTo", GroupRequest.GET_ALL, 0);
0454:                    Vector myAllReceivedUpTos = list.getResults();
0455:
0456:                    callRemoteMethods(m, "getAllReceivedUpTo", NULL_OBJ,
0457:                            NULL_TYPES, GroupRequest.GET_ALL, 0);
0458:                    //callRemoteMethods(m, "getAllReceivedUpTo", GroupRequest.GET_ALL, 0);
0459:                    Vector myAllReceivedUpTosConfirm = list.getResults();
0460:
0461:                    while (!myAllReceivedUpTos
0462:                            .equals(myAllReceivedUpTosConfirm)) {
0463:                        myAllReceivedUpTos = myAllReceivedUpTosConfirm;
0464:                        callRemoteMethods(m, "getAllReceivedUpTo", NULL_OBJ,
0465:                                NULL_TYPES, GroupRequest.GET_ALL, 0);
0466:                        // callRemoteMethods(m, "getAllReceivedUpTo", GroupRequest.GET_ALL, 0);
0467:                        myAllReceivedUpTosConfirm = list.getResults();
0468:
0469:                        if (log.isInfoEnabled())
0470:                            log.info("myAllReceivedUpto values are"
0471:                                    + myAllReceivedUpTos);
0472:                        if (log.isInfoEnabled())
0473:                            log.info("myAllReceivedUpto confirm values are "
0474:                                    + myAllReceivedUpTosConfirm);
0475:                    }
0476:
0477:                    if (log.isInfoEnabled())
0478:                        log.info("myAllReceivedUpto stabilized values are"
0479:                                + myAllReceivedUpTos);
0480:                    if (log.isInfoEnabled())
0481:                        log
0482:                                .info("installing transitional view to repair the ring...");
0483:
0484:                    callRemoteMethods(m, "installTransitionalView",
0485:                            new Object[] { m }, new String[] { Vector.class
0486:                                    .getName() }, GroupRequest.GET_ALL, 0);
0487:                    //callRemoteMethods(m, "installTransitionalView", m, GroupRequest.GET_ALL, 0);
0488:
0489:                    Vector xmits = prepareRecoveryRetransmissionList(myAllReceivedUpTos);
0490:                    RingToken injectToken = null;
0491:                    if (xmits.size() > 1) {
0492:
0493:                        if (log.isInfoEnabled())
0494:                            log
0495:                                    .info("VS not satisfied, injecting recovery token...");
0496:                        long aru = ((Long) xmits.firstElement()).longValue();
0497:                        long highest = ((Long) xmits.lastElement()).longValue();
0498:
0499:                        injectToken = new RingToken(RingToken.RECOVERY);
0500:                        injectToken.setHighestSequence(highest);
0501:                        injectToken.setAllReceivedUpto(aru);
0502:
0503:                        Collection rtr = injectToken
0504:                                .getRetransmissionRequests();
0505:                        rtr.addAll(xmits);
0506:                    } else {
0507:
0508:                        if (log.isInfoEnabled())
0509:                            log
0510:                                    .info("VS satisfied, injecting operational token...");
0511:                        injectToken = new RingToken();
0512:                        long sequence = ((Long) xmits.firstElement())
0513:                                .longValue();
0514:                        injectToken.setHighestSequence(sequence);
0515:                        injectToken.setAllReceivedUpto(sequence);
0516:                    }
0517:                    if (node != null)
0518:                        node.passToken(injectToken);
0519:                    tokenRetransmitter.resetTimeout();
0520:                }
0521:            }
0522:
0523:            /**
0524:             * Prepares a retransmissions list for recovery protocol
0525:             * given a collection of all myReceivedUpTo values as
0526:             * reported by polled surviving members.
0527:             *
0528:             *
0529:             *
0530:             */
0531:            private Vector prepareRecoveryRetransmissionList(Vector sequences) {
0532:                Collections.sort(sequences);
0533:                Long first = (Long) sequences.firstElement();
0534:                Long last = (Long) sequences.lastElement();
0535:
0536:                Vector retransmissions = new Vector();
0537:                if (first.equals(last)) {
0538:                    retransmissions.add(new Long(first.longValue()));
0539:                } else {
0540:                    for (long j = first.longValue() + 1; j <= last.longValue(); j++) {
0541:                        retransmissions.add(new Long(j));
0542:                    }
0543:                }
0544:                return retransmissions;
0545:            }
0546:
0547:            protected void updateView(View newMembers) {
0548:                super .updateView(newMembers);
0549:                Vector newViewMembers = newMembers.getMembers();
0550:                flowControl.viewChanged(newViewMembers.size());
0551:                if (node != null)
0552:                    node.reconfigure(newViewMembers);
0553:                boolean isCoordinator = localAddress.equals(newViewMembers
0554:                        .firstElement());
0555:                int memberSize = newViewMembers.size();
0556:
0557:                if (memberSize == 1 && isCoordinator && !tokenCirculating) {
0558:                    //create token for the first time , lets roll
0559:                    tokenCirculating = true;
0560:                    RingToken token = new RingToken();
0561:                    if (node != null)
0562:                        node.passToken(token);
0563:                    tokenRetransmitter.resetTimeout();
0564:                }
0565:                sleepTime = (20 / memberSize);
0566:            }
0567:
0568:            /**
0569:             * TOTAL_TOKEN's up-handler thread invokes this method after multicast
0570:             * message originating from some other TOTAL_TOKEN stack layer arrives at
0571:             * this stack layer.
0572:             *
0573:             * Up-handler thread coordinates it's access to a shared variables
0574:             * with TokenTransmitter thread.
0575:             *
0576:             * See tokenReceived() for details.
0577:             *
0578:             */
0579:            private void messageArrived(Message m) {
0580:                TotalTokenHeader h = (TotalTokenHeader) m.getHeader(getName());
0581:                long seq = h.getSeq();
0582:
0583:                synchronized (mutex) {
0584:                    if ((myAru + 1) <= seq) {
0585:                        if (seq > highestSeenSeq) {
0586:                            highestSeenSeq = seq;
0587:                        }
0588:
0589:                        receivedMessagesQueue.put(new Long(seq), m);
0590:                        if ((myAru + 1) == seq) {
0591:                            myAru = seq;
0592:                            passUp(new Event(Event.MSG, m));
0593:                        }
0594:                        if (isReceiveQueueHolePlugged()) {
0595:                            myAru = deliverMissingMessages();
0596:                        }
0597:                    }
0598:                }
0599:            }
0600:
0601:            /**
0602:             * Returns true if there is a hole in receive queue and at
0603:             * least one messages with sequence id consecutive to myAru.
0604:             *
0605:             *
0606:             */
0607:            private boolean isReceiveQueueHolePlugged() {
0608:                return ((myAru < highestSeenSeq) && receivedMessagesQueue
0609:                        .containsKey(new Long(myAru + 1)));
0610:            }
0611:
0612:            /**
0613:             * Delivers as much as possible messages from receive
0614:             * message queue as long as they are consecutive with
0615:             * respect to their sequence ids.
0616:             *
0617:             */
0618:            private long deliverMissingMessages() {
0619:                Map.Entry entry = null;
0620:                boolean inOrder = true;
0621:                long lastDelivered = myAru;
0622:                Set deliverySet = receivedMessagesQueue.tailMap(
0623:                        new Long(myAru + 1)).entrySet();
0624:
0625:                if (log.isInfoEnabled())
0626:                    log.info("hole getting plugged, prior muAru " + myAru);
0627:
0628:                for (Iterator iterator = deliverySet.iterator(); inOrder
0629:                        && iterator.hasNext();) {
0630:                    entry = (Map.Entry) iterator.next();
0631:                    long nextInQueue = ((Long) entry.getKey()).longValue();
0632:                    if (lastDelivered + 1 == nextInQueue) {
0633:                        Message m = (Message) entry.getValue();
0634:                        passUp(new Event(Event.MSG, m));
0635:                        lastDelivered++;
0636:                    } else {
0637:                        inOrder = false;
0638:                    }
0639:                }
0640:
0641:                if (log.isInfoEnabled())
0642:                    log.info("hole getting plugged, post muAru "
0643:                            + lastDelivered);
0644:                return lastDelivered;
0645:            }
0646:
0647:            /**
0648:             * Checks if the receivedMessageQueue has any missing sequence
0649:             * numbers in it, and if it does it finds holes in sequences from
0650:             * this stack's receivedMessageQueue and adds them to token retransmission
0651:             * list, thus informing other group members about messages missing
0652:             * from this stack.
0653:             *
0654:             *
0655:             */
0656:            private void updateTokenRtR(RingToken token) {
0657:                long holeLowerBound = 0;
0658:                long holeUpperBound = 0;
0659:                Long missingSequence = null;
0660:                Collection retransmissionList = null;
0661:
0662:                //any holes?
0663:                if (myAru < token.getHighestSequence()) {
0664:                    retransmissionList = token.getRetransmissionRequests();
0665:                    Set received = receivedMessagesQueue.tailMap(
0666:                            new Long(myAru + 1)).keySet();
0667:                    Iterator nonMissing = received.iterator();
0668:                    holeLowerBound = myAru;
0669:
0670:                    if (log.isDebugEnabled())
0671:                        log.debug("retransmission request prior"
0672:                                + retransmissionList);
0673:
0674:                    while (nonMissing.hasNext()) {
0675:                        Long seq = (Long) nonMissing.next();
0676:                        holeUpperBound = seq.longValue();
0677:
0678:                        while (holeLowerBound < holeUpperBound) {
0679:                            missingSequence = new Long(++holeLowerBound);
0680:                            retransmissionList.add(missingSequence);
0681:                        }
0682:                        holeLowerBound = holeUpperBound;
0683:                    }
0684:
0685:                    holeUpperBound = token.getHighestSequence();
0686:                    while (holeLowerBound < holeUpperBound) {
0687:                        missingSequence = new Long(++holeLowerBound);
0688:                        retransmissionList.add(missingSequence);
0689:                    }
0690:
0691:                    if (log.isDebugEnabled())
0692:                        log.debug("retransmission request after"
0693:                                + retransmissionList);
0694:                }
0695:            }
0696:
0697:            /**
0698:             * Sends messages in this stacks's outgoing queue and
0699:             * saves a copy of each outgoing message in case they got lost.
0700:             * If messages get lost it is thus guaranteed that each stack
0701:             * that sent any message has a copy of it ready for retransmitting.
0702:             *
0703:             * Each sent message is stamped by monotonically increasing
0704:             * sequence number starting from the highest sequence "seen"
0705:             * on the ring.
0706:             *
0707:             * Returns number of messages actually sent.  The number of
0708:             * sent messages is bounded above by the flow control
0709:             * algorithm (allowedCount) and bounded below by the number
0710:             * of pending messages in newMessagesQueue.
0711:             */
0712:            private int broadcastMessages(int allowedCount, RingToken token) {
0713:                List sendList = null;
0714:                synchronized (newMessagesQueue) {
0715:                    int queueSize = newMessagesQueue.size();
0716:
0717:                    if (queueSize <= 0) {
0718:                        return 0;
0719:                    }
0720:
0721:                    else if (queueSize > allowedCount) {
0722:                        sendList = new ArrayList(newMessagesQueue.subList(0,
0723:                                allowedCount));
0724:                        newMessagesQueue.removeAll(sendList);
0725:                    }
0726:
0727:                    else {
0728:                        sendList = new ArrayList();
0729:                        sendList.addAll(newMessagesQueue);
0730:                        newMessagesQueue.clear();
0731:                    }
0732:                }
0733:
0734:                long tokenSeq = token.getHighestSequence();
0735:
0736:                for (Iterator iterator = sendList.iterator(); iterator
0737:                        .hasNext();) {
0738:                    Message m = (Message) iterator.next();
0739:                    m.setSrc(localAddress);
0740:                    m.setDest(null); // mcast address
0741:                    m.putHeader(getName(), new TotalTokenHeader(++tokenSeq));
0742:                    receivedMessagesQueue.put(new Long(tokenSeq), m);
0743:                    passDown(new Event(Event.MSG, m));
0744:                }
0745:
0746:                if (token.getHighestSequence() == token.getAllReceivedUpto()) {
0747:                    token.setAllReceivedUpto(tokenSeq);
0748:                }
0749:                token.setHighestSequence(tokenSeq);
0750:                return sendList.size();
0751:            }
0752:
0753:            /**
0754:             * TokenTransmitter thread runs this method after receiving token.
0755:             * Thread is possibly blocked if up-handler thread is currently running
0756:             * through this stack i.e delivering an Event. Up-hanlder thread will
0757:             * notify blocked TokenTransmitter thread when it has delivered current
0758:             * Event so TokenTransmitter can proceed.
0759:             * TokenTransmitter thread in turn notifies possibly blocked up-handler thread
0760:             * when token has left the stack. Thus TokenTransmitter and up-hadler thread
0761:             * coordinate their access to shared variables(receivedMessageQueue and myAru).
0762:             *
0763:             * tokenReceived method and subsequent methods called from tokenReceived represent
0764:             * in some parts the totaly ordered algorithm presented in Amir's paper (see class
0765:             * header for link)
0766:             *
0767:             *
0768:             *
0769:             */
0770:            private void tokenReceived(RingToken token) {
0771:
0772:                if (log.isInfoEnabled())
0773:                    log.info(token.toString());
0774:                if (log.isDebugEnabled())
0775:                    log.debug(getState());
0776:
0777:                flowControl.setBacklog(newMessagesQueue.size());
0778:                flowControl.updateWindow(token);
0779:
0780:                blockSenderIfRequired();
0781:                unBlockSenderIfAcceptable();
0782:
0783:                long tokensAru = 0;
0784:                int broadcastCount = 0;
0785:                int rebroadcastCount = 0;
0786:                synchronized (mutex) {
0787:                    if (!tokenSeen) {
0788:                        long lastRoundAru = token.getHighestSequence()
0789:                                - token.getLastRoundBroadcastCount();
0790:                        if (myAru < token.getAllReceivedUpto()) {
0791:                            myAru = lastRoundAru;
0792:                        }
0793:                        //if(log.isInfoEnabled()) log.info("TOTAL_TOKEN.tokenReceived()", "tokenSeen " + myAru);
0794:                        tokenSeen = true;
0795:                    }
0796:
0797:                    if (token.getType() == RingToken.RECOVERY) {
0798:                        highestSeenSeq = token.getHighestSequence();
0799:                        if (highestSeenSeq == myAru) {
0800:                            if (log.isInfoEnabled())
0801:                                log.info("member node recovered");
0802:                            token.addRecoveredMember(localAddress);
0803:                        }
0804:                    }
0805:
0806:                    updateTokenRtR(token);
0807:
0808:                    int allowedToBroadcast = flowControl
0809:                            .getAllowedToBroadcast(token);
0810:                    rebroadcastCount = rebroadcastMessages(token);
0811:                    allowedToBroadcast -= rebroadcastCount;
0812:
0813:                    if (log.isInfoEnabled())
0814:                        log.info("myAllReceivedUpto" + myAru);
0815:                    if (log.isInfoEnabled())
0816:                        log.info("allowedToBroadcast" + allowedToBroadcast);
0817:                    if (log.isInfoEnabled())
0818:                        log.info("newMessagesQueue.size()"
0819:                                + newMessagesQueue.size());
0820:
0821:                    tokensAru = token.getAllReceivedUpto();
0822:
0823:                    if (myAru < tokensAru
0824:                            || localAddress.equals(token.getAruId())
0825:                            || token.getAruId() == null) {
0826:                        token.setAllReceivedUpto(myAru);
0827:                        if (token.getAllReceivedUpto() == token
0828:                                .getHighestSequence()) {
0829:                            token.setAruId(null);
0830:                        } else {
0831:                            token.setAruId(localAddress);
0832:                        }
0833:                    }
0834:                    if (allowedToBroadcast > 0
0835:                            && token.getType() == RingToken.OPERATIONAL) {
0836:                        broadcastCount = broadcastMessages(allowedToBroadcast,
0837:                                token);
0838:                    }
0839:
0840:                    if (tokensAru > lastRoundTokensAru) {
0841:                        removeStableMessages(receivedMessagesQueue,
0842:                                lastRoundTokensAru);
0843:                    }
0844:
0845:                } //end synchronized
0846:
0847:                //give CPU some breath
0848:                Util.sleep(sleepTime);
0849:
0850:                token.incrementTokenSequence();
0851:                token.addLastRoundBroadcastCount(broadcastCount
0852:                        - lastRoundTransmitCount);
0853:                token.addBacklog(flowControl.getBacklogDifference());
0854:                flowControl.setPreviousBacklog();
0855:                lastRoundTransmitCount = broadcastCount;
0856:                lastRoundRebroadcastCount = rebroadcastCount;
0857:                lastRoundTokensAru = tokensAru;
0858:            }
0859:
0860:            /**
0861:             *
0862:             * Rebroadcasts messages specified in token's retransmission
0863:             * request list if those messages are available in this stack.
0864:             * Returns number of rebroadcasted messages.
0865:             *
0866:             */
0867:            private int rebroadcastMessages(RingToken token) {
0868:                int rebroadCastCount = 0;
0869:                Collection rexmitRequests = token.getRetransmissionRequests();
0870:                if (rexmitRequests.size() > 0) {
0871:                    Collection rbl = getRebroadcastList(rexmitRequests);
0872:                    rebroadCastCount = rbl.size();
0873:                    if (rebroadCastCount > 0) {
0874:
0875:                        if (log.isInfoEnabled())
0876:                            log.info("rebroadcasting " + rbl);
0877:
0878:                        Long s = null;
0879:                        for (Iterator iterator = rbl.iterator(); iterator
0880:                                .hasNext();) {
0881:                            s = (Long) iterator.next();
0882:                            Message m = (Message) receivedMessagesQueue.get(s);
0883:                            passDown(new Event(Event.MSG, m));
0884:                        }
0885:                    }
0886:                }
0887:                return rebroadCastCount;
0888:            }
0889:
0890:            private void invalidateOnTokenloss() {
0891:                lastRoundTransmitCount = 0;
0892:                flowControl.invalidate();
0893:            }
0894:
0895:            /**
0896:             * Checks if the down pending queue's (newMessagesQueue) size is
0897:             * greater than blockSendingBacklogThreshold specified in the properties.
0898:             * If it is, client's sending thread is effectively blocked until
0899:             * down pending queue's size drops below unblockSendingBacklogThreshold.
0900:             *
0901:             *
0902:             */
0903:            private void blockSenderIfRequired() {
0904:                if (!senderBlocked
0905:                        && flowControl.getBacklog() > blockSendingBacklogThreshold) {
0906:                    synchronized (block_sending) {
0907:                        senderBlocked = true;
0908:                        while (senderBlocked) {
0909:                            try {
0910:                                block_sending.wait();
0911:                            } catch (InterruptedException e) {
0912:                            }
0913:                        }
0914:                    }
0915:                }
0916:            }
0917:
0918:            /**
0919:             * Checks if the down pending queue's (newMessagesQueue) size is
0920:             * smaller than unblockSendingBacklogThreshold specified in the properties.
0921:             * If it is, client's sending thread is effectively unblocked enabling
0922:             * new messages to be queued for transmission.
0923:             *
0924:             *
0925:             */
0926:            private void unBlockSenderIfAcceptable() {
0927:                if (senderBlocked
0928:                        && flowControl.getBacklog() < unblockSendingBacklogThreshold) {
0929:                    synchronized (block_sending) {
0930:                        senderBlocked = false;
0931:                        block_sending.notifyAll();
0932:                    }
0933:                }
0934:
0935:            }
0936:
0937:            /**
0938:             * Removes messages determined to be stable(i.e seen by all members)
0939:             * from the specified queue.  If the client also clears all reference
0940:             * to these messages (in application space) they become eligible for garabge
0941:             * collection.
0942:             *
0943:             *
0944:             */
0945:
0946:            private void removeStableMessages(TreeMap m, long upToSeq) {
0947:
0948:                if (m.size() > 0) {
0949:                    long first = ((Long) m.firstKey()).longValue();
0950:                    if (first > upToSeq) {
0951:                        upToSeq = first;
0952:                    }
0953:
0954:                    if (log.isDebugEnabled())
0955:                        log.debug("cutting queue first key " + m.firstKey()
0956:                                + " cut at " + upToSeq + " last key "
0957:                                + m.lastKey());
0958:                    SortedMap stable = m.headMap(new Long(upToSeq));
0959:                    stable.clear();
0960:                }
0961:            }
0962:
0963:            /**
0964:             * Determines a subset of message sequence numbers
0965:             * available for retransmission from this stack.
0966:             *
0967:             */
0968:            private Collection getRebroadcastList(Collection rtr) {
0969:                ArrayList rebroadcastList = new ArrayList(rtr);
0970:                rebroadcastList.retainAll(receivedMessagesQueue.keySet());
0971:                rtr.removeAll(rebroadcastList);
0972:                Collections.sort(rebroadcastList);
0973:                return rebroadcastList;
0974:            }
0975:
0976:            /**
0977:             * TokenTransimitter thread transmits the token to the next member
0978:             * in the logical ring as well as it receives token from the previous
0979:             * member in the ring. Smoothed ring roundtrip time is computed
0980:             * in order to detect token loss.  If the timeout expires AND this
0981:             * stack has received SUSPECT message, recovery protocol is invoked.
0982:             * See recover method for details.
0983:             *
0984:             */
0985:            private class TokenTransmitter extends Thread {
0986:                long rtt = 0;
0987:                long timer;
0988:                double srtt = 1000; //1 second to start
0989:                final double a = 0.09;
0990:                final int timeoutFactor = 10;
0991:                volatile boolean running = false;
0992:
0993:                private TokenTransmitter() {
0994:                    super (Util.getGlobalThreadGroup(), "TokenTransmitter");
0995:                    resetTimeout();
0996:                    running = true;
0997:                }
0998:
0999:                private void shutDown() {
1000:                    running = false;
1001:                }
1002:
1003:                private void recalculateTimeout() {
1004:                    long now = System.currentTimeMillis();
1005:                    if (timer > 0) {
1006:                        rtt = now - timer;
1007:                        srtt = (1 - a) * srtt + a * rtt;
1008:                    }
1009:                }
1010:
1011:                private double getTimeout() {
1012:                    return srtt * timeoutFactor;
1013:                }
1014:
1015:                private void resetTimeout() {
1016:                    timer = System.currentTimeMillis();
1017:                }
1018:
1019:                private boolean isRecoveryCompleted(RingToken token) {
1020:                    return liveMembersInRecovery.equals(token
1021:                            .getRecoveredMembers());
1022:                }
1023:
1024:                public void run() {
1025:                    while (running) {
1026:                        RingToken token = null;
1027:                        int timeout = 0;
1028:
1029:                        if (node == null) {
1030:                            // sleep some time, then retry
1031:                            Util.sleep(500);
1032:                            continue;
1033:                        }
1034:
1035:                        try {
1036:                            timeout = (int) getTimeout();
1037:
1038:                            if (log.isInfoEnabled())
1039:                                log.info("timeout(ms)=" + timeout);
1040:
1041:                            token = (RingToken) node.receiveToken(timeout);
1042:
1043:                            if (token.getType() == RingToken.OPERATIONAL
1044:                                    && state == RECOVERY_STATE) {
1045:                                state = OPERATIONAL_STATE;
1046:                            }
1047:
1048:                            tokenReceived(token);
1049:                            recalculateTimeout();
1050:
1051:                            if (token.getType() == RingToken.RECOVERY
1052:                                    && isRecoveryCompleted(token)) {
1053:
1054:                                if (log.isInfoEnabled())
1055:                                    log
1056:                                            .info("all members recovered, injecting operational token");
1057:                                token.setType(RingToken.OPERATIONAL);
1058:                            }
1059:                            node.passToken(token);
1060:                            resetTimeout();
1061:                        } catch (TokenLostException tle) {
1062:                            invalidateOnTokenloss();
1063:                            state = RECOVERY_STATE;
1064:                            recover();
1065:                        }
1066:                    }
1067:                }
1068:            }
1069:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.