Source Code Cross Referenced for FD.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » protocols » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.protocols 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        // $Id: FD.java,v 1.40.2.2 2007/04/27 08:03:51 belaban Exp $
002:
003:        package org.jgroups.protocols;
004:
005:        import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
006:        import org.jgroups.*;
007:        import org.jgroups.stack.Protocol;
008:        import org.jgroups.util.*;
009:
010:        import java.io.*;
011:        import java.util.*;
012:        import java.util.List;
013:
014:        /**
015:         * Failure detection based on simple heartbeat protocol. Regularly polls members for
016:         * liveness. Multicasts SUSPECT messages when a member is not reachable. The simple
017:         * algorithms works as follows: the membership is known and ordered. Each HB protocol
018:         * periodically sends an 'are-you-alive' message to its *neighbor*. A neighbor is the next in
019:         * rank in the membership list, which is recomputed upon a view change. When a response hasn't
020:         * been received for n milliseconds and m tries, the corresponding member is suspected (and
021:         * eventually excluded if faulty).<p>
022:         * FD starts when it detects (in a view change notification) that there are at least
023:         * 2 members in the group. It stops running when the membership drops below 2.<p>
024:         * When a message is received from the monitored neighbor member, it causes the pinger thread to
025:         * 'skip' sending the next are-you-alive message. Thus, traffic is reduced.<p>
026:         * When we receive a ping from a member that's not in the membership list, we shun it by sending it a
027:         * NOT_MEMBER message. That member will then leave the group (and possibly rejoin). This is only done if
028:         * <code>shun</code> is true.
029:         * @author Bela Ban
030:         * @version $Revision: 1.40.2.2 $
031:         */
032:        public class FD extends Protocol {
033:            Address ping_dest = null;
034:            Address local_addr = null;
035:            long timeout = 3000; // number of millisecs to wait for an are-you-alive msg
036:            long last_ack = System.currentTimeMillis();
037:            int num_tries = 0;
038:            int max_tries = 2; // number of times to send a are-you-alive msg (tot time= max_tries*timeout)
039:            final List members = new CopyOnWriteArrayList();
040:            final Hashtable invalid_pingers = new Hashtable(7); // keys=Address, val=Integer (number of pings from suspected mbrs)
041:
042:            /** Members from which we select ping_dest. may be subset of {@link #members} */
043:            final List pingable_mbrs = new CopyOnWriteArrayList();
044:
045:            boolean shun = true;
046:            TimeScheduler timer = null;
047:            private Monitor monitor = null; // task that performs the actual monitoring for failure detection
048:            private final Object monitor_mutex = new Object();
049:            protected int num_heartbeats = 0;
050:            protected int num_suspect_events = 0;
051:
052:            /** Transmits SUSPECT message until view change or UNSUSPECT is received */
053:            protected final Broadcaster bcast_task = new Broadcaster();
054:            final static String name = "FD";
055:
056:            BoundedList suspect_history = new BoundedList(20);
057:
058:            public String getName() {
059:                return name;
060:            }
061:
062:            public String getLocalAddress() {
063:                return local_addr != null ? local_addr.toString() : "null";
064:            }
065:
066:            public String getMembers() {
067:                return members != null ? members.toString() : "null";
068:            }
069:
070:            public String getPingableMembers() {
071:                return pingable_mbrs != null ? pingable_mbrs.toString()
072:                        : "null";
073:            }
074:
075:            public String getPingDest() {
076:                return ping_dest != null ? ping_dest.toString() : "null";
077:            }
078:
079:            public int getNumberOfHeartbeatsSent() {
080:                return num_heartbeats;
081:            }
082:
083:            public int getNumSuspectEventsGenerated() {
084:                return num_suspect_events;
085:            }
086:
087:            public long getTimeout() {
088:                return timeout;
089:            }
090:
091:            public void setTimeout(long timeout) {
092:                this .timeout = timeout;
093:            }
094:
095:            public int getMaxTries() {
096:                return max_tries;
097:            }
098:
099:            public void setMaxTries(int max_tries) {
100:                this .max_tries = max_tries;
101:            }
102:
103:            public int getCurrentNumTries() {
104:                return num_tries;
105:            }
106:
107:            public boolean isShun() {
108:                return shun;
109:            }
110:
111:            public void setShun(boolean flag) {
112:                this .shun = flag;
113:            }
114:
115:            public String printSuspectHistory() {
116:                StringBuffer sb = new StringBuffer();
117:                for (Enumeration en = suspect_history.elements(); en
118:                        .hasMoreElements();) {
119:                    sb.append(new Date()).append(": ").append(en.nextElement())
120:                            .append("\n");
121:                }
122:                return sb.toString();
123:            }
124:
125:            public boolean setProperties(Properties props) {
126:                String str;
127:
128:                super .setProperties(props);
129:                str = props.getProperty("timeout");
130:                if (str != null) {
131:                    timeout = Long.parseLong(str);
132:                    props.remove("timeout");
133:                }
134:
135:                str = props.getProperty("max_tries"); // before suspecting a member
136:                if (str != null) {
137:                    max_tries = Integer.parseInt(str);
138:                    props.remove("max_tries");
139:                }
140:
141:                str = props.getProperty("shun");
142:                if (str != null) {
143:                    shun = Boolean.valueOf(str).booleanValue();
144:                    props.remove("shun");
145:                }
146:
147:                if (!props.isEmpty()) {
148:                    log.error("the following properties are not recognized: "
149:                            + props);
150:                    return false;
151:                }
152:                return true;
153:            }
154:
155:            public void resetStats() {
156:                num_heartbeats = num_suspect_events = 0;
157:                suspect_history.removeAll();
158:            }
159:
160:            public void init() throws Exception {
161:                if (stack != null && stack.timer != null)
162:                    timer = stack.timer;
163:                else
164:                    throw new Exception(
165:                            "FD.init(): timer cannot be retrieved from protocol stack");
166:            }
167:
168:            public void stop() {
169:                stopMonitor();
170:            }
171:
172:            private Object getPingDest(List mbrs) {
173:                Object tmp, retval = null;
174:
175:                if (mbrs == null || mbrs.size() < 2 || local_addr == null)
176:                    return null;
177:                for (int i = 0; i < mbrs.size(); i++) {
178:                    tmp = mbrs.get(i);
179:                    if (local_addr.equals(tmp)) {
180:                        if (i + 1 >= mbrs.size())
181:                            retval = mbrs.get(0);
182:                        else
183:                            retval = mbrs.get(i + 1);
184:                        break;
185:                    }
186:                }
187:                return retval;
188:            }
189:
190:            private void startMonitor() {
191:                synchronized (monitor_mutex) {
192:                    if (monitor != null && monitor.started == false) {
193:                        monitor = null;
194:                    }
195:                    if (monitor == null) {
196:                        monitor = createMonitor();
197:                        last_ack = System.currentTimeMillis(); // start from scratch
198:                        timer.add(monitor, true); // fixed-rate scheduling
199:                        num_tries = 0;
200:                    }
201:                }
202:            }
203:
204:            private void stopMonitor() {
205:                synchronized (monitor_mutex) {
206:                    if (monitor != null) {
207:                        monitor.stop();
208:                        monitor = null;
209:                    }
210:                }
211:            }
212:
213:            protected Monitor createMonitor() {
214:                return new Monitor();
215:            }
216:
217:            public void up(Event evt) {
218:                Message msg;
219:                FdHeader hdr;
220:                Object sender, tmphdr;
221:
222:                switch (evt.getType()) {
223:
224:                case Event.SET_LOCAL_ADDRESS:
225:                    local_addr = (Address) evt.getArg();
226:                    break;
227:
228:                case Event.MSG:
229:                    msg = (Message) evt.getArg();
230:                    tmphdr = msg.getHeader(name);
231:                    if (tmphdr == null || !(tmphdr instanceof  FdHeader)) {
232:                        if (ping_dest != null
233:                                && (sender = msg.getSrc()) != null) {
234:                            if (ping_dest.equals(sender)) {
235:                                last_ack = System.currentTimeMillis();
236:                                if (log.isTraceEnabled())
237:                                    log.trace("received msg from " + sender
238:                                            + " (counts as ack)");
239:                                num_tries = 0;
240:                            }
241:                        }
242:                        break; // message did not originate from FD layer, just pass up
243:                    }
244:
245:                    hdr = (FdHeader) msg.removeHeader(name);
246:                    switch (hdr.type) {
247:                    case FdHeader.HEARTBEAT: // heartbeat request; send heartbeat ack
248:                        Address hb_sender = msg.getSrc();
249:                        if (log.isTraceEnabled())
250:                            log.trace("received are-you-alive from "
251:                                    + hb_sender + ", sending response");
252:                        sendHeartbeatResponse(hb_sender);
253:
254:                        // 2. Shun the sender of a HEARTBEAT message if that sender is not a member. This will cause
255:                        //    the sender to leave the group (and possibly rejoin it later)
256:                        if (shun)
257:                            shunInvalidHeartbeatSender(hb_sender);
258:                        break; // don't pass up !
259:
260:                    case FdHeader.HEARTBEAT_ACK: // heartbeat ack
261:                        if (ping_dest != null && ping_dest.equals(hdr.from)) {
262:                            last_ack = System.currentTimeMillis();
263:                            num_tries = 0;
264:                            if (log.isDebugEnabled())
265:                                log.debug("received ack from " + hdr.from);
266:                        } else {
267:                            /* modified by Luís Palma Nunes Mendes on 11 Aug 2006 
268:                             * By not doing this check, if we keep receiving HEARTBEAT_ACK messages from
269:                             * other members than ping_dest, Monitor Thread would be restarted every time,
270:                             * taking down the timeouts with it. This inhibits ping_dest Failure Detection.
271:                             */
272:                            synchronized (this ) {
273:                                Address previewNextPingDest = (Address) getPingDest(pingable_mbrs);
274:                                /* We are only interested to stop or restart the monitor thread iff the current target ping_dest is going
275:                                   change */
276:                                if (log.isDebugEnabled())
277:                                    log
278:                                            .debug("Recevied Ack. is invalid (was from: "
279:                                                    + hdr.from + "), ");
280:                                if ((previewNextPingDest != null
281:                                        && ping_dest != null && !previewNextPingDest
282:                                        .equals(ping_dest))
283:                                        || (previewNextPingDest != null && ping_dest == null)
284:                                        || (previewNextPingDest == null && ping_dest != null)) {
285:                                    stop();
286:                                    ping_dest = previewNextPingDest;
287:                                    if (log.isDebugEnabled())
288:                                        log
289:                                                .debug("changing to a new destination: "
290:                                                        + ping_dest);
291:                                    if (ping_dest != null) {
292:                                        try {
293:                                            startMonitor();
294:                                        } catch (Exception ex) {
295:                                            if (log.isWarnEnabled())
296:                                                log
297:                                                        .warn("exception when calling startMonitor(): "
298:                                                                + ex);
299:                                        }
300:                                    }
301:                                } else if (ping_dest == null) {
302:                                    if (log.isTraceEnabled())
303:                                        log
304:                                                .trace("and ping_dest is null, stop Monitor");
305:                                    stop();
306:                                } else {
307:                                    if (log.isTraceEnabled())
308:                                        log
309:                                                .trace("but we must keep pinging same destination");
310:                                }
311:                            }
312:                        }
313:                        break;
314:
315:                    case FdHeader.SUSPECT:
316:                        if (hdr.mbrs != null) {
317:                            if (log.isTraceEnabled())
318:                                log.trace("[SUSPECT] suspect hdr is " + hdr);
319:                            for (int i = 0; i < hdr.mbrs.size(); i++) {
320:                                Address m = (Address) hdr.mbrs.elementAt(i);
321:                                if (local_addr != null && m.equals(local_addr)) {
322:                                    if (log.isWarnEnabled())
323:                                        log
324:                                                .warn("I was suspected by "
325:                                                        + msg.getSrc()
326:                                                        + "; ignoring the SUSPECT "
327:                                                        + "message and sending back a HEARTBEAT_ACK");
328:                                    sendHeartbeatResponse(msg.getSrc());
329:                                    continue;
330:                                } else {
331:                                    pingable_mbrs.remove(m);
332:                                    ping_dest = (Address) getPingDest(pingable_mbrs);
333:                                }
334:                                passUp(new Event(Event.SUSPECT, m));
335:                                passDown(new Event(Event.SUSPECT, m));
336:                            }
337:                        }
338:                        break;
339:
340:                    case FdHeader.NOT_MEMBER:
341:                        if (shun) {
342:                            if (log.isDebugEnabled())
343:                                log
344:                                        .debug("[NOT_MEMBER] I'm being shunned; exiting");
345:                            passUp(new Event(Event.EXIT));
346:                        }
347:                        break;
348:                    }
349:                    return;
350:                }
351:                passUp(evt); // pass up to the layer above us
352:            }
353:
354:            public void down(Event evt) {
355:                View v;
356:
357:                switch (evt.getType()) {
358:                case Event.VIEW_CHANGE:
359:                    passDown(evt);
360:                    stop();
361:                    synchronized (this ) {
362:                        v = (View) evt.getArg();
363:                        members.clear();
364:                        members.addAll(v.getMembers());
365:                        bcast_task.adjustSuspectedMembers(members);
366:                        pingable_mbrs.clear();
367:                        pingable_mbrs.addAll(members);
368:                        ping_dest = (Address) getPingDest(pingable_mbrs);
369:                        if (ping_dest != null) {
370:                            try {
371:                                startMonitor();
372:                            } catch (Exception ex) {
373:                                if (log.isWarnEnabled())
374:                                    log
375:                                            .warn("exception when calling startMonitor(): "
376:                                                    + ex);
377:                            }
378:                        }
379:                    }
380:                    break;
381:
382:                case Event.UNSUSPECT:
383:                    unsuspect((Address) evt.getArg());
384:                    passDown(evt);
385:                    break;
386:
387:                default:
388:                    passDown(evt);
389:                    break;
390:                }
391:            }
392:
393:            private void sendHeartbeatResponse(Address dest) {
394:                Message hb_ack = new Message(dest, null, null);
395:                FdHeader tmp_hdr = new FdHeader(FdHeader.HEARTBEAT_ACK);
396:                tmp_hdr.from = local_addr;
397:                hb_ack.putHeader(name, tmp_hdr);
398:                passDown(new Event(Event.MSG, hb_ack));
399:            }
400:
401:            private void unsuspect(Address mbr) {
402:                bcast_task.removeSuspectedMember(mbr);
403:                pingable_mbrs.clear();
404:                pingable_mbrs.addAll(members);
405:                pingable_mbrs.removeAll(bcast_task.getSuspectedMembers());
406:                ping_dest = (Address) getPingDest(pingable_mbrs);
407:            }
408:
409:            /**
410:             * If sender is not a member, send a NOT_MEMBER to sender (after n pings received)
411:             */
412:            private void shunInvalidHeartbeatSender(Address hb_sender) {
413:                int num_pings = 0;
414:                Message shun_msg;
415:
416:                if (hb_sender != null && members != null
417:                        && !members.contains(hb_sender)) {
418:                    if (invalid_pingers.containsKey(hb_sender)) {
419:                        num_pings = ((Integer) invalid_pingers.get(hb_sender))
420:                                .intValue();
421:                        if (num_pings >= max_tries) {
422:                            if (log.isDebugEnabled())
423:                                log.debug(hb_sender + " is not in " + members
424:                                        + " ! Shunning it");
425:                            shun_msg = new Message(hb_sender, null, null);
426:                            shun_msg.putHeader(name, new FdHeader(
427:                                    FdHeader.NOT_MEMBER));
428:                            passDown(new Event(Event.MSG, shun_msg));
429:                            invalid_pingers.remove(hb_sender);
430:                        } else {
431:                            num_pings++;
432:                            invalid_pingers.put(hb_sender, new Integer(
433:                                    num_pings));
434:                        }
435:                    } else {
436:                        num_pings++;
437:                        invalid_pingers.put(hb_sender, new Integer(num_pings));
438:                    }
439:                }
440:            }
441:
442:            public static class FdHeader extends Header implements  Streamable {
443:                public static final byte HEARTBEAT = 0;
444:                public static final byte HEARTBEAT_ACK = 1;
445:                public static final byte SUSPECT = 2;
446:                public static final byte NOT_MEMBER = 3; // received as response by pinged mbr when we are not a member
447:
448:                byte type = HEARTBEAT;
449:                Vector mbrs = null;
450:                Address from = null; // member who detected that suspected_mbr has failed
451:
452:                public FdHeader() {
453:                } // used for externalization
454:
455:                public FdHeader(byte type) {
456:                    this .type = type;
457:                }
458:
459:                public FdHeader(byte type, Vector mbrs, Address from) {
460:                    this (type);
461:                    this .mbrs = mbrs;
462:                    this .from = from;
463:                }
464:
465:                public String toString() {
466:                    switch (type) {
467:                    case HEARTBEAT:
468:                        return "[FD: heartbeat]";
469:                    case HEARTBEAT_ACK:
470:                        return "[FD: heartbeat ack]";
471:                    case SUSPECT:
472:                        return "[FD: SUSPECT (suspected_mbrs=" + mbrs
473:                                + ", from=" + from + ")]";
474:                    case NOT_MEMBER:
475:                        return "[FD: NOT_MEMBER]";
476:                    default:
477:                        return "[FD: unknown type (" + type + ")]";
478:                    }
479:                }
480:
481:                public void writeExternal(ObjectOutput out) throws IOException {
482:                    out.writeByte(type);
483:                    if (mbrs == null)
484:                        out.writeBoolean(false);
485:                    else {
486:                        out.writeBoolean(true);
487:                        out.writeInt(mbrs.size());
488:                        for (Iterator it = mbrs.iterator(); it.hasNext();) {
489:                            Address addr = (Address) it.next();
490:                            Marshaller.write(addr, out);
491:                        }
492:                    }
493:                    Marshaller.write(from, out);
494:                }
495:
496:                public void readExternal(ObjectInput in) throws IOException,
497:                        ClassNotFoundException {
498:                    type = in.readByte();
499:                    boolean mbrs_not_null = in.readBoolean();
500:                    if (mbrs_not_null) {
501:                        int len = in.readInt();
502:                        mbrs = new Vector(11);
503:                        for (int i = 0; i < len; i++) {
504:                            Address addr = (Address) Marshaller.read(in);
505:                            mbrs.add(addr);
506:                        }
507:                    }
508:                    from = (Address) Marshaller.read(in);
509:                }
510:
511:                public long size() {
512:                    int retval = Global.BYTE_SIZE; // type
513:                    retval += Util.size(mbrs);
514:                    retval += Util.size(from);
515:                    return retval;
516:                }
517:
518:                public void writeTo(DataOutputStream out) throws IOException {
519:                    out.writeByte(type);
520:                    Util.writeAddresses(mbrs, out);
521:                    Util.writeAddress(from, out);
522:                }
523:
524:                public void readFrom(DataInputStream in) throws IOException,
525:                        IllegalAccessException, InstantiationException {
526:                    type = in.readByte();
527:                    mbrs = (Vector) Util.readAddresses(in, Vector.class);
528:                    from = Util.readAddress(in);
529:                }
530:
531:            }
532:
533:            protected class Monitor implements  TimeScheduler.Task {
534:                boolean started = true;
535:
536:                public void stop() {
537:                    started = false;
538:                }
539:
540:                public boolean cancelled() {
541:                    return !started;
542:                }
543:
544:                public long nextInterval() {
545:                    return timeout;
546:                }
547:
548:                public void run() {
549:                    Message hb_req;
550:                    long not_heard_from; // time in msecs we haven't heard from ping_dest
551:
552:                    if (ping_dest == null) {
553:                        if (log.isWarnEnabled())
554:                            log.warn("ping_dest is null: members=" + members
555:                                    + ", pingable_mbrs=" + pingable_mbrs
556:                                    + ", local_addr=" + local_addr);
557:                        return;
558:                    }
559:
560:                    // 1. send heartbeat request
561:                    hb_req = new Message(ping_dest, null, null);
562:                    hb_req.putHeader(name, new FdHeader(FdHeader.HEARTBEAT)); // send heartbeat request
563:                    if (log.isDebugEnabled())
564:                        log.debug("sending are-you-alive msg to " + ping_dest
565:                                + " (own address=" + local_addr + ')');
566:                    passDown(new Event(Event.MSG, hb_req));
567:                    num_heartbeats++;
568:
569:                    // 2. If the time of the last heartbeat is > timeout and max_tries heartbeat messages have not been
570:                    //    received, then broadcast a SUSPECT message. Will be handled by coordinator, which may install
571:                    //    a new view
572:                    not_heard_from = System.currentTimeMillis() - last_ack;
573:                    // quick & dirty fix: increase timeout by 500msecs to allow for latency (bela June 27 2003)
574:                    if (not_heard_from > timeout + 500) { // no heartbeat ack for more than timeout msecs
575:                        if (num_tries >= max_tries) {
576:                            if (log.isDebugEnabled())
577:                                log.debug("[" + local_addr
578:                                        + "]: received no heartbeat ack from "
579:                                        + ping_dest + " for " + (num_tries + 1)
580:                                        + " times ("
581:                                        + ((num_tries + 1) * timeout)
582:                                        + " milliseconds), suspecting it");
583:                            // broadcast a SUSPECT message to all members - loop until
584:                            // unsuspect or view change is received
585:                            bcast_task.addSuspectedMember(ping_dest);
586:                            num_tries = 0;
587:                            if (stats) {
588:                                num_suspect_events++;
589:                                suspect_history.add(ping_dest);
590:                            }
591:                        } else {
592:                            if (log.isDebugEnabled())
593:                                log.debug("heartbeat missing from " + ping_dest
594:                                        + " (number=" + num_tries + ')');
595:                            num_tries++;
596:                        }
597:                    }
598:                }
599:
600:                public String toString() {
601:                    return Boolean.toString(started);
602:                }
603:
604:            }
605:
606:            /**
607:             * Task that periodically broadcasts a list of suspected members to the group. Goal is not to lose
608:             * a SUSPECT message: since these are bcast unreliably, they might get dropped. The BroadcastTask makes
609:             * sure they are retransmitted until a view has been received which doesn't contain the suspected members
610:             * any longer. Then the task terminates.
611:             */
612:            protected final class Broadcaster {
613:                final Vector suspected_mbrs = new Vector(7);
614:                BroadcastTask task = null;
615:                private final Object bcast_mutex = new Object();
616:
617:                Vector getSuspectedMembers() {
618:                    return suspected_mbrs;
619:                }
620:
621:                /**
622:                 * Starts a new task, or - if already running - adds the argument to the running task.
623:                 * @param suspect
624:                 */
625:                private void startBroadcastTask(Address suspect) {
626:                    synchronized (bcast_mutex) {
627:                        if (task == null || task.cancelled()) {
628:                            task = new BroadcastTask((Vector) suspected_mbrs
629:                                    .clone());
630:                            task.addSuspectedMember(suspect);
631:                            task.run(); // run immediately the first time
632:                            timer.add(task); // then every timeout milliseconds, until cancelled
633:                            if (log.isTraceEnabled())
634:                                log.trace("BroadcastTask started");
635:                        } else {
636:                            task.addSuspectedMember(suspect);
637:                        }
638:                    }
639:                }
640:
641:                private void stopBroadcastTask() {
642:                    synchronized (bcast_mutex) {
643:                        if (task != null) {
644:                            task.stop();
645:                            task = null;
646:                        }
647:                    }
648:                }
649:
650:                /** Adds a suspected member. Starts the task if not yet running */
651:                protected void addSuspectedMember(Address mbr) {
652:                    if (mbr == null)
653:                        return;
654:                    if (!members.contains(mbr))
655:                        return;
656:                    boolean added = false;
657:                    synchronized (suspected_mbrs) {
658:                        if (!suspected_mbrs.contains(mbr)) {
659:                            suspected_mbrs.addElement(mbr);
660:                            added = true;
661:                        }
662:                    }
663:                    if (added)
664:                        startBroadcastTask(mbr);
665:                }
666:
667:                void removeSuspectedMember(Address suspected_mbr) {
668:                    if (suspected_mbr == null)
669:                        return;
670:                    if (log.isDebugEnabled())
671:                        log.debug("member is " + suspected_mbr);
672:                    synchronized (suspected_mbrs) {
673:                        suspected_mbrs.removeElement(suspected_mbr);
674:                        if (suspected_mbrs.isEmpty())
675:                            stopBroadcastTask();
676:                    }
677:                }
678:
679:                void removeAll() {
680:                    synchronized (suspected_mbrs) {
681:                        suspected_mbrs.removeAllElements();
682:                        stopBroadcastTask();
683:                    }
684:                }
685:
686:                /** Removes all elements from suspected_mbrs that are <em>not</em> in the new membership */
687:                void adjustSuspectedMembers(List new_mbrship) {
688:                    if (new_mbrship == null || new_mbrship.isEmpty())
689:                        return;
690:                    StringBuffer sb = new StringBuffer();
691:                    synchronized (suspected_mbrs) {
692:                        sb.append("suspected_mbrs: ").append(suspected_mbrs);
693:                        suspected_mbrs.retainAll(new_mbrship);
694:                        if (suspected_mbrs.isEmpty())
695:                            stopBroadcastTask();
696:                        sb.append(", after adjustment: ")
697:                                .append(suspected_mbrs);
698:                        log.debug(sb.toString());
699:                    }
700:                }
701:            }
702:
703:            protected final class BroadcastTask implements  TimeScheduler.Task {
704:                boolean cancelled = false;
705:                private final Vector suspected_members = new Vector();
706:
707:                BroadcastTask(Vector suspected_members) {
708:                    this .suspected_members.addAll(suspected_members);
709:                }
710:
711:                public void stop() {
712:                    cancelled = true;
713:                    suspected_members.clear();
714:                    if (log.isTraceEnabled())
715:                        log.trace("BroadcastTask stopped");
716:                }
717:
718:                public boolean cancelled() {
719:                    return cancelled;
720:                }
721:
722:                public long nextInterval() {
723:                    return FD.this .timeout;
724:                }
725:
726:                public void run() {
727:                    Message suspect_msg;
728:                    FD.FdHeader hdr;
729:
730:                    synchronized (suspected_members) {
731:                        if (suspected_members.isEmpty()) {
732:                            stop();
733:                            if (log.isDebugEnabled())
734:                                log.debug("task done (no suspected members)");
735:                            return;
736:                        }
737:
738:                        hdr = new FdHeader(FdHeader.SUSPECT);
739:                        hdr.mbrs = (Vector) suspected_members.clone();
740:                        hdr.from = local_addr;
741:                    }
742:                    suspect_msg = new Message(); // mcast SUSPECT to all members
743:                    suspect_msg.putHeader(name, hdr);
744:                    if (log.isDebugEnabled())
745:                        log
746:                                .debug("broadcasting SUSPECT message [suspected_mbrs="
747:                                        + suspected_members + "] to group");
748:                    passDown(new Event(Event.MSG, suspect_msg));
749:                    if (log.isDebugEnabled())
750:                        log.debug("task done");
751:                }
752:
753:                public void addSuspectedMember(Address suspect) {
754:                    if (suspect != null && !suspected_members.contains(suspect)) {
755:                        suspected_members.add(suspect);
756:                    }
757:                }
758:            }
759:
760:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.