Source Code Cross Referenced for GMS.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » protocols » pbcast » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.protocols.pbcast 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        package org.jgroups.protocols.pbcast;
0002:
0003:        import org.jgroups.*;
0004:        import org.jgroups.stack.Protocol;
0005:        import org.jgroups.util.*;
0006:        import org.jgroups.util.Queue;
0007:        import org.apache.commons.logging.Log;
0008:
0009:        import java.io.*;
0010:        import java.util.*;
0011:        import java.util.List;
0012:
0013:        /**
0014:         * Group membership protocol. Handles joins/leaves/crashes (suspicions) and emits new views
0015:         * accordingly. Use VIEW_ENFORCER on top of this layer to make sure new members don't receive
0016:         * any messages until they are members
0017:         * @author Bela Ban
0018:         * @version $Id: GMS.java,v 1.68.2.5 2007/04/27 08:03:55 belaban Exp $
0019:         */
0020:        public class GMS extends Protocol {
0021:            private GmsImpl impl = null;
0022:            Address local_addr = null;
0023:            final Membership members = new Membership(); // real membership
0024:            private final Membership tmp_members = new Membership(); // base for computing next view
0025:
0026:            /** Members joined but for which no view has been received yet */
0027:            private final Vector joining = new Vector(7);
0028:
0029:            /** Members excluded from group, but for which no view has been received yet */
0030:            private final Vector leaving = new Vector(7);
0031:
0032:            View view = null;
0033:            ViewId view_id = null;
0034:            private long ltime = 0;
0035:            long join_timeout = 5000;
0036:            long join_retry_timeout = 2000;
0037:            long flush_timeout = 4000;
0038:            long leave_timeout = 5000;
0039:            private long digest_timeout = 0; // time to wait for a digest (from PBCAST). should be fast
0040:            long merge_timeout = 10000; // time to wait for all MERGE_RSPS
0041:            private final Object impl_mutex = new Object(); // synchronizes event entry into impl
0042:            private final Promise digest_promise = new Promise(); // holds result of GET_DIGEST event
0043:            private final Promise flush_promise = new Promise();
0044:            boolean use_flush = false;
0045:            private final Hashtable impls = new Hashtable(3);
0046:            private boolean shun = false;
0047:            boolean merge_leader = false; // can I initiate a merge ?
0048:            private boolean print_local_addr = true;
0049:            boolean disable_initial_coord = false; // can the member become a coord on startup or not ?
0050:            /** Setting this to false disables concurrent startups. This is only used by unit testing code
0051:             * for testing merging. To everybody else: don't change it to false ! */
0052:            boolean handle_concurrent_startup = true;
0053:            /** Whether view bundling (http://jira.jboss.com/jira/browse/JGRP-144) should be enabled or not. Setting this to
0054:             * false forces each JOIN/LEAVE/SUPSECT request to be handled separately. By default these requests are processed
0055:             * together if they are queued at approximately the same time */
0056:            private boolean view_bundling = true;
0057:            private long max_bundling_time = 50; // 50ms max to wait for other JOIN, LEAVE or SUSPECT requests
0058:            static final String CLIENT = "Client";
0059:            static final String COORD = "Coordinator";
0060:            static final String PART = "Participant";
0061:            TimeScheduler timer = null;
0062:
0063:            /** Max number of old members to keep in history */
0064:            protected int num_prev_mbrs = 50;
0065:
0066:            /** Keeps track of old members (up to num_prev_mbrs) */
0067:            BoundedList prev_members = null;
0068:
0069:            int num_views = 0;
0070:
0071:            /** Stores the last 20 views */
0072:            BoundedList prev_views = new BoundedList(20);
0073:
0074:            /** Class to process JOIN, LEAVE and MERGE requests */
0075:            private final ViewHandler view_handler = new ViewHandler();
0076:
0077:            /** To collect VIEW_ACKs from all members */
0078:            final AckCollector ack_collector = new AckCollector();
0079:
0080:            /** Time in ms to wait for all VIEW acks (0 == wait forever) */
0081:            long view_ack_collection_timeout = 2000;
0082:
0083:            /** How long should a Resumer wait until resuming the ViewHandler */
0084:            long resume_task_timeout = 20000;
0085:
0086:            boolean flushProtocolInStack = false;
0087:
0088:            public static final String name = "GMS";
0089:
0090:            public GMS() {
0091:                initState();
0092:            }
0093:
0094:            public String getName() {
0095:                return name;
0096:            }
0097:
0098:            public String getView() {
0099:                return view_id != null ? view_id.toString() : "null";
0100:            }
0101:
0102:            public int getNumberOfViews() {
0103:                return num_views;
0104:            }
0105:
0106:            public String getLocalAddress() {
0107:                return local_addr != null ? local_addr.toString() : "null";
0108:            }
0109:
0110:            public String getMembers() {
0111:                return members != null ? members.toString() : "[]";
0112:            }
0113:
0114:            public int getNumMembers() {
0115:                return members != null ? members.size() : 0;
0116:            }
0117:
0118:            public long getJoinTimeout() {
0119:                return join_timeout;
0120:            }
0121:
0122:            public void setJoinTimeout(long t) {
0123:                join_timeout = t;
0124:            }
0125:
0126:            public long getJoinRetryTimeout() {
0127:                return join_retry_timeout;
0128:            }
0129:
0130:            public void setJoinRetryTimeout(long t) {
0131:                join_retry_timeout = t;
0132:            }
0133:
0134:            public boolean isShun() {
0135:                return shun;
0136:            }
0137:
0138:            public void setShun(boolean s) {
0139:                shun = s;
0140:            }
0141:
0142:            public String printPreviousMembers() {
0143:                StringBuffer sb = new StringBuffer();
0144:                if (prev_members != null) {
0145:                    for (Enumeration en = prev_members.elements(); en
0146:                            .hasMoreElements();) {
0147:                        sb.append(en.nextElement()).append("\n");
0148:                    }
0149:                }
0150:                return sb.toString();
0151:            }
0152:
0153:            public int viewHandlerSize() {
0154:                return view_handler.size();
0155:            }
0156:
0157:            public boolean isViewHandlerSuspended() {
0158:                return view_handler.suspended();
0159:            }
0160:
0161:            public String dumpViewHandlerQueue() {
0162:                return view_handler.dumpQueue();
0163:            }
0164:
0165:            public String dumpViewHandlerHistory() {
0166:                return view_handler.dumpHistory();
0167:            }
0168:
0169:            public void suspendViewHandler() {
0170:                view_handler.suspend(null);
0171:            }
0172:
0173:            public void resumeViewHandler() {
0174:                view_handler.resumeForce();
0175:            }
0176:
0177:            Log getLog() {
0178:                return log;
0179:            }
0180:
0181:            ViewHandler getViewHandler() {
0182:                return view_handler;
0183:            }
0184:
0185:            public String printPreviousViews() {
0186:                StringBuffer sb = new StringBuffer();
0187:                for (Enumeration en = prev_views.elements(); en
0188:                        .hasMoreElements();) {
0189:                    sb.append(en.nextElement()).append("\n");
0190:                }
0191:                return sb.toString();
0192:            }
0193:
0194:            public boolean isCoordinator() {
0195:                Address coord = determineCoordinator();
0196:                return coord != null && local_addr != null
0197:                        && local_addr.equals(coord);
0198:            }
0199:
0200:            public void resetStats() {
0201:                super .resetStats();
0202:                num_views = 0;
0203:                prev_views.removeAll();
0204:            }
0205:
0206:            public Vector requiredDownServices() {
0207:                Vector retval = new Vector(3);
0208:                retval.addElement(new Integer(Event.GET_DIGEST));
0209:                retval.addElement(new Integer(Event.SET_DIGEST));
0210:                retval.addElement(new Integer(Event.FIND_INITIAL_MBRS));
0211:                return retval;
0212:            }
0213:
0214:            public void setImpl(GmsImpl new_impl) {
0215:                synchronized (impl_mutex) {
0216:                    if (impl == new_impl) // superfluous
0217:                        return;
0218:                    impl = new_impl;
0219:                    if (log.isDebugEnabled()) {
0220:                        String msg = (local_addr != null ? local_addr
0221:                                .toString()
0222:                                + " " : "")
0223:                                + "changed role to "
0224:                                + new_impl.getClass().getName();
0225:                        log.debug(msg);
0226:                    }
0227:                }
0228:            }
0229:
0230:            public GmsImpl getImpl() {
0231:                return impl;
0232:            }
0233:
0234:            public void init() throws Exception {
0235:                prev_members = new BoundedList(num_prev_mbrs);
0236:                timer = stack != null ? stack.timer : null;
0237:                if (timer == null)
0238:                    throw new Exception("GMS.init(): timer is null");
0239:                if (impl != null)
0240:                    impl.init();
0241:            }
0242:
0243:            public void start() throws Exception {
0244:                if (impl != null)
0245:                    impl.start();
0246:                if (!flushProtocolInStack && use_flush) {
0247:                    log
0248:                            .warn("use_flush is true, however, FLUSH protocol not found in stack.");
0249:                    use_flush = false;
0250:                }
0251:            }
0252:
0253:            public void stop() {
0254:                view_handler.stop(true);
0255:                if (impl != null)
0256:                    impl.stop();
0257:                if (prev_members != null)
0258:                    prev_members.removeAll();
0259:            }
0260:
0261:            public void becomeCoordinator() {
0262:                CoordGmsImpl tmp = (CoordGmsImpl) impls.get(COORD);
0263:                if (tmp == null) {
0264:                    tmp = new CoordGmsImpl(this );
0265:                    impls.put(COORD, tmp);
0266:                }
0267:                try {
0268:                    tmp.init();
0269:                } catch (Exception e) {
0270:                    log.error("exception switching to coordinator role", e);
0271:                }
0272:                setImpl(tmp);
0273:            }
0274:
0275:            public void becomeParticipant() {
0276:                ParticipantGmsImpl tmp = (ParticipantGmsImpl) impls.get(PART);
0277:
0278:                if (tmp == null) {
0279:                    tmp = new ParticipantGmsImpl(this );
0280:                    impls.put(PART, tmp);
0281:                }
0282:                try {
0283:                    tmp.init();
0284:                } catch (Exception e) {
0285:                    log.error("exception switching to participant", e);
0286:                }
0287:                setImpl(tmp);
0288:            }
0289:
0290:            public void becomeClient() {
0291:                ClientGmsImpl tmp = (ClientGmsImpl) impls.get(CLIENT);
0292:                if (tmp == null) {
0293:                    tmp = new ClientGmsImpl(this );
0294:                    impls.put(CLIENT, tmp);
0295:                }
0296:                try {
0297:                    tmp.init();
0298:                } catch (Exception e) {
0299:                    log.error("exception switching to client role", e);
0300:                }
0301:                setImpl(tmp);
0302:            }
0303:
0304:            boolean haveCoordinatorRole() {
0305:                return impl != null && impl instanceof  CoordGmsImpl;
0306:            }
0307:
0308:            /**
0309:             * Computes the next view. Returns a copy that has <code>old_mbrs</code> and
0310:             * <code>suspected_mbrs</code> removed and <code>new_mbrs</code> added.
0311:             */
0312:            public View getNextView(Collection new_mbrs, Collection old_mbrs,
0313:                    Collection suspected_mbrs) {
0314:                Vector mbrs;
0315:                long vid;
0316:                View v;
0317:                Membership tmp_mbrs;
0318:                Address tmp_mbr;
0319:
0320:                synchronized (members) {
0321:                    if (view_id == null) {
0322:                        log.error("view_id is null");
0323:                        return null; // this should *never* happen !
0324:                    }
0325:                    vid = Math.max(view_id.getId(), ltime) + 1;
0326:                    ltime = vid;
0327:                    tmp_mbrs = tmp_members.copy(); // always operate on the temporary membership
0328:                    tmp_mbrs.remove(suspected_mbrs);
0329:                    tmp_mbrs.remove(old_mbrs);
0330:                    tmp_mbrs.add(new_mbrs);
0331:                    mbrs = tmp_mbrs.getMembers();
0332:                    v = new View(local_addr, vid, mbrs);
0333:
0334:                    // Update membership (see DESIGN for explanation):
0335:                    tmp_members.set(mbrs);
0336:
0337:                    // Update joining list (see DESIGN for explanation)
0338:                    if (new_mbrs != null) {
0339:                        for (Iterator it = new_mbrs.iterator(); it.hasNext();) {
0340:                            tmp_mbr = (Address) it.next();
0341:                            if (!joining.contains(tmp_mbr))
0342:                                joining.addElement(tmp_mbr);
0343:                        }
0344:                    }
0345:
0346:                    // Update leaving list (see DESIGN for explanations)
0347:                    if (old_mbrs != null) {
0348:                        for (Iterator it = old_mbrs.iterator(); it.hasNext();) {
0349:                            Address addr = (Address) it.next();
0350:                            if (!leaving.contains(addr))
0351:                                leaving.add(addr);
0352:                        }
0353:                    }
0354:                    if (suspected_mbrs != null) {
0355:                        for (Iterator it = suspected_mbrs.iterator(); it
0356:                                .hasNext();) {
0357:                            Address addr = (Address) it.next();
0358:                            if (!leaving.contains(addr))
0359:                                leaving.add(addr);
0360:                        }
0361:                    }
0362:                    return v;
0363:                }
0364:            }
0365:
0366:            /**
0367:             Compute a new view, given the current view, the new members and the suspected/left
0368:             members. Then simply mcast the view to all members. This is different to the VS GMS protocol,
0369:             in which we run a FLUSH protocol which tries to achive consensus on the set of messages mcast in
0370:             the current view before proceeding to install the next view.
0371:
0372:             The members for the new view are computed as follows:
0373:             <pre>
0374:                           existing          leaving        suspected          joining
0375:
0376:             1. new_view      y                 n               n                 y
0377:             2. tmp_view      y                 y               n                 y
0378:             (view_dest)
0379:             </pre>
0380:
0381:             <ol>
0382:             <li>
0383:             The new view to be installed includes the existing members plus the joining ones and
0384:             excludes the leaving and suspected members.
0385:             <li>
0386:             A temporary view is sent down the stack as an <em>event</em>. This allows the bottom layer
0387:             (e.g. UDP or TCP) to determine the members to which to send a multicast message. Compared
0388:             to the new view, leaving members are <em>included</em> since they have are waiting for a
0389:             view in which they are not members any longer before they leave. So, if we did not set a
0390:             temporary view, joining members would not receive the view (signalling that they have been
0391:             joined successfully). The temporary view is essentially the current view plus the joining
0392:             members (old members are still part of the current view).
0393:             </ol>
0394:             */
0395:            public void castViewChange(Vector new_mbrs, Vector old_mbrs,
0396:                    Vector suspected_mbrs) {
0397:                View new_view;
0398:
0399:                // next view: current mbrs + new_mbrs - old_mbrs - suspected_mbrs
0400:                new_view = getNextView(new_mbrs, old_mbrs, suspected_mbrs);
0401:                castViewChange(new_view, null);
0402:            }
0403:
0404:            public void castViewChange(View new_view, Digest digest) {
0405:                castViewChangeWithDest(new_view, digest, null);
0406:            }
0407:
0408:            /**
0409:             * Broadcasts the new view and digest, and waits for acks from all members in the list given as argument.
0410:             * If the list is null, we take the members who are part of new_view
0411:             * @param new_view
0412:             * @param digest
0413:             * @param members
0414:             */
0415:            public void castViewChangeWithDest(View new_view, Digest digest,
0416:                    java.util.List members) {
0417:                Message view_change_msg;
0418:                GmsHeader hdr;
0419:                long start, stop;
0420:                ViewId vid = new_view.getVid();
0421:                int size = -1;
0422:
0423:                if (members == null || members.size() == 0)
0424:                    members = new_view.getMembers();
0425:
0426:                if (log.isTraceEnabled())
0427:                    log.trace("mcasting view {" + new_view + "} ("
0428:                            + new_view.size() + " mbrs)\n");
0429:
0430:                start = System.currentTimeMillis();
0431:                view_change_msg = new Message(); // bcast to all members
0432:                hdr = new GmsHeader(GmsHeader.VIEW, new_view);
0433:                hdr.my_digest = digest;
0434:                view_change_msg.putHeader(name, hdr);
0435:
0436:                ack_collector.reset(vid, members);
0437:                size = ack_collector.size();
0438:                passDown(new Event(Event.MSG, view_change_msg));
0439:
0440:                try {
0441:                    ack_collector.waitForAllAcks(view_ack_collection_timeout);
0442:                    stop = System.currentTimeMillis();
0443:                    if (log.isTraceEnabled())
0444:                        log.trace("received all ACKs (" + size + ") for " + vid
0445:                                + " in " + (stop - start) + "ms");
0446:                } catch (TimeoutException e) {
0447:                    log.warn("failed to collect all ACKs (" + size
0448:                            + ") for view " + new_view + " after "
0449:                            + view_ack_collection_timeout
0450:                            + "ms, missing ACKs from "
0451:                            + ack_collector.printMissing() + " (received="
0452:                            + ack_collector.printReceived() + "), local_addr="
0453:                            + local_addr);
0454:                }
0455:            }
0456:
0457:            /**
0458:             * Sets the new view and sends a VIEW_CHANGE event up and down the stack. If the view is a MergeView (subclass
0459:             * of View), then digest will be non-null and has to be set before installing the view.
0460:             */
0461:            public void installView(View new_view, Digest digest) {
0462:                if (digest != null)
0463:                    mergeDigest(digest);
0464:                installView(new_view);
0465:            }
0466:
0467:            /**
0468:             * Sets the new view and sends a VIEW_CHANGE event up and down the stack.
0469:             */
0470:            public void installView(View new_view) {
0471:                Address coord;
0472:                int rc;
0473:                ViewId vid = new_view.getVid();
0474:                Vector mbrs = new_view.getMembers();
0475:
0476:                if (log.isDebugEnabled())
0477:                    log.debug("[local_addr=" + local_addr + "] view is "
0478:                            + new_view);
0479:                if (stats) {
0480:                    num_views++;
0481:                    prev_views.add(new_view);
0482:                }
0483:
0484:                ack_collector.handleView(new_view);
0485:
0486:                // Discards view with id lower than our own. Will be installed without check if first view
0487:                if (view_id != null) {
0488:                    rc = vid.compareTo(view_id);
0489:                    if (rc <= 0) {
0490:                        if (log.isTraceEnabled() && rc < 0) // only scream if view is smaller, silently discard same views
0491:                            log.trace("[" + local_addr
0492:                                    + "] received view < current view;"
0493:                                    + " discarding it (current vid: " + view_id
0494:                                    + ", new vid: " + vid + ')');
0495:                        return;
0496:                    }
0497:                }
0498:
0499:                ltime = Math.max(vid.getId(), ltime); // compute Lamport logical time
0500:
0501:                /* Check for self-inclusion: if I'm not part of the new membership, I just discard it.
0502:                This ensures that messages sent in view V1 are only received by members of V1 */
0503:                if (checkSelfInclusion(mbrs) == false) {
0504:                    // only shun if this member was previously part of the group. avoids problem where multiple
0505:                    // members (e.g. X,Y,Z) join {A,B} concurrently, X is joined first, and Y and Z get view
0506:                    // {A,B,X}, which would cause Y and Z to be shunned as they are not part of the membership
0507:                    // bela Nov 20 2003
0508:                    if (shun && local_addr != null
0509:                            && prev_members.contains(local_addr)) {
0510:                        if (log.isWarnEnabled())
0511:                            log
0512:                                    .warn("I ("
0513:                                            + local_addr
0514:                                            + ") am not a member of view "
0515:                                            + new_view
0516:                                            + ", shunning myself and leaving the group (prev_members are "
0517:                                            + prev_members
0518:                                            + ", current view is " + view + ")");
0519:                        if (impl != null)
0520:                            impl.handleExit();
0521:                        passUp(new Event(Event.EXIT));
0522:                    } else {
0523:                        if (log.isWarnEnabled())
0524:                            log.warn("I (" + local_addr
0525:                                    + ") am not a member of view " + new_view
0526:                                    + "; discarding view");
0527:                    }
0528:                    return;
0529:                }
0530:
0531:                synchronized (members) { // serialize access to views
0532:                    // assign new_view to view_id
0533:                    if (new_view instanceof  MergeView)
0534:                        view = new View(new_view.getVid(), new_view
0535:                                .getMembers());
0536:                    else
0537:                        view = new_view;
0538:                    view_id = vid.copy();
0539:
0540:                    // Set the membership. Take into account joining members
0541:                    if (mbrs != null && mbrs.size() > 0) {
0542:                        members.set(mbrs);
0543:                        tmp_members.set(members);
0544:                        joining.removeAll(mbrs); // remove all members in mbrs from joining
0545:                        // remove all elements from 'leaving' that are not in 'mbrs'
0546:                        leaving.retainAll(mbrs);
0547:
0548:                        tmp_members.add(joining); // add members that haven't yet shown up in the membership
0549:                        tmp_members.remove(leaving); // remove members that haven't yet been removed from the membership
0550:
0551:                        // add to prev_members
0552:                        for (Iterator it = mbrs.iterator(); it.hasNext();) {
0553:                            Address addr = (Address) it.next();
0554:                            if (!prev_members.contains(addr))
0555:                                prev_members.add(addr);
0556:                        }
0557:                    }
0558:
0559:                    // Send VIEW_CHANGE event up and down the stack:
0560:                    Event view_event = new Event(Event.VIEW_CHANGE, new_view
0561:                            .clone());
0562:                    // changed order of passing view up and down (http://jira.jboss.com/jira/browse/JGRP-347)
0563:                    passUp(view_event);
0564:                    passDown(view_event); // needed e.g. by failure detector or UDP
0565:
0566:                    coord = determineCoordinator();
0567:                    // if(coord != null && coord.equals(local_addr) && !(coord.equals(vid.getCoordAddress()))) {
0568:                    // changed on suggestion by yaronr and Nicolas Piedeloupe
0569:                    if (coord != null && coord.equals(local_addr)
0570:                            && !haveCoordinatorRole()) {
0571:                        becomeCoordinator();
0572:                    } else {
0573:                        if (haveCoordinatorRole() && !local_addr.equals(coord))
0574:                            becomeParticipant();
0575:                    }
0576:                }
0577:            }
0578:
0579:            protected Address determineCoordinator() {
0580:                synchronized (members) {
0581:                    return members != null && members.size() > 0 ? (Address) members
0582:                            .elementAt(0)
0583:                            : null;
0584:                }
0585:            }
0586:
0587:            /** Checks whether the potential_new_coord would be the new coordinator (2nd in line) */
0588:            protected boolean wouldBeNewCoordinator(Address potential_new_coord) {
0589:                Address new_coord;
0590:
0591:                if (potential_new_coord == null)
0592:                    return false;
0593:
0594:                synchronized (members) {
0595:                    if (members.size() < 2)
0596:                        return false;
0597:                    new_coord = (Address) members.elementAt(1); // member at 2nd place
0598:                    return new_coord != null
0599:                            && new_coord.equals(potential_new_coord);
0600:                }
0601:            }
0602:
0603:            /** Returns true if local_addr is member of mbrs, else false */
0604:            protected boolean checkSelfInclusion(Vector mbrs) {
0605:                Object mbr;
0606:                if (mbrs == null)
0607:                    return false;
0608:                for (int i = 0; i < mbrs.size(); i++) {
0609:                    mbr = mbrs.elementAt(i);
0610:                    if (mbr != null && local_addr.equals(mbr))
0611:                        return true;
0612:                }
0613:                return false;
0614:            }
0615:
0616:            public View makeView(Vector mbrs) {
0617:                Address coord = null;
0618:                long id = 0;
0619:
0620:                if (view_id != null) {
0621:                    coord = view_id.getCoordAddress();
0622:                    id = view_id.getId();
0623:                }
0624:                return new View(coord, id, mbrs);
0625:            }
0626:
0627:            public View makeView(Vector mbrs, ViewId vid) {
0628:                Address coord = null;
0629:                long id = 0;
0630:
0631:                if (vid != null) {
0632:                    coord = vid.getCoordAddress();
0633:                    id = vid.getId();
0634:                }
0635:                return new View(coord, id, mbrs);
0636:            }
0637:
0638:            /** Send down a SET_DIGEST event */
0639:            public void setDigest(Digest d) {
0640:                passDown(new Event(Event.SET_DIGEST, d));
0641:            }
0642:
0643:            /** Send down a MERGE_DIGEST event */
0644:            public void mergeDigest(Digest d) {
0645:                passDown(new Event(Event.MERGE_DIGEST, d));
0646:            }
0647:
0648:            /** Sends down a GET_DIGEST event and waits for the GET_DIGEST_OK response, or
0649:             timeout, whichever occurs first */
0650:            public Digest getDigest() {
0651:                Digest ret = null;
0652:
0653:                digest_promise.reset();
0654:                passDown(Event.GET_DIGEST_EVT);
0655:                try {
0656:                    ret = (Digest) digest_promise
0657:                            .getResultWithTimeout(digest_timeout);
0658:                } catch (TimeoutException e) {
0659:                    if (log.isErrorEnabled())
0660:                        log.error("digest could not be fetched from below");
0661:                }
0662:                return ret;
0663:            }
0664:
0665:            boolean startFlush(View new_view, int numberOfAttempts) {
0666:                boolean successfulFlush = false;
0667:                Vector membersInNewView = new_view.getMembers();
0668:                if (membersInNewView == null || membersInNewView.isEmpty()) {
0669:                    //there are no members to FLUSH
0670:                    successfulFlush = true;
0671:                } else {
0672:                    flush_promise.reset();
0673:                    passUp(new Event(Event.SUSPEND, new_view));
0674:                    try {
0675:                        Boolean r = (Boolean) flush_promise
0676:                                .getResultWithTimeout(flush_timeout);
0677:                        successfulFlush = r.booleanValue();
0678:                    } catch (TimeoutException e) {
0679:                        log
0680:                                .warn("GMS coordinator "
0681:                                        + local_addr
0682:                                        + " timed out waiting for flush responses after "
0683:                                        + flush_timeout + " msec");
0684:                    }
0685:
0686:                    if (!successfulFlush && numberOfAttempts > 0) {
0687:
0688:                        long backOffSleepTime = Util.random(5000);
0689:                        if (log.isInfoEnabled())
0690:                            log
0691:                                    .info("Flush in progress detected at GMS coordinator "
0692:                                            + local_addr
0693:                                            + ". Backing off for "
0694:                                            + backOffSleepTime
0695:                                            + " ms. Attempts left "
0696:                                            + numberOfAttempts);
0697:
0698:                        Util.sleepRandom(backOffSleepTime);
0699:                        successfulFlush = startFlush(new_view,
0700:                                --numberOfAttempts);
0701:                    }
0702:                }
0703:                return successfulFlush;
0704:            }
0705:
0706:            void stopFlush(View view) {
0707:
0708:                //since we did not call startFlush on
0709:                //empty view do not call RESUME either 
0710:                if (view != null && view.getMembers().isEmpty())
0711:                    return;
0712:
0713:                if (log.isDebugEnabled()) {
0714:                    log.debug("sending RESUME event");
0715:                }
0716:                passUp(new Event(Event.RESUME));
0717:            }
0718:
0719:            public void up(Event evt) {
0720:                Object obj;
0721:                Message msg;
0722:                GmsHeader hdr;
0723:                MergeData merge_data;
0724:
0725:                switch (evt.getType()) {
0726:
0727:                case Event.MSG:
0728:                    msg = (Message) evt.getArg();
0729:                    obj = msg.getHeader(name);
0730:                    if (obj == null || !(obj instanceof  GmsHeader))
0731:                        break;
0732:                    hdr = (GmsHeader) msg.removeHeader(name);
0733:                    switch (hdr.type) {
0734:                    case GmsHeader.JOIN_REQ:
0735:                        view_handler.add(new Request(Request.JOIN, hdr.mbr,
0736:                                false, null));
0737:                        break;
0738:                    case GmsHeader.JOIN_RSP:
0739:                        impl.handleJoinResponse(hdr.join_rsp);
0740:                        break;
0741:                    case GmsHeader.LEAVE_REQ:
0742:                        if (log.isDebugEnabled())
0743:                            log.debug("received LEAVE_REQ for " + hdr.mbr
0744:                                    + " from " + msg.getSrc());
0745:                        if (hdr.mbr == null) {
0746:                            if (log.isErrorEnabled())
0747:                                log.error("LEAVE_REQ's mbr field is null");
0748:                            return;
0749:                        }
0750:                        view_handler.add(new Request(Request.LEAVE, hdr.mbr,
0751:                                false, null));
0752:                        break;
0753:                    case GmsHeader.LEAVE_RSP:
0754:                        impl.handleLeaveResponse();
0755:                        break;
0756:                    case GmsHeader.VIEW:
0757:                        if (hdr.view == null) {
0758:                            if (log.isErrorEnabled())
0759:                                log.error("[VIEW]: view == null");
0760:                            return;
0761:                        }
0762:
0763:                        // send VIEW_ACK to sender of view
0764:                        Address coord = msg.getSrc();
0765:                        Message view_ack = new Message(coord, null, null);
0766:                        GmsHeader tmphdr = new GmsHeader(GmsHeader.VIEW_ACK,
0767:                                hdr.view);
0768:                        view_ack.putHeader(name, tmphdr);
0769:                        if (log.isTraceEnabled())
0770:                            log.trace("sending VIEW_ACK to " + coord);
0771:                        passDown(new Event(Event.MSG, view_ack));
0772:                        impl.handleViewChange(hdr.view, hdr.my_digest);
0773:                        break;
0774:
0775:                    case GmsHeader.VIEW_ACK:
0776:                        Object sender = msg.getSrc();
0777:                        ack_collector.ack(sender);
0778:                        return; // don't pass further up
0779:
0780:                    case GmsHeader.MERGE_REQ:
0781:                        impl.handleMergeRequest(msg.getSrc(), hdr.merge_id);
0782:                        break;
0783:
0784:                    case GmsHeader.MERGE_RSP:
0785:                        merge_data = new MergeData(msg.getSrc(), hdr.view,
0786:                                hdr.my_digest);
0787:                        merge_data.merge_rejected = hdr.merge_rejected;
0788:                        impl.handleMergeResponse(merge_data, hdr.merge_id);
0789:                        break;
0790:
0791:                    case GmsHeader.INSTALL_MERGE_VIEW:
0792:                        impl.handleMergeView(new MergeData(msg.getSrc(),
0793:                                hdr.view, hdr.my_digest), hdr.merge_id);
0794:                        break;
0795:
0796:                    case GmsHeader.CANCEL_MERGE:
0797:                        impl.handleMergeCancelled(hdr.merge_id);
0798:                        break;
0799:
0800:                    default:
0801:                        if (log.isErrorEnabled())
0802:                            log.error("GmsHeader with type=" + hdr.type
0803:                                    + " not known");
0804:                    }
0805:                    return; // don't pass up
0806:
0807:                case Event.CONNECT_OK: // sent by someone else, but WE are responsible for sending this !
0808:                case Event.DISCONNECT_OK: // dito (e.g. sent by TP layer). Don't send up the stack
0809:                    return;
0810:
0811:                case Event.SET_LOCAL_ADDRESS:
0812:                    local_addr = (Address) evt.getArg();
0813:                    if (print_local_addr) {
0814:                        System.out
0815:                                .println("\n-------------------------------------------------------\n"
0816:                                        + "GMS: address is "
0817:                                        + local_addr
0818:                                        + "\n-------------------------------------------------------");
0819:                    }
0820:                    break; // pass up
0821:
0822:                case Event.SUSPECT:
0823:                    Address suspected = (Address) evt.getArg();
0824:                    view_handler.add(new Request(Request.SUSPECT, suspected,
0825:                            true, null));
0826:                    ack_collector.suspect(suspected);
0827:                    break; // pass up
0828:
0829:                case Event.UNSUSPECT:
0830:                    impl.unsuspect((Address) evt.getArg());
0831:                    return; // discard
0832:
0833:                case Event.MERGE:
0834:                    view_handler.add(new Request(Request.MERGE, null, false,
0835:                            (Vector) evt.getArg()));
0836:                    return; // don't pass up
0837:                }
0838:
0839:                if (impl.handleUpEvent(evt))
0840:                    passUp(evt);
0841:            }
0842:
0843:            /**
0844:             This method is overridden to avoid hanging on getDigest(): when a JOIN is received, the coordinator needs
0845:             to retrieve the digest from the NAKACK layer. It therefore sends down a GET_DIGEST event, to which the NAKACK layer
0846:             responds with a GET_DIGEST_OK event.<p>
0847:             However, the GET_DIGEST_OK event will not be processed because the thread handling the JOIN request won't process
0848:             the GET_DIGEST_OK event until the JOIN event returns. The receiveUpEvent() method is executed by the up-handler
0849:             thread of the lower protocol and therefore can handle the event. All we do here is unblock the mutex on which
0850:             JOIN is waiting, allowing JOIN to return with a valid digest. The GET_DIGEST_OK event is then discarded, because
0851:             it won't be processed twice.
0852:             */
0853:            public void receiveUpEvent(Event evt) {
0854:                switch (evt.getType()) {
0855:                case Event.GET_DIGEST_OK:
0856:                    digest_promise.setResult(evt.getArg());
0857:                    return; // don't pass further up
0858:                }
0859:                super .receiveUpEvent(evt);
0860:            }
0861:
0862:            public void down(Event evt) {
0863:                switch (evt.getType()) {
0864:
0865:                case Event.CONNECT:
0866:                    Object arg = null;
0867:                    passDown(evt);
0868:                    if (local_addr == null)
0869:                        if (log.isFatalEnabled())
0870:                            log.fatal("[CONNECT] local_addr is null");
0871:                    try {
0872:                        impl.join(local_addr);
0873:                    } catch (SecurityException e) {
0874:                        arg = e;
0875:                    }
0876:                    passUp(new Event(Event.CONNECT_OK, arg));
0877:                    return; // don't pass down: was already passed down
0878:
0879:                case Event.DISCONNECT:
0880:                    impl.leave((Address) evt.getArg());
0881:                    if (!(impl instanceof  CoordGmsImpl)) {
0882:                        passUp(new Event(Event.DISCONNECT_OK));
0883:                        initState(); // in case connect() is called again
0884:                    }
0885:                    break; // pass down
0886:                case Event.SUSPEND_OK:
0887:                    flush_promise.setResult(Boolean.TRUE);
0888:                    break;
0889:
0890:                case Event.SUSPEND_FAILED:
0891:                    flush_promise.setResult(Boolean.FALSE);
0892:                    break;
0893:
0894:                case Event.CONFIG:
0895:                    Map config = (Map) evt.getArg();
0896:                    if (config != null && config.containsKey("flush_timeout")) {
0897:                        Long ftimeout = (Long) config.get("flush_timeout");
0898:                        use_flush = true;
0899:                        flush_timeout = ftimeout.longValue();
0900:                    }
0901:                    if ((config != null && !config
0902:                            .containsKey("flush_suported"))) {
0903:                        flushProtocolInStack = true;
0904:                    }
0905:                    break;
0906:                }
0907:
0908:                passDown(evt);
0909:            }
0910:
0911:            /** Setup the Protocol instance according to the configuration string */
0912:            public boolean setProperties(Properties props) {
0913:                String str;
0914:
0915:                super .setProperties(props);
0916:                str = props.getProperty("shun");
0917:                if (str != null) {
0918:                    shun = Boolean.valueOf(str).booleanValue();
0919:                    props.remove("shun");
0920:                }
0921:
0922:                str = props.getProperty("merge_leader");
0923:                if (str != null) {
0924:                    merge_leader = Boolean.valueOf(str).booleanValue();
0925:                    props.remove("merge_leader");
0926:                }
0927:
0928:                str = props.getProperty("print_local_addr");
0929:                if (str != null) {
0930:                    print_local_addr = Boolean.valueOf(str).booleanValue();
0931:                    props.remove("print_local_addr");
0932:                }
0933:
0934:                str = props.getProperty("join_timeout"); // time to wait for JOIN
0935:                if (str != null) {
0936:                    join_timeout = Long.parseLong(str);
0937:                    props.remove("join_timeout");
0938:                }
0939:
0940:                str = props.getProperty("join_retry_timeout"); // time to wait between JOINs
0941:                if (str != null) {
0942:                    join_retry_timeout = Long.parseLong(str);
0943:                    props.remove("join_retry_timeout");
0944:                }
0945:
0946:                str = props.getProperty("leave_timeout"); // time to wait until coord responds to LEAVE req.
0947:                if (str != null) {
0948:                    leave_timeout = Long.parseLong(str);
0949:                    props.remove("leave_timeout");
0950:                }
0951:
0952:                str = props.getProperty("merge_timeout"); // time to wait for MERGE_RSPS from subgroup coordinators
0953:                if (str != null) {
0954:                    merge_timeout = Long.parseLong(str);
0955:                    props.remove("merge_timeout");
0956:                }
0957:
0958:                str = props.getProperty("digest_timeout"); // time to wait for GET_DIGEST_OK from PBCAST
0959:                if (str != null) {
0960:                    digest_timeout = Long.parseLong(str);
0961:                    props.remove("digest_timeout");
0962:                }
0963:
0964:                str = props.getProperty("view_ack_collection_timeout");
0965:                if (str != null) {
0966:                    view_ack_collection_timeout = Long.parseLong(str);
0967:                    props.remove("view_ack_collection_timeout");
0968:                }
0969:
0970:                str = props.getProperty("resume_task_timeout");
0971:                if (str != null) {
0972:                    resume_task_timeout = Long.parseLong(str);
0973:                    props.remove("resume_task_timeout");
0974:                }
0975:
0976:                str = props.getProperty("disable_initial_coord");
0977:                if (str != null) {
0978:                    disable_initial_coord = Boolean.valueOf(str).booleanValue();
0979:                    props.remove("disable_initial_coord");
0980:                }
0981:
0982:                str = props.getProperty("handle_concurrent_startup");
0983:                if (str != null) {
0984:                    handle_concurrent_startup = Boolean.valueOf(str)
0985:                            .booleanValue();
0986:                    props.remove("handle_concurrent_startup");
0987:                }
0988:
0989:                str = props.getProperty("num_prev_mbrs");
0990:                if (str != null) {
0991:                    num_prev_mbrs = Integer.parseInt(str);
0992:                    props.remove("num_prev_mbrs");
0993:                }
0994:
0995:                str = props.getProperty("use_flush");
0996:                if (str != null) {
0997:                    use_flush = Boolean.valueOf(str).booleanValue();
0998:                    props.remove("use_flush");
0999:                }
1000:
1001:                str = props.getProperty("flush_timeout");
1002:                if (str != null) {
1003:                    flush_timeout = Long.parseLong(str);
1004:                    props.remove("flush_timeout");
1005:                }
1006:
1007:                str = props.getProperty("view_bundling");
1008:                if (str != null) {
1009:                    view_bundling = Boolean.valueOf(str).booleanValue();
1010:                    props.remove("view_bundling");
1011:                }
1012:
1013:                str = props.getProperty("max_bundling_time");
1014:                if (str != null) {
1015:                    max_bundling_time = Long.parseLong(str);
1016:                    props.remove("max_bundling_time");
1017:                }
1018:
1019:                if (props.size() > 0) {
1020:                    log.error("the following properties are not recognized: "
1021:                            + props);
1022:                    return false;
1023:                }
1024:                return true;
1025:            }
1026:
1027:            /* ------------------------------- Private Methods --------------------------------- */
1028:
1029:            final void initState() {
1030:                becomeClient();
1031:                view_id = null;
1032:                view = null;
1033:            }
1034:
1035:            /* --------------------------- End of Private Methods ------------------------------- */
1036:
1037:            public static class GmsHeader extends Header implements  Streamable {
1038:                public static final byte JOIN_REQ = 1;
1039:                public static final byte JOIN_RSP = 2;
1040:                public static final byte LEAVE_REQ = 3;
1041:                public static final byte LEAVE_RSP = 4;
1042:                public static final byte VIEW = 5;
1043:                public static final byte MERGE_REQ = 6;
1044:                public static final byte MERGE_RSP = 7;
1045:                public static final byte INSTALL_MERGE_VIEW = 8;
1046:                public static final byte CANCEL_MERGE = 9;
1047:                public static final byte VIEW_ACK = 10;
1048:
1049:                byte type = 0;
1050:                View view = null; // used when type=VIEW or MERGE_RSP or INSTALL_MERGE_VIEW
1051:                Address mbr = null; // used when type=JOIN_REQ or LEAVE_REQ
1052:                JoinRsp join_rsp = null; // used when type=JOIN_RSP
1053:                Digest my_digest = null; // used when type=MERGE_RSP or INSTALL_MERGE_VIEW
1054:                ViewId merge_id = null; // used when type=MERGE_REQ or MERGE_RSP or INSTALL_MERGE_VIEW or CANCEL_MERGE
1055:                boolean merge_rejected = false; // used when type=MERGE_RSP
1056:
1057:                public GmsHeader() {
1058:                } // used for Externalization
1059:
1060:                public GmsHeader(byte type) {
1061:                    this .type = type;
1062:                }
1063:
1064:                /** Used for VIEW header */
1065:                public GmsHeader(byte type, View view) {
1066:                    this .type = type;
1067:                    this .view = view;
1068:                }
1069:
1070:                /** Used for JOIN_REQ or LEAVE_REQ header */
1071:                public GmsHeader(byte type, Address mbr) {
1072:                    this .type = type;
1073:                    this .mbr = mbr;
1074:                }
1075:
1076:                /** Used for JOIN_RSP header */
1077:                public GmsHeader(byte type, JoinRsp join_rsp) {
1078:                    this .type = type;
1079:                    this .join_rsp = join_rsp;
1080:                }
1081:
1082:                public byte getType() {
1083:                    return type;
1084:                }
1085:
1086:                public Address getMemeber() {
1087:                    return mbr;
1088:                }
1089:
1090:                public String toString() {
1091:                    StringBuffer sb = new StringBuffer("GmsHeader");
1092:                    sb.append('[' + type2String(type) + ']');
1093:                    switch (type) {
1094:                    case JOIN_REQ:
1095:                        sb.append(": mbr=" + mbr);
1096:                        break;
1097:
1098:                    case JOIN_RSP:
1099:                        sb.append(": join_rsp=" + join_rsp);
1100:                        break;
1101:
1102:                    case LEAVE_REQ:
1103:                        sb.append(": mbr=" + mbr);
1104:                        break;
1105:
1106:                    case LEAVE_RSP:
1107:                        break;
1108:
1109:                    case VIEW:
1110:                    case VIEW_ACK:
1111:                        sb.append(": view=" + view);
1112:                        break;
1113:
1114:                    case MERGE_REQ:
1115:                        sb.append(": merge_id=" + merge_id);
1116:                        break;
1117:
1118:                    case MERGE_RSP:
1119:                        sb.append(": view=" + view + ", digest=" + my_digest
1120:                                + ", merge_rejected=" + merge_rejected
1121:                                + ", merge_id=" + merge_id);
1122:                        break;
1123:
1124:                    case INSTALL_MERGE_VIEW:
1125:                        sb.append(": view=" + view + ", digest=" + my_digest);
1126:                        break;
1127:
1128:                    case CANCEL_MERGE:
1129:                        sb.append(", <merge cancelled>, merge_id=" + merge_id);
1130:                        break;
1131:                    }
1132:                    return sb.toString();
1133:                }
1134:
1135:                public static String type2String(int type) {
1136:                    switch (type) {
1137:                    case JOIN_REQ:
1138:                        return "JOIN_REQ";
1139:                    case JOIN_RSP:
1140:                        return "JOIN_RSP";
1141:                    case LEAVE_REQ:
1142:                        return "LEAVE_REQ";
1143:                    case LEAVE_RSP:
1144:                        return "LEAVE_RSP";
1145:                    case VIEW:
1146:                        return "VIEW";
1147:                    case MERGE_REQ:
1148:                        return "MERGE_REQ";
1149:                    case MERGE_RSP:
1150:                        return "MERGE_RSP";
1151:                    case INSTALL_MERGE_VIEW:
1152:                        return "INSTALL_MERGE_VIEW";
1153:                    case CANCEL_MERGE:
1154:                        return "CANCEL_MERGE";
1155:                    case VIEW_ACK:
1156:                        return "VIEW_ACK";
1157:                    default:
1158:                        return "<unknown>";
1159:                    }
1160:                }
1161:
1162:                public void writeExternal(ObjectOutput out) throws IOException {
1163:                    out.writeByte(type);
1164:                    out.writeObject(view);
1165:                    out.writeObject(mbr);
1166:                    out.writeObject(join_rsp);
1167:                    out.writeObject(my_digest);
1168:                    out.writeObject(merge_id);
1169:                    out.writeBoolean(merge_rejected);
1170:                }
1171:
1172:                public void readExternal(ObjectInput in) throws IOException,
1173:                        ClassNotFoundException {
1174:                    type = in.readByte();
1175:                    view = (View) in.readObject();
1176:                    mbr = (Address) in.readObject();
1177:                    join_rsp = (JoinRsp) in.readObject();
1178:                    my_digest = (Digest) in.readObject();
1179:                    merge_id = (ViewId) in.readObject();
1180:                    merge_rejected = in.readBoolean();
1181:                }
1182:
1183:                public void writeTo(DataOutputStream out) throws IOException {
1184:                    out.writeByte(type);
1185:                    boolean isMergeView = view != null
1186:                            && view instanceof  MergeView;
1187:                    out.writeBoolean(isMergeView);
1188:                    Util.writeStreamable(view, out);
1189:                    Util.writeAddress(mbr, out);
1190:                    Util.writeStreamable(join_rsp, out);
1191:                    Util.writeStreamable(my_digest, out);
1192:                    Util.writeStreamable(merge_id, out); // kludge: we know merge_id is a ViewId
1193:                    out.writeBoolean(merge_rejected);
1194:                }
1195:
1196:                public void readFrom(DataInputStream in) throws IOException,
1197:                        IllegalAccessException, InstantiationException {
1198:                    type = in.readByte();
1199:                    boolean isMergeView = in.readBoolean();
1200:                    if (isMergeView)
1201:                        view = (View) Util.readStreamable(MergeView.class, in);
1202:                    else
1203:                        view = (View) Util.readStreamable(View.class, in);
1204:                    mbr = Util.readAddress(in);
1205:                    join_rsp = (JoinRsp) Util.readStreamable(JoinRsp.class, in);
1206:                    my_digest = (Digest) Util.readStreamable(Digest.class, in);
1207:                    merge_id = (ViewId) Util.readStreamable(ViewId.class, in);
1208:                    merge_rejected = in.readBoolean();
1209:                }
1210:
1211:                public long size() {
1212:                    long retval = Global.BYTE_SIZE * 2; // type + merge_rejected
1213:
1214:                    retval += Global.BYTE_SIZE; // presence view
1215:                    retval += Global.BYTE_SIZE; // MergeView or View
1216:                    if (view != null)
1217:                        retval += view.serializedSize();
1218:
1219:                    retval += Util.size(mbr);
1220:
1221:                    retval += Global.BYTE_SIZE; // presence of join_rsp
1222:                    if (join_rsp != null)
1223:                        retval += join_rsp.serializedSize();
1224:
1225:                    retval += Global.BYTE_SIZE; // presence for my_digest
1226:                    if (my_digest != null)
1227:                        retval += my_digest.serializedSize();
1228:
1229:                    retval += Global.BYTE_SIZE; // presence for merge_id
1230:                    if (merge_id != null)
1231:                        retval += merge_id.serializedSize();
1232:                    return retval;
1233:                }
1234:
1235:            }
1236:
1237:            public static class Request {
1238:                static final int JOIN = 1;
1239:                static final int LEAVE = 2;
1240:                static final int SUSPECT = 3;
1241:                static final int MERGE = 4;
1242:                static final int VIEW = 5;
1243:
1244:                int type = -1;
1245:                Address mbr;
1246:                boolean suspected;
1247:                Vector coordinators;
1248:                View view;
1249:                Digest digest;
1250:                List target_members;
1251:
1252:                Request(int type) {
1253:                    this .type = type;
1254:                }
1255:
1256:                Request(int type, Address mbr, boolean suspected,
1257:                        Vector coordinators) {
1258:                    this .type = type;
1259:                    this .mbr = mbr;
1260:                    this .suspected = suspected;
1261:                    this .coordinators = coordinators;
1262:                }
1263:
1264:                public int getType() {
1265:                    return type;
1266:                }
1267:
1268:                public String toString() {
1269:                    switch (type) {
1270:                    case JOIN:
1271:                        return "JOIN(" + mbr + ")";
1272:                    case LEAVE:
1273:                        return "LEAVE(" + mbr + ", " + suspected + ")";
1274:                    case SUSPECT:
1275:                        return "SUSPECT(" + mbr + ")";
1276:                    case MERGE:
1277:                        return "MERGE(" + coordinators + ")";
1278:                    case VIEW:
1279:                        return "VIEW (" + view.getVid() + ")";
1280:                    }
1281:                    return "<invalid (type=" + type + ")";
1282:                }
1283:
1284:                /**
1285:                 * Specifies whether this request can be processed with other request simultaneously
1286:                 */
1287:                public boolean canBeProcessedTogether(Request other) {
1288:                    if (other == null)
1289:                        return false;
1290:                    int other_type = other.getType();
1291:                    return (type == JOIN || type == LEAVE || type == SUSPECT)
1292:                            && (other_type == JOIN || other_type == LEAVE || other_type == SUSPECT);
1293:                }
1294:            }
1295:
1296:            /**
1297:             * Class which processes JOIN, LEAVE and MERGE requests. Requests are queued and processed in FIFO order
1298:             * @author Bela Ban
1299:             * @version $Id: GMS.java,v 1.68.2.5 2007/04/27 08:03:55 belaban Exp $
1300:             */
1301:            class ViewHandler implements  Runnable {
1302:                volatile Thread thread;
1303:                Queue q = new Queue(); // Queue<Request>
1304:                boolean suspended = false;
1305:                final static long INTERVAL = 5000;
1306:                private static final long MAX_COMPLETION_TIME = 10000;
1307:                /** Maintains a list of the last 20 requests */
1308:                private final BoundedList history = new BoundedList(20);
1309:
1310:                /** Map<Object,TimeScheduler.CancellableTask>. Keeps track of Resumer tasks which have not fired yet */
1311:                private final Map resume_tasks = new HashMap();
1312:                private Object merge_id = null;
1313:
1314:                void add(Request req) {
1315:                    add(req, false, false);
1316:                }
1317:
1318:                synchronized void add(Request req, boolean at_head,
1319:                        boolean unsuspend) {
1320:                    if (suspended && !unsuspend) {
1321:                        log.warn("queue is suspended; request " + req
1322:                                + " is discarded");
1323:                        return;
1324:                    }
1325:                    start(unsuspend);
1326:                    try {
1327:                        if (at_head)
1328:                            q.addAtHead(req);
1329:                        else
1330:                            q.add(req);
1331:                        history.add(new Date() + ": " + req.toString());
1332:                    } catch (QueueClosedException e) {
1333:                        if (log.isTraceEnabled())
1334:                            log.trace("queue is closed; request " + req
1335:                                    + " is discarded");
1336:                    }
1337:                }
1338:
1339:                void waitUntilCompleted(long timeout) {
1340:                    waitUntilCompleted(timeout, false);
1341:                }
1342:
1343:                synchronized void waitUntilCompleted(long timeout,
1344:                        boolean resume) {
1345:                    if (thread != null) {
1346:                        try {
1347:                            thread.join(timeout);
1348:                        } catch (InterruptedException e) {
1349:                        }
1350:                    }
1351:                    if (resume)
1352:                        resumeForce();
1353:                }
1354:
1355:                /**
1356:                 * Waits until the current request has been processes, then clears the queue and discards new
1357:                 * requests from now on
1358:                 */
1359:                public synchronized void suspend(Object merge_id) {
1360:                    if (suspended)
1361:                        return;
1362:                    suspended = true;
1363:                    this .merge_id = merge_id;
1364:                    q.clear();
1365:                    waitUntilCompleted(MAX_COMPLETION_TIME);
1366:                    q.close(true);
1367:                    if (log.isTraceEnabled())
1368:                        log.trace("suspended ViewHandler");
1369:                    Resumer r = new Resumer(resume_task_timeout, merge_id,
1370:                            resume_tasks, this );
1371:                    resume_tasks.put(merge_id, r);
1372:                    timer.add(r);
1373:                }
1374:
1375:                public synchronized void resume(Object merge_id) {
1376:                    if (!suspended)
1377:                        return;
1378:                    boolean same_merge_id = this .merge_id != null
1379:                            && merge_id != null
1380:                            && this .merge_id.equals(merge_id);
1381:                    same_merge_id = same_merge_id
1382:                            || (this .merge_id == null && merge_id == null);
1383:
1384:                    if (!same_merge_id) {
1385:                        if (log.isWarnEnabled())
1386:                            log.warn("resume(" + merge_id + ") does not match "
1387:                                    + this .merge_id + ", ignoring resume()");
1388:                        return;
1389:                    }
1390:                    synchronized (resume_tasks) {
1391:                        TimeScheduler.CancellableTask task = (TimeScheduler.CancellableTask) resume_tasks
1392:                                .get(merge_id);
1393:                        if (task != null) {
1394:                            task.cancel();
1395:                            resume_tasks.remove(merge_id);
1396:                        }
1397:                    }
1398:                    resumeForce();
1399:                }
1400:
1401:                public synchronized void resumeForce() {
1402:                    if (q.closed())
1403:                        q.reset();
1404:                    suspended = false;
1405:                    if (log.isTraceEnabled())
1406:                        log.trace("resumed ViewHandler");
1407:                }
1408:
1409:                public void run() {
1410:                    long start, stop, wait_time;
1411:                    List requests = new LinkedList();
1412:                    while (Thread.currentThread().equals(thread)) {
1413:                        requests.clear();
1414:                        try {
1415:                            boolean keepGoing = false;
1416:                            start = System.currentTimeMillis();
1417:                            do {
1418:                                Request firstRequest = (Request) q
1419:                                        .remove(INTERVAL); // throws a TimeoutException if it runs into timeout
1420:                                requests.add(firstRequest);
1421:                                if (q.size() > 0) {
1422:                                    Request nextReq = (Request) q.peek();
1423:                                    keepGoing = view_bundling
1424:                                            && firstRequest
1425:                                                    .canBeProcessedTogether(nextReq);
1426:                                } else {
1427:                                    stop = System.currentTimeMillis();
1428:                                    wait_time = max_bundling_time
1429:                                            - (stop - start);
1430:                                    if (wait_time > 0)
1431:                                        Util.sleep(wait_time);
1432:                                    keepGoing = q.size() > 0;
1433:                                }
1434:                            } while (keepGoing);
1435:                            process(requests);
1436:                        } catch (QueueClosedException e) {
1437:                            break;
1438:                        } catch (TimeoutException e) {
1439:                            break;
1440:                        } catch (Throwable catchall) {
1441:                            Util.sleep(50);
1442:                        }
1443:                    }
1444:                }
1445:
1446:                public int size() {
1447:                    return q.size();
1448:                }
1449:
1450:                public boolean suspended() {
1451:                    return suspended;
1452:                }
1453:
1454:                public String dumpQueue() {
1455:                    StringBuffer sb = new StringBuffer();
1456:                    List v = q.values();
1457:                    for (Iterator it = v.iterator(); it.hasNext();) {
1458:                        sb.append(it.next() + "\n");
1459:                    }
1460:                    return sb.toString();
1461:                }
1462:
1463:                public String dumpHistory() {
1464:                    StringBuffer sb = new StringBuffer();
1465:                    for (Enumeration en = history.elements(); en
1466:                            .hasMoreElements();) {
1467:                        sb.append(en.nextElement() + "\n");
1468:                    }
1469:                    return sb.toString();
1470:                }
1471:
1472:                private void process(List requests) {
1473:                    if (requests.isEmpty())
1474:                        return;
1475:                    if (log.isTraceEnabled())
1476:                        log.trace("processing " + requests);
1477:                    Request firstReq = (Request) requests.get(0);
1478:                    switch (firstReq.type) {
1479:                    case Request.JOIN:
1480:                    case Request.LEAVE:
1481:                    case Request.SUSPECT:
1482:                        Collection newMembers = new LinkedHashSet(requests
1483:                                .size());
1484:                        Collection suspectedMembers = new LinkedHashSet(
1485:                                requests.size());
1486:                        Collection oldMembers = new LinkedHashSet(requests
1487:                                .size());
1488:                        for (Iterator i = requests.iterator(); i.hasNext();) {
1489:                            Request req = (Request) i.next();
1490:                            switch (req.type) {
1491:                            case Request.JOIN:
1492:                                newMembers.add(req.mbr);
1493:                                break;
1494:                            case Request.LEAVE:
1495:                                if (req.suspected)
1496:                                    suspectedMembers.add(req.mbr);
1497:                                else
1498:                                    oldMembers.add(req.mbr);
1499:                                break;
1500:                            case Request.SUSPECT:
1501:                                suspectedMembers.add(req.mbr);
1502:                                break;
1503:                            }
1504:                        }
1505:                        impl.handleMembershipChange(newMembers, oldMembers,
1506:                                suspectedMembers);
1507:                        break;
1508:                    case Request.MERGE:
1509:                        if (requests.size() > 1)
1510:                            log
1511:                                    .error("more than one MERGE request to process, ignoring the others");
1512:                        impl.merge(firstReq.coordinators);
1513:                        break;
1514:                    case Request.VIEW:
1515:                        if (requests.size() > 1)
1516:                            log
1517:                                    .error("more than one VIEW request to process, ignoring the others");
1518:                        try {
1519:                            if (use_flush) {
1520:                                boolean successfulFlush = startFlush(
1521:                                        firstReq.view, 3);
1522:                                if (successfulFlush) {
1523:                                    log
1524:                                            .info("Successful GMS flush by coordinator at "
1525:                                                    + getLocalAddress());
1526:                                }
1527:                            }
1528:                            castViewChangeWithDest(firstReq.view,
1529:                                    firstReq.digest, firstReq.target_members);
1530:                        } finally {
1531:                            if (use_flush)
1532:                                stopFlush(firstReq.view);
1533:                        }
1534:                        break;
1535:                    default:
1536:                        log.error("request " + firstReq.type
1537:                                + " is unknown; discarded");
1538:                    }
1539:                }
1540:
1541:                synchronized void start(boolean unsuspend) {
1542:                    if (q.closed())
1543:                        q.reset();
1544:                    if (unsuspend) {
1545:                        suspended = false;
1546:                        synchronized (resume_tasks) {
1547:                            TimeScheduler.CancellableTask task = (TimeScheduler.CancellableTask) resume_tasks
1548:                                    .get(merge_id);
1549:                            if (task != null) {
1550:                                task.cancel();
1551:                                resume_tasks.remove(merge_id);
1552:                            }
1553:                        }
1554:                    }
1555:                    merge_id = null;
1556:                    if (thread == null || !thread.isAlive()) {
1557:                        thread = new Thread(Util.getGlobalThreadGroup(), this ,
1558:                                "ViewHandler");
1559:                        thread.setDaemon(false); // thread cannot terminate if we have tasks left, e.g. when we as coord leave
1560:                        thread.start();
1561:                        if (log.isTraceEnabled())
1562:                            log.trace("ViewHandler started");
1563:                    }
1564:                }
1565:
1566:                synchronized void stop(boolean flush) {
1567:                    q.close(flush);
1568:                    TimeScheduler.CancellableTask task;
1569:                    synchronized (resume_tasks) {
1570:                        for (Iterator it = resume_tasks.values().iterator(); it
1571:                                .hasNext();) {
1572:                            task = (TimeScheduler.CancellableTask) it.next();
1573:                            task.cancel();
1574:                        }
1575:                        resume_tasks.clear();
1576:                    }
1577:                    merge_id = null;
1578:                    // resumeForce();
1579:                }
1580:            }
1581:
1582:            /**
1583:             * Resumer is a second line of defense: when the ViewHandler is suspended, it will be resumed when the current
1584:             * merge is cancelled, or when the merge completes. However, in a case where this never happens (this
1585:             * shouldn't be the case !), the Resumer will nevertheless resume the ViewHandler.
1586:             * We chose this strategy because ViewHandler is critical: if it is suspended indefinitely, we would
1587:             * not be able to process new JOIN requests ! So, this is for peace of mind, although it most likely
1588:             * will never be used...
1589:             */
1590:            static class Resumer implements  TimeScheduler.CancellableTask {
1591:                boolean cancelled = false;
1592:                long interval;
1593:                final Object token;
1594:                final Map tasks;
1595:                final ViewHandler handler;
1596:
1597:                public Resumer(long interval, final Object token, final Map t,
1598:                        final ViewHandler handler) {
1599:                    this .interval = interval;
1600:                    this .token = token;
1601:                    this .tasks = t;
1602:                    this .handler = handler;
1603:                }
1604:
1605:                public void cancel() {
1606:                    cancelled = true;
1607:                }
1608:
1609:                public boolean cancelled() {
1610:                    return cancelled;
1611:                }
1612:
1613:                public long nextInterval() {
1614:                    return interval;
1615:                }
1616:
1617:                public void run() {
1618:                    TimeScheduler.CancellableTask t;
1619:                    boolean execute = true;
1620:                    synchronized (tasks) {
1621:                        t = (TimeScheduler.CancellableTask) tasks.get(token);
1622:                        if (t != null) {
1623:                            t.cancel();
1624:                            execute = true;
1625:                        } else {
1626:                            execute = false;
1627:                        }
1628:                        tasks.remove(token);
1629:                    }
1630:                    if (execute) {
1631:                        handler.resume(token);
1632:                    }
1633:                }
1634:            }
1635:
1636:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.