Source Code Cross Referenced for CoordGmsImpl.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » protocols » pbcast » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.protocols.pbcast 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        // $Id: CoordGmsImpl.java,v 1.52.2.2 2007/04/27 08:03:55 belaban Exp $
0002:
0003:        package org.jgroups.protocols.pbcast;
0004:
0005:        import org.jgroups.*;
0006:        import org.jgroups.util.TimeScheduler;
0007:
0008:        import java.util.*;
0009:
0010:        /**
0011:         * Coordinator role of the Group MemberShip (GMS) protocol. Accepts JOIN and LEAVE requests and emits view changes
0012:         * accordingly.
0013:         * @author Bela Ban
0014:         */
0015:        public class CoordGmsImpl extends GmsImpl {
0016:            private boolean merging = false;
0017:            private final MergeTask merge_task = new MergeTask();
0018:            private final Vector merge_rsps = new Vector(11);
0019:            // for MERGE_REQ/MERGE_RSP correlation, contains MergeData elements
0020:            private ViewId merge_id = null;
0021:
0022:            private Address merge_leader = null;
0023:
0024:            private MergeCanceller merge_canceller = null;
0025:
0026:            private final Object merge_canceller_mutex = new Object();
0027:
0028:            /** the max time in ms to suspend message garbage collection */
0029:            private final Long MAX_SUSPEND_TIMEOUT = new Long(30000);
0030:
0031:            public CoordGmsImpl(GMS g) {
0032:                super (g);
0033:            }
0034:
0035:            private void setMergeId(ViewId merge_id) {
0036:                this .merge_id = merge_id;
0037:                synchronized (merge_canceller_mutex) {
0038:                    if (this .merge_id != null) {
0039:                        stopMergeCanceller();
0040:                        merge_canceller = new MergeCanceller(this .merge_id,
0041:                                gms.merge_timeout);
0042:                        gms.timer.add(merge_canceller);
0043:                    } else { // merge completed
0044:                        stopMergeCanceller();
0045:                    }
0046:                }
0047:            }
0048:
0049:            private void stopMergeCanceller() {
0050:                synchronized (merge_canceller_mutex) {
0051:                    if (merge_canceller != null) {
0052:                        merge_canceller.cancel();
0053:                        merge_canceller = null;
0054:                    }
0055:                }
0056:            }
0057:
0058:            public void init() throws Exception {
0059:                super .init();
0060:                cancelMerge();
0061:            }
0062:
0063:            public void join(Address mbr) {
0064:                wrongMethod("join");
0065:            }
0066:
0067:            /** The coordinator itself wants to leave the group */
0068:            public void leave(Address mbr) {
0069:                if (mbr == null) {
0070:                    if (log.isErrorEnabled())
0071:                        log.error("member's address is null !");
0072:                    return;
0073:                }
0074:                if (mbr.equals(gms.local_addr))
0075:                    leaving = true;
0076:                gms.getViewHandler().add(
0077:                        new GMS.Request(GMS.Request.LEAVE, mbr, false, null));
0078:                gms.getViewHandler().stop(true); // wait until all requests have been processed, then close the queue and leave
0079:                gms.getViewHandler().waitUntilCompleted(gms.leave_timeout);
0080:            }
0081:
0082:            public void handleJoinResponse(JoinRsp join_rsp) {
0083:            }
0084:
0085:            public void handleLeaveResponse() {
0086:            }
0087:
0088:            public void suspect(Address mbr) {
0089:                if (mbr.equals(gms.local_addr)) {
0090:                    if (log.isWarnEnabled())
0091:                        log
0092:                                .warn("I am the coord and I'm being am suspected -- will probably leave shortly");
0093:                    return;
0094:                }
0095:                Collection emptyVector = new LinkedHashSet(0);
0096:                Collection suspected = new LinkedHashSet(1);
0097:                suspected.add(mbr);
0098:                handleMembershipChange(emptyVector, emptyVector, suspected);
0099:            }
0100:
0101:            public void unsuspect(Address mbr) {
0102:
0103:            }
0104:
0105:            /**
0106:             * Invoked upon receiving a MERGE event from the MERGE layer. Starts the merge protocol.
0107:             * See description of protocol in DESIGN.
0108:             * @param other_coords A list of coordinators (including myself) found by MERGE protocol
0109:             */
0110:            public void merge(Vector other_coords) {
0111:                Membership tmp;
0112:
0113:                if (merging) {
0114:                    if (log.isWarnEnabled())
0115:                        log
0116:                                .warn("merge already in progress, discarded MERGE event (I am "
0117:                                        + gms.local_addr + ")");
0118:                    return;
0119:                }
0120:                merge_leader = null;
0121:                if (other_coords == null) {
0122:                    if (log.isWarnEnabled())
0123:                        log
0124:                                .warn("list of other coordinators is null. Will not start merge.");
0125:                    return;
0126:                }
0127:
0128:                if (other_coords.size() <= 1) {
0129:                    if (log.isErrorEnabled())
0130:                        log.error("number of coordinators found is "
0131:                                + other_coords.size()
0132:                                + "; will not perform merge");
0133:                    return;
0134:                }
0135:
0136:                /* Establish deterministic order, so that coords can elect leader */
0137:                tmp = new Membership(other_coords);
0138:                tmp.sort();
0139:                merge_leader = (Address) tmp.elementAt(0);
0140:                // if(log.isDebugEnabled()) log.debug("coordinators in merge protocol are: " + tmp);
0141:                if (merge_leader.equals(gms.local_addr) || gms.merge_leader) {
0142:                    if (log.isTraceEnabled())
0143:                        log
0144:                                .trace("I ("
0145:                                        + gms.local_addr
0146:                                        + ") will be the leader. Starting the merge task");
0147:                    startMergeTask(other_coords);
0148:                } else {
0149:                    if (log.isTraceEnabled())
0150:                        log.trace("I (" + gms.local_addr
0151:                                + ") am not the merge leader, "
0152:                                + "waiting for merge leader (" + merge_leader
0153:                                + ")to initiate merge");
0154:                }
0155:            }
0156:
0157:            /**
0158:             * Get the view and digest and send back both (MergeData) in the form of a MERGE_RSP to the sender.
0159:             * If a merge is already in progress, send back a MergeData with the merge_rejected field set to true.
0160:             */
0161:            public void handleMergeRequest(Address sender, ViewId merge_id) {
0162:                Digest digest;
0163:                View view;
0164:
0165:                if (sender == null) {
0166:                    if (log.isErrorEnabled())
0167:                        log
0168:                                .error("sender == null; cannot send back a response");
0169:                    return;
0170:                }
0171:                if (merging) {
0172:                    if (log.isErrorEnabled())
0173:                        log.error("merge already in progress");
0174:                    sendMergeRejectedResponse(sender, merge_id);
0175:                    return;
0176:                }
0177:                merging = true;
0178:
0179:                /* Clears the view handler queue and discards all JOIN/LEAVE/MERGE requests until after the MERGE  */
0180:                gms.getViewHandler().suspend(merge_id);
0181:
0182:                setMergeId(merge_id);
0183:                if (log.isDebugEnabled())
0184:                    log.debug("sender=" + sender + ", merge_id=" + merge_id);
0185:                digest = gms.getDigest();
0186:                view = new View(gms.view_id.copy(), gms.members.getMembers());
0187:                gms.passDown(new Event(Event.ENABLE_UNICASTS_TO, sender));
0188:                sendMergeResponse(sender, view, digest);
0189:            }
0190:
0191:            private MergeData getMergeResponse(Address sender, ViewId merge_id) {
0192:                Digest digest;
0193:                View view;
0194:                MergeData retval;
0195:
0196:                if (sender == null) {
0197:                    if (log.isErrorEnabled())
0198:                        log
0199:                                .error("sender == null; cannot send back a response");
0200:                    return null;
0201:                }
0202:                if (merging) {
0203:                    if (log.isErrorEnabled())
0204:                        log.error("merge already in progress");
0205:                    retval = new MergeData(sender, null, null);
0206:                    retval.merge_rejected = true;
0207:                    return retval;
0208:                }
0209:                merging = true;
0210:                setMergeId(merge_id);
0211:                if (log.isDebugEnabled())
0212:                    log.debug("sender=" + sender + ", merge_id=" + merge_id);
0213:
0214:                try {
0215:                    digest = gms.getDigest();
0216:                    view = new View(gms.view_id.copy(), gms.members
0217:                            .getMembers());
0218:                    retval = new MergeData(sender, view, digest);
0219:                    retval.view = view;
0220:                    retval.digest = digest;
0221:                } catch (NullPointerException null_ex) {
0222:                    return null;
0223:                }
0224:                return retval;
0225:            }
0226:
0227:            public void handleMergeResponse(MergeData data, ViewId merge_id) {
0228:                if (data == null) {
0229:                    if (log.isErrorEnabled())
0230:                        log.error("merge data is null");
0231:                    return;
0232:                }
0233:                if (merge_id == null || this .merge_id == null) {
0234:                    if (log.isErrorEnabled())
0235:                        log.error("merge_id (" + merge_id
0236:                                + ") or this.merge_id (" + this .merge_id
0237:                                + ") is null (sender=" + data.getSender()
0238:                                + ").");
0239:                    return;
0240:                }
0241:
0242:                if (!this .merge_id.equals(merge_id)) {
0243:                    if (log.isErrorEnabled())
0244:                        log.error("this.merge_id (" + this .merge_id
0245:                                + ") is different from merge_id (" + merge_id
0246:                                + ')');
0247:                    return;
0248:                }
0249:
0250:                synchronized (merge_rsps) {
0251:                    if (!merge_rsps.contains(data)) {
0252:                        merge_rsps.addElement(data);
0253:                        merge_rsps.notifyAll();
0254:                    }
0255:                }
0256:            }
0257:
0258:            /**
0259:             * If merge_id is not equal to this.merge_id then discard.
0260:             * Else cast the view/digest to all members of this group.
0261:             */
0262:            public void handleMergeView(MergeData data, ViewId merge_id) {
0263:                if (merge_id == null || this .merge_id == null
0264:                        || !this .merge_id.equals(merge_id)) {
0265:                    if (log.isErrorEnabled())
0266:                        log
0267:                                .error("merge_ids don't match (or are null); merge view discarded");
0268:                    return;
0269:                }
0270:                java.util.List my_members = gms.view != null ? gms.view
0271:                        .getMembers() : null;
0272:
0273:                // only send to our *current* members, if we have A and B being merged (we are B), then we would *not*
0274:                // receive a VIEW_ACK from A because A doesn't see us in the pre-merge view yet and discards the view
0275:
0276:                GMS.Request req = new GMS.Request(GMS.Request.VIEW);
0277:                req.view = data.view;
0278:                req.digest = data.digest;
0279:                req.target_members = my_members;
0280:                gms.getViewHandler().add(req, true, // at head so it is processed next
0281:                        true); // un-suspend the queue
0282:                merging = false;
0283:            }
0284:
0285:            public void handleMergeCancelled(ViewId merge_id) {
0286:                if (merge_id != null && this .merge_id != null
0287:                        && this .merge_id.equals(merge_id)) {
0288:                    if (log.isDebugEnabled())
0289:                        log.debug("merge was cancelled (merge_id=" + merge_id
0290:                                + ", local_addr=" + gms.local_addr + ")");
0291:                    setMergeId(null);
0292:                    this .merge_leader = null;
0293:                    merging = false;
0294:                    gms.getViewHandler().resume(merge_id);
0295:                }
0296:            }
0297:
0298:            private void cancelMerge() {
0299:                Object tmp = merge_id;
0300:                if (merge_id != null && log.isDebugEnabled())
0301:                    log.debug("cancelling merge (merge_id=" + merge_id + ')');
0302:                setMergeId(null);
0303:                this .merge_leader = null;
0304:                stopMergeTask();
0305:                merging = false;
0306:                synchronized (merge_rsps) {
0307:                    merge_rsps.clear();
0308:                }
0309:                gms.getViewHandler().resume(tmp);
0310:            }
0311:
0312:            /**
0313:             * Computes the new view (including the newly joined member) and get the digest from PBCAST.
0314:             * Returns both in the form of a JoinRsp
0315:             */
0316:            /*private synchronized void handleJoin(Address mbr) {
0317:                View v;
0318:                Digest d, tmp;
0319:                JoinRsp join_rsp;
0320:
0321:                if(mbr == null) {
0322:                    if(log.isErrorEnabled()) log.error("mbr is null");
0323:                    return;
0324:                }
0325:                if(gms.local_addr.equals(mbr)) {
0326:                    if(log.isErrorEnabled()) log.error("cannot join myself !");
0327:                    return;
0328:                }
0329:                if(log.isDebugEnabled()) log.debug("mbr=" + mbr);
0330:                if(gms.members.contains(mbr)) { // already joined: return current digest and membership
0331:                    if(log.isWarnEnabled())
0332:                        log.warn(mbr + " already present; returning existing view " + gms.view);
0333:                    join_rsp=new JoinRsp(new View(gms.view_id, gms.members.getMembers()), gms.getDigest());
0334:                    sendJoinResponse(join_rsp, mbr);
0335:                    return;
0336:                }
0337:
0338:                try {
0339:                    // we cannot garbage collect during joining a new member *if* we're the only member
0340:                    // Example: {A}, B joins, after returning JoinRsp to B, A garbage collects messages higher than those in the
0341:                    // digest returned to the client, so the client will *not* be able to ask for retransmission of those
0342:                    // messages if he misses them
0343:                    gms.passDown(new Event(Event.SUSPEND_STABLE, MAX_SUSPEND_TIMEOUT));
0344:                    Vector new_mbrs=new Vector(1);
0345:                    new_mbrs.addElement(mbr);
0346:                    tmp=gms.getDigest(); // get existing digest
0347:                    if(tmp == null) {
0348:                        if(log.isErrorEnabled()) log.error("received null digest from GET_DIGEST: will cause JOIN to fail");
0349:                        return;
0350:                    }
0351:
0352:                    d=new Digest(tmp.size() + 1); // create a new digest, which contains 1 more member
0353:                    d.add(tmp); // add the existing digest to the new one
0354:                    d.add(mbr, 0, 0); // ... and add the new member. it's first seqno will be 1
0355:                    v=gms.getNextView(new_mbrs, null, null);
0356:                    if(log.isDebugEnabled()) log.debug("joined member " + mbr + ", view is " + v);
0357:                    join_rsp=new JoinRsp(v, d);
0358:
0359:                    // 2. Send down a local TMP_VIEW event. This is needed by certain layers (e.g. NAKACK) to compute correct digest
0360:                    //    in case client's next request (e.g. getState()) reaches us *before* our own view change multicast.
0361:                    // Check NAKACK's TMP_VIEW handling for details
0362:                    if(join_rsp.getView() != null)
0363:                        gms.passDown(new Event(Event.TMP_VIEW, join_rsp.getView()));
0364:
0365:                    Vector tmp_mbrs=join_rsp.getView() != null? new Vector(join_rsp.getView().getMembers()) : null;
0366:
0367:                    if(gms.use_flush) {
0368:
0369:                        //3a. FLUSH protocol is in use. First we FLUSH current members. Then we send a
0370:                        // view to a joining member and we will wait for his ACK together with view
0371:                        // ACKs from current members (castViewChangeWithDest). After all ACKS have been
0372:                        // collected, FLUSH is stopped (below in finally clause) and thus members are
0373:                        // allowed to send messages again.
0374:                        gms.startFlush(join_rsp.getView());
0375:                        sendJoinResponse(join_rsp, mbr);
0376:                        gms.castViewChangeWithDest(join_rsp.getView(), null, tmp_mbrs);
0377:                    }
0378:                    else {
0379:                        //3b. Broadcast the new view
0380:                        // we'll multicast the new view first and only, when everyone has replied with a VIEW_ACK (or timeout),
0381:                        // send the JOIN_RSP back to the client. This prevents the client from sending multicast messages in
0382:                        // view V2 which may get dropped by existing members because they're still in view V1.
0383:                        // (http://jira.jboss.com/jira/browse/JGRP-235)
0384:
0385:                        if(tmp_mbrs != null)
0386:                            tmp_mbrs.remove(mbr); // exclude the newly joined member from VIEW_ACKs
0387:
0388:                        gms.castViewChangeWithDest(join_rsp.getView(), null, tmp_mbrs);
0389:
0390:                        // 4. Return result to client
0391:                        sendJoinResponse(join_rsp, mbr);
0392:                    }
0393:
0394:                }
0395:                finally {
0396:                    if(gms.use_flush)
0397:                        gms.stopFlush();
0398:                    gms.passDown(new Event(Event.RESUME_STABLE));
0399:                }
0400:            }
0401:             */
0402:
0403:            /**
0404:              Exclude <code>mbr</code> from the membership. If <code>suspected</code> is true, then
0405:              this member crashed and therefore is forced to leave, otherwise it is leaving voluntarily.
0406:             */
0407:            /*private void handleLeave(Address mbr, boolean suspected) {
0408:                View new_view=null;
0409:                Vector v=new Vector(1);
0410:                v.addElement(mbr);
0411:
0412:                // contains either leaving mbrs or suspected mbrs
0413:                if(log.isDebugEnabled()) log.debug("mbr=" + mbr);
0414:                if(!gms.members.contains(mbr)) {
0415:                    if(log.isTraceEnabled()) log.trace("mbr " + mbr + " is not a member !");
0416:                    return;
0417:                }
0418:
0419:                if(gms.view_id == null) {
0420:                    // we're probably not the coord anymore (we just left ourselves), let someone else do it
0421:                    // (client will retry when it doesn't get a response
0422:                    if(log.isDebugEnabled())
0423:                        log.debug("gms.view_id is null, I'm not the coordinator anymore (leaving=" + leaving +
0424:                                "); the new coordinator will handle the leave request");
0425:                    return;
0426:                }
0427:                try {
0428:                    sendLeaveResponse(mbr); // send an ack to the leaving member
0429:                    if(suspected)
0430:                        new_view=gms.getNextView(null, null, v);
0431:                    else
0432:                        new_view=gms.getNextView(null, v, null);
0433:
0434:                    if(gms.use_flush) {
0435:                        gms.startFlush(new_view);
0436:                    }
0437:                    gms.castViewChange(new_view, null);
0438:                }
0439:                finally {
0440:                    if(gms.use_flush) {
0441:                        gms.stopFlush();
0442:                    }
0443:                }
0444:                if(leaving) {
0445:                    gms.passUp(new Event(Event.DISCONNECT_OK));
0446:                    gms.initState(); // in case connect() is called again
0447:                }
0448:            }*/
0449:
0450:            public void handleMembershipChange(Collection new_mbrs,
0451:                    Collection leaving_mbrs, Collection suspected_mbrs) {
0452:                if (new_mbrs == null)
0453:                    new_mbrs = new LinkedHashSet(0);
0454:                if (suspected_mbrs == null)
0455:                    suspected_mbrs = new LinkedHashSet(0);
0456:                if (leaving_mbrs == null)
0457:                    leaving_mbrs = new LinkedHashSet(0);
0458:                boolean joining_mbrs = !new_mbrs.isEmpty();
0459:
0460:                new_mbrs.remove(gms.local_addr); // remove myself - cannot join myself (already joined)
0461:
0462:                if (gms.view_id == null) {
0463:                    // we're probably not the coord anymore (we just left ourselves), let someone else do it
0464:                    // (client will retry when it doesn't get a response)
0465:                    if (log.isDebugEnabled())
0466:                        log
0467:                                .debug("gms.view_id is null, I'm not the coordinator anymore (leaving="
0468:                                        + leaving
0469:                                        + "); the new coordinator will handle the leave request");
0470:                    return;
0471:                }
0472:
0473:                Vector current_members = gms.members.getMembers();
0474:                leaving_mbrs.retainAll(current_members); // remove all elements of leaving_mbrs which are not current members
0475:                if (suspected_mbrs.remove(gms.local_addr)) {
0476:                    if (log.isWarnEnabled())
0477:                        log
0478:                                .warn("I am the coord and I'm being suspected -- will probably leave shortly");
0479:                }
0480:                suspected_mbrs.retainAll(current_members); // remove all elements of suspected_mbrs which are not current members
0481:
0482:                // for the members that have already joined, return the current digest and membership
0483:                for (Iterator it = new_mbrs.iterator(); it.hasNext();) {
0484:                    Address mbr = (Address) it.next();
0485:                    if (gms.members.contains(mbr)) { // already joined: return current digest and membership
0486:                        if (log.isWarnEnabled())
0487:                            log
0488:                                    .warn(mbr
0489:                                            + " already present; returning existing view "
0490:                                            + gms.view);
0491:                        JoinRsp join_rsp = new JoinRsp(new View(gms.view_id,
0492:                                gms.members.getMembers()), gms.getDigest());
0493:                        sendJoinResponse(join_rsp, mbr);
0494:                        it.remove();
0495:                    }
0496:                }
0497:
0498:                if (new_mbrs.isEmpty() && leaving_mbrs.isEmpty()
0499:                        && suspected_mbrs.isEmpty()) {
0500:                    if (log.isTraceEnabled())
0501:                        log
0502:                                .trace("found no members to add or remove, will not create new view");
0503:                    return;
0504:                }
0505:
0506:                JoinRsp join_rsp = null;
0507:                View new_view = gms.getNextView(new_mbrs, leaving_mbrs,
0508:                        suspected_mbrs);
0509:                if (log.isDebugEnabled())
0510:                    log.debug("new=" + new_mbrs + ", suspected="
0511:                            + suspected_mbrs + ", leaving=" + leaving_mbrs
0512:                            + ", new view: " + new_view);
0513:                try {
0514:
0515:                    // we cannot garbage collect during joining a new member *if* we're the only member
0516:                    // Example: {A}, B joins, after returning JoinRsp to B, A garbage collects messages higher than those
0517:                    // in the digest returned to the client, so the client will *not* be able to ask for retransmission
0518:                    // of those messages if he misses them
0519:                    if (joining_mbrs) {
0520:                        gms.passDown(new Event(Event.SUSPEND_STABLE,
0521:                                MAX_SUSPEND_TIMEOUT));
0522:                        Digest d = null, tmp = gms.getDigest(); // get existing digest
0523:                        if (tmp == null)
0524:                            log
0525:                                    .error("received null digest from GET_DIGEST: will cause JOIN to fail");
0526:                        else {
0527:                            // create a new digest, which contains the new member
0528:                            d = new Digest(tmp.size() + new_mbrs.size());
0529:                            d.add(tmp); // add the existing digest to the new one
0530:                            for (Iterator i = new_mbrs.iterator(); i.hasNext();)
0531:                                d.add((Address) i.next(), 0, 0); // ... and add the new members. their first seqno will be 1
0532:                        }
0533:                        join_rsp = new JoinRsp(new_view, d);
0534:                    }
0535:
0536:                    sendLeaveResponses(leaving_mbrs); // no-op if no leaving members
0537:
0538:                    // Send down a local TMP_VIEW event. This is needed by certain layers (e.g. NAKACK) to compute correct digest
0539:                    // in case client's next request (e.g. getState()) reaches us *before* our own view change multicast.
0540:                    // Check NAKACK's TMP_VIEW handling for details
0541:                    if (new_view != null)
0542:                        gms.passDown(new Event(Event.TMP_VIEW, new_view));
0543:
0544:                    Vector tmp_mbrs = new_view != null ? new Vector(new_view
0545:                            .getMembers()) : null;
0546:                    if (gms.use_flush) {
0547:                        // First we flush current members. Then we send a view to all joining member and we wait for their ACKs
0548:                        // together with ACKs from current members. After all ACKS have been collected, FLUSH is stopped
0549:                        // (below in finally clause) and members are allowed to send messages again                                      
0550:                        boolean successfulFlush = gms.startFlush(new_view, 3);
0551:                        if (successfulFlush) {
0552:                            log.info("Successful GMS flush by coordinator at "
0553:                                    + gms.getLocalAddress());
0554:                        }
0555:                        sendJoinResponses(join_rsp, new_mbrs); // might be a no-op if no joining members
0556:                        gms.castViewChangeWithDest(new_view, null, tmp_mbrs);
0557:                    } else {
0558:                        if (tmp_mbrs != null) // exclude the newly joined member from VIEW_ACKs
0559:                            tmp_mbrs.removeAll(new_mbrs);
0560:                        // Broadcast the new view
0561:                        // we'll multicast the new view first and only, when everyone has replied with a VIEW_ACK (or timeout),
0562:                        // send the JOIN_RSP back to the client. This prevents the client from sending multicast messages in
0563:                        // view V2 which may get dropped by existing members because they're still in view V1.
0564:                        // (http://jira.jboss.com/jira/browse/JGRP-235)
0565:                        gms.castViewChangeWithDest(new_view, null, tmp_mbrs);
0566:                        sendJoinResponses(join_rsp, new_mbrs); // Return result to newly joined clients (if there are any)
0567:                    }
0568:                } finally {
0569:                    if (joining_mbrs)
0570:                        gms.passDown(new Event(Event.RESUME_STABLE));
0571:                    if (gms.use_flush)
0572:                        gms.stopFlush(new_view);
0573:                    if (leaving) {
0574:                        gms.passUp(new Event(Event.DISCONNECT_OK));
0575:                        gms.initState(); // in case connect() is called again
0576:                    }
0577:                }
0578:            }
0579:
0580:            /**
0581:             * Called by the GMS when a VIEW is received.
0582:             * @param new_view The view to be installed
0583:             * @param digest   If view is a MergeView, digest contains the seqno digest of all members and has to
0584:             *                 be set by GMS
0585:             */
0586:            public void handleViewChange(View new_view, Digest digest) {
0587:                Vector mbrs = new_view.getMembers();
0588:                if (log.isDebugEnabled()) {
0589:                    if (digest != null)
0590:                        log.debug("view=" + new_view + ", digest=" + digest);
0591:                    else
0592:                        log.debug("view=" + new_view);
0593:                }
0594:
0595:                if (leaving && !mbrs.contains(gms.local_addr))
0596:                    return;
0597:                gms.installView(new_view, digest);
0598:            }
0599:
0600:            public void handleExit() {
0601:                cancelMerge();
0602:            }
0603:
0604:            public void stop() {
0605:                super .stop(); // sets leaving=false
0606:                stopMergeTask();
0607:            }
0608:
0609:            /* ------------------------------------------ Private methods ----------------------------------------- */
0610:
0611:            void startMergeTask(Vector coords) {
0612:                synchronized (merge_task) {
0613:                    merge_task.start(coords);
0614:                }
0615:            }
0616:
0617:            void stopMergeTask() {
0618:                synchronized (merge_task) {
0619:                    merge_task.stop();
0620:                }
0621:            }
0622:
0623:            private void sendJoinResponses(JoinRsp rsp, Collection c) {
0624:                if (c != null && rsp != null) {
0625:                    for (Iterator it = c.iterator(); it.hasNext();) {
0626:                        sendJoinResponse(rsp, (Address) it.next());
0627:                    }
0628:                }
0629:            }
0630:
0631:            private void sendJoinResponse(JoinRsp rsp, Address dest) {
0632:                Message m = new Message(dest, null, null);
0633:                GMS.GmsHeader hdr = new GMS.GmsHeader(GMS.GmsHeader.JOIN_RSP,
0634:                        rsp);
0635:                m.putHeader(gms.getName(), hdr);
0636:                gms.passDown(new Event(Event.MSG, m));
0637:            }
0638:
0639:            private void sendLeaveResponses(Collection c) {
0640:                for (Iterator i = c.iterator(); i.hasNext();) {
0641:                    Message msg = new Message((Address) i.next(), null, null); // send an ack to the leaving member
0642:                    GMS.GmsHeader hdr = new GMS.GmsHeader(
0643:                            GMS.GmsHeader.LEAVE_RSP);
0644:                    msg.putHeader(gms.getName(), hdr);
0645:                    gms.passDown(new Event(Event.MSG, msg));
0646:                }
0647:            }
0648:
0649:            /**
0650:             * Sends a MERGE_REQ to all coords and populates a list of MergeData (in merge_rsps). Returns after coords.size()
0651:             * response have been received, or timeout msecs have elapsed (whichever is first).<p>
0652:             * If a subgroup coordinator rejects the MERGE_REQ (e.g. because of participation in a different merge),
0653:             * <em>that member will be removed from coords !</em>
0654:             * @param coords A list of Addresses of subgroup coordinators (inluding myself)
0655:             * @param timeout Max number of msecs to wait for the merge responses from the subgroup coords
0656:             */
0657:            private void getMergeDataFromSubgroupCoordinators(Vector coords,
0658:                    long timeout) {
0659:                Message msg;
0660:                GMS.GmsHeader hdr;
0661:
0662:                long curr_time, time_to_wait, end_time, start, stop;
0663:                int num_rsps_expected;
0664:
0665:                if (coords == null || coords.size() <= 1) {
0666:                    if (log.isErrorEnabled())
0667:                        log.error("coords == null or size <= 1");
0668:                    return;
0669:                }
0670:
0671:                start = System.currentTimeMillis();
0672:                MergeData tmp;
0673:                synchronized (merge_rsps) {
0674:                    merge_rsps.removeAllElements();
0675:                    if (log.isDebugEnabled())
0676:                        log.debug("sending MERGE_REQ to " + coords);
0677:                    Address coord;
0678:                    for (int i = 0; i < coords.size(); i++) {
0679:                        coord = (Address) coords.elementAt(i);
0680:                        if (gms.local_addr != null
0681:                                && gms.local_addr.equals(coord)) {
0682:                            tmp = getMergeResponse(gms.local_addr, merge_id);
0683:                            if (tmp != null)
0684:                                merge_rsps.add(tmp);
0685:                            continue;
0686:                        }
0687:
0688:                        // this allows UNICAST to remove coord from previous_members in case of a merge
0689:                        gms
0690:                                .passDown(new Event(Event.ENABLE_UNICASTS_TO,
0691:                                        coord));
0692:
0693:                        msg = new Message(coord, null, null);
0694:                        hdr = new GMS.GmsHeader(GMS.GmsHeader.MERGE_REQ);
0695:                        hdr.mbr = gms.local_addr;
0696:                        hdr.merge_id = merge_id;
0697:                        msg.putHeader(gms.getName(), hdr);
0698:                        gms.passDown(new Event(Event.MSG, msg));
0699:                    }
0700:
0701:                    // wait until num_rsps_expected >= num_rsps or timeout elapsed
0702:                    num_rsps_expected = coords.size();
0703:                    curr_time = System.currentTimeMillis();
0704:                    end_time = curr_time + timeout;
0705:                    while (end_time > curr_time) {
0706:                        time_to_wait = end_time - curr_time;
0707:                        if (log.isDebugEnabled())
0708:                            log.debug("waiting " + time_to_wait
0709:                                    + " msecs for merge responses");
0710:                        if (merge_rsps.size() < num_rsps_expected) {
0711:                            try {
0712:                                merge_rsps.wait(time_to_wait);
0713:                            } catch (Exception ex) {
0714:                            }
0715:                        }
0716:                        if (log.isDebugEnabled())
0717:                            log
0718:                                    .debug("num_rsps_expected="
0719:                                            + num_rsps_expected
0720:                                            + ", actual responses="
0721:                                            + merge_rsps.size());
0722:
0723:                        if (merge_rsps.size() >= num_rsps_expected)
0724:                            break;
0725:                        curr_time = System.currentTimeMillis();
0726:                    }
0727:                    stop = System.currentTimeMillis();
0728:                    if (log.isTraceEnabled())
0729:                        log.trace("collected " + merge_rsps.size()
0730:                                + " merge response(s) in " + (stop - start)
0731:                                + "ms");
0732:                }
0733:            }
0734:
0735:            /**
0736:             * Generates a unique merge id by taking the local address and the current time
0737:             */
0738:            private ViewId generateMergeId() {
0739:                return new ViewId(gms.local_addr, System.currentTimeMillis());
0740:                // we're (ab)using ViewId as a merge id
0741:            }
0742:
0743:            /**
0744:             * Merge all MergeData. All MergeData elements should be disjunct (both views and digests). However,
0745:             * this method is prepared to resolve duplicate entries (for the same member). Resolution strategy for
0746:             * views is to merge only 1 of the duplicate members. Resolution strategy for digests is to take the higher
0747:             * seqnos for duplicate digests.<p>
0748:             * After merging all members into a Membership and subsequent sorting, the first member of the sorted membership
0749:             * will be the new coordinator.
0750:             * @param v A list of MergeData items. Elements with merge_rejected=true were removed before. Is guaranteed
0751:             *          not to be null and to contain at least 1 member.
0752:             */
0753:            private MergeData consolidateMergeData(Vector v) {
0754:                MergeData ret;
0755:                MergeData tmp_data;
0756:                long logical_time = 0; // for new_vid
0757:                ViewId new_vid, tmp_vid;
0758:                MergeView new_view;
0759:                View tmp_view;
0760:                Membership new_mbrs = new Membership();
0761:                int num_mbrs;
0762:                Digest new_digest;
0763:                Address new_coord;
0764:                Vector subgroups = new Vector(11);
0765:                // contains a list of Views, each View is a subgroup
0766:
0767:                for (int i = 0; i < v.size(); i++) {
0768:                    tmp_data = (MergeData) v.elementAt(i);
0769:                    if (log.isDebugEnabled())
0770:                        log.debug("merge data is " + tmp_data);
0771:                    tmp_view = tmp_data.getView();
0772:                    if (tmp_view != null) {
0773:                        tmp_vid = tmp_view.getVid();
0774:                        if (tmp_vid != null) {
0775:                            // compute the new view id (max of all vids +1)
0776:                            logical_time = Math.max(logical_time, tmp_vid
0777:                                    .getId());
0778:                        }
0779:                        // merge all membership lists into one (prevent duplicates)
0780:                        new_mbrs.add(tmp_view.getMembers());
0781:                        subgroups.addElement(tmp_view.clone());
0782:                    }
0783:                }
0784:
0785:                // the new coordinator is the first member of the consolidated & sorted membership list
0786:                new_mbrs.sort();
0787:                num_mbrs = new_mbrs.size();
0788:                new_coord = num_mbrs > 0 ? (Address) new_mbrs.elementAt(0)
0789:                        : null;
0790:                if (new_coord == null) {
0791:                    if (log.isErrorEnabled())
0792:                        log.error("new_coord == null");
0793:                    return null;
0794:                }
0795:                // should be the highest view ID seen up to now plus 1
0796:                new_vid = new ViewId(new_coord, logical_time + 1);
0797:
0798:                // determine the new view
0799:                new_view = new MergeView(new_vid, new_mbrs.getMembers(),
0800:                        subgroups);
0801:                if (log.isDebugEnabled())
0802:                    log.debug("new merged view will be " + new_view);
0803:
0804:                // determine the new digest
0805:                new_digest = consolidateDigests(v, num_mbrs);
0806:                if (new_digest == null) {
0807:                    if (log.isErrorEnabled())
0808:                        log.error("digest could not be consolidated");
0809:                    return null;
0810:                }
0811:                if (log.isDebugEnabled())
0812:                    log.debug("consolidated digest=" + new_digest);
0813:                ret = new MergeData(gms.local_addr, new_view, new_digest);
0814:                return ret;
0815:            }
0816:
0817:            /**
0818:             * Merge all digests into one. For each sender, the new value is min(low_seqno), max(high_seqno),
0819:             * max(high_seqno_seen)
0820:             */
0821:            private Digest consolidateDigests(Vector v, int num_mbrs) {
0822:                MergeData data;
0823:                Digest tmp_digest, retval = new Digest(num_mbrs);
0824:
0825:                for (int i = 0; i < v.size(); i++) {
0826:                    data = (MergeData) v.elementAt(i);
0827:                    tmp_digest = data.getDigest();
0828:                    if (tmp_digest == null) {
0829:                        if (log.isErrorEnabled())
0830:                            log.error("tmp_digest == null; skipping");
0831:                        continue;
0832:                    }
0833:                    retval.merge(tmp_digest);
0834:                }
0835:                return retval;
0836:            }
0837:
0838:            /**
0839:             * Sends the new view and digest to all subgroup coordinors in coords. Each coord will in turn
0840:             * <ol>
0841:             * <li>cast the new view and digest to all the members of its subgroup (MergeView)
0842:             * <li>on reception of the view, if it is a MergeView, each member will set the digest and install
0843:             *     the new view
0844:             * </ol>
0845:             */
0846:            private void sendMergeView(Vector coords,
0847:                    MergeData combined_merge_data) {
0848:                Message msg;
0849:                GMS.GmsHeader hdr;
0850:                Address coord;
0851:                View v;
0852:                Digest d;
0853:
0854:                if (coords == null || combined_merge_data == null)
0855:                    return;
0856:
0857:                v = combined_merge_data.view;
0858:                d = combined_merge_data.digest;
0859:                if (v == null || d == null) {
0860:                    if (log.isErrorEnabled())
0861:                        log
0862:                                .error("view or digest is null, cannot send consolidated merge view/digest");
0863:                    return;
0864:                }
0865:
0866:                if (log.isTraceEnabled())
0867:                    log.trace("sending merge view " + v.getVid()
0868:                            + " to coordinators " + coords);
0869:
0870:                for (int i = 0; i < coords.size(); i++) {
0871:                    coord = (Address) coords.elementAt(i);
0872:                    msg = new Message(coord, null, null);
0873:                    hdr = new GMS.GmsHeader(GMS.GmsHeader.INSTALL_MERGE_VIEW);
0874:                    hdr.view = v;
0875:                    hdr.my_digest = d;
0876:                    hdr.merge_id = merge_id;
0877:                    msg.putHeader(gms.getName(), hdr);
0878:                    gms.passDown(new Event(Event.MSG, msg));
0879:                }
0880:            }
0881:
0882:            /**
0883:             * Send back a response containing view and digest to sender
0884:             */
0885:            private void sendMergeResponse(Address sender, View view,
0886:                    Digest digest) {
0887:                Message msg = new Message(sender, null, null);
0888:                GMS.GmsHeader hdr = new GMS.GmsHeader(GMS.GmsHeader.MERGE_RSP);
0889:                hdr.merge_id = merge_id;
0890:                hdr.view = view;
0891:                hdr.my_digest = digest;
0892:                msg.putHeader(gms.getName(), hdr);
0893:                if (log.isDebugEnabled())
0894:                    log.debug("response=" + hdr);
0895:                gms.passDown(new Event(Event.MSG, msg));
0896:            }
0897:
0898:            private void sendMergeCancelledMessage(Vector coords,
0899:                    ViewId merge_id) {
0900:                Message msg;
0901:                GMS.GmsHeader hdr;
0902:                Address coord;
0903:
0904:                if (coords == null || merge_id == null) {
0905:                    if (log.isErrorEnabled())
0906:                        log.error("coords or merge_id == null");
0907:                    return;
0908:                }
0909:                for (int i = 0; i < coords.size(); i++) {
0910:                    coord = (Address) coords.elementAt(i);
0911:                    msg = new Message(coord, null, null);
0912:                    hdr = new GMS.GmsHeader(GMS.GmsHeader.CANCEL_MERGE);
0913:                    hdr.merge_id = merge_id;
0914:                    msg.putHeader(gms.getName(), hdr);
0915:                    gms.passDown(new Event(Event.MSG, msg));
0916:                }
0917:            }
0918:
0919:            /** Removed rejected merge requests from merge_rsps and coords */
0920:            private void removeRejectedMergeRequests(Vector coords) {
0921:                MergeData data;
0922:                for (Iterator it = merge_rsps.iterator(); it.hasNext();) {
0923:                    data = (MergeData) it.next();
0924:                    if (data.merge_rejected) {
0925:                        if (data.getSender() != null && coords != null)
0926:                            coords.removeElement(data.getSender());
0927:                        it.remove();
0928:                        if (log.isDebugEnabled())
0929:                            log.debug("removed element " + data);
0930:                    }
0931:                }
0932:            }
0933:
0934:            /* --------------------------------------- End of Private methods ------------------------------------- */
0935:
0936:            /**
0937:             * Starts the merge protocol (only run by the merge leader). Essentially sends a MERGE_REQ to all
0938:             * coordinators of all subgroups found. Each coord receives its digest and view and returns it.
0939:             * The leader then computes the digest and view for the new group from the return values. Finally, it
0940:             * sends this merged view/digest to all subgroup coordinators; each coordinator will install it in their
0941:             * subgroup.
0942:             */
0943:            private class MergeTask implements  Runnable {
0944:                Thread t = null;
0945:                Vector coords = null; // list of subgroup coordinators to be contacted
0946:
0947:                public void start(Vector coords) {
0948:                    if (t == null || !t.isAlive()) {
0949:                        this .coords = (Vector) (coords != null ? coords.clone()
0950:                                : null);
0951:                        t = new Thread(this , "MergeTask");
0952:                        t.setDaemon(true);
0953:                        t.start();
0954:                    }
0955:                }
0956:
0957:                public void stop() {
0958:                    Thread tmp = t;
0959:                    if (isRunning()) {
0960:                        t = null;
0961:                        tmp.interrupt();
0962:                    }
0963:                    t = null;
0964:                    coords = null;
0965:                }
0966:
0967:                public boolean isRunning() {
0968:                    return t != null && t.isAlive();
0969:                }
0970:
0971:                /**
0972:                 * Runs the merge protocol as a leader
0973:                 */
0974:                public void run() {
0975:                    MergeData combined_merge_data;
0976:
0977:                    if (merging == true) {
0978:                        if (log.isWarnEnabled())
0979:                            log
0980:                                    .warn("merge is already in progress, terminating");
0981:                        return;
0982:                    }
0983:
0984:                    if (log.isDebugEnabled())
0985:                        log.debug("merge task started, coordinators are "
0986:                                + this .coords);
0987:                    try {
0988:
0989:                        /* 1. Generate a merge_id that uniquely identifies the merge in progress */
0990:                        setMergeId(generateMergeId());
0991:
0992:                        /* 2. Fetch the current Views/Digests from all subgroup coordinators */
0993:                        getMergeDataFromSubgroupCoordinators(coords,
0994:                                gms.merge_timeout);
0995:
0996:                        /* 3. Remove rejected MergeData elements from merge_rsp and coords (so we'll send the new view only
0997:                           to members who accepted the merge request) */
0998:                        removeRejectedMergeRequests(coords);
0999:
1000:                        if (merge_rsps.size() <= 1) {
1001:                            if (log.isWarnEnabled())
1002:                                log
1003:                                        .warn("merge responses from subgroup coordinators <= 1 ("
1004:                                                + merge_rsps
1005:                                                + "). Cancelling merge");
1006:                            sendMergeCancelledMessage(coords, merge_id);
1007:                            return;
1008:                        }
1009:
1010:                        /* 4. Combine all views and digests into 1 View/1 Digest */
1011:                        combined_merge_data = consolidateMergeData(merge_rsps);
1012:                        if (combined_merge_data == null) {
1013:                            if (log.isErrorEnabled())
1014:                                log.error("combined_merge_data == null");
1015:                            sendMergeCancelledMessage(coords, merge_id);
1016:                            return;
1017:                        }
1018:
1019:                        /* 5. Don't allow JOINs or LEAVEs until we are done with the merge. Suspend() will clear the
1020:                              view handler queue, so no requests beyond this current MERGE request will be processed */
1021:                        gms.getViewHandler().suspend(merge_id);
1022:
1023:                        /* 6. Send the new View/Digest to all coordinators (including myself). On reception, they will
1024:                           install the digest and view in all of their subgroup members */
1025:                        sendMergeView(coords, combined_merge_data);
1026:                    } catch (Throwable ex) {
1027:                        if (log.isErrorEnabled())
1028:                            log.error("exception while merging", ex);
1029:                    } finally {
1030:                        sendMergeCancelledMessage(coords, merge_id);
1031:                        stopMergeCanceller(); // this is probably not necessary
1032:                        merging = false;
1033:                        merge_leader = null;
1034:                        if (log.isDebugEnabled())
1035:                            log.debug("merge task terminated");
1036:                        t = null;
1037:                    }
1038:                }
1039:            }
1040:
1041:            private class MergeCanceller implements  TimeScheduler.Task {
1042:                private Object my_merge_id = null;
1043:                private long timeout;
1044:                private boolean cancelled = false;
1045:
1046:                MergeCanceller(Object my_merge_id, long timeout) {
1047:                    this .my_merge_id = my_merge_id;
1048:                    this .timeout = timeout;
1049:                }
1050:
1051:                public boolean cancelled() {
1052:                    return cancelled;
1053:                }
1054:
1055:                public void cancel() {
1056:                    cancelled = true;
1057:                }
1058:
1059:                public long nextInterval() {
1060:                    return timeout;
1061:                }
1062:
1063:                public void run() {
1064:                    if (merge_id != null && my_merge_id.equals(merge_id)) {
1065:                        if (log.isTraceEnabled())
1066:                            log.trace("cancelling merge due to timer timeout ("
1067:                                    + timeout + " ms)");
1068:                        cancelMerge();
1069:                        cancelled = true;
1070:                    } else {
1071:                        if (log.isTraceEnabled())
1072:                            log
1073:                                    .trace("timer kicked in after "
1074:                                            + timeout
1075:                                            + " ms, but no (or different) merge was in progress: "
1076:                                            + "merge_id=" + merge_id
1077:                                            + ", my_merge_id=" + my_merge_id);
1078:                    }
1079:                }
1080:            }
1081:
1082:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.