Source Code Cross Referenced for GMS.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » protocols » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.protocols 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        // $Id: GMS.java,v 1.16.10.1 2007/04/27 08:03:51 belaban Exp $
002:
003:        package org.jgroups.protocols;
004:
005:        import org.jgroups.*;
006:        import org.jgroups.blocks.GroupRequest;
007:        import org.jgroups.blocks.MethodCall;
008:        import org.jgroups.stack.Protocol;
009:        import org.jgroups.stack.RpcProtocol;
010:        import org.jgroups.util.Queue;
011:        import org.jgroups.util.QueueClosedException;
012:
013:        import java.util.Hashtable;
014:        import java.util.Properties;
015:        import java.util.Vector;
016:
017:        /**
018:         * Group membership protocol. Handles joins/leaves/crashes (suspicions) and emits new views
019:         * accordingly. Use VIEW_ENFORCER on top of this layer to make sure new members don't receive
020:         * any messages until they are members.
021:         * 
022:         * @author Bela Ban
023:         */
024:        public class GMS extends RpcProtocol implements  Runnable {
025:            private GmsImpl impl = null;
026:            public Address local_addr = null;
027:            public String group_addr = null;
028:            public final Membership mbrs = new Membership();
029:            public ViewId view_id = null;
030:            public long ltime = 0;
031:            public long join_timeout = 5000;
032:            public long join_retry_timeout = 2000;
033:            private long flush_timeout = 0; // 0=wait forever until FLUSH completes
034:            private long rebroadcast_timeout = 0; // 0=wait forever until REBROADCAST completes
035:            private long view_change_timeout = 10000; // until all handleViewChange() RPCs have returned
036:            public long leave_timeout = 5000;
037:            public final Object impl_mutex = new Object(); // synchronizes event entry into impl
038:            public final Object view_mutex = new Object(); // synchronizes view installations
039:            private Queue event_queue = new Queue(); // stores SUSPECT, MERGE events
040:            private Thread evt_thread = null;
041:            private final Object flush_mutex = new Object();
042:            private FlushRsp flush_rsp = null;
043:            private final Object rebroadcast_mutex = new Object();
044:            private boolean rebroadcast_unstable_msgs = true;
045:            private boolean print_local_addr = true;
046:            boolean disable_initial_coord = false; // can the member become a coord on startup or not ?
047:            private final Hashtable impls = new Hashtable();
048:            static final String CLIENT = "Client";
049:            static final String COORD = "Coordinator";
050:            static final String PART = "Participant";
051:
052:            public static final String name = "GMS";
053:
054:            public GMS() {
055:                initState();
056:            }
057:
058:            public String getName() {
059:                return name;
060:            }
061:
062:            public Vector requiredDownServices() {
063:                Vector retval = new Vector();
064:                retval.addElement(new Integer(Event.FLUSH));
065:                retval.addElement(new Integer(Event.FIND_INITIAL_MBRS));
066:                return retval;
067:            }
068:
069:            public void setImpl(GmsImpl new_impl) {
070:                synchronized (impl_mutex) {
071:                    impl = new_impl;
072:                    if (log.isInfoEnabled())
073:                        log.info("changed role to "
074:                                + new_impl.getClass().getName());
075:                }
076:            }
077:
078:            public void start() throws Exception {
079:                super .start();
080:                if (checkForViewEnforcer(up_prot) == false) {
081:                    if (log.isWarnEnabled())
082:                        log
083:                                .warn("I need protocol layer "
084:                                        + "VIEW_ENFORCER above me to discard messages sent to me while I'm "
085:                                        + "not yet a group member ! Otherwise, these messages will be delivered "
086:                                        + "to the application without checking...\n");
087:                }
088:
089:                if (_corr != null)
090:                    _corr.setDeadlockDetection(true);
091:                else
092:                    throw new Exception(
093:                            "GMS.start(): cannot set deadlock detection in corr, as it is null !");
094:            }
095:
096:            public void becomeCoordinator() {
097:                CoordGmsImpl tmp = (CoordGmsImpl) impls.get(COORD);
098:
099:                if (tmp == null) {
100:                    tmp = new CoordGmsImpl(this );
101:                    tmp.leaving = false;
102:                    tmp.received_last_view = false; // +++ ?
103:                    impls.put(COORD, tmp);
104:                }
105:
106:                setImpl(tmp);
107:            }
108:
109:            public void becomeParticipant() {
110:                ParticipantGmsImpl tmp = (ParticipantGmsImpl) impls.get(PART);
111:
112:                if (tmp == null) {
113:                    tmp = new ParticipantGmsImpl(this );
114:                    tmp.leaving = false;
115:                    tmp.received_final_view = false;
116:                    impls.put(PART, tmp);
117:                }
118:                setImpl(tmp);
119:            }
120:
121:            public void becomeClient() {
122:                ClientGmsImpl tmp = (ClientGmsImpl) impls.get(CLIENT);
123:
124:                if (tmp == null) {
125:                    tmp = new ClientGmsImpl(this );
126:                    impls.put(CLIENT, tmp);
127:                } else
128:                    tmp.init();
129:
130:                setImpl(tmp);
131:            }
132:
133:            boolean haveCoordinatorRole() {
134:                return impl != null && impl instanceof  CoordGmsImpl;
135:            }
136:
137:            /**
138:             * Computes the next view. Returns a copy that has <code>old_mbrs</code> and
139:             * <code>suspected_mbrs</code> removed and <code>new_mbrs</code> added.
140:             */
141:            public View getNextView(Vector new_mbrs, Vector old_mbrs,
142:                    Vector suspected_mbrs) {
143:                Vector members;
144:                long vid;
145:                View v;
146:                Membership tmp_mbrs;
147:                Vector mbrs_to_remove = new Vector();
148:
149:                if (old_mbrs != null && old_mbrs.size() > 0)
150:                    for (int i = 0; i < old_mbrs.size(); i++)
151:                        mbrs_to_remove.addElement(old_mbrs.elementAt(i));
152:                if (suspected_mbrs != null && suspected_mbrs.size() > 0)
153:                    for (int i = 0; i < suspected_mbrs.size(); i++)
154:                        if (!mbrs_to_remove.contains(suspected_mbrs
155:                                .elementAt(i)))
156:                            mbrs_to_remove.addElement(suspected_mbrs
157:                                    .elementAt(i));
158:
159:                synchronized (view_mutex) {
160:                    vid = Math.max(view_id.getId(), ltime) + 1;
161:                    ltime = vid;
162:                    tmp_mbrs = this .mbrs.copy();
163:                    tmp_mbrs.merge(new_mbrs, mbrs_to_remove);
164:                    members = (Vector) tmp_mbrs.getMembers().clone();
165:                    v = new View(local_addr, vid, members);
166:                    return v;
167:                }
168:            }
169:
170:            /**
171:             * Return a copy of the current membership minus the suspected members: FLUSH request is not sent
172:             * to suspected members (because they won't respond, and not to joining members either.
173:             * It IS sent to leaving members (before they are allowed to leave).
174:             */
175:            Vector computeFlushDestination(Vector suspected_mbrs) {
176:                Vector ret = mbrs.getMembers(); // *copy* of current membership
177:                if (suspected_mbrs != null && suspected_mbrs.size() > 0)
178:                    for (int i = 0; i < suspected_mbrs.size(); i++)
179:                        ret.removeElement(suspected_mbrs.elementAt(i));
180:                return ret;
181:            }
182:
183:            /**
184:             * Compute the destination set to which to send a VIEW_CHANGE message. This is the current
185:             * members + the leaving members (old_mbrs) + the joining members (new_mbrs) - the suspected
186:             * members.
187:             */
188:            private Vector computeViewDestination(Vector new_mbrs,
189:                    Vector suspected_mbrs) {
190:                Vector ret = mbrs.getMembers(); // **copy* of current membership
191:                Address mbr;
192:
193:                // add new members
194:                if (new_mbrs != null) {
195:                    for (int i = 0; i < new_mbrs.size(); i++) {
196:                        mbr = (Address) new_mbrs.elementAt(i);
197:                        if (!ret.contains(mbr))
198:                            ret.addElement(new_mbrs.elementAt(i));
199:                    }
200:                }
201:
202:                // old members are still in existing membership, don't need to add them explicitely
203:
204:                // remove suspected members
205:                if (suspected_mbrs != null) {
206:                    for (int i = 0; i < suspected_mbrs.size(); i++) {
207:                        mbr = (Address) suspected_mbrs.elementAt(i);
208:                        ret.removeElement(mbr);
209:                    }
210:                }
211:                return ret;
212:            }
213:
214:            /**
215:             * FLUSH protocol.
216:             * Send to current mbrs - suspected_mbrs (not including new_mbrs, but including old_mbr)
217:             * Send TMP_VIEW event down,
218:             * this allows FLUSH/NAKACK to set membership correctly
219:             */
220:
221:            public void flush(Vector flush_dest, Vector suspected_mbrs) {
222:                Vector rebroadcast_msgs = new Vector();
223:
224:                if (suspected_mbrs == null)
225:                    suspected_mbrs = new Vector();
226:
227:                while (flush_dest.size() > 0) {
228:                    flush_rsp = null;
229:                    synchronized (flush_mutex) {
230:                        passDown(new Event(Event.FLUSH, flush_dest)); // send FLUSH to members in flush_dest
231:                        if (flush_rsp == null) {
232:                            try {
233:                                flush_mutex.wait(flush_timeout);
234:                            } catch (Exception e) {
235:                            }
236:                        }
237:                    }
238:                    if (flush_rsp == null) {
239:                        break;
240:                    }
241:
242:                    if (rebroadcast_unstable_msgs
243:                            && flush_rsp.unstable_msgs != null
244:                            && flush_rsp.unstable_msgs.size() > 0) {
245:                        Message m;
246:                        for (int i = 0; i < flush_rsp.unstable_msgs.size(); i++) {
247:                            m = (Message) flush_rsp.unstable_msgs.elementAt(i);
248:
249:                            // just add msg, NAKACK.RESEND will weed out duplicates based on
250:                            // <sender:id> before re-broadcasting msgs
251:                            rebroadcast_msgs.addElement(m);
252:                        }
253:                    }
254:
255:                    if (flush_rsp.result == true)
256:                        break;
257:                    else {
258:                        if (flush_rsp.failed_mbrs != null) {
259:                            for (int i = 0; i < flush_rsp.failed_mbrs.size(); i++) {
260:                                flush_dest.removeElement(flush_rsp.failed_mbrs
261:                                        .elementAt(i));
262:                                suspected_mbrs.addElement(flush_rsp.failed_mbrs
263:                                        .elementAt(i));
264:                            }
265:                        }
266:                    }
267:                } // while
268:                if (log.isInfoEnabled())
269:                    log.info("flushing completed.");
270:
271:                // Rebroadcast unstable messages
272:                if (rebroadcast_unstable_msgs && rebroadcast_msgs.size() > 0) {
273:
274:                    if (log.isInfoEnabled())
275:                        log.info("re-broadcasting unstable messages ("
276:                                + rebroadcast_msgs.size() + ')');
277:                    // NAKACK layer will rebroadcast the msgs (using the same seqnos assigned earlier)
278:                    synchronized (rebroadcast_mutex) {
279:                        passDown(new Event(Event.REBROADCAST_MSGS,
280:                                rebroadcast_msgs));
281:                        try {
282:                            rebroadcast_mutex.wait(rebroadcast_timeout);
283:                        } catch (Exception e) {
284:                        }
285:                    }
286:                    if (log.isInfoEnabled())
287:                        log.info("re-broadcasting messages completed");
288:                }
289:            }
290:
291:            /**
292:             * Compute a new view, given the current view, the new members and the suspected/left
293:             * members.  Run view update protocol to install a new view in all members (this involves
294:             * casting the new view to all members). The targets for FLUSH and VIEW mcasts are
295:             * computed as follows:<p>
296:             * <pre>
297:             * existing          leaving        suspected          joining
298:             * <p/>
299:             * 1. FLUSH         y                 y               n                 n
300:             * 2. new_view      y                 n               n                 y
301:             * 3. tmp_view      y                 y               n                 y
302:             * (view_dest)
303:             * </pre>
304:             * <p/>
305:             * <ol>
306:             * <li>
307:             * The FLUSH is only sent to the existing and leaving members (they are the only ones that might have
308:             * old messages not yet seen by the group. The suspected members would not answer anyway (because they
309:             * have failed) and the joining members have certainly no old messages.
310:             * <li>
311:             * The new view to be installed includes the existing members plus the joining ones and
312:             * excludes the leaving and suspected members.
313:             * <li>
314:             * A temporary view is sent down the stack as an <em>event</em>. This allows the bottom layer
315:             * (e.g. UDP or TCP) to determine the members to which to send a multicast message. Compared
316:             * to the new view, leaving members are <em>included</em> since they have are waiting for a
317:             * view in which they are not members any longer before they leave. So, if we did not set a
318:             * temporary view, joining members would not receive the view (signalling that they have been
319:             * joined successfully). The temporary view is essentially the current view plus the joining
320:             * members (old members are still part of the current view).
321:             * </ol>
322:             */
323:            public void castViewChange(Vector new_mbrs, Vector old_mbrs,
324:                    Vector suspected_mbrs) {
325:                View new_view, tmp_view;
326:                ViewId new_vid;
327:                Vector flush_dest = computeFlushDestination(suspected_mbrs); // members to which FLUSH/VIEW is sent
328:                Vector view_dest = computeViewDestination(new_mbrs,
329:                        suspected_mbrs); // dest for view change
330:
331:                // next view: current mbrs + new_mbrs - old_mbrs - suspected_mbrs
332:                new_view = getNextView(new_mbrs, old_mbrs, suspected_mbrs);
333:                new_vid = new_view.getVid();
334:
335:                if (log.isInfoEnabled())
336:                    log.info("FLUSH phase, flush_dest: " + flush_dest
337:                            + "\n\tview_dest: " + view_dest + "\n\tnew_view: "
338:                            + new_view + '\n');
339:                flush(flush_dest, suspected_mbrs);
340:                if (log.isInfoEnabled())
341:                    log.info("FLUSH phase done");
342:
343:                /* VIEW protocol. Send to current mbrs + new_mbrs + old_mbrs - suspected_mbrs.  Since
344:                   suspected members were removed from view_dest during the previous FLUSH round(s), we
345:                   only need to add the new members.  Send TMP_VIEW event down, this allows
346:                   FLUSH/NAKACK to set membership correctly */
347:                view_dest = computeViewDestination(new_mbrs, suspected_mbrs);
348:                tmp_view = new View(null, view_dest);
349:
350:                Event view_event = new Event(Event.TMP_VIEW, tmp_view); // so the VIEW msg is sent to the correct mbrs
351:                passDown(view_event); // needed e.g. by failure detector or UDP
352:
353:                if (log.isInfoEnabled())
354:                    log.info("mcasting view {" + new_vid + ", " + view_dest
355:                            + '}');
356:                passDown(new Event(Event.SWITCH_NAK_ACK)); // use ACK scheme for view bcast
357:                Object[] args = new Object[] { new_vid, new_view.getMembers() /* these are the mbrs in the new view */};
358:                MethodCall call = new MethodCall("handleViewChange", args,
359:                        new String[] { ViewId.class.getName(),
360:                                Vector.class.getName() });
361:                callRemoteMethods(view_dest, // send to all members in 'view_dest'
362:                        call, GroupRequest.GET_ALL, view_change_timeout);
363:                if (log.isInfoEnabled())
364:                    log.info("mcasting view completed");
365:                passDown(new Event(Event.SWITCH_NAK)); // back to normal NAKs ...
366:            }
367:
368:            /**
369:             * Assigns the new ltime. Installs view and view_id. Changes role to coordinator if necessary.
370:             * Sends VIEW_CHANGE event up and down the stack.
371:             */
372:            public void installView(ViewId new_view, Vector mbrs) {
373:                Object coord;
374:                int rc;
375:
376:                synchronized (view_mutex) { // serialize access to views
377:                    ltime = Math.max(new_view.getId(), ltime); // compute Lamport logical time
378:                    if (log.isInfoEnabled())
379:                        log.info("received view change, vid=" + new_view);
380:
381:                    /* Check for self-inclusion: if I'm not part of the new membership, I just discard it.
382:                       This ensures that messages sent in view V1 are only received by members of V1 */
383:                    if (checkSelfInclusion(mbrs) == false) {
384:                        if (log.isWarnEnabled())
385:                            log.warn("I'm not member of " + mbrs
386:                                    + ", discarding");
387:                        return;
388:                    }
389:
390:                    if (view_id == null) {
391:                        view_id = (ViewId) new_view.clone();
392:                    } else {
393:                        rc = new_view.compareTo(view_id); // rc should always be a positive number
394:                        if (rc <= 0) { // don't accept view id lower than our own
395:                            if (log.isWarnEnabled())
396:                                log
397:                                        .warn("received view <= current view; discarding it ! "
398:                                                + "(view_id: "
399:                                                + view_id
400:                                                + ", new_view: "
401:                                                + new_view
402:                                                + ')');
403:                            return;
404:                        } else { // the check for vid equality was okay, assign new_view to view_id
405:                            if (new_view.getCoordAddress() != null) {
406:                                view_id = new ViewId(
407:                                        new_view.getCoordAddress(), new_view
408:                                                .getId());
409:                            } else {
410:                                view_id = new ViewId(view_id.getCoordAddress(),
411:                                        new_view.getId());
412:                            }
413:                        }
414:                    }
415:
416:                    if (mbrs != null && mbrs.size() > 0)
417:                        this .mbrs.set(mbrs);
418:
419:                    // Send VIEW_CHANGE event up and down the stack:
420:                    Event view_event = new Event(Event.VIEW_CHANGE,
421:                            makeView(this .mbrs.getMembers()));
422:                    passDown(view_event); // needed e.g. by failure detector or UDP
423:                    passUp(view_event);
424:
425:                    coord = determineCoordinator();
426:                    if (coord != null && coord.equals(local_addr)) {
427:                        if (!haveCoordinatorRole()) // this avoids deadlock on coordinator - when suspect/join occurs simultaneously
428:                            becomeCoordinator();
429:                    } else {
430:                        if (haveCoordinatorRole() && !local_addr.equals(coord))
431:                            becomeParticipant();
432:                    }
433:                }
434:            }
435:
436:            protected Address determineCoordinator() {
437:                synchronized (mbrs) {
438:                    return mbrs != null && mbrs.size() > 0 ? (Address) mbrs
439:                            .elementAt(0) : null;
440:                }
441:            }
442:
443:            /**
444:             * Returns true if local_addr is member of mbrs, else false
445:             */
446:            protected boolean checkSelfInclusion(Vector mbrs) {
447:                Object mbr;
448:                if (mbrs == null)
449:                    return false;
450:                for (int i = 0; i < mbrs.size(); i++) {
451:                    mbr = mbrs.elementAt(i);
452:                    if (mbr != null && local_addr.equals(mbr))
453:                        return true;
454:                }
455:                return false;
456:            }
457:
458:            public View makeView(Vector mbrs) {
459:                Address coord = null;
460:                long id = 0;
461:
462:                if (view_id != null) {
463:                    coord = view_id.getCoordAddress();
464:                    id = view_id.getId();
465:                }
466:                return new View(coord, id, mbrs);
467:            }
468:
469:            public static View makeView(Vector mbrs, ViewId vid) {
470:                Address coord = null;
471:                long id = 0;
472:
473:                if (vid != null) {
474:                    coord = vid.getCoordAddress();
475:                    id = vid.getId();
476:                }
477:                return new View(coord, id, mbrs);
478:            }
479:
480:            /* ------------------------- Request handler methods ----------------------------- */
481:
482:            public void join(Address mbr) {
483:                synchronized (impl_mutex) {
484:                    impl.join(mbr);
485:                }
486:            }
487:
488:            public void leave(Address mbr) {
489:                synchronized (impl_mutex) {
490:                    impl.leave(mbr);
491:                }
492:            }
493:
494:            public void suspect(Address mbr) {
495:                synchronized (impl_mutex) {
496:                    impl.suspect(mbr);
497:                }
498:            }
499:
500:            public void merge(Vector other_coords) {
501:                synchronized (impl_mutex) {
502:                    impl.merge(other_coords);
503:                }
504:            }
505:
506:            public boolean handleJoin(Address mbr) {
507:                synchronized (impl_mutex) {
508:                    return impl.handleJoin(mbr);
509:                }
510:            }
511:
512:            public void handleLeave(Address mbr, boolean suspected) {
513:                synchronized (impl_mutex) {
514:                    impl.handleLeave(mbr, suspected);
515:                }
516:            }
517:
518:            public void handleViewChange(ViewId new_view, Vector mbrs) {
519:                //      synchronized (impl_mutex ) {
520:                impl.handleViewChange(new_view, mbrs);
521:                //      }
522:            }
523:
524:            public View handleMerge(ViewId other_vid, Vector other_members) {
525:                synchronized (impl_mutex) {
526:                    if (log.isTraceEnabled()) {
527:                        View v = impl.handleMerge(other_vid, other_members);
528:                        if (log.isInfoEnabled())
529:                            log.info("returning view: " + v);
530:                        return v;
531:                    }
532:                    return impl.handleMerge(other_vid, other_members);
533:                }
534:            }
535:
536:            public void handleSuspect(Address mbr) {
537:                synchronized (impl_mutex) {
538:                    impl.handleSuspect(mbr);
539:                }
540:            }
541:
542:            /* --------------------- End of Request handler methods -------------------------- */
543:
544:            boolean checkForViewEnforcer(Protocol up_protocol) {
545:                String prot_name;
546:
547:                if (up_protocol == null)
548:                    return false;
549:                prot_name = up_protocol.getName();
550:                if (prot_name != null && "VIEW_ENFORCER".equals(prot_name))
551:                    return true;
552:                return checkForViewEnforcer(up_protocol.getUpProtocol());
553:            }
554:
555:            /**
556:             * <b>Callback</b>. Called by superclass when event may be handled.<p>
557:             * <b>Do not use <code>PassUp</code> in this method as the event is passed up
558:             * by default by the superclass after this method returns !</b>
559:             * 
560:             * @return boolean Defaults to true. If false, event will not be passed up the stack.
561:             */
562:            public boolean handleUpEvent(Event evt) {
563:                switch (evt.getType()) {
564:
565:                case Event.CONNECT_OK: // sent by someone else, but WE are responsible for sending this !
566:                case Event.DISCONNECT_OK: // dito (e.g. sent by UDP layer)
567:                    return false;
568:
569:                case Event.SET_LOCAL_ADDRESS:
570:                    local_addr = (Address) evt.getArg();
571:
572:                    if (print_local_addr) {
573:                        System.out
574:                                .println("\n-------------------------------------------------------\n"
575:                                        + "GMS: address is "
576:                                        + local_addr
577:                                        + "\n-------------------------------------------------------");
578:                    }
579:                    return true; // pass up
580:
581:                case Event.SUSPECT:
582:                    try {
583:                        event_queue.add(evt);
584:                    } catch (Exception e) {
585:                    }
586:                    return true; // pass up
587:
588:                case Event.MERGE:
589:                    try {
590:                        event_queue.add(evt);
591:                    } catch (Exception e) {
592:                    }
593:                    return false; // don't pass up
594:
595:                case Event.FLUSH_OK:
596:                    synchronized (flush_mutex) {
597:                        flush_rsp = (FlushRsp) evt.getArg();
598:                        flush_mutex.notifyAll();
599:                    }
600:                    return false; // don't pass up
601:
602:                case Event.REBROADCAST_MSGS_OK:
603:                    synchronized (rebroadcast_mutex) {
604:                        rebroadcast_mutex.notifyAll();
605:                    }
606:                    return false; // don't pass up
607:                }
608:
609:                return impl.handleUpEvent(evt);
610:            }
611:
612:            /**
613:             * <b>Callback</b>. Called by superclass when event may be handled.<p>
614:             * <b>Do not use <code>PassDown</code> in this method as the event is passed down
615:             * by default by the superclass after this method returns !</b>
616:             * 
617:             * @return boolean Defaults to true. If false, event will not be passed down the stack.
618:             */
619:            public boolean handleDownEvent(Event evt) {
620:                switch (evt.getType()) {
621:
622:                case Event.CONNECT:
623:                    passDown(evt);
624:                    try {
625:                        group_addr = (String) evt.getArg();
626:                    } catch (ClassCastException cce) {
627:                        if (log.isErrorEnabled())
628:                            log.error("group address must "
629:                                    + "be a string (group name) to make sense");
630:                    }
631:                    impl.join(local_addr);
632:                    passUp(new Event(Event.CONNECT_OK));
633:                    startEventHandlerThread();
634:                    return false; // don't pass down: was already passed down
635:
636:                case Event.DISCONNECT:
637:                    impl.leave((Address) evt.getArg());
638:                    passUp(new Event(Event.DISCONNECT_OK));
639:                    stopEventHandlerThread();
640:                    initState();
641:                    return true; // pass down
642:                }
643:
644:                return impl.handleDownEvent(evt);
645:            }
646:
647:            // Priority handling, otherwise GMS.down(DISCONNECT) would block !
648:            // Similar to FLUSH protocol
649:            public void receiveDownEvent(Event evt) {
650:                if (evt.getType() == Event.BLOCK_OK) {
651:                    passDown(evt);
652:                    return;
653:                }
654:                super .receiveDownEvent(evt);
655:            }
656:
657:            /**
658:             * Setup the Protocol instance acording to the configuration string
659:             */
660:            public boolean setProperties(Properties props) {
661:                String str;
662:
663:                super .setProperties(props);
664:                str = props.getProperty("join_timeout"); // time to wait for JOIN
665:                if (str != null) {
666:                    join_timeout = Long.parseLong(str);
667:                    props.remove("join_timeout");
668:                }
669:
670:                str = props.getProperty("print_local_addr");
671:                if (str != null) {
672:                    print_local_addr = Boolean.valueOf(str).booleanValue();
673:                    props.remove("print_local_addr");
674:                }
675:
676:                str = props.getProperty("view_change_timeout"); // time to wait for VIEW_CHANGE
677:                if (str != null) {
678:                    view_change_timeout = Long.parseLong(str);
679:                    props.remove("view_change_timeout");
680:                }
681:
682:                str = props.getProperty("join_retry_timeout"); // time to wait between JOINs
683:                if (str != null) {
684:                    join_retry_timeout = Long.parseLong(str);
685:                    props.remove("join_retry_timeout");
686:                }
687:
688:                str = props.getProperty("leave_timeout"); // time to wait until coord responds to LEAVE req.
689:                if (str != null) {
690:                    leave_timeout = Long.parseLong(str);
691:                    props.remove("leave_timeout");
692:                }
693:
694:                str = props.getProperty("flush_timeout"); // time to wait until FLUSH completes (0=forever)
695:                if (str != null) {
696:                    flush_timeout = Long.parseLong(str);
697:                    props.remove("flush_timeout");
698:                }
699:
700:                str = props.getProperty("rebroadcast_unstable_msgs"); // bcast unstable msgs (recvd from FLUSH)
701:                if (str != null) {
702:                    rebroadcast_unstable_msgs = Boolean.valueOf(str)
703:                            .booleanValue();
704:                    props.remove("rebroadcast_unstable_msgs");
705:                }
706:
707:                str = props.getProperty("rebroadcast_timeout"); // time to wait until REBROADCAST_MSGS completes
708:                if (str != null) {
709:                    rebroadcast_timeout = Long.parseLong(str);
710:                    props.remove("rebroadcast_timeout");
711:                }
712:
713:                str = props.getProperty("disable_initial_coord"); // allow initial mbr to become coord or not
714:                if (str != null) {
715:                    disable_initial_coord = Boolean.valueOf(str).booleanValue();
716:                    props.remove("disable_initial_coord");
717:                }
718:
719:                if (props.size() > 0) {
720:                    log
721:                            .error("GMS.setProperties(): the following properties are not recognized: "
722:                                    + props);
723:
724:                    return false;
725:                }
726:                return true;
727:            }
728:
729:            public void run() {
730:                Event evt;
731:
732:                while (evt_thread != null && event_queue != null) {
733:                    try {
734:                        evt = (Event) event_queue.remove();
735:                        switch (evt.getType()) {
736:                        case Event.SUSPECT:
737:                            impl.suspect((Address) evt.getArg());
738:                            break;
739:                        case Event.MERGE:
740:                            impl.merge((Vector) evt.getArg());
741:                            break;
742:                        default:
743:                            if (log.isErrorEnabled())
744:                                log
745:                                        .error("event handler thread encountered event of type "
746:                                                + Event.type2String(evt
747:                                                        .getType())
748:                                                + ": not handled by me !");
749:                            break;
750:                        }
751:                    } catch (QueueClosedException closed) {
752:                        break;
753:                    } catch (Exception ex) {
754:                        if (log.isWarnEnabled())
755:                            log.warn("exception=" + ex);
756:                    }
757:                }
758:            }
759:
760:            /* ------------------------------- Private Methods --------------------------------- */
761:
762:            private void initState() {
763:                becomeClient();
764:                impl.init();
765:                view_id = null;
766:                if (mbrs != null)
767:                    mbrs.clear();
768:            }
769:
770:            private void startEventHandlerThread() {
771:                if (event_queue == null)
772:                    event_queue = new Queue();
773:                if (evt_thread == null) {
774:                    evt_thread = new Thread(this , "GMS.EventHandlerThread");
775:                    evt_thread.setDaemon(true);
776:                    evt_thread.start();
777:                }
778:            }
779:
780:            private void stopEventHandlerThread() {
781:                if (evt_thread != null) {
782:                    event_queue.close(false);
783:                    event_queue = null;
784:                    evt_thread = null;
785:                    return;
786:                }
787:
788:                if (event_queue != null) {
789:                    event_queue.close(false);
790:                    event_queue = null;
791:                }
792:            }
793:
794:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.