Source Code Cross Referenced for PBCAST.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » protocols » pbcast » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.protocols.pbcast 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        // $Id: PBCAST.java,v 1.16.6.1 2007/04/27 08:03:55 belaban Exp $
0002:
0003:        package org.jgroups.protocols.pbcast;
0004:
0005:        import org.jgroups.Address;
0006:        import org.jgroups.Event;
0007:        import org.jgroups.Message;
0008:        import org.jgroups.View;
0009:        import org.jgroups.stack.NakReceiverWindow;
0010:        import org.jgroups.stack.Protocol;
0011:        import org.jgroups.util.List;
0012:        import org.jgroups.util.Queue;
0013:        import org.jgroups.util.QueueClosedException;
0014:        import org.jgroups.util.Util;
0015:
0016:        import java.util.*;
0017:
0018:        /**
0019:         * Implementation of probabilistic broadcast. Sends group messages via unreliable multicast. Gossips regularly to
0020:         * a random subset of group members to retransmit missing messages. Gossiping is used both for bringing all
0021:         * members to the same state (having received the same messages) and to garbage-collect messages seen by all members
0022:         * (gc is piggybacked in gossip messages). See DESIGN for more details.
0023:         * @author Bela Ban
0024:         */
0025:        public class PBCAST extends Protocol implements  Runnable {
0026:            boolean operational = false;
0027:            long seqno = 1; // seqno for messages. 1 for the first message
0028:            long gossip_round = 1; // identifies the gossip (together with sender)
0029:            Address local_addr = null;
0030:            final Hashtable digest = new Hashtable(); // stores all messages from members (key: member, val: NakReceiverWindow)
0031:            Thread gossip_thread = null;
0032:            GossipHandler gossip_handler = null; // removes gossips and other requests from queue and handles them
0033:            final Queue gossip_queue = new Queue(); // (bounded) queue for incoming gossip requests
0034:            int max_queue = 100; // max elements in gossip_queue (bounded buffer)
0035:            long gossip_interval = 5000; // gossip every 5 seconds
0036:            double subset = 0.1; // send gossip messages to a subset consisting of 10% of the mbrship
0037:            long desired_avg_gossip = 30000; // receive a gossip every 30 secs on average
0038:            final Vector members = new Vector();
0039:            final List gossip_list = new List(); // list of gossips received, we periodically purge it (FIFO)
0040:            int max_gossip_cache = 100; // number of gossips to keep until gossip list is purged
0041:            int gc_lag = 30; // how many seqnos should we lag behind (see DESIGN)
0042:            final Hashtable invalid_gossipers = new Hashtable(); // keys=Address, val=Integer (number of gossips from suspected mbrs)
0043:            final int max_invalid_gossips = 2; // max number of gossip from non-member before that member is shunned
0044:            Vector seen_list = null;
0045:            boolean shun = false; // whether invalid gossipers will be shunned or not
0046:            boolean dynamic = true; // whether to use dynamic or static gosssip_interval (overrides gossip_interval)
0047:            boolean skip_sleep = true;
0048:            boolean mcast_gossip = true; // use multicast for gossips (subset will be ignored, send to all members)
0049:
0050:            public String getName() {
0051:                return "PBCAST";
0052:            }
0053:
0054:            public Vector providedUpServices() {
0055:                Vector retval = new Vector();
0056:                retval.addElement(new Integer(Event.GET_DIGEST));
0057:                retval.addElement(new Integer(Event.SET_DIGEST));
0058:                retval.addElement(new Integer(Event.GET_DIGEST_STATE));
0059:                return retval;
0060:            }
0061:
0062:            public void stop() {
0063:                stopGossipThread();
0064:                stopGossipHandler();
0065:                operational = false;
0066:            }
0067:
0068:            public void up(Event evt) {
0069:                Message m;
0070:                PbcastHeader hdr;
0071:                Address sender = null;
0072:
0073:                switch (evt.getType()) {
0074:                case Event.MSG:
0075:                    m = (Message) evt.getArg();
0076:                    if (m.getDest() != null
0077:                            && !m.getDest().isMulticastAddress()) {
0078:                        if (!(m.getHeader(getName()) instanceof  PbcastHeader))
0079:                            break; // unicast address: not null and not mcast, pass up unchanged
0080:                    }
0081:
0082:                    // discard all multicast messages until we become operational (transition from joiner to member)
0083:                    if (!operational) {
0084:
0085:                        if (log.isInfoEnabled())
0086:                            log
0087:                                    .info("event was discarded as I'm not yet operational. Event: "
0088:                                            + Util.printEvent(evt));
0089:                        return; // don't pass up
0090:                    }
0091:
0092:                    if (m.getHeader(getName()) instanceof  PbcastHeader)
0093:                        hdr = (PbcastHeader) m.removeHeader(getName());
0094:                    else {
0095:                        sender = m.getSrc();
0096:
0097:                        if (log.isErrorEnabled())
0098:                            log
0099:                                    .error("PbcastHeader expected, but received header of type "
0100:                                            + m.getHeader(getName()).getClass()
0101:                                                    .getName()
0102:                                            + " from "
0103:                                            + sender
0104:                                            + ". Passing event up unchanged");
0105:                        break;
0106:                    }
0107:
0108:                    switch (hdr.type) {
0109:                    case PbcastHeader.MCAST_MSG: // messages are handled directly (high priority)
0110:                        handleUpMessage(m, hdr);
0111:                        return;
0112:
0113:                        // all other requests are put in the bounded gossip queue (discarded if full). this helps to ensure
0114:                        // that no 'gossip storms' will occur (overflowing the buffers and the network)
0115:                    case PbcastHeader.GOSSIP:
0116:                    case PbcastHeader.XMIT_REQ:
0117:                    case PbcastHeader.XMIT_RSP:
0118:                    case PbcastHeader.NOT_MEMBER:
0119:                        try {
0120:                            if (gossip_queue.size() >= max_queue) {
0121:
0122:                                if (log.isWarnEnabled())
0123:                                    log
0124:                                            .warn("gossip request "
0125:                                                    + PbcastHeader
0126:                                                            .type2String(hdr.type)
0127:                                                    + " discarded because "
0128:                                                    + "gossip_queue is full (number of elements="
0129:                                                    + gossip_queue.size() + ')');
0130:                                return;
0131:                            }
0132:                            gossip_queue.add(new GossipEntry(hdr, m.getSrc(), m
0133:                                    .getBuffer()));
0134:                        } catch (Exception ex) {
0135:                            if (log.isWarnEnabled())
0136:                                log
0137:                                        .warn("exception adding request to gossip_queue, details="
0138:                                                + ex);
0139:                        }
0140:                        return;
0141:
0142:                    default:
0143:                        if (log.isErrorEnabled())
0144:                            log.error("type (" + hdr.type
0145:                                    + ") of PbcastHeader not known !");
0146:                        return;
0147:                    }
0148:
0149:                case Event.SET_LOCAL_ADDRESS:
0150:                    local_addr = (Address) evt.getArg();
0151:                    break; // pass up
0152:                }
0153:
0154:                passUp(evt); // pass up by default
0155:            }
0156:
0157:            public void down(Event evt) {
0158:                PbcastHeader hdr;
0159:                Message m, copy;
0160:                View v;
0161:                Vector mbrs;
0162:                Address key;
0163:                NakReceiverWindow win;
0164:
0165:                switch (evt.getType()) {
0166:
0167:                case Event.MSG:
0168:                    m = (Message) evt.getArg();
0169:                    if (m.getDest() != null
0170:                            && !m.getDest().isMulticastAddress()) {
0171:                        break; // unicast address: not null and not mcast, pass down unchanged
0172:                    } else { // multicast address
0173:                        hdr = new PbcastHeader(PbcastHeader.MCAST_MSG, seqno);
0174:                        m.putHeader(getName(), hdr);
0175:
0176:                        // put message in NakReceiverWindow (to be on the safe side if we don't receive it ...)
0177:                        synchronized (digest) {
0178:                            win = (NakReceiverWindow) digest.get(local_addr);
0179:                            if (win == null) {
0180:                                if (log.isInfoEnabled())
0181:                                    log
0182:                                            .info("NakReceiverWindow for sender "
0183:                                                    + local_addr
0184:                                                    + " not found. Creating new NakReceiverWindow starting at seqno="
0185:                                                    + seqno);
0186:                                win = new NakReceiverWindow(local_addr, seqno);
0187:                                digest.put(local_addr, win);
0188:                            }
0189:                            copy = m.copy();
0190:                            copy.setSrc(local_addr);
0191:                            win.add(seqno, copy);
0192:                        }
0193:                        seqno++;
0194:                        break;
0195:                    }
0196:
0197:                case Event.SET_DIGEST:
0198:                    setDigest((Digest) evt.getArg());
0199:                    return; // don't pass down
0200:
0201:                case Event.GET_DIGEST: // don't pass down
0202:                    passUp(new Event(Event.GET_DIGEST_OK, getDigest()));
0203:                    return;
0204:
0205:                case Event.GET_DIGEST_STATE: // don't pass down
0206:                    passUp(new Event(Event.GET_DIGEST_STATE_OK, getDigest()));
0207:                    return;
0208:
0209:                case Event.VIEW_CHANGE:
0210:                    v = (View) evt.getArg();
0211:                    if (v == null) {
0212:                        if (log.isErrorEnabled())
0213:                            log.error("view is null !");
0214:                        break;
0215:                    }
0216:                    mbrs = v.getMembers();
0217:
0218:                    // update internal membership list
0219:                    synchronized (members) {
0220:                        members.removeAllElements();
0221:                        for (int i = 0; i < mbrs.size(); i++)
0222:                            members.addElement(mbrs.elementAt(i));
0223:                    }
0224:
0225:                    // delete all members in digest that are not in new membership list
0226:                    if (mbrs.size() > 0) {
0227:                        synchronized (digest) {
0228:                            for (Enumeration e = digest.keys(); e
0229:                                    .hasMoreElements();) {
0230:                                key = (Address) e.nextElement();
0231:                                if (!mbrs.contains(key)) {
0232:                                    win = (NakReceiverWindow) digest.get(key);
0233:                                    win.reset();
0234:                                    digest.remove(key);
0235:                                }
0236:                            }
0237:                        }
0238:                    }
0239:
0240:                    // add all members from new membership list that are not yet in digest
0241:                    for (int i = 0; i < mbrs.size(); i++) {
0242:                        key = (Address) mbrs.elementAt(i);
0243:                        if (!digest.containsKey(key)) {
0244:                            digest.put(key, new NakReceiverWindow(key, 1));
0245:                        }
0246:                    }
0247:
0248:                    if (dynamic) {
0249:                        gossip_interval = computeGossipInterval(members.size(),
0250:                                desired_avg_gossip);
0251:
0252:                        if (log.isInfoEnabled())
0253:                            log.info("VIEW_CHANGE: gossip_interval="
0254:                                    + gossip_interval);
0255:                        if (gossip_thread != null) {
0256:                            skip_sleep = true;
0257:                            gossip_thread.interrupt(); // wake up and sleep according to the new gossip_interval
0258:                        }
0259:                    }
0260:
0261:                    startGossipThread(); // will only be started if not yet running
0262:                    startGossipHandler();
0263:                    break;
0264:
0265:                case Event.BECOME_SERVER:
0266:                    operational = true;
0267:                    break;
0268:                }
0269:
0270:                passDown(evt);
0271:            }
0272:
0273:            /** Gossip thread. Sends gossips containing a message digest every <code>gossip_interval</code> msecs */
0274:            public void run() {
0275:                while (gossip_thread != null) { // stopGossipThread() sets gossip_thread to null
0276:                    if (dynamic) {
0277:                        gossip_interval = computeGossipInterval(members.size(),
0278:                                desired_avg_gossip);
0279:
0280:                        if (log.isInfoEnabled())
0281:                            log.info("gossip_interval=" + gossip_interval);
0282:                    }
0283:
0284:                    Util.sleep(gossip_interval);
0285:                    if (skip_sleep)
0286:                        skip_sleep = false;
0287:                    else
0288:                        sendGossip();
0289:                }
0290:            }
0291:
0292:            /** Setup the Protocol instance acording to the configuration string */
0293:            public boolean setProperties(Properties props) {
0294:                super .setProperties(props);
0295:                String str;
0296:
0297:                str = props.getProperty("dynamic");
0298:                if (str != null) {
0299:                    dynamic = Boolean.valueOf(str).booleanValue();
0300:                    props.remove("dynamic");
0301:                }
0302:
0303:                str = props.getProperty("shun");
0304:                if (str != null) {
0305:                    shun = Boolean.valueOf(str).booleanValue();
0306:                    props.remove("shun");
0307:                }
0308:
0309:                str = props.getProperty("gossip_interval");
0310:                if (str != null) {
0311:                    gossip_interval = Long.parseLong(str);
0312:                    props.remove("gossip_interval");
0313:                }
0314:
0315:                str = props.getProperty("mcast_gossip");
0316:                if (str != null) {
0317:                    mcast_gossip = Boolean.valueOf(str).booleanValue();
0318:                    props.remove("mcast_gossip");
0319:                }
0320:
0321:                str = props.getProperty("subset");
0322:                if (str != null) {
0323:                    subset = Double.parseDouble(str);
0324:                    props.remove("subset");
0325:                }
0326:
0327:                str = props.getProperty("desired_avg_gossip");
0328:                if (str != null) {
0329:                    desired_avg_gossip = Long.parseLong(str);
0330:                    props.remove("desired_avg_gossip");
0331:                }
0332:
0333:                str = props.getProperty("max_queue");
0334:                if (str != null) {
0335:                    max_queue = Integer.parseInt(str);
0336:                    props.remove("max_queue");
0337:                }
0338:
0339:                str = props.getProperty("max_gossip_cache");
0340:                if (str != null) {
0341:                    max_gossip_cache = Integer.parseInt(str);
0342:                    props.remove("max_gossip_cache");
0343:                }
0344:
0345:                str = props.getProperty("gc_lag");
0346:                if (str != null) {
0347:                    gc_lag = Integer.parseInt(str);
0348:                    props.remove("gc_lag");
0349:                }
0350:
0351:                if (props.size() > 0) {
0352:                    log
0353:                            .error("PBCAST.setProperties(): the following properties are not recognized: "
0354:                                    + props);
0355:
0356:                    return false;
0357:                }
0358:                return true;
0359:            }
0360:
0361:            /* --------------------------------- Private Methods --------------------------------------------- */
0362:
0363:            /**
0364:             Ensures that FIFO is observed for all messages for a certain member. The NakReceiverWindow corresponding
0365:             to a certain sender is looked up in a hashtable. Then, the message is added to the NakReceiverWindow.
0366:             As many messages as possible are then removed from the table and passed up.
0367:             */
0368:            void handleUpMessage(Message m, PbcastHeader hdr) {
0369:                Address sender = m.getSrc();
0370:                NakReceiverWindow win = null;
0371:                Message tmpmsg;
0372:                long tmp_seqno = hdr.seqno;
0373:
0374:                if (sender == null) {
0375:                    if (log.isErrorEnabled())
0376:                        log.error("sender is null");
0377:                    return;
0378:                }
0379:
0380:                synchronized (digest) {
0381:                    win = (NakReceiverWindow) digest.get(sender);
0382:                    if (win == null) {
0383:                        if (log.isWarnEnabled())
0384:                            log
0385:                                    .warn("NakReceiverWindow for sender "
0386:                                            + sender
0387:                                            + " not found. Creating new NakReceiverWindow starting at seqno="
0388:                                            + tmp_seqno);
0389:                        win = new NakReceiverWindow(sender, tmp_seqno);
0390:                        digest.put(sender, win);
0391:                    }
0392:
0393:                    // *************************************
0394:                    // The header was removed before, so we add it again for the NakReceiverWindow. When there is a
0395:                    // retransmission request, the header will already be attached to the message (both message and
0396:                    // header are *copied* into delivered_msgs when a message is removed from NakReceiverWindow).
0397:                    // *************************************
0398:                    m.putHeader(getName(), hdr);
0399:                    win.add(tmp_seqno, m);
0400:
0401:                    if (log.isInfoEnabled())
0402:                        log
0403:                                .info("receiver window for " + sender + " is "
0404:                                        + win);
0405:
0406:                    // Try to remove as many message as possible and send them up the stack
0407:                    while ((tmpmsg = win.remove()) != null) {
0408:                        tmpmsg.removeHeader(getName()); // need to remove header again, so upper protocols don't get confused
0409:                        passUp(new Event(Event.MSG, tmpmsg));
0410:                    }
0411:
0412:                    // Garbage collect messages if singleton member (because then we won't receive any gossips, triggering
0413:                    // garbage collection)
0414:                    if (members.size() == 1) {
0415:                        tmp_seqno = Math.max(tmp_seqno - gc_lag, 0);
0416:                        if (tmp_seqno <= 0) {
0417:                        } else {
0418:                            if (log.isTraceEnabled())
0419:                                log.trace("deleting messages < " + tmp_seqno
0420:                                        + " from " + sender);
0421:                            win.stable(tmp_seqno);
0422:                        }
0423:                    }
0424:                }
0425:            }
0426:
0427:            /**
0428:             * Returns for each sender the 'highest seen' seqno from the digest. Highest seen means the
0429:             * highest seqno without any gaps, e.g. if for a sender P the messages 2 3 4 6 7 were received,
0430:             * then only 2, 3 and 4 can be delivered, so 4 is the highest seen. 6 and 7 cannot because there
0431:             * 5 is missing. If there are no message, the highest seen seqno is -1.
0432:             */
0433:            Digest getDigest() {
0434:                Digest ret = new Digest(digest.size());
0435:                long highest_seqno, lowest_seqno;
0436:                Address key;
0437:                NakReceiverWindow win;
0438:
0439:                for (Enumeration e = digest.keys(); e.hasMoreElements();) {
0440:                    key = (Address) e.nextElement();
0441:                    win = (NakReceiverWindow) digest.get(key);
0442:                    lowest_seqno = win.getLowestSeen();
0443:                    highest_seqno = win.getHighestSeen();
0444:                    ret.add(key, lowest_seqno, highest_seqno);
0445:                }
0446:
0447:                if (log.isInfoEnabled())
0448:                    log.info("digest is " + ret);
0449:
0450:                return ret;
0451:            }
0452:
0453:            /**
0454:             * Sets (or resets) the contents of the 'digest' table. Its current messages will be deleted and the
0455:             * NakReceiverTables reset.
0456:             */
0457:            void setDigest(Digest d) {
0458:                NakReceiverWindow win;
0459:
0460:                long tmp_seqno = 1;
0461:
0462:                synchronized (digest) {
0463:                    for (Enumeration e = digest.elements(); e.hasMoreElements();) {
0464:                        win = (NakReceiverWindow) e.nextElement();
0465:                        win.reset();
0466:                    }
0467:                    digest.clear();
0468:
0469:                    Map.Entry entry;
0470:                    Address sender;
0471:                    org.jgroups.protocols.pbcast.Digest.Entry val;
0472:                    for (Iterator it = d.senders.entrySet().iterator(); it
0473:                            .hasNext();) {
0474:                        entry = (Map.Entry) it.next();
0475:                        sender = (Address) entry.getKey();
0476:                        if (sender == null) {
0477:                            if (log.isErrorEnabled())
0478:                                log
0479:                                        .error("cannot set item because sender is null");
0480:                            continue;
0481:                        }
0482:                        val = (org.jgroups.protocols.pbcast.Digest.Entry) entry
0483:                                .getValue();
0484:                        tmp_seqno = val.high_seqno;
0485:                        digest.put(sender, new NakReceiverWindow(sender,
0486:                                tmp_seqno + 1)); // next to expect, digest had *last* seen !
0487:                    }
0488:                }
0489:            }
0490:
0491:            String printDigest() {
0492:                long highest_seqno;
0493:                Address key;
0494:                NakReceiverWindow win;
0495:                StringBuffer sb = new StringBuffer();
0496:
0497:                for (Enumeration e = digest.keys(); e.hasMoreElements();) {
0498:                    key = (Address) e.nextElement();
0499:                    win = (NakReceiverWindow) digest.get(key);
0500:                    highest_seqno = win.getHighestSeen();
0501:                    sb.append(key + ": " + highest_seqno + '\n');
0502:                }
0503:                return sb.toString();
0504:            }
0505:
0506:            String printIncomingMessageQueue() {
0507:                StringBuffer sb = new StringBuffer();
0508:                NakReceiverWindow win;
0509:
0510:                win = (NakReceiverWindow) digest.get(local_addr);
0511:                sb.append(win);
0512:                return sb.toString();
0513:            }
0514:
0515:            void startGossipThread() {
0516:                if (gossip_thread == null) {
0517:                    gossip_thread = new Thread(this );
0518:                    gossip_thread.setDaemon(true);
0519:                    gossip_thread.start();
0520:                }
0521:            }
0522:
0523:            void stopGossipThread() {
0524:                Thread tmp;
0525:
0526:                if (gossip_thread != null) {
0527:                    if (gossip_thread.isAlive()) {
0528:                        tmp = gossip_thread;
0529:                        gossip_thread = null;
0530:                        tmp.interrupt();
0531:                        tmp = null;
0532:                    }
0533:                }
0534:                gossip_thread = null;
0535:            }
0536:
0537:            void startGossipHandler() {
0538:                if (gossip_handler == null) {
0539:                    gossip_handler = new GossipHandler(gossip_queue);
0540:                    gossip_handler.start();
0541:                }
0542:            }
0543:
0544:            void stopGossipHandler() {
0545:                if (gossip_handler != null) {
0546:                    gossip_handler.stop();
0547:                    gossip_handler = null;
0548:                }
0549:            }
0550:
0551:            /**
0552:             * Send a gossip message with a message digest of the highest seqnos seen per sender to a subset
0553:             * of the current membership. Exclude self (I receive all mcasts sent by myself).
0554:             */
0555:            void sendGossip() {
0556:                Vector current_mbrs = (Vector) members.clone();
0557:                Vector subset_mbrs = null;
0558:                Gossip gossip = null;
0559:                Message msg;
0560:                Address dest;
0561:                PbcastHeader hdr;
0562:
0563:                if (local_addr != null)
0564:                    current_mbrs.remove(local_addr); // don't pick myself
0565:
0566:                if (mcast_gossip) { // send gossip to all members using a multicast
0567:                    gossip = new Gossip(local_addr, gossip_round, getDigest()
0568:                            .copy(), null); // not_seen list is null, prevents forwarding
0569:                    for (int i = 0; i < current_mbrs.size(); i++)
0570:                        // all members have seen this gossip. Used for garbage collection
0571:                        gossip.addToSeenList((Address) current_mbrs
0572:                                .elementAt(i));
0573:                    hdr = new PbcastHeader(gossip, PbcastHeader.GOSSIP);
0574:                    msg = new Message(); // null dest == multicast to all members
0575:                    msg.putHeader(getName(), hdr);
0576:
0577:                    if (log.isInfoEnabled())
0578:                        log.info("(from " + local_addr
0579:                                + ") multicasting gossip " + gossip.shortForm()
0580:                                + " to all members");
0581:
0582:                    passDown(new Event(Event.MSG, msg));
0583:                } else {
0584:                    subset_mbrs = Util.pickSubset(current_mbrs, subset);
0585:
0586:                    for (int i = 0; i < subset_mbrs.size(); i++) {
0587:                        gossip = new Gossip(local_addr, gossip_round,
0588:                                getDigest().copy(), (Vector) current_mbrs
0589:                                        .clone());
0590:                        gossip.addToSeenList(local_addr);
0591:                        hdr = new PbcastHeader(gossip, PbcastHeader.GOSSIP);
0592:                        dest = (Address) subset_mbrs.elementAt(i);
0593:                        msg = new Message(dest);
0594:                        msg.putHeader(getName(), hdr);
0595:
0596:                        if (log.isInfoEnabled())
0597:                            log.info("(from " + local_addr
0598:                                    + ") sending gossip " + gossip.shortForm()
0599:                                    + " to " + subset_mbrs);
0600:
0601:                        passDown(new Event(Event.MSG, msg));
0602:                    }
0603:                }
0604:
0605:                gossip_round++;
0606:            }
0607:
0608:            /**
0609:             * MOST IMPORTANT METHOD IN THIS CLASS !! This guy really decides how a gossip reaches all members,
0610:             * or whether it will flood the network !<p>
0611:             * Scrutinize the gossip received and request retransmission of messages that we haven't received yet.
0612:             * A gossip has a digest which carries for each sender the lowest and highest seqno seen. We check
0613:             * this range against our own digest and request retransmission of missing messages if needed.<br>
0614:             * <em>See DESIGN for a description of this method</em>
0615:             */
0616:            void handleGossip(Gossip gossip) {
0617:                long my_low = 0, my_high = 0, their_low, their_high;
0618:                Hashtable ht = null;
0619:                Digest their_digest;
0620:                NakReceiverWindow win;
0621:                Message msg;
0622:                Address dest;
0623:                Vector new_dests;
0624:                PbcastHeader hdr;
0625:                List missing_msgs; // list of missing messages (for retransmission) (List of Longs)
0626:
0627:                if (log.isTraceEnabled())
0628:                    log.trace("(from " + local_addr + ") received gossip "
0629:                            + gossip.shortForm() + " from " + gossip.sender);
0630:
0631:                if (gossip == null || gossip.digest == null) {
0632:                    if (log.isWarnEnabled())
0633:                        log.warn("gossip is null or digest is null");
0634:                    return;
0635:                }
0636:
0637:                /* 1. If gossip sender is null, we cannot ask it for missing messages anyway, so discard gossip ! */
0638:                if (gossip.sender == null) {
0639:                    if (log.isErrorEnabled())
0640:                        log
0641:                                .error("sender of gossip is null; "
0642:                                        + "don't know where to send XMIT_REQ to. Discarding gossip");
0643:                    return;
0644:                }
0645:
0646:                /* 2. Don't process the gossip if the sender of the gossip is not a member anymore. If it is a newly
0647:                   joined member, discard it as well (we can't tell the difference). When the new member will be
0648:                   added to the membership, then its gossips will be processed */
0649:                if (!members.contains(gossip.sender)) {
0650:                    if (log.isWarnEnabled())
0651:                        log
0652:                                .warn("sender "
0653:                                        + gossip.sender
0654:                                        + " is not a member. Gossip will not be processed");
0655:                    if (shun)
0656:                        shunInvalidGossiper(gossip.sender);
0657:                    return;
0658:                }
0659:
0660:                /* 3. If this gossip was received before, just discard it and return (don't process the
0661:                   same gossip twice). This prevents flooding of the gossip sender with retransmission reqs */
0662:                while (gossip_list.size() >= max_gossip_cache)
0663:                    // first delete oldest gossips
0664:                    gossip_list.removeFromHead();
0665:
0666:                if (gossip_list.contains(gossip)) // already received, don't re-broadcast
0667:                    return;
0668:                else
0669:                    gossip_list.add(gossip.copy()); // add to list of received gossips
0670:
0671:                /* 4. Send a HEARD_FROM event containing all members in the gossip-chain down to the FD layer.
0672:                   This ensures that we don't suspect them */
0673:                seen_list = gossip.getSeenList();
0674:                if (seen_list.size() > 0)
0675:                    passDown(new Event(Event.HEARD_FROM, seen_list.clone()));
0676:
0677:                /* 5. Compare their digest against ours. Find out if some messages in the their digest are
0678:                   not in our digest. If yes, put them in the 'ht' hashtable for retransmission */
0679:                their_digest = gossip.digest;
0680:
0681:                Map.Entry entry;
0682:                Address sender;
0683:                org.jgroups.protocols.pbcast.Digest.Entry val;
0684:                for (Iterator it = their_digest.senders.entrySet().iterator(); it
0685:                        .hasNext();) {
0686:                    entry = (Map.Entry) it.next();
0687:                    sender = (Address) entry.getKey();
0688:                    val = (org.jgroups.protocols.pbcast.Digest.Entry) entry
0689:                            .getValue();
0690:                    their_low = val.low_seqno;
0691:                    their_high = val.high_seqno;
0692:                    if (their_low == 0 && their_high == 0)
0693:                        continue; // won't have any messages for this sender, don't even re-send
0694:
0695:                    win = (NakReceiverWindow) digest.get(sender);
0696:                    if (win == null) {
0697:                        // this specific sender in this digest is probably not a member anymore, new digests
0698:                        // won't contain it. for now, just ignore it. if it is a new member, it will be in the next
0699:                        // gossips
0700:
0701:                        if (log.isWarnEnabled())
0702:                            log.warn("sender " + sender
0703:                                    + " not found, skipping...");
0704:                        continue;
0705:                    }
0706:
0707:                    my_low = win.getLowestSeen();
0708:                    my_high = win.getHighestSeen();
0709:                    if (my_high < their_high) {
0710:                        // changed by Bela (June 26 2003) - replaced my_high with my_low (not tested though !)
0711:                        if (my_low + 1 < their_low) {
0712:                        } else {
0713:                            missing_msgs = win.getMissingMessages(my_high,
0714:                                    their_high);
0715:                            if (missing_msgs != null) {
0716:                                if (log.isInfoEnabled())
0717:                                    log.info("asking " + gossip.sender
0718:                                            + " for retransmission of "
0719:                                            + sender + ", missing messages: "
0720:                                            + missing_msgs + "\nwin for "
0721:                                            + sender + ":\n" + win + '\n');
0722:                                if (ht == null)
0723:                                    ht = new Hashtable();
0724:                                ht.put(sender, missing_msgs);
0725:                            }
0726:                        }
0727:                    }
0728:                }
0729:
0730:                /* 6. Send a XMIT_REQ to the sender of the gossip. The sender will then resend those messages as
0731:                   an XMIT_RSP unicast message (the messages are in its buffer, as a List) */
0732:                if (ht == null || ht.size() == 0) {
0733:                } else {
0734:                    hdr = new PbcastHeader(PbcastHeader.XMIT_REQ);
0735:                    hdr.xmit_reqs = ht;
0736:
0737:                    if (log.isInfoEnabled())
0738:                        log.info("sending XMIT_REQ to " + gossip.sender);
0739:                    msg = new Message(gossip.sender, null, null);
0740:                    msg.putHeader(getName(), hdr);
0741:                    passDown(new Event(Event.MSG, msg));
0742:                }
0743:
0744:                /* 7. Remove myself from 'not_seen' list. If not_seen list is empty, we can garbage-collect messages
0745:                   smaller than the digest. Since all the members have seen the gossip, it will not be re-sent */
0746:                gossip.removeFromNotSeenList(local_addr);
0747:                if (gossip.sizeOfNotSeenList() == 0) {
0748:                    garbageCollect(gossip.digest);
0749:                    return;
0750:                }
0751:
0752:                /* 8. If we make it to this point, re-send to subset of remaining members in 'not_seen' list */
0753:                new_dests = Util.pickSubset(gossip.getNotSeenList(), subset);
0754:
0755:                if (log.isInfoEnabled())
0756:                    log.info("(from " + local_addr + ") forwarding gossip "
0757:                            + gossip.shortForm() + " to " + new_dests);
0758:                gossip.addToSeenList(local_addr);
0759:                for (int i = 0; i < new_dests.size(); i++) {
0760:                    dest = (Address) new_dests.elementAt(i);
0761:                    msg = new Message(dest, null, null);
0762:                    hdr = new PbcastHeader(gossip.copy(), PbcastHeader.GOSSIP);
0763:                    msg.putHeader(getName(), hdr);
0764:                    passDown(new Event(Event.MSG, msg));
0765:                }
0766:            }
0767:
0768:            /**
0769:             * Find the messages indicated in <code>xmit_reqs</code> and re-send them to
0770:             * <code>requester</code>
0771:             */
0772:            void handleXmitRequest(Address requester, Hashtable xmit_reqs) {
0773:                NakReceiverWindow win;
0774:                Address sender;
0775:                List msgs, missing_msgs, xmit_msgs;
0776:                Message msg;
0777:
0778:                if (requester == null) {
0779:                    if (log.isErrorEnabled())
0780:                        log.error("requester is null");
0781:                    return;
0782:                }
0783:
0784:                if (log.isInfoEnabled())
0785:                    log.info("retransmission requests are "
0786:                            + printXmitReqs(xmit_reqs));
0787:                for (Enumeration e = xmit_reqs.keys(); e.hasMoreElements();) {
0788:                    sender = (Address) e.nextElement();
0789:                    win = (NakReceiverWindow) digest.get(sender);
0790:                    if (win == null) {
0791:                        if (log.isWarnEnabled())
0792:                            log
0793:                                    .warn("sender "
0794:                                            + sender
0795:                                            + " not found in my digest; skipping retransmit request !");
0796:                        continue;
0797:                    }
0798:
0799:                    missing_msgs = (List) xmit_reqs.get(sender);
0800:                    msgs = win.getMessagesInList(missing_msgs); // msgs to be sent back to requester
0801:
0802:                    // re-send the messages to requester. don't add a header since they already have headers
0803:                    // (when added to the NakReceiverWindow, the headers were not removed)
0804:                    xmit_msgs = new List();
0805:                    for (Enumeration en = msgs.elements(); en.hasMoreElements();) {
0806:                        msg = ((Message) en.nextElement()).copy();
0807:                        xmit_msgs.add(msg);
0808:                    }
0809:
0810:                    // create a msg with the List of xmit_msgs as contents, add header
0811:                    msg = new Message(requester, null, xmit_msgs);
0812:                    msg.putHeader(getName(), new PbcastHeader(
0813:                            PbcastHeader.XMIT_RSP));
0814:                    passDown(new Event(Event.MSG, msg));
0815:                }
0816:            }
0817:
0818:            void handleXmitRsp(List xmit_msgs) {
0819:                Message m;
0820:                PbcastHeader hdr;
0821:
0822:                for (Enumeration e = xmit_msgs.elements(); e.hasMoreElements();) {
0823:                    m = (Message) e.nextElement();
0824:                    hdr = (PbcastHeader) m.removeHeader(getName());
0825:                    if (hdr == null) {
0826:                        log.warn("header is null, ignoring message");
0827:                    } else {
0828:                        if (log.isInfoEnabled())
0829:                            log.info("received #" + hdr.seqno + ", type="
0830:                                    + PbcastHeader.type2String(hdr.type)
0831:                                    + ", msg=" + m);
0832:                        handleUpMessage(m, hdr);
0833:                    }
0834:                }
0835:            }
0836:
0837:            String printXmitReqs(Hashtable xmit_reqs) {
0838:                StringBuffer sb = new StringBuffer();
0839:                Address key;
0840:                boolean first = true;
0841:
0842:                if (xmit_reqs == null)
0843:                    return "<null>";
0844:
0845:                for (Enumeration e = xmit_reqs.keys(); e.hasMoreElements();) {
0846:                    key = (Address) e.nextElement();
0847:                    if (!first) {
0848:                        sb.append(", ");
0849:                    } else
0850:                        first = false;
0851:                    sb.append(key + ": " + xmit_reqs.get(key));
0852:                }
0853:                return sb.toString();
0854:            }
0855:
0856:            void garbageCollect(Digest d) {
0857:                Address sender;
0858:                long tmp_seqno;
0859:                NakReceiverWindow win;
0860:                Map.Entry entry;
0861:                org.jgroups.protocols.pbcast.Digest.Entry val;
0862:
0863:                for (Iterator it = d.senders.entrySet().iterator(); it
0864:                        .hasNext();) {
0865:                    entry = (Map.Entry) it.next();
0866:                    sender = (Address) entry.getKey();
0867:                    val = (org.jgroups.protocols.pbcast.Digest.Entry) entry
0868:                            .getValue();
0869:                    win = (NakReceiverWindow) digest.get(sender);
0870:                    if (win == null) {
0871:                        if (log.isDebugEnabled())
0872:                            log
0873:                                    .debug("sender "
0874:                                            + sender
0875:                                            + " not found in our message digest, skipping");
0876:                        continue;
0877:                    }
0878:                    tmp_seqno = val.high_seqno;
0879:                    tmp_seqno = Math.max(tmp_seqno - gc_lag, 0);
0880:                    if (tmp_seqno <= 0) {
0881:                        continue;
0882:                    }
0883:
0884:                    if (log.isTraceEnabled())
0885:                        log.trace("(from " + local_addr
0886:                                + ") GC: deleting messages < " + tmp_seqno
0887:                                + " from " + sender);
0888:                    win.stable(tmp_seqno);
0889:                }
0890:            }
0891:
0892:            /**
0893:             * If sender of gossip is not a member, send a NOT_MEMBER to sender (after n gossips received).
0894:             * This will cause that member to leave the group and possibly re-join.
0895:             */
0896:            void shunInvalidGossiper(Address invalid_gossiper) {
0897:                int num_pings = 0;
0898:                Message shun_msg;
0899:
0900:                if (invalid_gossipers.containsKey(invalid_gossiper)) {
0901:                    num_pings = ((Integer) invalid_gossipers
0902:                            .get(invalid_gossiper)).intValue();
0903:                    if (num_pings >= max_invalid_gossips) {
0904:
0905:                        if (log.isInfoEnabled())
0906:                            log.info("sender " + invalid_gossiper
0907:                                    + " is not member of " + members
0908:                                    + " ! Telling it to leave group");
0909:                        shun_msg = new Message(invalid_gossiper, null, null);
0910:                        shun_msg.putHeader(getName(), new PbcastHeader(
0911:                                PbcastHeader.NOT_MEMBER));
0912:                        passDown(new Event(Event.MSG, shun_msg));
0913:                        invalid_gossipers.remove(invalid_gossiper);
0914:                    } else {
0915:                        num_pings++;
0916:                        invalid_gossipers.put(invalid_gossiper, new Integer(
0917:                                num_pings));
0918:                    }
0919:                } else {
0920:                    num_pings++;
0921:                    invalid_gossipers.put(invalid_gossiper, new Integer(
0922:                            num_pings));
0923:                }
0924:            }
0925:
0926:            /** Computes the gossip_interval. See DESIGN for details */
0927:            long computeGossipInterval(int num_mbrs, double desired_avg_gossip) {
0928:                return getRandom((long) (num_mbrs * desired_avg_gossip * 2));
0929:            }
0930:
0931:            long getRandom(long range) {
0932:                return (long) ((Math.random() * range) % range);
0933:            }
0934:
0935:            /* ------------------------------- End of Private Methods ---------------------------------------- */
0936:
0937:            private static class GossipEntry {
0938:                PbcastHeader hdr = null;
0939:                Address sender = null;
0940:                byte[] data = null;
0941:
0942:                GossipEntry(PbcastHeader hdr, Address sender, byte[] data) {
0943:                    this .hdr = hdr;
0944:                    this .sender = sender;
0945:                    this .data = data;
0946:                }
0947:
0948:                public String toString() {
0949:                    return "hdr=" + hdr + ", sender=" + sender + ", data="
0950:                            + data;
0951:                }
0952:            }
0953:
0954:            /**
0955:             Handles gossip and retransmission requests. Removes requests from a (bounded) queue.
0956:             */
0957:            private class GossipHandler implements  Runnable {
0958:                Thread t = null;
0959:                final Queue queue;
0960:
0961:                GossipHandler(Queue q) {
0962:                    queue = q;
0963:                }
0964:
0965:                void start() {
0966:                    if (t == null) {
0967:                        t = new Thread(this , "PBCAST.GossipHandlerThread");
0968:                        t.setDaemon(true);
0969:                        t.start();
0970:                    }
0971:                }
0972:
0973:                void stop() {
0974:                    Thread tmp;
0975:                    if (t != null && t.isAlive()) {
0976:                        tmp = t;
0977:                        t = null;
0978:                        if (queue != null)
0979:                            queue.close(false); // don't flush elements
0980:                        tmp.interrupt();
0981:                    }
0982:                    t = null;
0983:                }
0984:
0985:                public void run() {
0986:                    GossipEntry entry;
0987:                    PbcastHeader hdr;
0988:                    List xmit_msgs;
0989:                    byte[] data;
0990:
0991:                    while (t != null && queue != null) {
0992:                        try {
0993:                            entry = (GossipEntry) queue.remove();
0994:                            hdr = entry.hdr;
0995:                            if (hdr == null) {
0996:                                if (log.isErrorEnabled())
0997:                                    log
0998:                                            .error("gossip entry has no PbcastHeader");
0999:                                continue;
1000:                            }
1001:
1002:                            switch (hdr.type) {
1003:
1004:                            case PbcastHeader.GOSSIP:
1005:                                handleGossip(hdr.gossip);
1006:                                break;
1007:
1008:                            case PbcastHeader.XMIT_REQ:
1009:                                if (hdr.xmit_reqs == null) {
1010:                                    if (log.isWarnEnabled())
1011:                                        log.warn("request is null !");
1012:                                    break;
1013:                                }
1014:                                handleXmitRequest(entry.sender, hdr.xmit_reqs);
1015:                                break;
1016:
1017:                            case PbcastHeader.XMIT_RSP:
1018:                                data = entry.data;
1019:                                if (data == null) {
1020:                                    if (log.isWarnEnabled())
1021:                                        log
1022:                                                .warn("buffer is null (no xmitted msgs)");
1023:                                    break;
1024:                                }
1025:                                try {
1026:                                    xmit_msgs = (List) Util
1027:                                            .objectFromByteBuffer(data);
1028:                                } catch (Exception ex) {
1029:                                    if (log.isErrorEnabled())
1030:                                        log
1031:                                                .error(
1032:                                                        "failed creating retransmitted messages from buffer",
1033:                                                        ex);
1034:                                    break;
1035:                                }
1036:                                handleXmitRsp(xmit_msgs);
1037:                                break;
1038:
1039:                            case PbcastHeader.NOT_MEMBER: // we are shunned
1040:                                if (shun) {
1041:                                    if (log.isInfoEnabled())
1042:                                        log
1043:                                                .info("I am being shunned. Will leave and re-join");
1044:                                    passUp(new Event(Event.EXIT));
1045:                                }
1046:                                break;
1047:
1048:                            default:
1049:                                if (log.isErrorEnabled())
1050:                                    log.error("type (" + hdr.type
1051:                                            + ") of PbcastHeader not known !");
1052:                                return;
1053:                            }
1054:                        } catch (QueueClosedException closed) {
1055:                            break;
1056:                        }
1057:                    }
1058:                }
1059:            }
1060:
1061:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.