Source Code Cross Referenced for TOTAL.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » protocols » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.protocols 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        // $Id: TOTAL.java,v 1.13 2006/01/03 14:11:29 belaban Exp $
002:        package org.jgroups.protocols;
003:
004:        import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
005:        import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
006:        import org.jgroups.*;
007:        import org.jgroups.stack.AckSenderWindow;
008:        import org.jgroups.stack.Protocol;
009:        import org.jgroups.util.TimeScheduler;
010:        import org.jgroups.util.Streamable;
011:
012:        import java.io.*;
013:        import java.util.*;
014:
015:        /**
016:         * Implements the total ordering layer using a message sequencer
017:         * <p/>
018:         * <p/>
019:         * The protocol guarantees that all bcast sent messages will be delivered in
020:         * the same order to all members. For that it uses a sequencer which assignes
021:         * monotonically increasing sequence ID to broadcasts. Then all group members
022:         * deliver the bcasts in ascending sequence ID order.
023:         * <p/>
024:         * <ul>
025:         * <li>
026:         * When a bcast message comes down to this layer, it is placed in the pending
027:         * down queue. A bcast request is sent to the sequencer.</li>
028:         * <li>
029:         * When the sequencer receives a bcast request, it creates a bcast reply
030:         * message and assigns to it a monotonically increasing seqID and sends it back
031:         * to the source of the bcast request.</li>
032:         * <li>
033:         * When a broadcast reply is received, the corresponding bcast message is
034:         * assigned the received seqID. Then it is broadcasted.</li>
035:         * <li>
036:         * Received bcasts are placed in the up queue. The queue is sorted according
037:         * to the seqID of the bcast. Any message at the head of the up queue with a
038:         * seqID equal to the next expected seqID is delivered to the layer above.</li>
039:         * <li>
040:         * Unicast messages coming from the layer below are forwarded above.</li>
041:         * <li>
042:         * Unicast messages coming from the layer above are forwarded below.</li>
043:         * </ul>
044:         * <p/>
045:         * <i>Please note that once a <code>BLOCK_OK</code> is acknowledged messages
046:         * coming from above are discarded!</i> Either the application must stop
047:         * sending messages when a <code>BLOCK</code> event is received from the
048:         * channel or a QUEUE layer should be placed above this one. Received messages
049:         * are still delivered above though.
050:         * <p/>
051:         * bcast requests are retransmitted periodically until a bcast reply is
052:         * received. In case a BCAST_REP is on its way during a BCAST_REQ
053:         * retransmission, then the next BCAST_REP will be to a non-existing
054:         * BCAST_REQ. So, a null BCAST message is sent to fill the created gap in
055:         * the seqID of all members.
056:         *
057:         * @author i.georgiadis@doc.ic.ac.uk
058:         * @author Bela Ban
059:         */
060:        public class TOTAL extends Protocol {
061:            /**
062:             * The header processed by the TOTAL layer and intended for TOTAL
063:             * inter-stack communication
064:             */
065:            public static class Header extends org.jgroups.Header implements 
066:                    Streamable {
067:                // Header types
068:                /**
069:                 * Null value for the tag
070:                 */
071:                public static final int NULL_TYPE = -1;
072:                /**
073:                 * Request to broadcast by the source
074:                 */
075:                public static final int REQ = 0;
076:                /**
077:                 * Reply to broadcast request.
078:                 */
079:                public static final int REP = 1;
080:                /**
081:                 * Unicast message
082:                 */
083:                public static final int UCAST = 2;
084:                /**
085:                 * Broadcast Message
086:                 */
087:                public static final int BCAST = 3;
088:
089:                /**
090:                 * The header's type tag
091:                 */
092:                public int type;
093:                /**
094:                 * The ID used by the message source to match replies from the
095:                 * sequencer
096:                 */
097:                public long localSequenceID;
098:                /**
099:                 * The ID imposing the total order of messages
100:                 */
101:                public long sequenceID;
102:
103:                /**
104:                 * used for externalization
105:                 */
106:                public Header() {
107:                }
108:
109:                /**
110:                 * Create a header for the TOTAL layer
111:                 *
112:                 * @param type       the header's type
113:                 * @param localSeqID the ID used by the sender of broadcasts to match
114:                 *                   requests with replies from the sequencer
115:                 * @param seqID      the ID imposing the total order of messages
116:                 * @throws IllegalArgumentException if the provided header type is
117:                 *                                  unknown
118:                 */
119:                public Header(int type, long localSeqID, long seqID) {
120:                    super ();
121:                    switch (type) {
122:                    case REQ:
123:                    case REP:
124:                    case UCAST:
125:                    case BCAST:
126:                        this .type = type;
127:                        break;
128:                    default:
129:                        this .type = NULL_TYPE;
130:                        throw new IllegalArgumentException("type");
131:                    }
132:                    this .localSequenceID = localSeqID;
133:                    this .sequenceID = seqID;
134:                }
135:
136:                /**
137:                 * For debugging purposes
138:                 */
139:                public String toString() {
140:                    StringBuffer buffer = new StringBuffer();
141:                    String typeName;
142:                    buffer.append("[TOTAL.Header");
143:                    switch (type) {
144:                    case REQ:
145:                        typeName = "REQ";
146:                        break;
147:                    case REP:
148:                        typeName = "REP";
149:                        break;
150:                    case UCAST:
151:                        typeName = "UCAST";
152:                        break;
153:                    case BCAST:
154:                        typeName = "BCAST";
155:                        break;
156:                    case NULL_TYPE:
157:                        typeName = "NULL_TYPE";
158:                        break;
159:                    default:
160:                        typeName = "";
161:                        break;
162:                    }
163:                    buffer.append(", type=" + typeName);
164:                    buffer.append(", " + "localID=" + localSequenceID);
165:                    buffer.append(", " + "seqID=" + sequenceID);
166:                    buffer.append(']');
167:
168:                    return (buffer.toString());
169:                }
170:
171:                /**
172:                 * Manual serialization
173:                 */
174:                public void writeExternal(ObjectOutput out) throws IOException {
175:                    out.writeInt(type);
176:                    out.writeLong(localSequenceID);
177:                    out.writeLong(sequenceID);
178:                }
179:
180:                /**
181:                 * Manual deserialization
182:                 */
183:                public void readExternal(ObjectInput in) throws IOException,
184:                        ClassNotFoundException {
185:                    type = in.readInt();
186:                    localSequenceID = in.readLong();
187:                    sequenceID = in.readLong();
188:                }
189:
190:                public void writeTo(DataOutputStream out) throws IOException {
191:                    out.writeInt(type);
192:                    out.writeLong(localSequenceID);
193:                    out.writeLong(sequenceID);
194:                }
195:
196:                public void readFrom(DataInputStream in) throws IOException,
197:                        IllegalAccessException, InstantiationException {
198:                    type = in.readInt();
199:                    localSequenceID = in.readLong();
200:                    sequenceID = in.readLong();
201:                }
202:
203:                public long size() {
204:                    return Global.INT_SIZE + Global.LONG_SIZE * 2;
205:                }
206:
207:            }
208:
209:            /**
210:             * The retransmission listener - It is called by the
211:             * <code>AckSenderWindow</code> when a retransmission should occur
212:             */
213:            private class Command implements  AckSenderWindow.RetransmitCommand {
214:                Command() {
215:                }
216:
217:                public void retransmit(long seqNo, Message msg) {
218:                    _retransmitBcastRequest(seqNo);
219:                }
220:            }
221:
222:            /**
223:             * Protocol name
224:             */
225:            private static final String PROT_NAME = "TOTAL";
226:            /**
227:             * Property names
228:             */
229:            private static final String TRACE_PROP = "trace";
230:
231:            /**
232:             * Average time between broadcast request retransmissions
233:             */
234:            private final long[] AVG_RETRANSMIT_INTERVAL = new long[] { 1000,
235:                    2000, 3000, 4000 };
236:
237:            /**
238:             * Null value for the IDs
239:             */
240:            private static final long NULL_ID = -1;
241:            // Layer sending states
242:            /**
243:             * No group has been joined yet
244:             */
245:            private static final int NULL_STATE = -1;
246:            /**
247:             * When set, all messages are sent/received
248:             */
249:            private static final int RUN = 0;
250:            /**
251:             * When set, only session-specific messages are sent/received, i.e. only
252:             * messages essential to the session's integrity
253:             */
254:            private static final int FLUSH = 1;
255:            /**
256:             * No message is sent to the layer below
257:             */
258:            private static final int BLOCK = 2;
259:
260:            /**
261:             * The state lock allowing multiple reads or a single write
262:             */
263:            private final ReadWriteLock stateLock = new WriterPreferenceReadWriteLock();
264:            /**
265:             * Protocol layer message-sending state
266:             */
267:            private int state = NULL_STATE;
268:            /**
269:             * The address of this stack
270:             */
271:            private Address addr = null;
272:            /**
273:             * The address of the sequencer
274:             */
275:            private Address sequencerAddr = null;
276:            /**
277:             * The sequencer's seq ID. The ID of the most recently broadcast reply
278:             * message
279:             */
280:            private long sequencerSeqID = NULL_ID;
281:            /**
282:             * The local sequence ID, i.e. the ID sent with the last broadcast request
283:             * message. This is increased with every broadcast request sent to the
284:             * sequencer and it's used to match the requests with the sequencer's
285:             * replies
286:             */
287:            private long localSeqID = NULL_ID;
288:            /**
289:             * The total order sequence ID. This is the ID of the most recently
290:             * delivered broadcast message. As the sequence IDs are increasing without
291:             * gaps, this is used to detect missing broadcast messages
292:             */
293:            private long seqID = NULL_ID;
294:            /**
295:             * The list of unanswered broadcast requests to the sequencer. The entries
296:             * are stored in increasing local sequence ID, i.e. in the order they were
297:             * <p/>
298:             * sent localSeqID -> Broadcast msg to be sent.
299:             */
300:            private SortedMap reqTbl;
301:            /**
302:             * The list of received broadcast messages that haven't yet been delivered
303:             * to the layer above. The entries are stored in increasing sequence ID,
304:             * i.e. in the order they must be delivered above
305:             * <p/>
306:             * seqID -> Received broadcast msg
307:             */
308:            private SortedMap upTbl;
309:            /**
310:             * Retranmitter for pending broadcast requests
311:             */
312:            private AckSenderWindow retransmitter;
313:
314:            /**
315:             * Print addresses in host_ip:port form to bypass DNS
316:             */
317:            private String addrToString(Object addr) {
318:                return (addr == null ? "<null>"
319:                        : ((addr instanceof  org.jgroups.stack.IpAddress) ? (((org.jgroups.stack.IpAddress) addr)
320:                                .getIpAddress().getHostAddress() + ':' + ((org.jgroups.stack.IpAddress) addr)
321:                                .getPort())
322:                                : addr.toString()));
323:            }
324:
325:            /**
326:             * @return this protocol's name
327:             */
328:            public String getName() {
329:                return PROT_NAME;
330:            }
331:
332:            /**
333:             * Configure the protocol based on the given list of properties
334:             *
335:             * @param properties the list of properties to use to setup this layer
336:             * @return false if there was any unrecognized property or a property with
337:             *         an invalid value
338:             */
339:            public boolean setProperties(Properties properties) {
340:                String value;
341:
342:                // trace
343:                // Parse & remove property but ignore it; use Trace.trace instead
344:                value = properties.getProperty(TRACE_PROP);
345:                if (value != null)
346:                    properties.remove(TRACE_PROP);
347:                if (properties.size() > 0) {
348:                    if (log.isErrorEnabled())
349:                        log
350:                                .error("The following properties are not recognized: "
351:                                        + properties);
352:                    return (false);
353:                }
354:                return (true);
355:            }
356:
357:            /**
358:             * Events that some layer below must handle
359:             *
360:             * @return the set of <code>Event</code>s that must be handled by some layer
361:             *         below
362:             */
363:            public Vector requiredDownServices() {
364:                return new Vector();
365:            }
366:
367:            /**
368:             * Events that some layer above must handle
369:             *
370:             * @return the set of <code>Event</code>s that must be handled by some
371:             *         layer above
372:             */
373:            public Vector requiredUpServices() {
374:                return new Vector();
375:            }
376:
377:            /**
378:             * Extract as many messages as possible from the pending up queue and send
379:             * them to the layer above
380:             */
381:            private void _deliverBcast() {
382:                Message msg;
383:                Header header;
384:
385:                synchronized (upTbl) {
386:                    while ((msg = (Message) upTbl.remove(new Long(seqID + 1))) != null) {
387:                        header = (Header) msg.removeHeader(getName());
388:                        if (header.localSequenceID != NULL_ID)
389:                            passUp(new Event(Event.MSG, msg));
390:                        ++seqID;
391:                    }
392:                } // synchronized(upTbl)
393:            }
394:
395:            /**
396:             * Add all undelivered bcasts sent by this member in the req queue and then
397:             * replay this queue
398:             */
399:            private void _replayBcast() {
400:                Iterator it;
401:                Message msg;
402:                Header header;
403:
404:                // i. Remove all undelivered bcasts sent by this member and place them
405:                // again in the pending bcast req queue
406:
407:                synchronized (upTbl) {
408:                    if (upTbl.size() > 0)
409:                        if (log.isInfoEnabled())
410:                            log.info("Replaying undelivered bcasts");
411:
412:                    it = upTbl.entrySet().iterator();
413:                    while (it.hasNext()) {
414:                        msg = (Message) ((Map.Entry) it.next()).getValue();
415:                        it.remove();
416:                        if (!msg.getSrc().equals(addr)) {
417:                            if (log.isInfoEnabled())
418:                                log
419:                                        .info("During replay: "
420:                                                + "discarding BCAST["
421:                                                + ((TOTAL.Header) msg
422:                                                        .getHeader(getName())).sequenceID
423:                                                + "] from "
424:                                                + addrToString(msg.getSrc()));
425:                            continue;
426:                        }
427:                        header = (Header) msg.removeHeader(getName());
428:                        if (header.localSequenceID == NULL_ID)
429:                            continue;
430:                        _sendBcastRequest(msg, header.localSequenceID);
431:                    }
432:                } // synchronized(upTbl)
433:            }
434:
435:            /**
436:             * Send a unicast message: Add a <code>UCAST</code> header
437:             *
438:             * @param msg the message to unicast
439:             * @return the message to send
440:             */
441:            private Message _sendUcast(Message msg) {
442:                msg.putHeader(getName(), new Header(Header.UCAST, NULL_ID,
443:                        NULL_ID));
444:                return (msg);
445:            }
446:
447:            /**
448:             * Replace the original message with a broadcast request sent to the
449:             * sequencer. The original bcast message is stored locally until a reply to
450:             * bcast is received from the sequencer. This function has the side-effect
451:             * of increasing the <code>localSeqID</code>
452:             *
453:             * @param msg the message to broadcast
454:             */
455:            private void _sendBcastRequest(Message msg) {
456:                _sendBcastRequest(msg, ++localSeqID);
457:            }
458:
459:            /**
460:             * Replace the original message with a broadcast request sent to the
461:             * sequencer. The original bcast message is stored locally until a reply
462:             * to bcast is received from the sequencer
463:             *
464:             * @param msg the message to broadcast
465:             * @param id  the local sequence ID to use
466:             */
467:            private void _sendBcastRequest(Message msg, long id) {
468:
469:                // i. Store away the message while waiting for the sequencer's reply
470:                // ii. Send a bcast request immediatelly and also schedule a
471:                // retransmission
472:                synchronized (reqTbl) {
473:                    reqTbl.put(new Long(id), msg);
474:                }
475:                _transmitBcastRequest(id);
476:                retransmitter.add(id, msg);
477:            }
478:
479:            /**
480:             * Send the bcast request with the given localSeqID
481:             *
482:             * @param seqID the local sequence id of the
483:             */
484:            private void _transmitBcastRequest(long seqID) {
485:                Message reqMsg;
486:
487:                // i. If NULL_STATE, then ignore, just transient state before
488:                // shutting down the retransmission thread
489:                // ii. If blocked, be patient - reschedule
490:                // iii. If the request is not pending any more, acknowledge it
491:                // iv. Create a broadcast request and send it to the sequencer
492:
493:                if (state == NULL_STATE) {
494:                    if (log.isInfoEnabled())
495:                        log.info("Transmit BCAST_REQ[" + seqID
496:                                + "] in NULL_STATE");
497:                    return;
498:                }
499:                if (state == BLOCK)
500:                    return;
501:
502:                synchronized (reqTbl) {
503:                    if (!reqTbl.containsKey(new Long(seqID))) {
504:                        retransmitter.ack(seqID);
505:                        return;
506:                    }
507:                }
508:                reqMsg = new Message(sequencerAddr, addr, new byte[0]);
509:                reqMsg.putHeader(getName(), new Header(Header.REQ, seqID,
510:                        NULL_ID));
511:
512:                passDown(new Event(Event.MSG, reqMsg));
513:            }
514:
515:            /**
516:             * Receive a unicast message: Remove the <code>UCAST</code> header
517:             *
518:             * @param msg the received unicast message
519:             */
520:            private void _recvUcast(Message msg) {
521:                msg.removeHeader(getName());
522:            }
523:
524:            /**
525:             * Receive a broadcast message: Put it in the pending up queue and then
526:             * try to deliver above as many messages as possible
527:             *
528:             * @param msg the received broadcast message
529:             */
530:            private void _recvBcast(Message msg) {
531:                Header header = (Header) msg.getHeader(getName());
532:
533:                // i. Put the message in the up pending queue only if it's not
534:                // already there, as it seems that the event may be received
535:                // multiple times before a view change when all members are
536:                // negotiating a common set of stable msgs
537:                //
538:                // ii. Deliver as many messages as possible
539:
540:                synchronized (upTbl) {
541:                    if (header.sequenceID <= seqID)
542:                        return;
543:                    upTbl.put(new Long(header.sequenceID), msg);
544:                }
545:
546:                _deliverBcast();
547:            }
548:
549:            /**
550:             * Received a bcast request - Ignore if not the sequencer, else send a
551:             * bcast reply
552:             *
553:             * @param msg the broadcast request message
554:             */
555:            private void _recvBcastRequest(Message msg) {
556:                Header header;
557:                Message repMsg;
558:
559:                // i. If blocked, discard the bcast request
560:                // ii. Assign a seqID to the message and send it back to the requestor
561:
562:                if (!addr.equals(sequencerAddr)) {
563:                    if (log.isErrorEnabled())
564:                        log.error("Received bcast request "
565:                                + "but not a sequencer");
566:                    return;
567:                }
568:                if (state == BLOCK) {
569:                    if (log.isInfoEnabled())
570:                        log.info("Blocked, discard bcast req");
571:                    return;
572:                }
573:                header = (Header) msg.getHeader(getName());
574:                ++sequencerSeqID;
575:                repMsg = new Message(msg.getSrc(), addr, new byte[0]);
576:                repMsg.putHeader(getName(), new Header(Header.REP,
577:                        header.localSequenceID, sequencerSeqID));
578:
579:                passDown(new Event(Event.MSG, repMsg));
580:            }
581:
582:            /**
583:             * Received a bcast reply - Match with the pending bcast request and move
584:             * the message in the list of messages to be delivered above
585:             *
586:             * @param header the header of the bcast reply
587:             */
588:            private void _recvBcastReply(Header header) {
589:                Message msg;
590:                long id;
591:
592:                // i. If blocked, discard the bcast reply
593:                //
594:                // ii. Assign the received seqID to the message and broadcast it
595:                //
596:                // iii.
597:                // - Acknowledge the message to the retransmitter
598:                // - If non-existent BCAST_REQ, send a fake bcast to avoid seqID gaps
599:                // - If localID == NULL_ID, it's a null BCAST, else normal BCAST
600:                // - Set the seq ID of the message to the one sent by the sequencer
601:
602:                if (state == BLOCK) {
603:                    if (log.isInfoEnabled())
604:                        log.info("Blocked, discard bcast rep");
605:                    return;
606:                }
607:
608:                synchronized (reqTbl) {
609:                    msg = (Message) reqTbl.remove(new Long(
610:                            header.localSequenceID));
611:                }
612:
613:                if (msg != null) {
614:                    retransmitter.ack(header.localSequenceID);
615:                    id = header.localSequenceID;
616:                } else {
617:                    if (log.isInfoEnabled())
618:                        log.info("Bcast reply to " + "non-existent BCAST_REQ["
619:                                + header.localSequenceID
620:                                + "], Sending NULL bcast");
621:                    id = NULL_ID;
622:                    msg = new Message(null, addr, new byte[0]);
623:                }
624:                msg.putHeader(getName(), new Header(Header.BCAST, id,
625:                        header.sequenceID));
626:
627:                passDown(new Event(Event.MSG, msg));
628:            }
629:
630:            /**
631:             * Resend the bcast request with the given localSeqID
632:             *
633:             * @param seqID the local sequence id of the
634:             */
635:            private void _retransmitBcastRequest(long seqID) {
636:                // *** Get a shared lock
637:                try {
638:                    stateLock.readLock().acquire();
639:                    try {
640:                        if (log.isInfoEnabled())
641:                            log.info("Retransmit BCAST_REQ[" + seqID + ']');
642:                        _transmitBcastRequest(seqID);
643:                    } finally {
644:                        stateLock.readLock().release();
645:                    }
646:                } catch (InterruptedException e) {
647:                    log.error("failed acquiring a read lock", e);
648:                }
649:            }
650:
651:            /* Up event handlers
652:             * If the return value is true the event travels further up the stack
653:             * else it won't be forwarded
654:             */
655:
656:            /**
657:             * Prepare for a VIEW_CHANGE: switch to flushing state
658:             *
659:             * @return true if the event is to be forwarded further up
660:             */
661:            private boolean _upBlock() {
662:                // *** Get an exclusive lock
663:                try {
664:                    stateLock.writeLock().acquire();
665:                    try {
666:                        state = FLUSH;
667:                        // *** Revoke the exclusive lock
668:                    } finally {
669:                        stateLock.writeLock().release();
670:                    }
671:                } catch (InterruptedException e) {
672:                    log.error("failed acquiring the write lock", e);
673:                }
674:
675:                return (true);
676:            }
677:
678:            /**
679:             * Handle an up MSG event
680:             *
681:             * @param event the MSG event
682:             * @return true if the event is to be forwarded further up
683:             */
684:            private boolean _upMsg(Event event) {
685:                Message msg;
686:                Object obj;
687:                Header header;
688:
689:                // *** Get a shared lock
690:                try {
691:                    stateLock.readLock().acquire();
692:                    try {
693:
694:                        // If NULL_STATE, shouldn't receive any msg on the up queue!
695:                        if (state == NULL_STATE) {
696:                            if (log.isErrorEnabled())
697:                                log.error("Up msg in NULL_STATE");
698:                            return (false);
699:                        }
700:
701:                        // Peek the header:
702:                        //
703:                        // (UCAST) A unicast message - Send up the stack
704:                        // (BCAST) A broadcast message - Handle specially
705:                        // (REQ) A broadcast request - Handle specially
706:                        // (REP) A broadcast reply from the sequencer - Handle specially
707:                        msg = (Message) event.getArg();
708:                        if (!((obj = msg.getHeader(getName())) instanceof  TOTAL.Header)) {
709:                            if (log.isErrorEnabled())
710:                                log.error("No TOTAL.Header found");
711:                            return (false);
712:                        }
713:                        header = (Header) obj;
714:
715:                        switch (header.type) {
716:                        case Header.UCAST:
717:                            _recvUcast(msg);
718:                            return (true);
719:                        case Header.BCAST:
720:                            _recvBcast(msg);
721:                            return (false);
722:                        case Header.REQ:
723:                            _recvBcastRequest(msg);
724:                            return (false);
725:                        case Header.REP:
726:                            _recvBcastReply(header);
727:                            return (false);
728:                        default:
729:                            if (log.isErrorEnabled())
730:                                log.error("Unknown header type");
731:                            return (false);
732:                        }
733:
734:                        // ** Revoke the shared lock
735:                    } finally {
736:                        stateLock.readLock().release();
737:                    }
738:                } catch (InterruptedException e) {
739:                    if (log.isErrorEnabled())
740:                        log.error(e.getMessage());
741:                }
742:
743:                return (true);
744:            }
745:
746:            /**
747:             * Set the address of this group member
748:             *
749:             * @param event the SET_LOCAL_ADDRESS event
750:             * @return true if event should be forwarded further up
751:             */
752:            private boolean _upSetLocalAddress(Event event) {
753:                // *** Get an exclusive lock
754:                try {
755:                    stateLock.writeLock().acquire();
756:                    try {
757:                        addr = (Address) event.getArg();
758:                    } finally {
759:                        stateLock.writeLock().release();
760:                    }
761:                } catch (InterruptedException e) {
762:                    log.error(e.getMessage());
763:                }
764:                return (true);
765:            }
766:
767:            /**
768:             * Handle view changes
769:             * <p/>
770:             * param event the VIEW_CHANGE event
771:             *
772:             * @return true if the event should be forwarded to the layer above
773:             */
774:            private boolean _upViewChange(Event event) {
775:                Object oldSequencerAddr;
776:
777:                // *** Get an exclusive lock
778:                try {
779:                    stateLock.writeLock().acquire();
780:                    try {
781:
782:                        state = RUN;
783:
784:                        // i. See if this member is the sequencer
785:                        // ii. If this is the sequencer, reset the sequencer's sequence ID
786:                        // iii. Reset the last received sequence ID
787:                        //
788:                        // iv. Replay undelivered bcasts: Put all the undelivered bcasts
789:                        // sent by us back to the req queue and discard the rest
790:                        oldSequencerAddr = sequencerAddr;
791:                        sequencerAddr = (Address) ((View) event.getArg())
792:                                .getMembers().elementAt(0);
793:                        if (addr.equals(sequencerAddr)) {
794:                            sequencerSeqID = NULL_ID;
795:                            if ((oldSequencerAddr == null)
796:                                    || (!addr.equals(oldSequencerAddr)))
797:                                if (log.isInfoEnabled())
798:                                    log.info("I'm the new sequencer");
799:                        }
800:                        seqID = NULL_ID;
801:                        _replayBcast();
802:
803:                        // *** Revoke the exclusive lock
804:                    } finally {
805:                        stateLock.writeLock().release();
806:                    }
807:                } catch (InterruptedException e) {
808:                    log.error(e.getMessage());
809:                }
810:
811:                return (true);
812:            }
813:
814:            /*
815:             * Down event handlers
816:             * If the return value is true the event travels further down the stack
817:             * else it won't be forwarded
818:             */
819:
820:            /**
821:             * Blocking confirmed - No messages should come from above until a
822:             * VIEW_CHANGE event is received. Switch to blocking state.
823:             *
824:             * @return true if event should travel further down
825:             */
826:            private boolean downBlockOk() {
827:                // *** Get an exclusive lock
828:                try {
829:                    stateLock.writeLock().acquire();
830:                    try {
831:                        state = BLOCK;
832:                    } finally {
833:                        stateLock.writeLock().release();
834:                    }
835:                } catch (InterruptedException e) {
836:                    log.error(e.getMessage());
837:                }
838:
839:                return (true);
840:            }
841:
842:            /**
843:             * A MSG event travelling down the stack. Forward unicast messages, treat
844:             * specially the broadcast messages.<br>
845:             * <p/>
846:             * If in <code>BLOCK</code> state, i.e. it has replied to a
847:             * <code>BLOCk_OK</code> and hasn't yet received a
848:             * <code>VIEW_CHANGE</code> event, messages are discarded<br>
849:             * <p/>
850:             * If in <code>FLUSH</code> state, forward unicast but queue broadcasts
851:             *
852:             * @param event the MSG event
853:             * @return true if event should travel further down
854:             */
855:            private boolean _downMsg(Event event) {
856:                Message msg;
857:
858:                // *** Get a shared lock
859:                try {
860:                    stateLock.readLock().acquire();
861:                    try {
862:
863:                        // i. Discard all msgs, if in NULL_STATE
864:                        // ii. Discard all msgs, if blocked
865:                        if (state == NULL_STATE) {
866:                            if (log.isErrorEnabled())
867:                                log.error("Discard msg in NULL_STATE");
868:                            return (false);
869:                        }
870:                        if (state == BLOCK) {
871:                            if (log.isErrorEnabled())
872:                                log.error("Blocked, discard msg");
873:                            return (false);
874:                        }
875:
876:                        msg = (Message) event.getArg();
877:                        if (msg.getDest() == null) {
878:                            _sendBcastRequest(msg);
879:                            return (false);
880:                        } else {
881:                            msg = _sendUcast(msg);
882:                            event.setArg(msg);
883:                        }
884:
885:                        // ** Revoke the shared lock
886:                    } finally {
887:                        stateLock.readLock().release();
888:                    }
889:                } catch (InterruptedException e) {
890:                    log.error(e.getMessage());
891:                }
892:
893:                return (true);
894:            }
895:
896:            /**
897:             * Prepare this layer to receive messages from above
898:             */
899:            public void start() throws Exception {
900:                TimeScheduler timer;
901:
902:                timer = stack != null ? stack.timer : null;
903:                if (timer == null)
904:                    throw new Exception("TOTAL.start(): timer is null");
905:
906:                reqTbl = new TreeMap();
907:                upTbl = new TreeMap();
908:                retransmitter = new AckSenderWindow(new Command(),
909:                        AVG_RETRANSMIT_INTERVAL);
910:            }
911:
912:            /**
913:             * Handle the stop() method travelling down the stack.
914:             * <p/>
915:             * The local addr is set to null, since after a Start->Stop->Start
916:             * sequence this member's addr is not guaranteed to be the same
917:             */
918:            public void stop() {
919:                try {
920:                    stateLock.writeLock().acquire();
921:                    try {
922:                        state = NULL_STATE;
923:                        retransmitter.reset();
924:                        reqTbl.clear();
925:                        upTbl.clear();
926:                        addr = null;
927:                    } finally {
928:                        stateLock.writeLock().release();
929:                    }
930:                } catch (InterruptedException e) {
931:                    log.error(e.getMessage());
932:                }
933:            }
934:
935:            /**
936:             * Process an event coming from the layer below
937:             *
938:             * @param event the event to process
939:             */
940:            public void up(Event event) {
941:                switch (event.getType()) {
942:                case Event.BLOCK:
943:                    if (!_upBlock())
944:                        return;
945:                    break;
946:                case Event.MSG:
947:                    if (!_upMsg(event))
948:                        return;
949:                    break;
950:                case Event.SET_LOCAL_ADDRESS:
951:                    if (!_upSetLocalAddress(event))
952:                        return;
953:                    break;
954:                case Event.VIEW_CHANGE:
955:                    if (!_upViewChange(event))
956:                        return;
957:                    break;
958:                default:
959:                    break;
960:                }
961:
962:                passUp(event);
963:            }
964:
965:            /**
966:             * Process an event coming from the layer above
967:             *
968:             * @param event the event to process
969:             */
970:            public void down(Event event) {
971:                switch (event.getType()) {
972:                case Event.BLOCK_OK:
973:                    if (!downBlockOk())
974:                        return;
975:                    break;
976:                case Event.MSG:
977:                    if (!_downMsg(event))
978:                        return;
979:                    break;
980:                default:
981:                    break;
982:                }
983:
984:                passDown(event);
985:            }
986:
987:            /**
988:             * Create the TOTAL layer
989:             */
990:            public TOTAL() {
991:            }
992:
993:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.