Source Code Cross Referenced for STABLE.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » protocols » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.protocols 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        // $Id: STABLE.java,v 1.11.10.1 2007/04/27 08:03:52 belaban Exp $
002:
003:        package org.jgroups.protocols;
004:
005:        import org.jgroups.*;
006:        import org.jgroups.blocks.GroupRequest;
007:        import org.jgroups.blocks.MethodCall;
008:        import org.jgroups.stack.RpcProtocol;
009:        import org.jgroups.util.TimeScheduler;
010:        import org.jgroups.util.Util;
011:
012:        import java.util.Properties;
013:        import java.util.Vector;
014:
015:        /**
016:         * Computes the broadcast messages that are stable; i.e., that have been received
017:         * by all members. Sends STABLE events up the stack when this is the case.
018:         * Uses a probabilistic scheme to do so, as described in:<br>
019:         * GSGC: An Efficient Gossip-Style Garbage Collection Scheme for Scalable
020:         * Reliable Multicast, K. Guo et al., 1997.
021:         * <p>
022:         * The only difference is that instead of using counters for an estimation of
023:         * messages received from each member, we retrieve this actual information
024:         * from the NAKACK layer (which must be present for the STABLE protocol to
025:         * work).
026:         * <p>
027:         * Note: the the <tt>Event.MSG</tt> call path path must be as lightweight as
028:         * possible. It should not request any lock for which there is a high
029:         * contention and/or long delay.
030:         * <p>
031:         * <pre>
032:         * Changes(igeorg - 2.VI.2001):
033:         * i. Thread-safety (in RPC calls most notably on the lines of Gianluca
034:         * Collot's bugfix)
035:         * ii. All slow calls (RPCs, seqnos requests, etc.) placed outside locks
036:         * iii. Removed redundant initialization in adaptation to a higher round
037:         * iv. heard_from[this meber] is always set to true on every new round
038:         * (i.e. on every stability bcast).
039:         * v. Replaced gossip thread with <tt>TimeScheduler.Task</tt>
040:         * </pre>
041:         * <p>
042:         * [[[ TODO(igeorg - 2.VI.2001)
043:         * i. Faster stability convergence by better selection of gossip subsets
044:         * (replace Util.pickSubset()).
045:         * ii. Special mutex on the <tt>Event.MSG</tt> call path. I.e. remove
046:         * <tt>synchronized(this)</t>> with e.g. <tt>synchronized(msg_mutex)</tt>.
047:         * ]] TODO
048:         */
049:        public class STABLE extends RpcProtocol {
050:            /** The protocol name */
051:            private static final String PROT_NAME = "STABLE";
052:
053:            /** Default subgroup size for gossiping expressed as percentage overthe group's size */
054:            private static final double SUBSET_SIZE = 0.1;
055:
056:            /** Default max number of msgs to wait for before sending gossip */
057:            private static final int GOSSIP_MSG_INTERVAL = 100;
058:
059:            /** Default max time to wait before sending gossip (ms) */
060:            private static final int GOSSIP_INTERVAL = 10000;
061:
062:            private Address local_addr = null;
063:            private ViewId vid = null;
064:            private final Vector mbrs = new Vector(11);
065:
066:            /** gossip round */
067:            private long round = 1;
068:
069:            /** highest seqno received for each member (corresponds to membership) */
070:            private long[] seqnos = new long[0];
071:
072:            /** Array of members from which we have received a gossip in the current round */
073:            private boolean[] heard_from = new boolean[0];
074:
075:            /** Percentage of members to which gossip is sent (parameterizable by user) */
076:            private double subset = SUBSET_SIZE;
077:
078:            /** The gossiping task scheduler */
079:            private TimeScheduler sched = null;
080:
081:            private Task gossip_task;
082:
083:            /** wait for n messages until sending gossip ... */
084:            private int max_msgs = GOSSIP_MSG_INTERVAL;
085:
086:            /** ... or until max_wait_time has elapsed, whichever comes first */
087:            private long max_wait_time = GOSSIP_INTERVAL;
088:
089:            /** Current number of msgs left to be received before sending a gossip */
090:            private long num_msgs = max_msgs;
091:
092:            /** mutex for interacting with NAKACK layer (GET_MSGS_RECVD) */
093:            private final Object highest_seqnos_mutex = new Object();
094:
095:            /** Time to wait for a reply from NAKACK layer (GET_MSGS_RECVD) */
096:            private long highest_seqnos_timeout = 4000;
097:
098:            /**
099:             * @return this protocol name
100:             */
101:            public String getName() {
102:                return (PROT_NAME);
103:            }
104:
105:            /**
106:             * The events expected to be handled from some layer above:
107:             * <ul>
108:             * <li>
109:             * GET_MSGS_RECEIVED: NAKACK layer
110:             * </li>
111:             * </ul>
112:             * @return a list of events expected by to be handled from some layer
113:             * above
114:             */
115:            public Vector requiredUpServices() {
116:                Vector retval = new Vector(1);
117:                retval.addElement(new Integer(Event.GET_MSGS_RECEIVED));
118:                return retval;
119:            }
120:
121:            /**
122:             * Set the parameters for this layer.
123:             *
124:             * <ul>
125:             * <li>
126:             * <i>subset</i>: the percentage of the group'size to which the
127:             * msgs_seen_so_far gossip is sent periodically.</li>
128:             * <li>
129:             * <i>max_msgs</i>: the max number of msgs to wait for between two
130:             * consecutive gossipings.</li>
131:             * <li>
132:             * <i>max_wait_time</i>: the max time to wait for between two consecutive
133:             * gossipings.</li>
134:             * <li>
135:             * <i>highest_seqno_timeout</i>: time to wait to receive from NAKACK
136:             * the array of highest deliverable seqnos
137:             * </li>
138:             * </ul>
139:             *
140:             * @param props the list of parameters
141:             */
142:            public boolean setProperties(Properties props) {
143:                String str;
144:
145:                super .setProperties(props);
146:                str = props.getProperty("subset");
147:                if (str != null) {
148:                    subset = Float.parseFloat(str);
149:                    props.remove("subset");
150:                }
151:
152:                str = props.getProperty("max_msgs");
153:                if (str != null) {
154:                    num_msgs = max_msgs = Integer.parseInt(str);
155:                    if (max_msgs <= 1) {
156:                        if (log.isFatalEnabled())
157:                            log
158:                                    .fatal("value for 'max_msgs' must be greater than 1 !");
159:                        return false;
160:                    }
161:                    props.remove("max_msgs");
162:                }
163:
164:                str = props.getProperty("max_wait_time");
165:                if (str != null) {
166:                    max_wait_time = Long.parseLong(str);
167:                    props.remove("max_wait_time");
168:                }
169:
170:                str = props.getProperty("highest_seqnos_timeout");
171:                if (str != null) {
172:                    highest_seqnos_timeout = Long.parseLong(str);
173:                    props.remove("highest_seqnos_timeout");
174:                }
175:
176:                if (props.size() > 0) {
177:                    log
178:                            .error("STABLE.setProperties(): these properties are not recognized: "
179:                                    + props);
180:
181:                    return false;
182:                }
183:                return true;
184:            }
185:
186:            /**
187:             * Start the layer:
188:             * i. Set the gossip task scheduler
189:             * ii. Reset the layer's state.
190:             * iii. Start the gossiping task
191:             */
192:            public void start() throws Exception {
193:                TimeScheduler timer;
194:
195:                super .start();
196:                timer = stack != null ? stack.timer : null;
197:                if (timer == null)
198:                    throw new Exception("STABLE.start(): timer is null");
199:
200:                sched = timer;
201:
202:                // we use only asynchronous method invocations...
203:                if (_corr != null)
204:                    _corr.setDeadlockDetection(false);
205:                initialize();
206:                startGossip();
207:            }
208:
209:            /**
210:             * Stop scheduling the gossip task
211:             */
212:            public void stop() {
213:                super .stop();
214:                synchronized (this ) {
215:                    if (gossip_task != null)
216:                        gossip_task.cancel();
217:                    gossip_task = null;
218:                }
219:            }
220:
221:            /* ------------------------- Request handler methods ------------------ */
222:
223:            /**
224:             * Contains the highest sequence numbers as seen by <code>sender</code>
225:             *
226:             * @param view_id The view ID in which the gossip was sent. Must be the
227:             * same as ours, otherwise it is discarded
228:             *
229:             * @param gossip_round The round in which the gossip was sent
230:             *
231:             * @param gossip_seqnos A vector with the highest sequence numbers as
232:             * seen by <code>sender</code>
233:             *
234:             * @param heard The sender's <code>heard_from</code> array. This allows
235:             * us to minimize the gossip msgs for a given round as a member does not
236:             * have to receive gossip msgs from each member, but members pass gossips
237:             * they've received from others on in their own gossips. E.g. when a
238:             * member P (of group {P,Q,R}) receives a gossip from R, its own gossip
239:             * to Q might be {R,P}. Q, who hasn't received a gossip from R, will not
240:             * need to receive it anymore as it is already sent by P. This simple
241:             * scheme reduces the number of gossip messages needed.
242:             *
243:             * @param sender The sender of the gossip message (obviously :-))
244:             */
245:            public void gossip(ViewId view_id, long gossip_round,
246:                    long[] gossip_seqnos, boolean[] heard, Object sender) {
247:                Object[] params;
248:                MethodCall call;
249:
250:                synchronized (this ) {
251:
252:                    if (log.isInfoEnabled())
253:                        log.info("sender=" + sender + ", round=" + gossip_round
254:                                + ", seqnos="
255:                                + Util.array2String(gossip_seqnos) + ", heard="
256:                                + Util.array2String(heard));
257:                    if (vid == null || view_id == null || !vid.equals(view_id)) {
258:
259:                        if (log.isInfoEnabled())
260:                            log
261:                                    .info("view ID s are different (" + vid
262:                                            + " != " + view_id
263:                                            + "). Discarding gossip received");
264:                        return;
265:                    }
266:                    if (gossip_round < this .round) {
267:
268:                        if (log.isInfoEnabled())
269:                            log
270:                                    .info("received a gossip from a previous round ("
271:                                            + gossip_round
272:                                            + "); my round is "
273:                                            + round + ". Discarding gossip");
274:                        return;
275:                    }
276:                    if (gossip_seqnos == null || seqnos == null
277:                            || seqnos.length != gossip_seqnos.length) {
278:
279:                        if (log.isWarnEnabled())
280:                            log
281:                                    .warn("size of seqnos and gossip_seqnos are not equal ! "
282:                                            + "Discarding gossip");
283:                        return;
284:                    }
285:
286:                    // (1) If round greater than local round:
287:                    // i. Adjust the local to the received round
288:                    //
289:                    // (2)
290:                    // i. local_seqnos = arrayMin(local_seqnos, gossip_seqnos)
291:                    // ii. local_heard = arrayMax(local_heard, gossip_heard)
292:                    // iii. If heard from all, bcast our seqnos (stability vector)
293:                    if (round == gossip_round) {
294:                        update(sender, gossip_seqnos, heard);
295:                    } else if (round < gossip_round) {
296:
297:                        if (log.isInfoEnabled())
298:                            log.info("received a gossip from a higher round ("
299:                                    + gossip_round + "); adopting my round ("
300:                                    + round + ") to " + gossip_round);
301:                        round = gossip_round;
302:                        set(sender, gossip_seqnos, heard_from);
303:                    }
304:
305:                    if (log.isInfoEnabled())
306:                        log.info("heard_from=" + Util.array2String(heard_from));
307:                    if (!heardFromAll())
308:                        return;
309:
310:                    params = new Object[] { vid.clone(),
311:                            new Long(gossip_round), seqnos.clone(), local_addr };
312:                } // synchronized(this)
313:
314:                call = new MethodCall("stability", params, new String[] {
315:                        ViewId.class.getName(), long.class.getName(),
316:                        long[].class.getName(), Object.class.getName() });
317:                callRemoteMethods(null, call, GroupRequest.GET_NONE, 0);
318:            }
319:
320:            /**
321:             * Contains the highest message sequence numbers (for each member) that
322:             * can safely be deleted (because they have been seen by all members).
323:             */
324:            public void stability(ViewId view_id, long gossip_round,
325:                    long[] stability_vector, Object sender) {
326:                // i. Proceed to the next round; init the heard from list
327:                // ii. Send up the stability vector
328:                // iii. get a fresh copy of the highest deliverable seqnos
329:                synchronized (this ) {
330:
331:                    if (log.isInfoEnabled())
332:                        log.info("sender=" + sender + ", round=" + gossip_round
333:                                + ", vector="
334:                                + Util.array2String(stability_vector) + ')');
335:                    if (vid == null || view_id == null || !vid.equals(view_id)) {
336:
337:                        if (log.isInfoEnabled())
338:                            log
339:                                    .info("view ID s are different (" + vid
340:                                            + " != " + view_id
341:                                            + "). Discarding gossip received");
342:                        return;
343:                    }
344:
345:                    if (round > gossip_round)
346:                        return;
347:                    round = gossip_round + 1;
348:                    for (int i = 0; i < heard_from.length; i++)
349:                        heard_from[i] = false;
350:                }
351:                heard_from[mbrs.indexOf(local_addr)] = true;
352:
353:                passUp(new Event(Event.STABLE, stability_vector));
354:                getHighestSeqnos();
355:            }
356:
357:            /* --------------------- End of Request handler methods --------------- */
358:
359:            /**
360:             * <b>Callback</b>. Called by superclass when event may be handled.
361:             * <p>
362:             * <b>Do not use <code>PassUp</code> in this method as the event is passed
363:             * up by default by the superclass after this method returns !</b>
364:             *
365:             * @return boolean Defaults to true. If false, event will not be passed
366:             * up the stack.
367:             */
368:            public boolean handleUpEvent(Event evt) {
369:                switch (evt.getType()) {
370:                case Event.MSG:
371:                    if (!upMsg(evt))
372:                        return (false);
373:                    break;
374:                case Event.SET_LOCAL_ADDRESS:
375:                    local_addr = (Address) evt.getArg();
376:                    break;
377:                }
378:
379:                return true;
380:            }
381:
382:            /**
383:             * <b>Callback</b>. Called by superclass when event may be handled.
384:             * <p>
385:             * <b>Do not use <code>PassDown</code> in this method as the event is
386:             * passed down by default by the superclass after this method returns !</b>
387:             *
388:             * @return boolean Defaults to true. If false, event will not be passed
389:             * down the stack.
390:             */
391:            public boolean handleDownEvent(Event evt) {
392:                switch (evt.getType()) {
393:                case Event.VIEW_CHANGE:
394:                    if (!downViewChange(evt))
395:                        return (false);
396:                    break;
397:                // does anyone else below needs this msg except STABLE?
398:                case Event.GET_MSGS_RECEIVED_OK:
399:                    if (!downGetMsgsReceived(evt))
400:                        return (false);
401:                    break;
402:                }
403:
404:                return (true);
405:            }
406:
407:            /**
408:             * The gossip task that runs periodically
409:             */
410:            private void gossipRun() {
411:                num_msgs = max_msgs;
412:                sendGossip();
413:            }
414:
415:            /**
416:             * <pre>
417:             * Reset the state of msg garbage-collection:
418:             * i. Reset the table of highest seqnos seen by each member
419:             * ii. Reset the tbl of mbrs from which highest seqnos have been recorded
420:             * </pre>
421:             */
422:            private void initialize() {
423:                synchronized (this ) {
424:                    seqnos = new long[mbrs.size()];
425:                    for (int i = 0; i < seqnos.length; i++)
426:                        seqnos[i] = -1;
427:
428:                    heard_from = new boolean[mbrs.size()];
429:                    for (int i = 0; i < heard_from.length; i++)
430:                        heard_from[i] = false;
431:                }
432:            }
433:
434:            /**
435:             * (1)<br>
436:             * Merge this member's table of highest seqnos seen by a each member
437:             * with the one received from a gossip by another member. The result is
438:             * the element-wise minimum of the input arrays. For each entry:<br>
439:             *
440:             * <tt>seqno[mbr_i] = min(seqno[mbr_i], gossip_seqno[mbr_i])</tt>
441:             * <p>
442:             *
443:             * (2)<br>
444:             * Merge the <tt>heard from</tt> tables of this member and the sender of
445:             * the gossip. The resulting table is:<br>
446:             *
447:             * <tt>heard_from[mbr_i] = heard_from[mbr_i] | sender_heard[mbr_i]</tt>
448:             *
449:             * @param sender the sender of the gossip
450:             * @param gossip_seqnos the highest deliverable seqnos of the sender
451:             * @param gossip_heard_from the table of members sender has heard from
452:             *
453:             */
454:            private void update(Object sender, long[] gossip_seqnos,
455:                    boolean[] gossip_heard_from) {
456:                int index;
457:
458:                synchronized (this ) {
459:                    index = mbrs.indexOf(sender);
460:                    if (index < 0) {
461:                        if (log.isWarnEnabled())
462:                            log.warn("sender " + sender
463:                                    + " not found in mbrs !");
464:                        return;
465:                    }
466:
467:                    for (int i = 0; i < gossip_seqnos.length; i++)
468:                        seqnos[i] = Math.min(seqnos[i], gossip_seqnos[i]);
469:
470:                    heard_from[index] = true;
471:                    for (int i = 0; i < heard_from.length; i++)
472:                        heard_from[i] = heard_from[i] | gossip_heard_from[i];
473:                }
474:            }
475:
476:            /**
477:             * Set the seqnos and heard_from arrays to those of the sender. The
478:             * method is called when the sender seems to know more than this member.
479:             * The situation occurs if either:
480:             * <ul>
481:             * <li>
482:             * sender.heard_from > this.heard_from, i.e. the sender has heard
483:             * from more members than we have</li>
484:             * <li>
485:             * sender.round > this.round, i.e. the sender is in a more recent round
486:             * than we are</li>
487:             * </ul>
488:             *
489:             * In both cases, this member is assigned the state of the sender
490:             */
491:            private void set(Object sender, long[] gossip_seqnos,
492:                    boolean[] gossip_heard_from) {
493:                int index;
494:
495:                synchronized (this ) {
496:                    index = mbrs.indexOf(sender);
497:                    if (index < 0) {
498:                        if (log.isWarnEnabled())
499:                            log.warn("sender " + sender
500:                                    + " not found in mbrs !");
501:                        return;
502:                    }
503:
504:                    seqnos = gossip_seqnos;
505:                    heard_from = gossip_heard_from;
506:                }
507:            }
508:
509:            /**
510:             * @return true, if we have received the highest deliverable seqnos
511:             * directly or indirectly from all members
512:             */
513:            private boolean heardFromAll() {
514:                synchronized (this ) {
515:                    if (heard_from == null)
516:                        return false;
517:                    for (int i = 0; i < heard_from.length; i++)
518:                        if (!heard_from[i])
519:                            return false;
520:                }
521:
522:                return true;
523:            }
524:
525:            /**
526:             * Send our <code>seqnos</code> array to a subset of the membership
527:             */
528:            private void sendGossip() {
529:                Vector gossip_subset;
530:                Object[] params;
531:                MethodCall call;
532:
533:                synchronized (this ) {
534:                    gossip_subset = Util.pickSubset(mbrs, subset);
535:                    if (gossip_subset == null || gossip_subset.size() < 1) {
536:                        if (log.isWarnEnabled())
537:                            log.warn("picked empty subset !");
538:                        return;
539:                    }
540:
541:                    if (log.isInfoEnabled())
542:                        log.info("subset=" + gossip_subset + ", round=" + round
543:                                + ", seqnos=" + Util.array2String(seqnos));
544:
545:                    params = new Object[] { vid.clone(), new Long(round),
546:                            seqnos.clone(), heard_from.clone(), local_addr };
547:                }
548:
549:                call = new MethodCall("gossip", params, new String[] {
550:                        ViewId.class.getName(), long.class.getName(),
551:                        long[].class.getName(), boolean[].class.getName(),
552:                        Object.class.getName() });
553:                for (int i = 0; i < gossip_subset.size(); i++) {
554:                    try {
555:                        callRemoteMethod((Address) gossip_subset.get(i), call,
556:                                GroupRequest.GET_NONE, 0);
557:                    } catch (Exception e) {
558:                        if (log.isDebugEnabled())
559:                            log.debug("exception=" + e);
560:                    }
561:                }
562:            }
563:
564:            /**
565:             * Sends GET_MSGS_RECEIVED to NAKACK layer (above us !) and stores result
566:             * in <code>seqnos</code>. In case <code>seqnos</code> does not yet exist
567:             * it creates and initializes it.
568:             */
569:            private void getHighestSeqnos() {
570:                synchronized (highest_seqnos_mutex) {
571:                    passUp(new Event(Event.GET_MSGS_RECEIVED));
572:
573:                    try {
574:                        highest_seqnos_mutex.wait(highest_seqnos_timeout);
575:                    } catch (InterruptedException e) {
576:                        if (log.isErrorEnabled())
577:                            log
578:                                    .error("Interrupted while waiting for highest seqnos from NAKACK");
579:                    }
580:                }
581:            }
582:
583:            /**
584:             * Start scheduling the gossip task
585:             */
586:            private void startGossip() {
587:                synchronized (this ) {
588:                    if (gossip_task != null)
589:                        gossip_task.cancel();
590:                    gossip_task = new Task(new Times(
591:                            new long[] { GOSSIP_INTERVAL }));
592:                    sched.add(gossip_task);
593:                }
594:            }
595:
596:            /**
597:             * Received a <tt>MSG</tt> event from a layer below
598:             *
599:             * A msg received:
600:             * If unicast ignore; if multicast and time for gossiping has been
601:             * reached, send out a gossip to a subset of the mbrs
602:             *
603:             * @return true if the event should be forwarded to the layer above
604:             */
605:            private boolean upMsg(Event e) {
606:                Message msg = (Message) e.getArg();
607:
608:                if (msg.getDest() != null
609:                        && (!msg.getDest().isMulticastAddress()))
610:                    return (true);
611:
612:                synchronized (this ) {
613:                    --num_msgs;
614:                    if (num_msgs > 0)
615:                        return (true);
616:                    num_msgs = max_msgs;
617:
618:                    gossip_task.cancel();
619:                    gossip_task = new Task(new Times(new long[] { 0,
620:                            GOSSIP_INTERVAL }));
621:                    sched.add(gossip_task);
622:                }
623:
624:                return (true);
625:            }
626:
627:            /**
628:             * Received a <tt>VIEW_CHANGE</tt> event from a layer above
629:             *
630:             * A new view:
631:             * i. Set the new mbrs list and the new view ID.
632:             * ii. Reset the highest deliverable seqnos seen
633:             *
634:             * @return true if the event should be forwarded to the layer below
635:             */
636:            private boolean downViewChange(Event e) {
637:                View v = (View) e.getArg();
638:                Vector new_mbrs = v.getMembers();
639:
640:                /*
641:                  // Could this ever happen? GMS is always sending non-null value
642:                  if(new_mbrs == null) {
643:                  / Trace.println(
644:                  "STABLE.handleDownEvent()", Trace.ERROR,
645:                  "Received VIEW_CHANGE event with null mbrs list");
646:                  break;
647:                  }
648:                 */
649:
650:                synchronized (this ) {
651:                    vid = v.getVid();
652:                    mbrs.clear();
653:                    mbrs.addAll(new_mbrs);
654:                    initialize();
655:                }
656:
657:                return (true);
658:            }
659:
660:            /**
661:             * Received a <tt>GET_MSGS__RECEIVED_OK</tt> event from a layer above
662:             *
663:             * Updated list of highest deliverable seqnos:
664:             * i. Update the local copy of highest deliverable seqnos
665:             *
666:             * @return true if the event should be forwarded to the layer below
667:             */
668:            private boolean downGetMsgsReceived(Event e) {
669:                long[] new_seqnos = (long[]) e.getArg();
670:
671:                try {
672:                    synchronized (this ) {
673:                        if (new_seqnos == null)
674:                            return (true);
675:                        if (new_seqnos.length != seqnos.length) {
676:
677:                            if (log.isInfoEnabled())
678:                                log
679:                                        .info("GET_MSGS_RECEIVED: array of highest "
680:                                                + "seqnos seen so far (received from NAKACK layer) "
681:                                                + "has a different length ("
682:                                                + new_seqnos.length
683:                                                + ") from 'seqnos' array ("
684:                                                + seqnos.length + ')');
685:                            return (true);
686:                        }
687:                        System.arraycopy(new_seqnos, 0, seqnos, 0,
688:                                seqnos.length);
689:                    }
690:
691:                } finally {
692:                    synchronized (highest_seqnos_mutex) {
693:                        highest_seqnos_mutex.notifyAll();
694:                    }
695:                }
696:
697:                return (true);
698:            }
699:
700:            /**
701:             * Select next interval from list. Once the end of the list is reached,
702:             * keep returning the last value. It would be sensible that list of
703:             * times is in increasing order
704:             */
705:            private static class Times {
706:                private int next = 0;
707:                private long[] times;
708:
709:                Times(long[] times) {
710:                    if (times.length == 0)
711:                        throw new IllegalArgumentException("times");
712:                    this .times = times;
713:                }
714:
715:                public synchronized long next() {
716:                    if (next >= times.length)
717:                        return (times[times.length - 1]);
718:                    else
719:                        return (times[next++]);
720:                }
721:
722:                public long[] times() {
723:                    return (times);
724:                }
725:
726:                public synchronized void reset() {
727:                    next = 0;
728:                }
729:            }
730:
731:            /**
732:             * The gossiping task
733:             */
734:            private class Task implements  TimeScheduler.Task {
735:                private final Times intervals;
736:                private boolean cancelled = false;
737:
738:                Task(Times intervals) {
739:                    this .intervals = intervals;
740:                }
741:
742:                public long nextInterval() {
743:                    return (intervals.next());
744:                }
745:
746:                public boolean cancelled() {
747:                    return (cancelled);
748:                }
749:
750:                public void cancel() {
751:                    cancelled = true;
752:                }
753:
754:                public void run() {
755:                    gossipRun();
756:                }
757:            }
758:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.