Source Code Cross Referenced for FD_PROB.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » protocols » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.protocols 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        // $Id: FD_PROB.java,v 1.10.6.1 2007/04/27 08:03:52 belaban Exp $
002:
003:        package org.jgroups.protocols;
004:
005:        import org.jgroups.*;
006:        import org.jgroups.stack.Protocol;
007:        import org.jgroups.util.Util;
008:        import org.jgroups.util.Streamable;
009:
010:        import java.io.*;
011:        import java.util.Enumeration;
012:        import java.util.Hashtable;
013:        import java.util.Properties;
014:        import java.util.Vector;
015:
016:        /**
017:         * Probabilistic failure detection protocol based on "A Gossip-Style Failure Detection Service"
018:         * by Renesse, Minsky and Hayden.<p>
019:         * Each member maintains a list of all other members: for each member P, 2 data are maintained, a heartbeat
020:         * counter and the time of the last increment of the counter. Each member periodically sends its own heartbeat
021:         * counter list to a randomly chosen member Q. Q updates its own heartbeat counter list and the associated
022:         * time (if counter was incremented). Each member periodically increments its own counter. If, when sending
023:         * its heartbeat counter list, a member P detects that another member Q's heartbeat counter was not incremented
024:         * for timeout seconds, Q will be suspected.<p>
025:         * This protocol can be used both with a PBCAST *and* regular stacks.
026:         * @author Bela Ban 1999
027:         * @version $Revision: 1.10.6.1 $
028:         */
029:        public class FD_PROB extends Protocol implements  Runnable {
030:            Address local_addr = null;
031:            Thread hb = null;
032:            long timeout = 3000; // before a member with a non updated timestamp is suspected
033:            long gossip_interval = 1000;
034:            Vector members = null;
035:            final Hashtable counters = new Hashtable(); // keys=Addresses, vals=FdEntries
036:            final Hashtable invalid_pingers = new Hashtable(); // keys=Address, vals=Integer (number of pings from suspected mbrs)
037:            int max_tries = 2; // number of times to send a are-you-alive msg (tot time= max_tries*timeout)
038:
039:            public String getName() {
040:                return "FD_PROB";
041:            }
042:
043:            public boolean setProperties(Properties props) {
044:                String str;
045:
046:                super .setProperties(props);
047:                str = props.getProperty("timeout");
048:                if (str != null) {
049:                    timeout = Long.parseLong(str);
050:                    props.remove("timeout");
051:                }
052:
053:                str = props.getProperty("gossip_interval");
054:                if (str != null) {
055:                    gossip_interval = Long.parseLong(str);
056:                    props.remove("gossip_interval");
057:                }
058:
059:                str = props.getProperty("max_tries");
060:                if (str != null) {
061:                    max_tries = Integer.parseInt(str);
062:                    props.remove("max_tries");
063:                }
064:
065:                if (props.size() > 0) {
066:                    log
067:                            .error("FD_PROB.setProperties(): the following properties are not recognized: "
068:                                    + props);
069:
070:                    return false;
071:                }
072:                return true;
073:            }
074:
075:            public void start() throws Exception {
076:                if (hb == null) {
077:                    hb = new Thread(this , "FD_PROB.HeartbeatThread");
078:                    hb.setDaemon(true);
079:                    hb.start();
080:                }
081:            }
082:
083:            public void stop() {
084:                Thread tmp = null;
085:                if (hb != null && hb.isAlive()) {
086:                    tmp = hb;
087:                    hb = null;
088:                    tmp.interrupt();
089:                    try {
090:                        tmp.join(timeout);
091:                    } catch (Exception ex) {
092:                    }
093:                }
094:                hb = null;
095:            }
096:
097:            public void up(Event evt) {
098:                Message msg;
099:                FdHeader hdr = null;
100:                Object obj;
101:
102:                switch (evt.getType()) {
103:
104:                case Event.SET_LOCAL_ADDRESS:
105:                    local_addr = (Address) evt.getArg();
106:                    break;
107:
108:                case Event.MSG:
109:                    msg = (Message) evt.getArg();
110:                    obj = msg.getHeader(getName());
111:                    if (obj == null || !(obj instanceof  FdHeader)) {
112:                        updateCounter(msg.getSrc()); // got a msg from this guy, reset its time (we heard from it now)
113:                        break;
114:                    }
115:
116:                    hdr = (FdHeader) msg.removeHeader(getName());
117:                    switch (hdr.type) {
118:                    case FdHeader.HEARTBEAT: // heartbeat request; send heartbeat ack
119:                        if (checkPingerValidity(msg.getSrc()) == false) // false == sender of heartbeat is not a member
120:                            return;
121:
122:                        // 2. Update my own array of counters
123:
124:                        if (log.isInfoEnabled())
125:                            log.info("<-- HEARTBEAT from " + msg.getSrc());
126:                        updateCounters(hdr);
127:                        return; // don't pass up !
128:                    case FdHeader.NOT_MEMBER:
129:                        if (log.isWarnEnabled())
130:                            log.warn("NOT_MEMBER: I'm being shunned; exiting");
131:                        passUp(new Event(Event.EXIT));
132:                        return;
133:                    default:
134:                        if (log.isWarnEnabled())
135:                            log
136:                                    .warn("FdHeader type " + hdr.type
137:                                            + " not known");
138:                        return;
139:                    }
140:                }
141:                passUp(evt); // pass up to the layer above us
142:            }
143:
144:            public void down(Event evt) {
145:                int num_mbrs;
146:                Vector excluded_mbrs;
147:                FdEntry entry;
148:                Address mbr;
149:
150:                switch (evt.getType()) {
151:
152:                // Start heartbeat thread when we have more than 1 member; stop it when membership drops below 2
153:                case Event.VIEW_CHANGE:
154:                    passDown(evt);
155:                    synchronized (this ) {
156:                        View v = (View) evt.getArg();
157:
158:                        // mark excluded members
159:                        excluded_mbrs = computeExcludedMembers(members, v
160:                                .getMembers());
161:                        if (excluded_mbrs != null && excluded_mbrs.size() > 0) {
162:                            for (int i = 0; i < excluded_mbrs.size(); i++) {
163:                                mbr = (Address) excluded_mbrs.elementAt(i);
164:                                entry = (FdEntry) counters.get(mbr);
165:                                if (entry != null)
166:                                    entry.setExcluded(true);
167:                            }
168:                        }
169:
170:                        members = v != null ? v.getMembers() : null;
171:                        if (members != null) {
172:                            num_mbrs = members.size();
173:                            if (num_mbrs >= 2) {
174:                                if (hb == null) {
175:                                    try {
176:                                        start();
177:                                    } catch (Exception ex) {
178:                                        if (log.isWarnEnabled())
179:                                            log
180:                                                    .warn("exception when calling start(): "
181:                                                            + ex);
182:                                    }
183:                                }
184:                            } else
185:                                stop();
186:                        }
187:                    }
188:                    break;
189:
190:                default:
191:                    passDown(evt);
192:                    break;
193:                }
194:            }
195:
196:            /**
197:             Loop while more than 1 member available. Choose a member randomly (not myself !) and send a
198:             heartbeat. Wait for ack. If ack not received withing timeout, mcast SUSPECT message.
199:             */
200:            public void run() {
201:                Message hb_msg;
202:                FdHeader hdr;
203:                Address hb_dest, key;
204:                FdEntry entry;
205:                long curr_time, diff;
206:
207:                if (log.isInfoEnabled())
208:                    log.info("heartbeat thread was started");
209:
210:                while (hb != null && members.size() > 1) {
211:
212:                    // 1. Get a random member P (excluding ourself)
213:                    hb_dest = getHeartbeatDest();
214:                    if (hb_dest == null) {
215:                        if (log.isWarnEnabled())
216:                            log.warn("hb_dest is null");
217:                        Util.sleep(gossip_interval);
218:                        continue;
219:                    }
220:
221:                    // 2. Increment own counter
222:                    entry = (FdEntry) counters.get(local_addr);
223:                    if (entry == null) {
224:                        entry = new FdEntry();
225:                        counters.put(local_addr, entry);
226:                    }
227:                    entry.incrementCounter();
228:
229:                    // 3. Send heartbeat to P
230:                    hdr = createHeader();
231:                    if (hdr == null)
232:                        if (log.isWarnEnabled())
233:                            log
234:                                    .warn("header could not be created. Heartbeat will not be sent");
235:                        else {
236:                            hb_msg = new Message(hb_dest, null, null);
237:                            hb_msg.putHeader(getName(), hdr);
238:
239:                            if (log.isInfoEnabled())
240:                                log.info("--> HEARTBEAT to " + hb_dest);
241:                            passDown(new Event(Event.MSG, hb_msg));
242:                        }
243:
244:                    if (log.isInfoEnabled())
245:                        log.info("own counters are " + printCounters());
246:
247:                    // 4. Suspect members from which we haven't heard for timeout msecs
248:                    for (Enumeration e = counters.keys(); e.hasMoreElements();) {
249:                        curr_time = System.currentTimeMillis();
250:                        key = (Address) e.nextElement();
251:                        entry = (FdEntry) counters.get(key);
252:
253:                        if (entry.getTimestamp() > 0
254:                                && (diff = curr_time - entry.getTimestamp()) >= timeout) {
255:                            if (entry.excluded()) {
256:                                if (diff >= 2 * timeout) { // remove members marked as 'excluded' after 2*timeout msecs
257:                                    counters.remove(key);
258:                                    if (log.isInfoEnabled())
259:                                        log.info("removed " + key);
260:                                }
261:                            } else {
262:                                if (log.isInfoEnabled())
263:                                    log.info("suspecting " + key);
264:                                passUp(new Event(Event.SUSPECT, key));
265:                            }
266:                        }
267:                    }
268:                    Util.sleep(gossip_interval);
269:                } // end while
270:
271:                if (log.isInfoEnabled())
272:                    log.info("heartbeat thread was stopped");
273:            }
274:
275:            /* -------------------------------- Private Methods ------------------------------- */
276:
277:            Address getHeartbeatDest() {
278:                Address retval = null;
279:                int r, size;
280:                Vector members_copy;
281:
282:                if (members == null || members.size() < 2 || local_addr == null)
283:                    return null;
284:                members_copy = (Vector) members.clone();
285:                members_copy.removeElement(local_addr); // don't select myself as heartbeat destination
286:                size = members_copy.size();
287:                r = ((int) (Math.random() * (size + 1))) % size;
288:                retval = (Address) members_copy.elementAt(r);
289:                return retval;
290:            }
291:
292:            /** Create a header containing the counters for all members */
293:            FdHeader createHeader() {
294:                int num_mbrs = counters.size(), index = 0;
295:                FdHeader ret = null;
296:                Address key;
297:                FdEntry entry;
298:
299:                if (num_mbrs <= 0)
300:                    return null;
301:                ret = new FdHeader(FdHeader.HEARTBEAT, num_mbrs);
302:                for (Enumeration e = counters.keys(); e.hasMoreElements();) {
303:                    key = (Address) e.nextElement();
304:                    entry = (FdEntry) counters.get(key);
305:                    if (entry.excluded())
306:                        continue;
307:                    if (index >= ret.members.length) {
308:                        if (log.isWarnEnabled())
309:                            log.warn("index " + index + " is out of bounds ("
310:                                    + ret.members.length + ')');
311:                        break;
312:                    }
313:                    ret.members[index] = key;
314:                    ret.counters[index] = entry.getCounter();
315:                    index++;
316:                }
317:                return ret;
318:            }
319:
320:            /** Set my own counters values to max(own-counter, counter) */
321:            void updateCounters(FdHeader hdr) {
322:                Address key;
323:                FdEntry entry;
324:
325:                if (hdr == null || hdr.members == null || hdr.counters == null) {
326:                    if (log.isWarnEnabled())
327:                        log.warn("hdr is null or contains no counters");
328:                    return;
329:                }
330:
331:                for (int i = 0; i < hdr.members.length; i++) {
332:                    key = hdr.members[i];
333:                    if (key == null)
334:                        continue;
335:                    entry = (FdEntry) counters.get(key);
336:                    if (entry == null) {
337:                        entry = new FdEntry(hdr.counters[i]);
338:                        counters.put(key, entry);
339:                        continue;
340:                    }
341:
342:                    if (entry.excluded())
343:                        continue;
344:
345:                    // only update counter (and adjust timestamp) if new counter is greater then old one
346:                    entry.setCounter(Math.max(entry.getCounter(),
347:                            hdr.counters[i]));
348:                }
349:            }
350:
351:            /** Resets the counter for mbr */
352:            void updateCounter(Address mbr) {
353:                FdEntry entry;
354:
355:                if (mbr == null)
356:                    return;
357:                entry = (FdEntry) counters.get(mbr);
358:                if (entry != null)
359:                    entry.setTimestamp();
360:            }
361:
362:            String printCounters() {
363:                StringBuffer sb = new StringBuffer();
364:                Address mbr;
365:                FdEntry entry;
366:
367:                for (Enumeration e = counters.keys(); e.hasMoreElements();) {
368:                    mbr = (Address) e.nextElement();
369:                    entry = (FdEntry) counters.get(mbr);
370:                    sb.append("\n" + mbr + ": " + entry._toString());
371:                }
372:                return sb.toString();
373:            }
374:
375:            static Vector computeExcludedMembers(Vector old_mbrship,
376:                    Vector new_mbrship) {
377:                Vector ret = new Vector();
378:                if (old_mbrship == null || new_mbrship == null)
379:                    return ret;
380:                for (int i = 0; i < old_mbrship.size(); i++)
381:                    if (!new_mbrship.contains(old_mbrship.elementAt(i)))
382:                        ret.addElement(old_mbrship.elementAt(i));
383:                return ret;
384:            }
385:
386:            /** If hb_sender is not a member, send a SUSPECT to sender (after n pings received) */
387:            boolean checkPingerValidity(Object hb_sender) {
388:                int num_pings = 0;
389:                Message shun_msg;
390:                Header hdr;
391:
392:                if (hb_sender != null && members != null
393:                        && !members.contains(hb_sender)) {
394:                    if (invalid_pingers.containsKey(hb_sender)) {
395:                        num_pings = ((Integer) invalid_pingers.get(hb_sender))
396:                                .intValue();
397:                        if (num_pings >= max_tries) {
398:                            if (log.isErrorEnabled())
399:                                log.error("sender " + hb_sender
400:                                        + " is not member in " + members
401:                                        + " ! Telling it to leave group");
402:                            shun_msg = new Message((Address) hb_sender, null,
403:                                    null);
404:                            hdr = new FdHeader(FdHeader.NOT_MEMBER);
405:                            shun_msg.putHeader(getName(), hdr);
406:                            passDown(new Event(Event.MSG, shun_msg));
407:                            invalid_pingers.remove(hb_sender);
408:                        } else {
409:                            num_pings++;
410:                            invalid_pingers.put(hb_sender, new Integer(
411:                                    num_pings));
412:                        }
413:                    } else {
414:                        num_pings++;
415:                        invalid_pingers.put(hb_sender, new Integer(num_pings));
416:                    }
417:                    return false;
418:                } else
419:                    return true;
420:            }
421:
422:            /* ----------------------------- End of Private Methods --------------------------- */
423:
424:            public static class FdHeader extends Header implements  Streamable {
425:                static final byte HEARTBEAT = 1; // sent periodically to a random member
426:                static final byte NOT_MEMBER = 2; // sent to the sender, when it is not a member anymore (shunned)
427:
428:                byte type = HEARTBEAT;
429:                Address[] members = null;
430:                long[] counters = null; // correlates with 'members' (same indexes)
431:
432:                public FdHeader() {
433:                } // used for externalization
434:
435:                FdHeader(byte type) {
436:                    this .type = type;
437:                }
438:
439:                FdHeader(byte type, int num_elements) {
440:                    this (type);
441:                    members = new Address[num_elements];
442:                    counters = new long[num_elements];
443:                }
444:
445:                public String toString() {
446:                    switch (type) {
447:                    case HEARTBEAT:
448:                        return "[FD_PROB: HEARTBEAT]";
449:                    case NOT_MEMBER:
450:                        return "[FD_PROB: NOT_MEMBER]";
451:                    default:
452:                        return "[FD_PROB: unknown type (" + type + ")]";
453:                    }
454:                }
455:
456:                public String printDetails() {
457:                    StringBuffer sb = new StringBuffer();
458:                    Address mbr;
459:
460:                    if (members != null && counters != null)
461:                        for (int i = 0; i < members.length; i++) {
462:                            mbr = members[i];
463:                            if (mbr == null)
464:                                sb.append("\n<null>");
465:                            else
466:                                sb.append("\n" + mbr);
467:                            sb.append(": " + counters[i]);
468:                        }
469:                    return sb.toString();
470:                }
471:
472:                public void writeExternal(ObjectOutput out) throws IOException {
473:                    out.writeByte(type);
474:
475:                    if (members != null) {
476:                        out.writeInt(members.length);
477:                        out.writeObject(members);
478:                    } else
479:                        out.writeInt(0);
480:
481:                    if (counters != null) {
482:                        out.writeInt(counters.length);
483:                        for (int i = 0; i < counters.length; i++)
484:                            out.writeLong(counters[i]);
485:                    } else
486:                        out.writeInt(0);
487:                }
488:
489:                public void readExternal(ObjectInput in) throws IOException,
490:                        ClassNotFoundException {
491:                    int num;
492:                    type = in.readByte();
493:
494:                    num = in.readInt();
495:                    if (num == 0)
496:                        members = null;
497:                    else {
498:                        members = (Address[]) in.readObject();
499:                    }
500:
501:                    num = in.readInt();
502:                    if (num == 0)
503:                        counters = null;
504:                    else {
505:                        counters = new long[num];
506:                        for (int i = 0; i < counters.length; i++)
507:                            counters[i] = in.readLong();
508:                    }
509:                }
510:
511:                public long size() {
512:                    long retval = Global.BYTE_SIZE;
513:                    retval += Global.SHORT_SIZE; // number of members
514:                    if (members != null && members.length > 0) {
515:                        for (int i = 0; i < members.length; i++) {
516:                            Address member = members[i];
517:                            retval += Util.size(member);
518:                        }
519:                    }
520:
521:                    retval += Global.SHORT_SIZE; // counters
522:                    if (counters != null && counters.length > 0) {
523:                        retval += counters.length * Global.LONG_SIZE;
524:                    }
525:
526:                    return retval;
527:                }
528:
529:                public void writeTo(DataOutputStream out) throws IOException {
530:                    out.writeByte(type);
531:                    if (members == null || members.length == 0)
532:                        out.writeShort(0);
533:                    else {
534:                        out.writeShort(members.length);
535:                        for (int i = 0; i < members.length; i++) {
536:                            Address member = members[i];
537:                            Util.writeAddress(member, out);
538:                        }
539:                    }
540:
541:                    if (counters == null || counters.length == 0) {
542:                        out.writeShort(0);
543:                    } else {
544:                        out.writeShort(counters.length);
545:                        for (int i = 0; i < counters.length; i++) {
546:                            long counter = counters[i];
547:                            out.writeLong(counter);
548:                        }
549:                    }
550:                }
551:
552:                public void readFrom(DataInputStream in) throws IOException,
553:                        IllegalAccessException, InstantiationException {
554:                    type = in.readByte();
555:                    short len = in.readShort();
556:                    if (len > 0) {
557:                        members = new Address[len];
558:                        for (int i = 0; i < len; i++) {
559:                            members[i] = Util.readAddress(in);
560:                        }
561:                    }
562:
563:                    len = in.readShort();
564:                    if (len > 0) {
565:                        counters = new long[len];
566:                        for (int i = 0; i < counters.length; i++) {
567:                            counters[i] = in.readLong();
568:                        }
569:                    }
570:                }
571:
572:            }
573:
574:            private static class FdEntry {
575:                private long counter = 0; // heartbeat counter
576:                private long timestamp = 0; // last time the counter was incremented
577:                private boolean excluded = false; // set to true if member was excluded from group
578:
579:                FdEntry() {
580:
581:                }
582:
583:                FdEntry(long counter) {
584:                    this .counter = counter;
585:                    timestamp = System.currentTimeMillis();
586:                }
587:
588:                long getCounter() {
589:                    return counter;
590:                }
591:
592:                long getTimestamp() {
593:                    return timestamp;
594:                }
595:
596:                boolean excluded() {
597:                    return excluded;
598:                }
599:
600:                synchronized void setCounter(long new_counter) {
601:                    if (new_counter > counter) { // only set time if counter was incremented
602:                        timestamp = System.currentTimeMillis();
603:                        counter = new_counter;
604:                    }
605:                }
606:
607:                synchronized void incrementCounter() {
608:                    counter++;
609:                    timestamp = System.currentTimeMillis();
610:                }
611:
612:                synchronized void setTimestamp() {
613:                    timestamp = System.currentTimeMillis();
614:                }
615:
616:                synchronized void setExcluded(boolean flag) {
617:                    excluded = flag;
618:                }
619:
620:                public String toString() {
621:                    return "counter=" + counter + ", timestamp=" + timestamp
622:                            + ", excluded=" + excluded;
623:                }
624:
625:                public String _toString() {
626:                    return "counter=" + counter + ", age="
627:                            + (System.currentTimeMillis() - timestamp)
628:                            + ", excluded=" + excluded;
629:                }
630:            }
631:
632:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.