Source Code Cross Referenced for STABLE.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » protocols » pbcast » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.protocols.pbcast 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        // $Id: STABLE.java,v 1.46.6.1 2007/04/27 08:03:55 belaban Exp $
002:
003:        package org.jgroups.protocols.pbcast;
004:
005:        import org.jgroups.*;
006:        import org.jgroups.stack.Protocol;
007:        import org.jgroups.util.Streamable;
008:        import org.jgroups.util.TimeScheduler;
009:        import org.jgroups.util.Util;
010:
011:        import java.io.*;
012:        import java.util.Iterator;
013:        import java.util.Map;
014:        import java.util.Properties;
015:        import java.util.Vector;
016:
017:        /**
018:         * Computes the broadcast messages that are stable; i.e., have been received by all members. Sends
019:         * STABLE events up the stack when this is the case. This allows NAKACK to garbage collect messages that
020:         * have been seen by all members.<p>
021:         * Works as follows: periodically we mcast our highest seqnos (seen for each member) to the group.
022:         * A stability vector, which maintains the highest seqno for each member and initially contains no data,
023:         * is updated when such a message is received. The entry for a member P is computed set to
024:         * min(entry[P], digest[P]). When messages from all members have been received, a stability
025:         * message is mcast, which causes all members to send a STABLE event up the stack (triggering garbage collection
026:         * in the NAKACK layer).<p>
027:         * The stable task now terminates after max_num_gossips if no messages or view changes have been sent or received
028:         * in the meantime. It will resume when messages are received. This effectively suspends sending superfluous
029:         * STABLE messages in the face of no activity.<br/>
030:         * New: when <code>max_bytes</code> is exceeded (unless disabled by setting it to 0),
031:         * a STABLE task will be started (unless it is already running).
032:         * @author Bela Ban
033:         */
034:        public class STABLE extends Protocol {
035:            Address local_addr = null;
036:            final Vector mbrs = new Vector();
037:            final Digest digest = new Digest(10); // keeps track of the highest seqnos from all members
038:            final Digest latest_local_digest = new Digest(10); // keeps track of the latest digests received from NAKACK
039:            final Vector heard_from = new Vector(); // keeps track of who we already heard from (STABLE_GOSSIP msgs)
040:
041:            /** Sends a STABLE gossip every 20 seconds on average. 0 disables gossipping of STABLE messages */
042:            long desired_avg_gossip = 20000;
043:
044:            /** delay before we send STABILITY msg (give others a change to send first). This should be set to a very
045:             * small number (> 0 !) if <code>max_bytes</code> is used */
046:            long stability_delay = 6000;
047:            private StabilitySendTask stability_task = null;
048:            final Object stability_mutex = new Object(); // to synchronize on stability_task
049:            private volatile StableTask stable_task = null; // bcasts periodic STABLE message (added to timer below)
050:            final Object stable_task_mutex = new Object(); // to sync on stable_task
051:            TimeScheduler timer = null; // to send periodic STABLE msgs (and STABILITY messages)
052:            static final String name = "STABLE";
053:
054:            /** Total amount of bytes from incoming messages (default = 0 = disabled). When exceeded, a STABLE
055:             * message will be broadcast and <code>num_bytes_received</code> reset to 0 . If this is > 0, then ideally
056:             * <code>stability_delay</code> should be set to a low number as well */
057:            long max_bytes = 0;
058:
059:            /** The total number of bytes received from unicast and multicast messages */
060:            long num_bytes_received = 0;
061:
062:            /** When true, don't take part in garbage collection protocol: neither send STABLE messages nor
063:             * handle STABILITY messages */
064:            boolean suspended = false;
065:
066:            boolean initialized = false;
067:
068:            private ResumeTask resume_task = null;
069:            final Object resume_task_mutex = new Object();
070:
071:            /** Number of gossip messages */
072:            int num_gossips = 0;
073:
074:            private static final long MAX_SUSPEND_TIME = 200000;
075:
076:            public String getName() {
077:                return name;
078:            }
079:
080:            public long getDesiredAverageGossip() {
081:                return desired_avg_gossip;
082:            }
083:
084:            public void setDesiredAverageGossip(long gossip_interval) {
085:                desired_avg_gossip = gossip_interval;
086:            }
087:
088:            public long getMaxBytes() {
089:                return max_bytes;
090:            }
091:
092:            public void setMaxBytes(long max_bytes) {
093:                this .max_bytes = max_bytes;
094:            }
095:
096:            public int getNumberOfGossipMessages() {
097:                return num_gossips;
098:            }
099:
100:            public void resetStats() {
101:                super .resetStats();
102:                num_gossips = 0;
103:            }
104:
105:            public Vector requiredDownServices() {
106:                Vector retval = new Vector();
107:                retval.addElement(new Integer(Event.GET_DIGEST_STABLE)); // NAKACK layer
108:                return retval;
109:            }
110:
111:            public boolean setProperties(Properties props) {
112:                String str;
113:
114:                super .setProperties(props);
115:                str = props.getProperty("digest_timeout");
116:                if (str != null) {
117:                    props.remove("digest_timeout");
118:                    log
119:                            .error("digest_timeout has been deprecated; it will be ignored");
120:                }
121:
122:                str = props.getProperty("desired_avg_gossip");
123:                if (str != null) {
124:                    desired_avg_gossip = Long.parseLong(str);
125:                    props.remove("desired_avg_gossip");
126:                }
127:
128:                str = props.getProperty("stability_delay");
129:                if (str != null) {
130:                    stability_delay = Long.parseLong(str);
131:                    props.remove("stability_delay");
132:                }
133:
134:                str = props.getProperty("max_gossip_runs");
135:                if (str != null) {
136:                    props.remove("max_gossip_runs");
137:                    log
138:                            .error("max_gossip_runs has been deprecated and will be ignored");
139:                }
140:
141:                str = props.getProperty("max_bytes");
142:                if (str != null) {
143:                    max_bytes = Long.parseLong(str);
144:                    props.remove("max_bytes");
145:                }
146:
147:                str = props.getProperty("max_suspend_time");
148:                if (str != null) {
149:                    log
150:                            .error("max_suspend_time is not supported any longer; please remove it (ignoring it)");
151:                    props.remove("max_suspend_time");
152:                }
153:
154:                if (props.size() > 0) {
155:                    log.error("these properties are not recognized: " + props);
156:
157:                    return false;
158:                }
159:                return true;
160:            }
161:
162:            private void suspend(long timeout) {
163:                if (!suspended) {
164:                    suspended = true;
165:                    if (log.isDebugEnabled())
166:                        log.debug("suspending message garbage collection");
167:                }
168:                startResumeTask(timeout); // will not start task if already running
169:            }
170:
171:            private void resume() {
172:                resetDigest(mbrs); // start from scratch
173:                suspended = false;
174:                if (log.isDebugEnabled())
175:                    log.debug("resuming message garbage collection");
176:                stopResumeTask();
177:            }
178:
179:            public void start() throws Exception {
180:                if (stack != null && stack.timer != null)
181:                    timer = stack.timer;
182:                else
183:                    throw new Exception(
184:                            "timer cannot be retrieved from protocol stack");
185:                if (desired_avg_gossip > 0)
186:                    startStableTask();
187:            }
188:
189:            public void stop() {
190:                stopStableTask();
191:                clearDigest();
192:            }
193:
194:            public void up(Event evt) {
195:                Message msg;
196:                StableHeader hdr;
197:                int type = evt.getType();
198:
199:                switch (type) {
200:
201:                case Event.MSG:
202:                    msg = (Message) evt.getArg();
203:
204:                    // only if message counting is enabled, and only for multicast messages
205:                    // fixes http://jira.jboss.com/jira/browse/JGRP-233
206:                    if (max_bytes > 0) {
207:                        Address dest = msg.getDest();
208:                        if (dest == null || dest.isMulticastAddress()) {
209:                            num_bytes_received += (long) Math.max(msg
210:                                    .getLength(), 24);
211:                            if (num_bytes_received >= max_bytes) {
212:                                if (log.isTraceEnabled()) {
213:                                    log.trace(new StringBuffer(
214:                                            "max_bytes has been reached (")
215:                                            .append(max_bytes).append(
216:                                                    ", bytes received=")
217:                                            .append(num_bytes_received).append(
218:                                                    "): triggers stable msg"));
219:                                }
220:                                num_bytes_received = 0;
221:                                // asks the NAKACK protocol for the current digest, reply event is GET_DIGEST_STABLE_OK (arg=digest)
222:                                passDown(new Event(Event.GET_DIGEST_STABLE));
223:                            }
224:                        }
225:                    }
226:
227:                    hdr = (StableHeader) msg.removeHeader(name);
228:                    if (hdr == null)
229:                        break;
230:                    switch (hdr.type) {
231:                    case StableHeader.STABLE_GOSSIP:
232:                        handleStableMessage(msg.getSrc(), hdr.stableDigest);
233:                        break;
234:                    case StableHeader.STABILITY:
235:                        handleStabilityMessage(hdr.stableDigest, msg.getSrc());
236:                        break;
237:                    default:
238:                        if (log.isErrorEnabled())
239:                            log.error("StableHeader type " + hdr.type
240:                                    + " not known");
241:                    }
242:                    return; // don't pass STABLE or STABILITY messages up the stack
243:
244:                case Event.GET_DIGEST_STABLE_OK:
245:                    Digest d = (Digest) evt.getArg();
246:                    synchronized (latest_local_digest) {
247:                        latest_local_digest.replace(d);
248:                    }
249:                    if (log.isTraceEnabled())
250:                        log.trace("setting latest_local_digest from NAKACK: "
251:                                + d.printHighSeqnos());
252:                    sendStableMessage(d);
253:                    break;
254:
255:                case Event.VIEW_CHANGE:
256:                    View view = (View) evt.getArg();
257:                    handleViewChange(view);
258:                    break;
259:
260:                case Event.SET_LOCAL_ADDRESS:
261:                    local_addr = (Address) evt.getArg();
262:                    break;
263:                }
264:                passUp(evt);
265:            }
266:
267:            public void down(Event evt) {
268:                switch (evt.getType()) {
269:                case Event.VIEW_CHANGE:
270:                    View v = (View) evt.getArg();
271:                    handleViewChange(v);
272:                    break;
273:
274:                case Event.SUSPEND_STABLE:
275:                    long timeout = 0;
276:                    Object t = evt.getArg();
277:                    if (t != null && t instanceof  Long)
278:                        timeout = ((Long) t).longValue();
279:                    suspend(timeout);
280:                    break;
281:
282:                case Event.RESUME_STABLE:
283:                    resume();
284:                    break;
285:                }
286:                passDown(evt);
287:            }
288:
289:            public void runMessageGarbageCollection() {
290:                Digest copy;
291:                synchronized (digest) {
292:                    copy = digest.copy();
293:                }
294:                sendStableMessage(copy);
295:            }
296:
297:            /* --------------------------------------- Private Methods ---------------------------------------- */
298:
299:            private void handleViewChange(View v) {
300:                Vector tmp = v.getMembers();
301:                mbrs.clear();
302:                mbrs.addAll(tmp);
303:                adjustSenders(digest, tmp);
304:                adjustSenders(latest_local_digest, tmp);
305:                resetDigest(tmp);
306:                if (!initialized)
307:                    initialized = true;
308:            }
309:
310:            /** Digest and members are guaranteed to be non-null */
311:            private static void adjustSenders(Digest d, Vector members) {
312:                synchronized (d) {
313:                    // 1. remove all members from digest who are not in the view
314:                    Iterator it = d.senders.keySet().iterator();
315:                    Address mbr;
316:                    while (it.hasNext()) {
317:                        mbr = (Address) it.next();
318:                        if (!members.contains(mbr))
319:                            it.remove();
320:                    }
321:                    // 2. add members to digest which are in the new view but not in the digest
322:                    for (int i = 0; i < members.size(); i++) {
323:                        mbr = (Address) members.get(i);
324:                        if (!d.contains(mbr))
325:                            d.add(mbr, -1, -1);
326:                    }
327:                }
328:            }
329:
330:            private void clearDigest() {
331:                synchronized (digest) {
332:                    digest.clear();
333:                }
334:            }
335:
336:            /** Update my own digest from a digest received by somebody else. Returns whether the update was successful.
337:             *  Needs to be called with a lock on digest */
338:            private boolean updateLocalDigest(Digest d, Address sender) {
339:                if (d == null || d.size() == 0)
340:                    return false;
341:
342:                if (!initialized) {
343:                    if (log.isTraceEnabled())
344:                        log
345:                                .trace("STABLE message will not be handled as I'm not yet initialized");
346:                    return false;
347:                }
348:
349:                if (!digest.sameSenders(d)) {
350:                    if (log.isTraceEnabled())
351:                        log
352:                                .trace(new StringBuffer("received a digest ")
353:                                        .append(d.printHighSeqnos())
354:                                        .append(" from ")
355:                                        .append(sender)
356:                                        .append(
357:                                                " which has different members than mine (")
358:                                        .append(digest.printHighSeqnos())
359:                                        .append(
360:                                                "), discarding it and resetting heard_from list"));
361:                    // to avoid sending incorrect stability/stable msgs, we simply reset our heard_from list, see DESIGN
362:                    resetDigest(mbrs);
363:                    return false;
364:                }
365:
366:                StringBuffer sb = null;
367:                if (log.isTraceEnabled())
368:                    sb = new StringBuffer("my [").append(local_addr).append(
369:                            "] digest before: ").append(digest).append(
370:                            "\ndigest from ").append(sender).append(": ")
371:                            .append(d);
372:                Address mbr;
373:                long highest_seqno, my_highest_seqno, new_highest_seqno;
374:                long highest_seen_seqno, my_highest_seen_seqno, new_highest_seen_seqno;
375:                Map.Entry entry;
376:                org.jgroups.protocols.pbcast.Digest.Entry val;
377:                for (Iterator it = d.senders.entrySet().iterator(); it
378:                        .hasNext();) {
379:                    entry = (Map.Entry) it.next();
380:                    mbr = (Address) entry.getKey();
381:                    val = (org.jgroups.protocols.pbcast.Digest.Entry) entry
382:                            .getValue();
383:                    highest_seqno = val.high_seqno;
384:                    highest_seen_seqno = val.high_seqno_seen;
385:
386:                    // compute the minimum of the highest seqnos deliverable (for garbage collection)
387:                    my_highest_seqno = digest.highSeqnoAt(mbr);
388:                    // compute the maximum of the highest seqnos seen (for retransmission of last missing message)
389:                    my_highest_seen_seqno = digest.highSeqnoSeenAt(mbr);
390:
391:                    new_highest_seqno = Math.min(my_highest_seqno,
392:                            highest_seqno);
393:                    new_highest_seen_seqno = Math.max(my_highest_seen_seqno,
394:                            highest_seen_seqno);
395:                    digest.setHighestDeliveredAndSeenSeqnos(mbr,
396:                            new_highest_seqno, new_highest_seen_seqno);
397:                }
398:                if (log.isTraceEnabled()) {
399:                    sb.append("\nmy [").append(local_addr).append(
400:                            "] digest after: ").append(digest).append("\n");
401:                    log.trace(sb);
402:                }
403:                return true;
404:            }
405:
406:            private void resetDigest(Vector new_members) {
407:                if (new_members == null || new_members.size() == 0)
408:                    return;
409:                synchronized (heard_from) {
410:                    heard_from.clear();
411:                    heard_from.addAll(new_members);
412:                }
413:
414:                Digest copy_of_latest;
415:                synchronized (latest_local_digest) {
416:                    copy_of_latest = latest_local_digest.copy();
417:                }
418:                synchronized (digest) {
419:                    digest.replace(copy_of_latest);
420:                    if (log.isTraceEnabled())
421:                        log.trace("resetting digest from NAKACK: "
422:                                + copy_of_latest.printHighSeqnos());
423:                }
424:            }
425:
426:            /**
427:             * Removes mbr from heard_from and returns true if this was the last member, otherwise false.
428:             * Resets the heard_from list (populates with membership)
429:             * @param mbr
430:             */
431:            private boolean removeFromHeardFromList(Address mbr) {
432:                synchronized (heard_from) {
433:                    heard_from.remove(mbr);
434:                    if (heard_from.size() == 0) {
435:                        resetDigest(this .mbrs);
436:                        return true;
437:                    }
438:                }
439:                return false;
440:            }
441:
442:            void startStableTask() {
443:                // Here, double-checked locking works: we don't want to synchronize if the task already runs (which is the case
444:                // 99% of the time). If stable_task gets nulled after the condition check, we return anyways, but just miss
445:                // 1 cycle: on the next message or view, we will start the task
446:                if (stable_task != null)
447:                    return;
448:                synchronized (stable_task_mutex) {
449:                    if (stable_task != null && stable_task.running()) {
450:                        return; // already running
451:                    }
452:                    stable_task = new StableTask();
453:                    timer.add(stable_task, true); // fixed-rate scheduling
454:                }
455:                if (log.isTraceEnabled())
456:                    log.trace("stable task started");
457:            }
458:
459:            void stopStableTask() {
460:                // contrary to startStableTask(), we don't need double-checked locking here because this method is not
461:                // called frequently
462:                synchronized (stable_task_mutex) {
463:                    if (stable_task != null) {
464:                        stable_task.stop();
465:                        stable_task = null;
466:                    }
467:                }
468:            }
469:
470:            void startResumeTask(long max_suspend_time) {
471:                max_suspend_time = (long) (max_suspend_time * 1.1); // little slack
472:                if (max_suspend_time <= 0)
473:                    max_suspend_time = MAX_SUSPEND_TIME;
474:
475:                synchronized (resume_task_mutex) {
476:                    if (resume_task != null && resume_task.running()) {
477:                        return; // already running
478:                    } else {
479:                        resume_task = new ResumeTask(max_suspend_time);
480:                        timer.add(resume_task, true); // fixed-rate scheduling
481:                    }
482:                }
483:                if (log.isDebugEnabled())
484:                    log.debug("resume task started, max_suspend_time="
485:                            + max_suspend_time);
486:            }
487:
488:            void stopResumeTask() {
489:                synchronized (resume_task_mutex) {
490:                    if (resume_task != null) {
491:                        resume_task.stop();
492:                        resume_task = null;
493:                    }
494:                }
495:            }
496:
497:            void startStabilityTask(Digest d, long delay) {
498:                synchronized (stability_mutex) {
499:                    if (stability_task != null && stability_task.running()) {
500:                    } else {
501:                        stability_task = new StabilitySendTask(d, delay); // runs only once
502:                        timer.add(stability_task, true);
503:                    }
504:                }
505:            }
506:
507:            void stopStabilityTask() {
508:                synchronized (stability_mutex) {
509:                    if (stability_task != null) {
510:                        stability_task.stop();
511:                        stability_task = null;
512:                    }
513:                }
514:            }
515:
516:            /**
517:             Digest d contains (a) the highest seqnos <em>deliverable</em> for each sender and (b) the highest seqnos
518:             <em>seen</em> for each member. (Difference: with 1,2,4,5, the highest seqno seen is 5, whereas the highest
519:             seqno deliverable is 2). The minimum of all highest seqnos deliverable will be taken to send a stability
520:             message, which results in garbage collection of messages lower than the ones in the stability vector. The
521:             maximum of all seqnos will be taken to trigger possible retransmission of last missing seqno (see DESIGN
522:             for details).
523:             */
524:            private void handleStableMessage(Address sender, Digest d) {
525:                if (d == null || sender == null) {
526:                    if (log.isErrorEnabled())
527:                        log.error("digest or sender is null");
528:                    return;
529:                }
530:
531:                if (!initialized) {
532:                    if (log.isTraceEnabled())
533:                        log
534:                                .trace("STABLE message will not be handled as I'm not yet initialized");
535:                    return;
536:                }
537:
538:                if (suspended) {
539:                    if (log.isTraceEnabled())
540:                        log
541:                                .trace("STABLE message will not be handled as I'm suspended");
542:                    return;
543:                }
544:
545:                if (log.isTraceEnabled())
546:                    log.trace(new StringBuffer("received stable msg from ")
547:                            .append(sender).append(": ").append(
548:                                    d.printHighSeqnos()));
549:                if (!heard_from.contains(sender)) { // already received gossip from sender; discard it
550:                    if (log.isTraceEnabled())
551:                        log.trace("already received stable msg from " + sender);
552:                    return;
553:                }
554:
555:                Digest copy;
556:                synchronized (digest) {
557:                    boolean success = updateLocalDigest(d, sender);
558:                    if (!success) // we can only remove the sender from heard_from if *all* elements of my digest were updated
559:                        return;
560:                    copy = digest.copy();
561:                }
562:
563:                boolean was_last = removeFromHeardFromList(sender);
564:                if (was_last) {
565:                    sendStabilityMessage(copy);
566:                }
567:            }
568:
569:            /**
570:             * Bcasts a STABLE message of the current digest to all members. Message contains highest seqnos of all members
571:             * seen by this member. Highest seqnos are retrieved from the NAKACK layer below.
572:             * @param d A <em>copy</em> of this.digest
573:             */
574:            private void sendStableMessage(Digest d) {
575:                if (suspended) {
576:                    if (log.isTraceEnabled())
577:                        log
578:                                .trace("will not send STABLE message as I'm suspended");
579:                    return;
580:                }
581:
582:                if (d != null && d.size() > 0) {
583:                    if (log.isTraceEnabled())
584:                        log.trace("sending stable msg " + d.printHighSeqnos());
585:                    Message msg = new Message(); // mcast message
586:                    StableHeader hdr = new StableHeader(
587:                            StableHeader.STABLE_GOSSIP, d);
588:                    msg.putHeader(name, hdr);
589:                    num_gossips++;
590:                    passDown(new Event(Event.MSG, msg));
591:                }
592:            }
593:
594:            /**
595:             Schedules a stability message to be mcast after a random number of milliseconds (range 1-5 secs).
596:             The reason for waiting a random amount of time is that, in the worst case, all members receive a
597:             STABLE_GOSSIP message from the last outstanding member at the same time and would therefore mcast the
598:             STABILITY message at the same time too. To avoid this, each member waits random N msecs. If, before N
599:             elapses, some other member sent the STABILITY message, we just cancel our own message. If, during
600:             waiting for N msecs to send STABILITY message S1, another STABILITY message S2 is to be sent, we just
601:             discard S2.
602:             @param tmp A copy of te stability digest, so we don't need to copy it again
603:             */
604:            void sendStabilityMessage(Digest tmp) {
605:                long delay;
606:
607:                if (suspended) {
608:                    if (log.isTraceEnabled())
609:                        log
610:                                .trace("STABILITY message will not be sent as I'm suspended");
611:                    return;
612:                }
613:
614:                // give other members a chance to mcast STABILITY message. if we receive STABILITY by the end of
615:                // our random sleep, we will not send the STABILITY msg. this prevents that all mbrs mcast a
616:                // STABILITY msg at the same time
617:                delay = Util.random(stability_delay);
618:                startStabilityTask(tmp, delay);
619:            }
620:
621:            void handleStabilityMessage(Digest d, Address sender) {
622:                if (d == null) {
623:                    if (log.isErrorEnabled())
624:                        log.error("stability digest is null");
625:                    return;
626:                }
627:
628:                if (!initialized) {
629:                    if (log.isTraceEnabled())
630:                        log
631:                                .trace("STABLE message will not be handled as I'm not yet initialized");
632:                    return;
633:                }
634:
635:                if (suspended) {
636:                    if (log.isDebugEnabled()) {
637:                        log
638:                                .debug("stability message will not be handled as I'm suspended");
639:                    }
640:                    return;
641:                }
642:
643:                if (log.isTraceEnabled())
644:                    log.trace(new StringBuffer("received stability msg from ")
645:                            .append(sender).append(": ").append(
646:                                    d.printHighSeqnos()));
647:                stopStabilityTask();
648:
649:                // we won't handle the gossip d, if d's members don't match the membership in my own digest,
650:                // this is part of the fix for the NAKACK problem (bugs #943480 and #938584)
651:                if (!this .digest.sameSenders(d)) {
652:                    if (log.isDebugEnabled()) {
653:                        log
654:                                .debug("received digest (digest="
655:                                        + d
656:                                        + ") which does not match my own digest ("
657:                                        + this .digest
658:                                        + "): ignoring digest and re-initializing own digest");
659:                    }
660:                    return;
661:                }
662:
663:                resetDigest(mbrs);
664:
665:                // pass STABLE event down the stack, so NAKACK can garbage collect old messages
666:                passDown(new Event(Event.STABLE, d));
667:            }
668:
669:            /* ------------------------------------End of Private Methods ------------------------------------- */
670:
671:            public static class StableHeader extends Header implements 
672:                    Streamable {
673:                public static final int STABLE_GOSSIP = 1;
674:                public static final int STABILITY = 2;
675:
676:                int type = 0;
677:                // Digest digest=new Digest();  // used for both STABLE_GOSSIP and STABILITY message
678:                Digest stableDigest = null; // changed by Bela April 4 2004
679:
680:                public StableHeader() {
681:                } // used for externalizable
682:
683:                public StableHeader(int type, Digest digest) {
684:                    this .type = type;
685:                    this .stableDigest = digest;
686:                }
687:
688:                static String type2String(int t) {
689:                    switch (t) {
690:                    case STABLE_GOSSIP:
691:                        return "STABLE_GOSSIP";
692:                    case STABILITY:
693:                        return "STABILITY";
694:                    default:
695:                        return "<unknown>";
696:                    }
697:                }
698:
699:                public String toString() {
700:                    StringBuffer sb = new StringBuffer();
701:                    sb.append('[');
702:                    sb.append(type2String(type));
703:                    sb.append("]: digest is ");
704:                    sb.append(stableDigest);
705:                    return sb.toString();
706:                }
707:
708:                public void writeExternal(ObjectOutput out) throws IOException {
709:                    out.writeInt(type);
710:                    if (stableDigest == null) {
711:                        out.writeBoolean(false);
712:                        return;
713:                    }
714:                    out.writeBoolean(true);
715:                    stableDigest.writeExternal(out);
716:                }
717:
718:                public void readExternal(ObjectInput in) throws IOException,
719:                        ClassNotFoundException {
720:                    type = in.readInt();
721:                    boolean digest_not_null = in.readBoolean();
722:                    if (digest_not_null) {
723:                        stableDigest = new Digest();
724:                        stableDigest.readExternal(in);
725:                    }
726:                }
727:
728:                public long size() {
729:                    long retval = Global.INT_SIZE + Global.BYTE_SIZE; // type + presence for digest
730:                    if (stableDigest != null)
731:                        retval += stableDigest.serializedSize();
732:                    return retval;
733:                }
734:
735:                public void writeTo(DataOutputStream out) throws IOException {
736:                    out.writeInt(type);
737:                    Util.writeStreamable(stableDigest, out);
738:                }
739:
740:                public void readFrom(DataInputStream in) throws IOException,
741:                        IllegalAccessException, InstantiationException {
742:                    type = in.readInt();
743:                    stableDigest = (Digest) Util.readStreamable(Digest.class,
744:                            in);
745:                }
746:
747:            }
748:
749:            /**
750:             Mcast periodic STABLE message. Interval between sends varies. Terminates after num_gossip_runs is 0.
751:             However, UP or DOWN messages will reset num_gossip_runs to max_gossip_runs. This has the effect that the
752:             stable_send task terminates only after a period of time within which no messages were either sent
753:             or received
754:             */
755:            private class StableTask implements  TimeScheduler.Task {
756:                boolean stopped = false;
757:
758:                public void stop() {
759:                    stopped = true;
760:                }
761:
762:                public boolean running() { // syntactic sugar
763:                    return !stopped;
764:                }
765:
766:                public boolean cancelled() {
767:                    return stopped;
768:                }
769:
770:                public long nextInterval() {
771:                    long interval = computeSleepTime();
772:                    if (interval <= 0)
773:                        return 10000;
774:                    else
775:                        return interval;
776:                }
777:
778:                public void run() {
779:                    if (suspended) {
780:                        if (log.isTraceEnabled())
781:                            log.trace("stable task will not run as suspended="
782:                                    + suspended);
783:                        return;
784:                    }
785:
786:                    // asks the NAKACK protocol for the current digest, reply event is GET_DIGEST_STABLE_OK (arg=digest)
787:                    passDown(new Event(Event.GET_DIGEST_STABLE));
788:                }
789:
790:                long computeSleepTime() {
791:                    return getRandom((mbrs.size() * desired_avg_gossip * 2));
792:                }
793:
794:                long getRandom(long range) {
795:                    return (long) ((Math.random() * range) % range);
796:                }
797:            }
798:
799:            /**
800:             * Multicasts a STABILITY message.
801:             */
802:            private class StabilitySendTask implements  TimeScheduler.Task {
803:                Digest d = null;
804:                boolean stopped = false;
805:                long delay = 2000;
806:
807:                StabilitySendTask(Digest d, long delay) {
808:                    this .d = d;
809:                    this .delay = delay;
810:                }
811:
812:                public boolean running() {
813:                    return !stopped;
814:                }
815:
816:                public void stop() {
817:                    stopped = true;
818:                }
819:
820:                public boolean cancelled() {
821:                    return stopped;
822:                }
823:
824:                /** wait a random number of msecs (to give other a chance to send the STABILITY msg first) */
825:                public long nextInterval() {
826:                    return delay;
827:                }
828:
829:                public void run() {
830:                    Message msg;
831:                    StableHeader hdr;
832:
833:                    if (suspended) {
834:                        if (log.isDebugEnabled()) {
835:                            log
836:                                    .debug("STABILITY message will not be sent as suspended="
837:                                            + suspended);
838:                        }
839:                        stopped = true;
840:                        return;
841:                    }
842:
843:                    if (d != null && !stopped) {
844:                        msg = new Message();
845:                        hdr = new StableHeader(StableHeader.STABILITY, d);
846:                        msg.putHeader(STABLE.name, hdr);
847:                        if (log.isTraceEnabled())
848:                            log.trace("sending stability msg "
849:                                    + d.printHighSeqnos());
850:                        passDown(new Event(Event.MSG, msg));
851:                        d = null;
852:                    }
853:                    stopped = true; // run only once
854:                }
855:            }
856:
857:            private class ResumeTask implements  TimeScheduler.Task {
858:                boolean running = true;
859:                long max_suspend_time = 0;
860:
861:                ResumeTask(long max_suspend_time) {
862:                    this .max_suspend_time = max_suspend_time;
863:                }
864:
865:                void stop() {
866:                    running = false;
867:                }
868:
869:                public boolean running() {
870:                    return running;
871:                }
872:
873:                public boolean cancelled() {
874:                    return running == false;
875:                }
876:
877:                public long nextInterval() {
878:                    return max_suspend_time;
879:                }
880:
881:                public void run() {
882:                    if (suspended)
883:                        log
884:                                .warn("ResumeTask resumed message garbage collection - this should be done by a RESUME_STABLE event; "
885:                                        + "check why this event was not received (or increase max_suspend_time for large state transfers)");
886:                    resume();
887:                }
888:            }
889:
890:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.