Source Code Cross Referenced for NonBlockingCoordinator.java in  » Sevlet-Container » apache-tomcat-6.0.14 » org » apache » catalina » tribes » group » interceptors » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Sevlet Container » apache tomcat 6.0.14 » org.apache.catalina.tribes.group.interceptors 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * Licensed to the Apache Software Foundation (ASF) under one or more
003:         * contributor license agreements.  See the NOTICE file distributed with
004:         * this work for additional information regarding copyright ownership.
005:         * The ASF licenses this file to You under the Apache License, Version 2.0
006:         * (the "License"); you may not use this file except in compliance with
007:         * the License.  You may obtain a copy of the License at
008:         * 
009:         *      http://www.apache.org/licenses/LICENSE-2.0
010:         * 
011:         * Unless required by applicable law or agreed to in writing, software
012:         * distributed under the License is distributed on an "AS IS" BASIS,
013:         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014:         * See the License for the specific language governing permissions and
015:         */
016:        package org.apache.catalina.tribes.group.interceptors;
017:
018:        import java.util.concurrent.atomic.AtomicBoolean;
019:
020:        import org.apache.catalina.tribes.Channel;
021:        import org.apache.catalina.tribes.ChannelException;
022:        import org.apache.catalina.tribes.ChannelInterceptor;
023:        import org.apache.catalina.tribes.ChannelMessage;
024:        import org.apache.catalina.tribes.Member;
025:        import org.apache.catalina.tribes.UniqueId;
026:        import org.apache.catalina.tribes.group.AbsoluteOrder;
027:        import org.apache.catalina.tribes.group.ChannelInterceptorBase;
028:        import org.apache.catalina.tribes.group.InterceptorPayload;
029:        import org.apache.catalina.tribes.io.ChannelData;
030:        import org.apache.catalina.tribes.io.XByteBuffer;
031:        import org.apache.catalina.tribes.membership.MemberImpl;
032:        import org.apache.catalina.tribes.membership.Membership;
033:        import org.apache.catalina.tribes.util.Arrays;
034:        import org.apache.catalina.tribes.util.UUIDGenerator;
035:
036:        /**
037:         * <p>Title: Auto merging leader election algorithm</p>
038:         *
039:         * <p>Description: Implementation of a simple coordinator algorithm that not only selects a coordinator,
040:         *    it also merges groups automatically when members are discovered that werent part of the 
041:         *    </p>
042:         * <p>This algorithm is non blocking meaning it allows for transactions while the coordination phase is going on
043:         * </p>
044:         * <p>This implementation is based on a home brewed algorithm that uses the AbsoluteOrder of a membership
045:         * to pass a token ring of the current membership.<br>
046:         * This is not the same as just using AbsoluteOrder! Consider the following scenario:<br>
047:         * Nodes, A,B,C,D,E on a network, in that priority. AbsoluteOrder will only work if all
048:         * nodes are receiving pings from all the other nodes. 
049:         * meaning, that node{i} receives pings from node{all}-node{i}<br>
050:         * but the following could happen if a multicast problem occurs.
051:         * A has members {B,C,D}<br>
052:         * B has members {A,C}<br>
053:         * C has members {D,E}<br>
054:         * D has members {A,B,C,E}<br>
055:         * E has members {A,C,D}<br>
056:         * Because the default Tribes membership implementation, relies on the multicast packets to 
057:         * arrive at all nodes correctly, there is nothing guaranteeing that it will.<br>
058:         * <br>
059:         * To best explain how this algorithm works, lets take the above example:
060:         * For simplicity we assume that a send operation is O(1) for all nodes, although this algorithm will work
061:         * where messages overlap, as they all depend on absolute order<br>
062:         * Scenario 1: A,B,C,D,E all come online at the same time
063:         * Eval phase, A thinks of itself as leader, B thinks of A as leader,
064:         * C thinks of itself as leader, D,E think of A as leader<br>
065:         * Token phase:<br>
066:         * (1) A sends out a message X{A-ldr, A-src, mbrs-A,B,C,D} to B where X is the id for the message(and the view)<br>
067:         * (1) C sends out a message Y{C-ldr, C-src, mbrs-C,D,E} to D where Y is the id for the message(and the view)<br>
068:         * (2) B receives X{A-ldr, A-src, mbrs-A,B,C,D}, sends X{A-ldr, A-src, mbrs-A,B,C,D} to C <br>
069:         * (2) D receives Y{C-ldr, C-src, mbrs-C,D,E} D is aware of A,B, sends Y{A-ldr, C-src, mbrs-A,B,C,D,E} to E<br>
070:         * (3) C receives X{A-ldr, A-src, mbrs-A,B,C,D}, sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to D<br>
071:         * (3) E receives Y{A-ldr, C-src, mbrs-A,B,C,D,E} sends Y{A-ldr, C-src, mbrs-A,B,C,D,E} to A<br>
072:         * (4) D receives X{A-ldr, A-src, mbrs-A,B,C,D,E} sends sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to A<br>
073:         * (4) A receives Y{A-ldr, C-src, mbrs-A,B,C,D,E}, holds the message, add E to its list of members<br>
074:         * (5) A receives X{A-ldr, A-src, mbrs-A,B,C,D,E} <br>
075:         * At this point, the state looks like<br>
076:         * A - {A-ldr, mbrs-A,B,C,D,E, id=X}<br>
077:         * B - {A-ldr, mbrs-A,B,C,D, id=X}<br>
078:         * C - {A-ldr, mbrs-A,B,C,D,E, id=X}<br>
079:         * D - {A-ldr, mbrs-A,B,C,D,E, id=X}<br>
080:         * E - {A-ldr, mbrs-A,B,C,D,E, id=Y}<br>
081:         * <br>
082:         * A message doesn't stop until it reaches its original sender, unless its dropped by a higher leader.
083:         * As you can see, E still thinks the viewId=Y, which is not correct. But at this point we have 
084:         * arrived at the same membership and all nodes are informed of each other.<br>
085:         * To synchronize the rest we simply perform the following check at A when A receives X:<br>
086:         * Original X{A-ldr, A-src, mbrs-A,B,C,D} == Arrived X{A-ldr, A-src, mbrs-A,B,C,D,E}<br>
087:         * Since the condition is false, A, will resend the token, and A sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to B
088:         * When A receives X again, the token is complete. <br>
089:         * Optionally, A can send a message X{A-ldr, A-src, mbrs-A,B,C,D,E confirmed} to A,B,C,D,E who then
090:         * install and accept the view.
091:         * </p>
092:         * <p>
093:         * Lets assume that C1 arrives, C1 has lower priority than C, but higher priority than D.<br>
094:         * Lets also assume that C1 sees the following view {B,D,E}<br>
095:         * C1 waits for a token to arrive. When the token arrives, the same scenario as above will happen.<br>
096:         * In the scenario where C1 sees {D,E} and A,B,C can not see C1, no token will ever arrive.<br>
097:         * In this case, C1 sends a Z{C1-ldr, C1-src, mbrs-C1,D,E} to D<br>
098:         * D receives Z{C1-ldr, C1-src, mbrs-C1,D,E} and sends Z{A-ldr, C1-src, mbrs-A,B,C,C1,D,E} to E<br>
099:         * E receives Z{A-ldr, C1-src, mbrs-A,B,C,C1,D,E} and sends it to A<br>
100:         * A sends Z{A-ldr, A-src, mbrs-A,B,C,C1,D,E} to B and the chain continues until A receives the token again.
101:         * At that time A optionally sends out Z{A-ldr, A-src, mbrs-A,B,C,C1,D,E, confirmed} to A,B,C,C1,D,E
102:         * </p>
103:         * <p>To ensure that the view gets implemented at all nodes at the same time, 
104:         *    A will send out a VIEW_CONF message, this is the 'confirmed' message that is optional above.
105:         * <p>Ideally, the interceptor below this one would be the TcpFailureDetector to ensure correct memberships</p>
106:         *
107:         * <p>The example above, of course can be simplified with a finite statemachine:<br>
108:         * But I suck at writing state machines, my head gets all confused. One day I will document this algorithm though.<br>
109:         * Maybe I'll do a state diagram :)
110:         * </p>
111:         * <h2>State Diagrams</h2>
112:         * <a href="http://people.apache.org/~fhanik/tribes/docs/leader-election-initiate-election.jpg">Initiate an election</a><br><br>
113:         * <a href="http://people.apache.org/~fhanik/tribes/docs/leader-election-message-arrives.jpg">Receive an election message</a><br><br>
114:         * 
115:         * @author Filip Hanik
116:         * @version 1.0
117:         * 
118:         * 
119:         * 
120:         */
121:        public class NonBlockingCoordinator extends ChannelInterceptorBase {
122:
123:            /**
124:             * header for a coordination message
125:             */
126:            protected static final byte[] COORD_HEADER = new byte[] { -86, 38,
127:                    -34, -29, -98, 90, 65, 63, -81, -122, -6, -110, 99, -54,
128:                    13, 63 };
129:            /**
130:             * Coordination request
131:             */
132:            protected static final byte[] COORD_REQUEST = new byte[] { 104,
133:                    -95, -92, -42, 114, -36, 71, -19, -79, 20, 122, 101, -1,
134:                    -48, -49, 30 };
135:            /**
136:             * Coordination confirmation, for blocking installations
137:             */
138:            protected static final byte[] COORD_CONF = new byte[] { 67, 88,
139:                    107, -86, 69, 23, 76, -70, -91, -23, -87, -25, -125, 86,
140:                    75, 20 };
141:
142:            /**
143:             * Alive message
144:             */
145:            protected static final byte[] COORD_ALIVE = new byte[] { 79, -121,
146:                    -25, -15, -59, 5, 64, 94, -77, 113, -119, -88, 52, 114,
147:                    -56, -46, -18, 102, 10, 34, -127, -9, 71, 115, -70, 72,
148:                    -101, 88, 72, -124, 127, 111, 74, 76, -116, 50, 111, 103,
149:                    65, 3, -77, 51, -35, 0, 119, 117, 9, -26, 119, 50, -75,
150:                    -105, -102, 36, 79, 37, -68, -84, -123, 15, -22, -109, 106,
151:                    -55 };
152:            /**
153:             * Time to wait for coordination timeout
154:             */
155:            protected long waitForCoordMsgTimeout = 15000;
156:            /**
157:             * Our current view
158:             */
159:            protected Membership view = null;
160:            /**
161:             * Out current viewId
162:             */
163:            protected UniqueId viewId;
164:
165:            /**
166:             * Our nonblocking membership
167:             */
168:            protected Membership membership = null;
169:
170:            /**
171:             * indicates that we are running an election 
172:             * and this is the one we are running
173:             */
174:            protected UniqueId suggestedviewId;
175:            protected Membership suggestedView;
176:
177:            protected boolean started = false;
178:            protected final int startsvc = 0xFFFF;
179:
180:            protected Object electionMutex = new Object();
181:
182:            protected AtomicBoolean coordMsgReceived = new AtomicBoolean(false);
183:
184:            public NonBlockingCoordinator() {
185:                super ();
186:            }
187:
188:            //============================================================================================================    
189:            //              COORDINATION HANDLING
190:            //============================================================================================================
191:
192:            public void startElection(boolean force) throws ChannelException {
193:                synchronized (electionMutex) {
194:                    MemberImpl local = (MemberImpl) getLocalMember(false);
195:                    MemberImpl[] others = (MemberImpl[]) membership
196:                            .getMembers();
197:                    fireInterceptorEvent(new CoordinationEvent(
198:                            CoordinationEvent.EVT_START_ELECT, this ,
199:                            "Election initated"));
200:                    if (others.length == 0) {
201:                        this .viewId = new UniqueId(UUIDGenerator
202:                                .randomUUID(false));
203:                        this .view = new Membership(local, AbsoluteOrder.comp,
204:                                true);
205:                        this .handleViewConf(this .createElectionMsg(local,
206:                                others, local), local, view);
207:                        return; //the only member, no need for an election
208:                    }
209:                    if (suggestedviewId != null) {
210:
211:                        if (view != null
212:                                && Arrays.diff(view, suggestedView, local).length == 0
213:                                && Arrays.diff(suggestedView, view, local).length == 0) {
214:                            suggestedviewId = null;
215:                            suggestedView = null;
216:                            fireInterceptorEvent(new CoordinationEvent(
217:                                    CoordinationEvent.EVT_ELECT_ABANDONED,
218:                                    this ,
219:                                    "Election abandoned, running election matches view"));
220:                        } else {
221:                            fireInterceptorEvent(new CoordinationEvent(
222:                                    CoordinationEvent.EVT_ELECT_ABANDONED,
223:                                    this ,
224:                                    "Election abandoned, election running"));
225:                        }
226:                        return; //election already running, I'm not allowed to have two of them
227:                    }
228:                    if (view != null
229:                            && Arrays.diff(view, membership, local).length == 0
230:                            && Arrays.diff(membership, view, local).length == 0) {
231:                        fireInterceptorEvent(new CoordinationEvent(
232:                                CoordinationEvent.EVT_ELECT_ABANDONED, this ,
233:                                "Election abandoned, view matches membership"));
234:                        return; //already have this view installed
235:                    }
236:                    int prio = AbsoluteOrder.comp.compare(local, others[0]);
237:                    MemberImpl leader = (prio < 0) ? local : others[0];//am I the leader in my view?
238:                    if (local.equals(leader) || force) {
239:                        CoordinationMessage msg = createElectionMsg(local,
240:                                others, leader);
241:                        suggestedviewId = msg.getId();
242:                        suggestedView = new Membership(local,
243:                                AbsoluteOrder.comp, true);
244:                        Arrays.fill(suggestedView, msg.getMembers());
245:                        fireInterceptorEvent(new CoordinationEvent(
246:                                CoordinationEvent.EVT_PROCESS_ELECT, this ,
247:                                "Election, sending request"));
248:                        sendElectionMsg(local, others[0], msg);
249:                    } else {
250:                        try {
251:                            coordMsgReceived.set(false);
252:                            fireInterceptorEvent(new CoordinationEvent(
253:                                    CoordinationEvent.EVT_WAIT_FOR_MSG, this ,
254:                                    "Election, waiting for request"));
255:                            electionMutex.wait(waitForCoordMsgTimeout);
256:                        } catch (InterruptedException x) {
257:                            Thread.currentThread().interrupted();
258:                        }
259:                        if (suggestedviewId == null
260:                                && (!coordMsgReceived.get())) {
261:                            //no message arrived, send the coord msg
262:                            //                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_WAIT_FOR_MSG,this,"Election, waiting timed out."));
263:                            //                    startElection(true);
264:                            fireInterceptorEvent(new CoordinationEvent(
265:                                    CoordinationEvent.EVT_ELECT_ABANDONED,
266:                                    this ,
267:                                    "Election abandoned, waiting timed out."));
268:                        } else {
269:                            fireInterceptorEvent(new CoordinationEvent(
270:                                    CoordinationEvent.EVT_ELECT_ABANDONED,
271:                                    this ,
272:                                    "Election abandoned, received a message"));
273:                        }
274:                    }//end if
275:
276:                }
277:            }
278:
279:            private CoordinationMessage createElectionMsg(MemberImpl local,
280:                    MemberImpl[] others, MemberImpl leader) {
281:                Membership m = new Membership(local, AbsoluteOrder.comp, true);
282:                Arrays.fill(m, others);
283:                MemberImpl[] mbrs = m.getMembers();
284:                m.reset();
285:                CoordinationMessage msg = new CoordinationMessage(leader,
286:                        local, mbrs, new UniqueId(UUIDGenerator
287:                                .randomUUID(true)), this .COORD_REQUEST);
288:                return msg;
289:            }
290:
291:            protected void sendElectionMsg(MemberImpl local, MemberImpl next,
292:                    CoordinationMessage msg) throws ChannelException {
293:                fireInterceptorEvent(new CoordinationEvent(
294:                        CoordinationEvent.EVT_SEND_MSG, this ,
295:                        "Sending election message to(" + next.getName() + ")"));
296:                super .sendMessage(new Member[] { next },
297:                        createData(msg, local), null);
298:            }
299:
300:            protected void sendElectionMsgToNextInline(MemberImpl local,
301:                    CoordinationMessage msg) throws ChannelException {
302:                int next = Arrays.nextIndex(local, msg.getMembers());
303:                int current = next;
304:                msg.leader = msg.getMembers()[0];
305:                boolean sent = false;
306:                while (!sent && current >= 0) {
307:                    try {
308:                        sendElectionMsg(local,
309:                                (MemberImpl) msg.getMembers()[current], msg);
310:                        sent = true;
311:                    } catch (ChannelException x) {
312:                        log.warn("Unable to send election message to:"
313:                                + msg.getMembers()[current]);
314:                        current = Arrays.nextIndex(msg.getMembers()[current],
315:                                msg.getMembers());
316:                        if (current == next)
317:                            throw x;
318:                    }
319:                }
320:            }
321:
322:            public Member getNextInLine(MemberImpl local, MemberImpl[] others) {
323:                MemberImpl result = null;
324:                for (int i = 0; i < others.length; i++) {
325:
326:                }
327:                return result;
328:            }
329:
330:            public ChannelData createData(CoordinationMessage msg,
331:                    MemberImpl local) {
332:                msg.write();
333:                ChannelData data = new ChannelData(true);
334:                data.setAddress(local);
335:                data.setMessage(msg.getBuffer());
336:                data.setOptions(Channel.SEND_OPTIONS_USE_ACK);
337:                data.setTimestamp(System.currentTimeMillis());
338:                return data;
339:            }
340:
341:            protected void viewChange(UniqueId viewId, Member[] view) {
342:                //invoke any listeners
343:            }
344:
345:            protected boolean alive(Member mbr) {
346:                return TcpFailureDetector.memberAlive(mbr, COORD_ALIVE, false,
347:                        false, waitForCoordMsgTimeout, waitForCoordMsgTimeout,
348:                        getOptionFlag());
349:            }
350:
351:            protected Membership mergeOnArrive(CoordinationMessage msg,
352:                    Member sender) {
353:                fireInterceptorEvent(new CoordinationEvent(
354:                        CoordinationEvent.EVT_PRE_MERGE, this , "Pre merge"));
355:                MemberImpl local = (MemberImpl) getLocalMember(false);
356:                Membership merged = new Membership(local, AbsoluteOrder.comp,
357:                        true);
358:                Arrays.fill(merged, msg.getMembers());
359:                Arrays.fill(merged, getMembers());
360:                Member[] diff = Arrays.diff(merged, membership, local);
361:                for (int i = 0; i < diff.length; i++) {
362:                    if (!alive(diff[i]))
363:                        merged.removeMember((MemberImpl) diff[i]);
364:                    else
365:                        memberAdded(diff[i], false);
366:                }
367:                fireInterceptorEvent(new CoordinationEvent(
368:                        CoordinationEvent.EVT_POST_MERGE, this , "Post merge"));
369:                return merged;
370:            }
371:
372:            protected void processCoordMessage(CoordinationMessage msg,
373:                    Member sender) throws ChannelException {
374:                if (!coordMsgReceived.get()) {
375:                    coordMsgReceived.set(true);
376:                    synchronized (electionMutex) {
377:                        electionMutex.notifyAll();
378:                    }
379:                }
380:                msg.timestamp = System.currentTimeMillis();
381:                Membership merged = mergeOnArrive(msg, sender);
382:                if (isViewConf(msg))
383:                    handleViewConf(msg, sender, merged);
384:                else
385:                    handleToken(msg, sender, merged);
386:                ClassLoader loader;
387:
388:            }
389:
390:            protected void handleToken(CoordinationMessage msg, Member sender,
391:                    Membership merged) throws ChannelException {
392:                MemberImpl local = (MemberImpl) getLocalMember(false);
393:                if (local.equals(msg.getSource())) {
394:                    //my message msg.src=local
395:                    handleMyToken(local, msg, sender, merged);
396:                } else {
397:                    handleOtherToken(local, msg, sender, merged);
398:                }
399:            }
400:
401:            protected void handleMyToken(MemberImpl local,
402:                    CoordinationMessage msg, Member sender, Membership merged)
403:                    throws ChannelException {
404:                if (local.equals(msg.getLeader())) {
405:                    //no leadership change
406:                    if (Arrays.sameMembers(msg.getMembers(), merged
407:                            .getMembers())) {
408:                        msg.type = COORD_CONF;
409:                        super .sendMessage(Arrays
410:                                .remove(msg.getMembers(), local), createData(
411:                                msg, local), null);
412:                        handleViewConf(msg, local, merged);
413:                    } else {
414:                        //membership change
415:                        suggestedView = new Membership(local,
416:                                AbsoluteOrder.comp, true);
417:                        suggestedviewId = msg.getId();
418:                        Arrays.fill(suggestedView, merged.getMembers());
419:                        msg.view = (MemberImpl[]) merged.getMembers();
420:                        sendElectionMsgToNextInline(local, msg);
421:                    }
422:                } else {
423:                    //leadership change
424:                    suggestedView = null;
425:                    suggestedviewId = null;
426:                    msg.view = (MemberImpl[]) merged.getMembers();
427:                    sendElectionMsgToNextInline(local, msg);
428:                }
429:            }
430:
431:            protected void handleOtherToken(MemberImpl local,
432:                    CoordinationMessage msg, Member sender, Membership merged)
433:                    throws ChannelException {
434:                if (local.equals(msg.getLeader())) {
435:                    //I am the new leader
436:                    //startElection(false);
437:                } else {
438:                    msg.view = (MemberImpl[]) merged.getMembers();
439:                    sendElectionMsgToNextInline(local, msg);
440:                }
441:            }
442:
443:            protected void handleViewConf(CoordinationMessage msg,
444:                    Member sender, Membership merged) throws ChannelException {
445:                if (viewId != null && msg.getId().equals(viewId))
446:                    return;//we already have this view
447:                view = new Membership((MemberImpl) getLocalMember(false),
448:                        AbsoluteOrder.comp, true);
449:                Arrays.fill(view, msg.getMembers());
450:                viewId = msg.getId();
451:
452:                if (viewId.equals(suggestedviewId)) {
453:                    suggestedView = null;
454:                    suggestedviewId = null;
455:                }
456:
457:                if (suggestedView != null
458:                        && AbsoluteOrder.comp.compare(suggestedView
459:                                .getMembers()[0], merged.getMembers()[0]) < 0) {
460:                    suggestedView = null;
461:                    suggestedviewId = null;
462:                }
463:
464:                viewChange(viewId, view.getMembers());
465:                fireInterceptorEvent(new CoordinationEvent(
466:                        CoordinationEvent.EVT_CONF_RX, this , "Accepted View"));
467:
468:                if (suggestedviewId == null
469:                        && hasHigherPriority(merged.getMembers(), membership
470:                                .getMembers())) {
471:                    startElection(false);
472:                }
473:            }
474:
475:            protected boolean isViewConf(CoordinationMessage msg) {
476:                return Arrays.contains(msg.getType(), 0, COORD_CONF, 0,
477:                        COORD_CONF.length);
478:            }
479:
480:            protected boolean hasHigherPriority(Member[] complete,
481:                    Member[] local) {
482:                if (local == null || local.length == 0)
483:                    return false;
484:                if (complete == null || complete.length == 0)
485:                    return true;
486:                AbsoluteOrder.absoluteOrder(complete);
487:                AbsoluteOrder.absoluteOrder(local);
488:                return (AbsoluteOrder.comp.compare(complete[0], local[0]) > 0);
489:
490:            }
491:
492:            /**
493:             * Returns coordinator if one is available
494:             * @return Member
495:             */
496:            public Member getCoordinator() {
497:                return (view != null && view.hasMembers()) ? view.getMembers()[0]
498:                        : null;
499:            }
500:
501:            public Member[] getView() {
502:                return (view != null && view.hasMembers()) ? view.getMembers()
503:                        : new Member[0];
504:            }
505:
506:            public UniqueId getViewId() {
507:                return viewId;
508:            }
509:
510:            /**
511:             * Block in/out messages while a election is going on
512:             */
513:            protected void halt() {
514:
515:            }
516:
517:            /**
518:             * Release lock for in/out messages election is completed
519:             */
520:            protected void release() {
521:
522:            }
523:
524:            /**
525:             * Wait for an election to end
526:             */
527:            protected void waitForRelease() {
528:
529:            }
530:
531:            //============================================================================================================    
532:            //              OVERRIDDEN METHODS FROM CHANNEL INTERCEPTOR BASE    
533:            //============================================================================================================
534:            public void start(int svc) throws ChannelException {
535:                if (membership == null)
536:                    setupMembership();
537:                if (started)
538:                    return;
539:                fireInterceptorEvent(new CoordinationEvent(
540:                        CoordinationEvent.EVT_START, this , "Before start"));
541:                super .start(startsvc);
542:                started = true;
543:                if (view == null)
544:                    view = new Membership((MemberImpl) super 
545:                            .getLocalMember(true), AbsoluteOrder.comp, true);
546:                fireInterceptorEvent(new CoordinationEvent(
547:                        CoordinationEvent.EVT_START, this , "After start"));
548:                startElection(false);
549:            }
550:
551:            public void stop(int svc) throws ChannelException {
552:                try {
553:                    halt();
554:                    synchronized (electionMutex) {
555:                        if (!started)
556:                            return;
557:                        started = false;
558:                        fireInterceptorEvent(new CoordinationEvent(
559:                                CoordinationEvent.EVT_STOP, this , "Before stop"));
560:                        super .stop(startsvc);
561:                        this .view = null;
562:                        this .viewId = null;
563:                        this .suggestedView = null;
564:                        this .suggestedviewId = null;
565:                        this .membership.reset();
566:                        fireInterceptorEvent(new CoordinationEvent(
567:                                CoordinationEvent.EVT_STOP, this , "After stop"));
568:                    }
569:                } finally {
570:                    release();
571:                }
572:            }
573:
574:            public void sendMessage(Member[] destination, ChannelMessage msg,
575:                    InterceptorPayload payload) throws ChannelException {
576:                waitForRelease();
577:                super .sendMessage(destination, msg, payload);
578:            }
579:
580:            public void messageReceived(ChannelMessage msg) {
581:                if (Arrays.contains(msg.getMessage().getBytesDirect(), 0,
582:                        COORD_ALIVE, 0, COORD_ALIVE.length)) {
583:                    //ignore message, its an alive message
584:                    fireInterceptorEvent(new CoordinationEvent(
585:                            CoordinationEvent.EVT_MSG_ARRIVE, this ,
586:                            "Alive Message"));
587:
588:                } else if (Arrays.contains(msg.getMessage().getBytesDirect(),
589:                        0, COORD_HEADER, 0, COORD_HEADER.length)) {
590:                    try {
591:                        CoordinationMessage cmsg = new CoordinationMessage(msg
592:                                .getMessage());
593:                        Member[] cmbr = cmsg.getMembers();
594:                        fireInterceptorEvent(new CoordinationEvent(
595:                                CoordinationEvent.EVT_MSG_ARRIVE, this ,
596:                                "Coord Msg Arrived("
597:                                        + Arrays.toNameString(cmbr) + ")"));
598:                        processCoordMessage(cmsg, msg.getAddress());
599:                    } catch (ChannelException x) {
600:                        log
601:                                .error(
602:                                        "Error processing coordination message. Could be fatal.",
603:                                        x);
604:                    }
605:                } else {
606:                    super .messageReceived(msg);
607:                }
608:            }
609:
610:            public boolean accept(ChannelMessage msg) {
611:                return super .accept(msg);
612:            }
613:
614:            public void memberAdded(Member member) {
615:                memberAdded(member, true);
616:            }
617:
618:            public void memberAdded(Member member, boolean elect) {
619:                try {
620:                    if (membership == null)
621:                        setupMembership();
622:                    if (membership.memberAlive((MemberImpl) member))
623:                        super .memberAdded(member);
624:                    try {
625:                        fireInterceptorEvent(new CoordinationEvent(
626:                                CoordinationEvent.EVT_MBR_ADD, this ,
627:                                "Member add(" + member.getName() + ")"));
628:                        if (started && elect)
629:                            startElection(false);
630:                    } catch (ChannelException x) {
631:                        log
632:                                .error(
633:                                        "Unable to start election when member was added.",
634:                                        x);
635:                    }
636:                } finally {
637:                }
638:
639:            }
640:
641:            public void memberDisappeared(Member member) {
642:                try {
643:
644:                    membership.removeMember((MemberImpl) member);
645:                    super .memberDisappeared(member);
646:                    try {
647:                        fireInterceptorEvent(new CoordinationEvent(
648:                                CoordinationEvent.EVT_MBR_DEL, this ,
649:                                "Member remove(" + member.getName() + ")"));
650:                        if (started && (isCoordinator() || isHighest()))
651:                            startElection(true); //to do, if a member disappears, only the coordinator can start
652:                    } catch (ChannelException x) {
653:                        log
654:                                .error(
655:                                        "Unable to start election when member was removed.",
656:                                        x);
657:                    }
658:                } finally {
659:                }
660:            }
661:
662:            public boolean isHighest() {
663:                Member local = getLocalMember(false);
664:                if (membership.getMembers().length == 0)
665:                    return true;
666:                else
667:                    return AbsoluteOrder.comp.compare(local, membership
668:                            .getMembers()[0]) <= 0;
669:            }
670:
671:            public boolean isCoordinator() {
672:                Member coord = getCoordinator();
673:                return coord != null && getLocalMember(false).equals(coord);
674:            }
675:
676:            public void heartbeat() {
677:                try {
678:                    MemberImpl local = (MemberImpl) getLocalMember(false);
679:                    if (view != null
680:                            && (Arrays.diff(view, membership, local).length != 0 || Arrays
681:                                    .diff(membership, view, local).length != 0)) {
682:                        if (isHighest()) {
683:                            fireInterceptorEvent(new CoordinationEvent(
684:                                    CoordinationEvent.EVT_START_ELECT, this ,
685:                                    "Heartbeat found inconsistency, restart election"));
686:                            startElection(true);
687:                        }
688:                    }
689:                } catch (Exception x) {
690:                    log.error("Unable to perform heartbeat.", x);
691:                } finally {
692:                    super .heartbeat();
693:                }
694:            }
695:
696:            /**
697:             * has members
698:             */
699:            public boolean hasMembers() {
700:
701:                return membership.hasMembers();
702:            }
703:
704:            /**
705:             * Get all current cluster members
706:             * @return all members or empty array
707:             */
708:            public Member[] getMembers() {
709:
710:                return membership.getMembers();
711:            }
712:
713:            /**
714:             *
715:             * @param mbr Member
716:             * @return Member
717:             */
718:            public Member getMember(Member mbr) {
719:
720:                return membership.getMember(mbr);
721:            }
722:
723:            /**
724:             * Return the member that represents this node.
725:             *
726:             * @return Member
727:             */
728:            public Member getLocalMember(boolean incAlive) {
729:                Member local = super .getLocalMember(incAlive);
730:                if (view == null && (local != null))
731:                    setupMembership();
732:                return local;
733:            }
734:
735:            protected synchronized void setupMembership() {
736:                if (membership == null) {
737:                    membership = new Membership((MemberImpl) super 
738:                            .getLocalMember(true), AbsoluteOrder.comp, false);
739:                }
740:            }
741:
742:            //============================================================================================================    
743:            //              HELPER CLASSES FOR COORDINATION
744:            //============================================================================================================
745:
746:            public static class CoordinationMessage {
747:                //X{A-ldr, A-src, mbrs-A,B,C,D}
748:                protected XByteBuffer buf;
749:                protected MemberImpl leader;
750:                protected MemberImpl source;
751:                protected MemberImpl[] view;
752:                protected UniqueId id;
753:                protected byte[] type;
754:                protected long timestamp = System.currentTimeMillis();
755:
756:                public CoordinationMessage(XByteBuffer buf) {
757:                    this .buf = buf;
758:                    parse();
759:                }
760:
761:                public CoordinationMessage(MemberImpl leader,
762:                        MemberImpl source, MemberImpl[] view, UniqueId id,
763:                        byte[] type) {
764:                    this .buf = new XByteBuffer(4096, false);
765:                    this .leader = leader;
766:                    this .source = source;
767:                    this .view = view;
768:                    this .id = id;
769:                    this .type = type;
770:                    this .write();
771:                }
772:
773:                public byte[] getHeader() {
774:                    return NonBlockingCoordinator.COORD_HEADER;
775:                }
776:
777:                public MemberImpl getLeader() {
778:                    if (leader == null)
779:                        parse();
780:                    return leader;
781:                }
782:
783:                public MemberImpl getSource() {
784:                    if (source == null)
785:                        parse();
786:                    return source;
787:                }
788:
789:                public UniqueId getId() {
790:                    if (id == null)
791:                        parse();
792:                    return id;
793:                }
794:
795:                public MemberImpl[] getMembers() {
796:                    if (view == null)
797:                        parse();
798:                    return view;
799:                }
800:
801:                public byte[] getType() {
802:                    if (type == null)
803:                        parse();
804:                    return type;
805:                }
806:
807:                public XByteBuffer getBuffer() {
808:                    return this .buf;
809:                }
810:
811:                public void parse() {
812:                    //header
813:                    int offset = 16;
814:                    //leader
815:                    int ldrLen = buf.toInt(buf.getBytesDirect(), offset);
816:                    offset += 4;
817:                    byte[] ldr = new byte[ldrLen];
818:                    System.arraycopy(buf.getBytesDirect(), offset, ldr, 0,
819:                            ldrLen);
820:                    leader = MemberImpl.getMember(ldr);
821:                    offset += ldrLen;
822:                    //source
823:                    int srcLen = buf.toInt(buf.getBytesDirect(), offset);
824:                    offset += 4;
825:                    byte[] src = new byte[srcLen];
826:                    System.arraycopy(buf.getBytesDirect(), offset, src, 0,
827:                            srcLen);
828:                    source = MemberImpl.getMember(src);
829:                    offset += srcLen;
830:                    //view
831:                    int mbrCount = buf.toInt(buf.getBytesDirect(), offset);
832:                    offset += 4;
833:                    view = new MemberImpl[mbrCount];
834:                    for (int i = 0; i < view.length; i++) {
835:                        int mbrLen = buf.toInt(buf.getBytesDirect(), offset);
836:                        offset += 4;
837:                        byte[] mbr = new byte[mbrLen];
838:                        System.arraycopy(buf.getBytesDirect(), offset, mbr, 0,
839:                                mbrLen);
840:                        view[i] = MemberImpl.getMember(mbr);
841:                        offset += mbrLen;
842:                    }
843:                    //id
844:                    this .id = new UniqueId(buf.getBytesDirect(), offset, 16);
845:                    offset += 16;
846:                    type = new byte[16];
847:                    System.arraycopy(buf.getBytesDirect(), offset, type, 0,
848:                            type.length);
849:                    offset += 16;
850:
851:                }
852:
853:                public void write() {
854:                    buf.reset();
855:                    //header
856:                    buf.append(COORD_HEADER, 0, COORD_HEADER.length);
857:                    //leader
858:                    byte[] ldr = leader.getData(false, false);
859:                    buf.append(ldr.length);
860:                    buf.append(ldr, 0, ldr.length);
861:                    ldr = null;
862:                    //source
863:                    byte[] src = source.getData(false, false);
864:                    buf.append(src.length);
865:                    buf.append(src, 0, src.length);
866:                    src = null;
867:                    //view
868:                    buf.append(view.length);
869:                    for (int i = 0; i < view.length; i++) {
870:                        byte[] mbr = view[i].getData(false, false);
871:                        buf.append(mbr.length);
872:                        buf.append(mbr, 0, mbr.length);
873:                    }
874:                    //id
875:                    buf.append(id.getBytes(), 0, id.getBytes().length);
876:                    buf.append(type, 0, type.length);
877:                }
878:            }
879:
880:            public void fireInterceptorEvent(InterceptorEvent event) {
881:                if (event instanceof  CoordinationEvent
882:                        && ((CoordinationEvent) event).type == CoordinationEvent.EVT_CONF_RX)
883:                    log.info(event);
884:            }
885:
886:            public static class CoordinationEvent implements  InterceptorEvent {
887:                public static final int EVT_START = 1;
888:                public static final int EVT_MBR_ADD = 2;
889:                public static final int EVT_MBR_DEL = 3;
890:                public static final int EVT_START_ELECT = 4;
891:                public static final int EVT_PROCESS_ELECT = 5;
892:                public static final int EVT_MSG_ARRIVE = 6;
893:                public static final int EVT_PRE_MERGE = 7;
894:                public static final int EVT_POST_MERGE = 8;
895:                public static final int EVT_WAIT_FOR_MSG = 9;
896:                public static final int EVT_SEND_MSG = 10;
897:                public static final int EVT_STOP = 11;
898:                public static final int EVT_CONF_RX = 12;
899:                public static final int EVT_ELECT_ABANDONED = 13;
900:
901:                int type;
902:                ChannelInterceptor interceptor;
903:                Member coord;
904:                Member[] mbrs;
905:                String info;
906:                Membership view;
907:                Membership suggestedView;
908:
909:                public CoordinationEvent(int type,
910:                        ChannelInterceptor interceptor, String info) {
911:                    this .type = type;
912:                    this .interceptor = interceptor;
913:                    this .coord = ((NonBlockingCoordinator) interceptor)
914:                            .getCoordinator();
915:                    this .mbrs = ((NonBlockingCoordinator) interceptor).membership
916:                            .getMembers();
917:                    this .info = info;
918:                    this .view = ((NonBlockingCoordinator) interceptor).view;
919:                    this .suggestedView = ((NonBlockingCoordinator) interceptor).suggestedView;
920:                }
921:
922:                public int getEventType() {
923:                    return type;
924:                }
925:
926:                public String getEventTypeDesc() {
927:                    switch (type) {
928:                    case EVT_START:
929:                        return "EVT_START:" + info;
930:                    case EVT_MBR_ADD:
931:                        return "EVT_MBR_ADD:" + info;
932:                    case EVT_MBR_DEL:
933:                        return "EVT_MBR_DEL:" + info;
934:                    case EVT_START_ELECT:
935:                        return "EVT_START_ELECT:" + info;
936:                    case EVT_PROCESS_ELECT:
937:                        return "EVT_PROCESS_ELECT:" + info;
938:                    case EVT_MSG_ARRIVE:
939:                        return "EVT_MSG_ARRIVE:" + info;
940:                    case EVT_PRE_MERGE:
941:                        return "EVT_PRE_MERGE:" + info;
942:                    case EVT_POST_MERGE:
943:                        return "EVT_POST_MERGE:" + info;
944:                    case EVT_WAIT_FOR_MSG:
945:                        return "EVT_WAIT_FOR_MSG:" + info;
946:                    case EVT_SEND_MSG:
947:                        return "EVT_SEND_MSG:" + info;
948:                    case EVT_STOP:
949:                        return "EVT_STOP:" + info;
950:                    case EVT_CONF_RX:
951:                        return "EVT_CONF_RX:" + info;
952:                    case EVT_ELECT_ABANDONED:
953:                        return "EVT_ELECT_ABANDONED:" + info;
954:                    default:
955:                        return "Unknown";
956:                    }
957:                }
958:
959:                public ChannelInterceptor getInterceptor() {
960:                    return interceptor;
961:                }
962:
963:                public String toString() {
964:                    StringBuffer buf = new StringBuffer(
965:                            "CoordinationEvent[type=");
966:                    buf.append(type).append("\n\tLocal:");
967:                    Member local = interceptor.getLocalMember(false);
968:                    buf.append(local != null ? local.getName() : "").append(
969:                            "\n\tCoord:");
970:                    buf.append(coord != null ? coord.getName() : "").append(
971:                            "\n\tView:");
972:                    buf.append(
973:                            Arrays.toNameString(view != null ? view
974:                                    .getMembers() : null)).append(
975:                            "\n\tSuggested View:");
976:                    buf
977:                            .append(
978:                                    Arrays
979:                                            .toNameString(suggestedView != null ? suggestedView
980:                                                    .getMembers()
981:                                                    : null)).append(
982:                                    "\n\tMembers:");
983:                    buf.append(Arrays.toNameString(mbrs)).append("\n\tInfo:");
984:                    buf.append(info).append("]");
985:                    return buf.toString();
986:                }
987:            }
988:
989:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.