001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: */
016: package org.apache.catalina.tribes.group.interceptors;
017:
018: import java.util.concurrent.atomic.AtomicBoolean;
019:
020: import org.apache.catalina.tribes.Channel;
021: import org.apache.catalina.tribes.ChannelException;
022: import org.apache.catalina.tribes.ChannelInterceptor;
023: import org.apache.catalina.tribes.ChannelMessage;
024: import org.apache.catalina.tribes.Member;
025: import org.apache.catalina.tribes.UniqueId;
026: import org.apache.catalina.tribes.group.AbsoluteOrder;
027: import org.apache.catalina.tribes.group.ChannelInterceptorBase;
028: import org.apache.catalina.tribes.group.InterceptorPayload;
029: import org.apache.catalina.tribes.io.ChannelData;
030: import org.apache.catalina.tribes.io.XByteBuffer;
031: import org.apache.catalina.tribes.membership.MemberImpl;
032: import org.apache.catalina.tribes.membership.Membership;
033: import org.apache.catalina.tribes.util.Arrays;
034: import org.apache.catalina.tribes.util.UUIDGenerator;
035:
036: /**
037: * <p>Title: Auto merging leader election algorithm</p>
038: *
039: * <p>Description: Implementation of a simple coordinator algorithm that not only selects a coordinator,
040: * it also merges groups automatically when members are discovered that werent part of the
041: * </p>
042: * <p>This algorithm is non blocking meaning it allows for transactions while the coordination phase is going on
043: * </p>
044: * <p>This implementation is based on a home brewed algorithm that uses the AbsoluteOrder of a membership
045: * to pass a token ring of the current membership.<br>
046: * This is not the same as just using AbsoluteOrder! Consider the following scenario:<br>
047: * Nodes, A,B,C,D,E on a network, in that priority. AbsoluteOrder will only work if all
048: * nodes are receiving pings from all the other nodes.
049: * meaning, that node{i} receives pings from node{all}-node{i}<br>
050: * but the following could happen if a multicast problem occurs.
051: * A has members {B,C,D}<br>
052: * B has members {A,C}<br>
053: * C has members {D,E}<br>
054: * D has members {A,B,C,E}<br>
055: * E has members {A,C,D}<br>
056: * Because the default Tribes membership implementation, relies on the multicast packets to
057: * arrive at all nodes correctly, there is nothing guaranteeing that it will.<br>
058: * <br>
059: * To best explain how this algorithm works, lets take the above example:
060: * For simplicity we assume that a send operation is O(1) for all nodes, although this algorithm will work
061: * where messages overlap, as they all depend on absolute order<br>
062: * Scenario 1: A,B,C,D,E all come online at the same time
063: * Eval phase, A thinks of itself as leader, B thinks of A as leader,
064: * C thinks of itself as leader, D,E think of A as leader<br>
065: * Token phase:<br>
066: * (1) A sends out a message X{A-ldr, A-src, mbrs-A,B,C,D} to B where X is the id for the message(and the view)<br>
067: * (1) C sends out a message Y{C-ldr, C-src, mbrs-C,D,E} to D where Y is the id for the message(and the view)<br>
068: * (2) B receives X{A-ldr, A-src, mbrs-A,B,C,D}, sends X{A-ldr, A-src, mbrs-A,B,C,D} to C <br>
069: * (2) D receives Y{C-ldr, C-src, mbrs-C,D,E} D is aware of A,B, sends Y{A-ldr, C-src, mbrs-A,B,C,D,E} to E<br>
070: * (3) C receives X{A-ldr, A-src, mbrs-A,B,C,D}, sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to D<br>
071: * (3) E receives Y{A-ldr, C-src, mbrs-A,B,C,D,E} sends Y{A-ldr, C-src, mbrs-A,B,C,D,E} to A<br>
072: * (4) D receives X{A-ldr, A-src, mbrs-A,B,C,D,E} sends sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to A<br>
073: * (4) A receives Y{A-ldr, C-src, mbrs-A,B,C,D,E}, holds the message, add E to its list of members<br>
074: * (5) A receives X{A-ldr, A-src, mbrs-A,B,C,D,E} <br>
075: * At this point, the state looks like<br>
076: * A - {A-ldr, mbrs-A,B,C,D,E, id=X}<br>
077: * B - {A-ldr, mbrs-A,B,C,D, id=X}<br>
078: * C - {A-ldr, mbrs-A,B,C,D,E, id=X}<br>
079: * D - {A-ldr, mbrs-A,B,C,D,E, id=X}<br>
080: * E - {A-ldr, mbrs-A,B,C,D,E, id=Y}<br>
081: * <br>
082: * A message doesn't stop until it reaches its original sender, unless its dropped by a higher leader.
083: * As you can see, E still thinks the viewId=Y, which is not correct. But at this point we have
084: * arrived at the same membership and all nodes are informed of each other.<br>
085: * To synchronize the rest we simply perform the following check at A when A receives X:<br>
086: * Original X{A-ldr, A-src, mbrs-A,B,C,D} == Arrived X{A-ldr, A-src, mbrs-A,B,C,D,E}<br>
087: * Since the condition is false, A, will resend the token, and A sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to B
088: * When A receives X again, the token is complete. <br>
089: * Optionally, A can send a message X{A-ldr, A-src, mbrs-A,B,C,D,E confirmed} to A,B,C,D,E who then
090: * install and accept the view.
091: * </p>
092: * <p>
093: * Lets assume that C1 arrives, C1 has lower priority than C, but higher priority than D.<br>
094: * Lets also assume that C1 sees the following view {B,D,E}<br>
095: * C1 waits for a token to arrive. When the token arrives, the same scenario as above will happen.<br>
096: * In the scenario where C1 sees {D,E} and A,B,C can not see C1, no token will ever arrive.<br>
097: * In this case, C1 sends a Z{C1-ldr, C1-src, mbrs-C1,D,E} to D<br>
098: * D receives Z{C1-ldr, C1-src, mbrs-C1,D,E} and sends Z{A-ldr, C1-src, mbrs-A,B,C,C1,D,E} to E<br>
099: * E receives Z{A-ldr, C1-src, mbrs-A,B,C,C1,D,E} and sends it to A<br>
100: * A sends Z{A-ldr, A-src, mbrs-A,B,C,C1,D,E} to B and the chain continues until A receives the token again.
101: * At that time A optionally sends out Z{A-ldr, A-src, mbrs-A,B,C,C1,D,E, confirmed} to A,B,C,C1,D,E
102: * </p>
103: * <p>To ensure that the view gets implemented at all nodes at the same time,
104: * A will send out a VIEW_CONF message, this is the 'confirmed' message that is optional above.
105: * <p>Ideally, the interceptor below this one would be the TcpFailureDetector to ensure correct memberships</p>
106: *
107: * <p>The example above, of course can be simplified with a finite statemachine:<br>
108: * But I suck at writing state machines, my head gets all confused. One day I will document this algorithm though.<br>
109: * Maybe I'll do a state diagram :)
110: * </p>
111: * <h2>State Diagrams</h2>
112: * <a href="http://people.apache.org/~fhanik/tribes/docs/leader-election-initiate-election.jpg">Initiate an election</a><br><br>
113: * <a href="http://people.apache.org/~fhanik/tribes/docs/leader-election-message-arrives.jpg">Receive an election message</a><br><br>
114: *
115: * @author Filip Hanik
116: * @version 1.0
117: *
118: *
119: *
120: */
121: public class NonBlockingCoordinator extends ChannelInterceptorBase {
122:
123: /**
124: * header for a coordination message
125: */
126: protected static final byte[] COORD_HEADER = new byte[] { -86, 38,
127: -34, -29, -98, 90, 65, 63, -81, -122, -6, -110, 99, -54,
128: 13, 63 };
129: /**
130: * Coordination request
131: */
132: protected static final byte[] COORD_REQUEST = new byte[] { 104,
133: -95, -92, -42, 114, -36, 71, -19, -79, 20, 122, 101, -1,
134: -48, -49, 30 };
135: /**
136: * Coordination confirmation, for blocking installations
137: */
138: protected static final byte[] COORD_CONF = new byte[] { 67, 88,
139: 107, -86, 69, 23, 76, -70, -91, -23, -87, -25, -125, 86,
140: 75, 20 };
141:
142: /**
143: * Alive message
144: */
145: protected static final byte[] COORD_ALIVE = new byte[] { 79, -121,
146: -25, -15, -59, 5, 64, 94, -77, 113, -119, -88, 52, 114,
147: -56, -46, -18, 102, 10, 34, -127, -9, 71, 115, -70, 72,
148: -101, 88, 72, -124, 127, 111, 74, 76, -116, 50, 111, 103,
149: 65, 3, -77, 51, -35, 0, 119, 117, 9, -26, 119, 50, -75,
150: -105, -102, 36, 79, 37, -68, -84, -123, 15, -22, -109, 106,
151: -55 };
152: /**
153: * Time to wait for coordination timeout
154: */
155: protected long waitForCoordMsgTimeout = 15000;
156: /**
157: * Our current view
158: */
159: protected Membership view = null;
160: /**
161: * Out current viewId
162: */
163: protected UniqueId viewId;
164:
165: /**
166: * Our nonblocking membership
167: */
168: protected Membership membership = null;
169:
170: /**
171: * indicates that we are running an election
172: * and this is the one we are running
173: */
174: protected UniqueId suggestedviewId;
175: protected Membership suggestedView;
176:
177: protected boolean started = false;
178: protected final int startsvc = 0xFFFF;
179:
180: protected Object electionMutex = new Object();
181:
182: protected AtomicBoolean coordMsgReceived = new AtomicBoolean(false);
183:
184: public NonBlockingCoordinator() {
185: super ();
186: }
187:
188: //============================================================================================================
189: // COORDINATION HANDLING
190: //============================================================================================================
191:
192: public void startElection(boolean force) throws ChannelException {
193: synchronized (electionMutex) {
194: MemberImpl local = (MemberImpl) getLocalMember(false);
195: MemberImpl[] others = (MemberImpl[]) membership
196: .getMembers();
197: fireInterceptorEvent(new CoordinationEvent(
198: CoordinationEvent.EVT_START_ELECT, this ,
199: "Election initated"));
200: if (others.length == 0) {
201: this .viewId = new UniqueId(UUIDGenerator
202: .randomUUID(false));
203: this .view = new Membership(local, AbsoluteOrder.comp,
204: true);
205: this .handleViewConf(this .createElectionMsg(local,
206: others, local), local, view);
207: return; //the only member, no need for an election
208: }
209: if (suggestedviewId != null) {
210:
211: if (view != null
212: && Arrays.diff(view, suggestedView, local).length == 0
213: && Arrays.diff(suggestedView, view, local).length == 0) {
214: suggestedviewId = null;
215: suggestedView = null;
216: fireInterceptorEvent(new CoordinationEvent(
217: CoordinationEvent.EVT_ELECT_ABANDONED,
218: this ,
219: "Election abandoned, running election matches view"));
220: } else {
221: fireInterceptorEvent(new CoordinationEvent(
222: CoordinationEvent.EVT_ELECT_ABANDONED,
223: this ,
224: "Election abandoned, election running"));
225: }
226: return; //election already running, I'm not allowed to have two of them
227: }
228: if (view != null
229: && Arrays.diff(view, membership, local).length == 0
230: && Arrays.diff(membership, view, local).length == 0) {
231: fireInterceptorEvent(new CoordinationEvent(
232: CoordinationEvent.EVT_ELECT_ABANDONED, this ,
233: "Election abandoned, view matches membership"));
234: return; //already have this view installed
235: }
236: int prio = AbsoluteOrder.comp.compare(local, others[0]);
237: MemberImpl leader = (prio < 0) ? local : others[0];//am I the leader in my view?
238: if (local.equals(leader) || force) {
239: CoordinationMessage msg = createElectionMsg(local,
240: others, leader);
241: suggestedviewId = msg.getId();
242: suggestedView = new Membership(local,
243: AbsoluteOrder.comp, true);
244: Arrays.fill(suggestedView, msg.getMembers());
245: fireInterceptorEvent(new CoordinationEvent(
246: CoordinationEvent.EVT_PROCESS_ELECT, this ,
247: "Election, sending request"));
248: sendElectionMsg(local, others[0], msg);
249: } else {
250: try {
251: coordMsgReceived.set(false);
252: fireInterceptorEvent(new CoordinationEvent(
253: CoordinationEvent.EVT_WAIT_FOR_MSG, this ,
254: "Election, waiting for request"));
255: electionMutex.wait(waitForCoordMsgTimeout);
256: } catch (InterruptedException x) {
257: Thread.currentThread().interrupted();
258: }
259: if (suggestedviewId == null
260: && (!coordMsgReceived.get())) {
261: //no message arrived, send the coord msg
262: // fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_WAIT_FOR_MSG,this,"Election, waiting timed out."));
263: // startElection(true);
264: fireInterceptorEvent(new CoordinationEvent(
265: CoordinationEvent.EVT_ELECT_ABANDONED,
266: this ,
267: "Election abandoned, waiting timed out."));
268: } else {
269: fireInterceptorEvent(new CoordinationEvent(
270: CoordinationEvent.EVT_ELECT_ABANDONED,
271: this ,
272: "Election abandoned, received a message"));
273: }
274: }//end if
275:
276: }
277: }
278:
279: private CoordinationMessage createElectionMsg(MemberImpl local,
280: MemberImpl[] others, MemberImpl leader) {
281: Membership m = new Membership(local, AbsoluteOrder.comp, true);
282: Arrays.fill(m, others);
283: MemberImpl[] mbrs = m.getMembers();
284: m.reset();
285: CoordinationMessage msg = new CoordinationMessage(leader,
286: local, mbrs, new UniqueId(UUIDGenerator
287: .randomUUID(true)), this .COORD_REQUEST);
288: return msg;
289: }
290:
291: protected void sendElectionMsg(MemberImpl local, MemberImpl next,
292: CoordinationMessage msg) throws ChannelException {
293: fireInterceptorEvent(new CoordinationEvent(
294: CoordinationEvent.EVT_SEND_MSG, this ,
295: "Sending election message to(" + next.getName() + ")"));
296: super .sendMessage(new Member[] { next },
297: createData(msg, local), null);
298: }
299:
300: protected void sendElectionMsgToNextInline(MemberImpl local,
301: CoordinationMessage msg) throws ChannelException {
302: int next = Arrays.nextIndex(local, msg.getMembers());
303: int current = next;
304: msg.leader = msg.getMembers()[0];
305: boolean sent = false;
306: while (!sent && current >= 0) {
307: try {
308: sendElectionMsg(local,
309: (MemberImpl) msg.getMembers()[current], msg);
310: sent = true;
311: } catch (ChannelException x) {
312: log.warn("Unable to send election message to:"
313: + msg.getMembers()[current]);
314: current = Arrays.nextIndex(msg.getMembers()[current],
315: msg.getMembers());
316: if (current == next)
317: throw x;
318: }
319: }
320: }
321:
322: public Member getNextInLine(MemberImpl local, MemberImpl[] others) {
323: MemberImpl result = null;
324: for (int i = 0; i < others.length; i++) {
325:
326: }
327: return result;
328: }
329:
330: public ChannelData createData(CoordinationMessage msg,
331: MemberImpl local) {
332: msg.write();
333: ChannelData data = new ChannelData(true);
334: data.setAddress(local);
335: data.setMessage(msg.getBuffer());
336: data.setOptions(Channel.SEND_OPTIONS_USE_ACK);
337: data.setTimestamp(System.currentTimeMillis());
338: return data;
339: }
340:
341: protected void viewChange(UniqueId viewId, Member[] view) {
342: //invoke any listeners
343: }
344:
345: protected boolean alive(Member mbr) {
346: return TcpFailureDetector.memberAlive(mbr, COORD_ALIVE, false,
347: false, waitForCoordMsgTimeout, waitForCoordMsgTimeout,
348: getOptionFlag());
349: }
350:
351: protected Membership mergeOnArrive(CoordinationMessage msg,
352: Member sender) {
353: fireInterceptorEvent(new CoordinationEvent(
354: CoordinationEvent.EVT_PRE_MERGE, this , "Pre merge"));
355: MemberImpl local = (MemberImpl) getLocalMember(false);
356: Membership merged = new Membership(local, AbsoluteOrder.comp,
357: true);
358: Arrays.fill(merged, msg.getMembers());
359: Arrays.fill(merged, getMembers());
360: Member[] diff = Arrays.diff(merged, membership, local);
361: for (int i = 0; i < diff.length; i++) {
362: if (!alive(diff[i]))
363: merged.removeMember((MemberImpl) diff[i]);
364: else
365: memberAdded(diff[i], false);
366: }
367: fireInterceptorEvent(new CoordinationEvent(
368: CoordinationEvent.EVT_POST_MERGE, this , "Post merge"));
369: return merged;
370: }
371:
372: protected void processCoordMessage(CoordinationMessage msg,
373: Member sender) throws ChannelException {
374: if (!coordMsgReceived.get()) {
375: coordMsgReceived.set(true);
376: synchronized (electionMutex) {
377: electionMutex.notifyAll();
378: }
379: }
380: msg.timestamp = System.currentTimeMillis();
381: Membership merged = mergeOnArrive(msg, sender);
382: if (isViewConf(msg))
383: handleViewConf(msg, sender, merged);
384: else
385: handleToken(msg, sender, merged);
386: ClassLoader loader;
387:
388: }
389:
390: protected void handleToken(CoordinationMessage msg, Member sender,
391: Membership merged) throws ChannelException {
392: MemberImpl local = (MemberImpl) getLocalMember(false);
393: if (local.equals(msg.getSource())) {
394: //my message msg.src=local
395: handleMyToken(local, msg, sender, merged);
396: } else {
397: handleOtherToken(local, msg, sender, merged);
398: }
399: }
400:
401: protected void handleMyToken(MemberImpl local,
402: CoordinationMessage msg, Member sender, Membership merged)
403: throws ChannelException {
404: if (local.equals(msg.getLeader())) {
405: //no leadership change
406: if (Arrays.sameMembers(msg.getMembers(), merged
407: .getMembers())) {
408: msg.type = COORD_CONF;
409: super .sendMessage(Arrays
410: .remove(msg.getMembers(), local), createData(
411: msg, local), null);
412: handleViewConf(msg, local, merged);
413: } else {
414: //membership change
415: suggestedView = new Membership(local,
416: AbsoluteOrder.comp, true);
417: suggestedviewId = msg.getId();
418: Arrays.fill(suggestedView, merged.getMembers());
419: msg.view = (MemberImpl[]) merged.getMembers();
420: sendElectionMsgToNextInline(local, msg);
421: }
422: } else {
423: //leadership change
424: suggestedView = null;
425: suggestedviewId = null;
426: msg.view = (MemberImpl[]) merged.getMembers();
427: sendElectionMsgToNextInline(local, msg);
428: }
429: }
430:
431: protected void handleOtherToken(MemberImpl local,
432: CoordinationMessage msg, Member sender, Membership merged)
433: throws ChannelException {
434: if (local.equals(msg.getLeader())) {
435: //I am the new leader
436: //startElection(false);
437: } else {
438: msg.view = (MemberImpl[]) merged.getMembers();
439: sendElectionMsgToNextInline(local, msg);
440: }
441: }
442:
443: protected void handleViewConf(CoordinationMessage msg,
444: Member sender, Membership merged) throws ChannelException {
445: if (viewId != null && msg.getId().equals(viewId))
446: return;//we already have this view
447: view = new Membership((MemberImpl) getLocalMember(false),
448: AbsoluteOrder.comp, true);
449: Arrays.fill(view, msg.getMembers());
450: viewId = msg.getId();
451:
452: if (viewId.equals(suggestedviewId)) {
453: suggestedView = null;
454: suggestedviewId = null;
455: }
456:
457: if (suggestedView != null
458: && AbsoluteOrder.comp.compare(suggestedView
459: .getMembers()[0], merged.getMembers()[0]) < 0) {
460: suggestedView = null;
461: suggestedviewId = null;
462: }
463:
464: viewChange(viewId, view.getMembers());
465: fireInterceptorEvent(new CoordinationEvent(
466: CoordinationEvent.EVT_CONF_RX, this , "Accepted View"));
467:
468: if (suggestedviewId == null
469: && hasHigherPriority(merged.getMembers(), membership
470: .getMembers())) {
471: startElection(false);
472: }
473: }
474:
475: protected boolean isViewConf(CoordinationMessage msg) {
476: return Arrays.contains(msg.getType(), 0, COORD_CONF, 0,
477: COORD_CONF.length);
478: }
479:
480: protected boolean hasHigherPriority(Member[] complete,
481: Member[] local) {
482: if (local == null || local.length == 0)
483: return false;
484: if (complete == null || complete.length == 0)
485: return true;
486: AbsoluteOrder.absoluteOrder(complete);
487: AbsoluteOrder.absoluteOrder(local);
488: return (AbsoluteOrder.comp.compare(complete[0], local[0]) > 0);
489:
490: }
491:
492: /**
493: * Returns coordinator if one is available
494: * @return Member
495: */
496: public Member getCoordinator() {
497: return (view != null && view.hasMembers()) ? view.getMembers()[0]
498: : null;
499: }
500:
501: public Member[] getView() {
502: return (view != null && view.hasMembers()) ? view.getMembers()
503: : new Member[0];
504: }
505:
506: public UniqueId getViewId() {
507: return viewId;
508: }
509:
510: /**
511: * Block in/out messages while a election is going on
512: */
513: protected void halt() {
514:
515: }
516:
517: /**
518: * Release lock for in/out messages election is completed
519: */
520: protected void release() {
521:
522: }
523:
524: /**
525: * Wait for an election to end
526: */
527: protected void waitForRelease() {
528:
529: }
530:
531: //============================================================================================================
532: // OVERRIDDEN METHODS FROM CHANNEL INTERCEPTOR BASE
533: //============================================================================================================
534: public void start(int svc) throws ChannelException {
535: if (membership == null)
536: setupMembership();
537: if (started)
538: return;
539: fireInterceptorEvent(new CoordinationEvent(
540: CoordinationEvent.EVT_START, this , "Before start"));
541: super .start(startsvc);
542: started = true;
543: if (view == null)
544: view = new Membership((MemberImpl) super
545: .getLocalMember(true), AbsoluteOrder.comp, true);
546: fireInterceptorEvent(new CoordinationEvent(
547: CoordinationEvent.EVT_START, this , "After start"));
548: startElection(false);
549: }
550:
551: public void stop(int svc) throws ChannelException {
552: try {
553: halt();
554: synchronized (electionMutex) {
555: if (!started)
556: return;
557: started = false;
558: fireInterceptorEvent(new CoordinationEvent(
559: CoordinationEvent.EVT_STOP, this , "Before stop"));
560: super .stop(startsvc);
561: this .view = null;
562: this .viewId = null;
563: this .suggestedView = null;
564: this .suggestedviewId = null;
565: this .membership.reset();
566: fireInterceptorEvent(new CoordinationEvent(
567: CoordinationEvent.EVT_STOP, this , "After stop"));
568: }
569: } finally {
570: release();
571: }
572: }
573:
574: public void sendMessage(Member[] destination, ChannelMessage msg,
575: InterceptorPayload payload) throws ChannelException {
576: waitForRelease();
577: super .sendMessage(destination, msg, payload);
578: }
579:
580: public void messageReceived(ChannelMessage msg) {
581: if (Arrays.contains(msg.getMessage().getBytesDirect(), 0,
582: COORD_ALIVE, 0, COORD_ALIVE.length)) {
583: //ignore message, its an alive message
584: fireInterceptorEvent(new CoordinationEvent(
585: CoordinationEvent.EVT_MSG_ARRIVE, this ,
586: "Alive Message"));
587:
588: } else if (Arrays.contains(msg.getMessage().getBytesDirect(),
589: 0, COORD_HEADER, 0, COORD_HEADER.length)) {
590: try {
591: CoordinationMessage cmsg = new CoordinationMessage(msg
592: .getMessage());
593: Member[] cmbr = cmsg.getMembers();
594: fireInterceptorEvent(new CoordinationEvent(
595: CoordinationEvent.EVT_MSG_ARRIVE, this ,
596: "Coord Msg Arrived("
597: + Arrays.toNameString(cmbr) + ")"));
598: processCoordMessage(cmsg, msg.getAddress());
599: } catch (ChannelException x) {
600: log
601: .error(
602: "Error processing coordination message. Could be fatal.",
603: x);
604: }
605: } else {
606: super .messageReceived(msg);
607: }
608: }
609:
610: public boolean accept(ChannelMessage msg) {
611: return super .accept(msg);
612: }
613:
614: public void memberAdded(Member member) {
615: memberAdded(member, true);
616: }
617:
618: public void memberAdded(Member member, boolean elect) {
619: try {
620: if (membership == null)
621: setupMembership();
622: if (membership.memberAlive((MemberImpl) member))
623: super .memberAdded(member);
624: try {
625: fireInterceptorEvent(new CoordinationEvent(
626: CoordinationEvent.EVT_MBR_ADD, this ,
627: "Member add(" + member.getName() + ")"));
628: if (started && elect)
629: startElection(false);
630: } catch (ChannelException x) {
631: log
632: .error(
633: "Unable to start election when member was added.",
634: x);
635: }
636: } finally {
637: }
638:
639: }
640:
641: public void memberDisappeared(Member member) {
642: try {
643:
644: membership.removeMember((MemberImpl) member);
645: super .memberDisappeared(member);
646: try {
647: fireInterceptorEvent(new CoordinationEvent(
648: CoordinationEvent.EVT_MBR_DEL, this ,
649: "Member remove(" + member.getName() + ")"));
650: if (started && (isCoordinator() || isHighest()))
651: startElection(true); //to do, if a member disappears, only the coordinator can start
652: } catch (ChannelException x) {
653: log
654: .error(
655: "Unable to start election when member was removed.",
656: x);
657: }
658: } finally {
659: }
660: }
661:
662: public boolean isHighest() {
663: Member local = getLocalMember(false);
664: if (membership.getMembers().length == 0)
665: return true;
666: else
667: return AbsoluteOrder.comp.compare(local, membership
668: .getMembers()[0]) <= 0;
669: }
670:
671: public boolean isCoordinator() {
672: Member coord = getCoordinator();
673: return coord != null && getLocalMember(false).equals(coord);
674: }
675:
676: public void heartbeat() {
677: try {
678: MemberImpl local = (MemberImpl) getLocalMember(false);
679: if (view != null
680: && (Arrays.diff(view, membership, local).length != 0 || Arrays
681: .diff(membership, view, local).length != 0)) {
682: if (isHighest()) {
683: fireInterceptorEvent(new CoordinationEvent(
684: CoordinationEvent.EVT_START_ELECT, this ,
685: "Heartbeat found inconsistency, restart election"));
686: startElection(true);
687: }
688: }
689: } catch (Exception x) {
690: log.error("Unable to perform heartbeat.", x);
691: } finally {
692: super .heartbeat();
693: }
694: }
695:
696: /**
697: * has members
698: */
699: public boolean hasMembers() {
700:
701: return membership.hasMembers();
702: }
703:
704: /**
705: * Get all current cluster members
706: * @return all members or empty array
707: */
708: public Member[] getMembers() {
709:
710: return membership.getMembers();
711: }
712:
713: /**
714: *
715: * @param mbr Member
716: * @return Member
717: */
718: public Member getMember(Member mbr) {
719:
720: return membership.getMember(mbr);
721: }
722:
723: /**
724: * Return the member that represents this node.
725: *
726: * @return Member
727: */
728: public Member getLocalMember(boolean incAlive) {
729: Member local = super .getLocalMember(incAlive);
730: if (view == null && (local != null))
731: setupMembership();
732: return local;
733: }
734:
735: protected synchronized void setupMembership() {
736: if (membership == null) {
737: membership = new Membership((MemberImpl) super
738: .getLocalMember(true), AbsoluteOrder.comp, false);
739: }
740: }
741:
742: //============================================================================================================
743: // HELPER CLASSES FOR COORDINATION
744: //============================================================================================================
745:
746: public static class CoordinationMessage {
747: //X{A-ldr, A-src, mbrs-A,B,C,D}
748: protected XByteBuffer buf;
749: protected MemberImpl leader;
750: protected MemberImpl source;
751: protected MemberImpl[] view;
752: protected UniqueId id;
753: protected byte[] type;
754: protected long timestamp = System.currentTimeMillis();
755:
756: public CoordinationMessage(XByteBuffer buf) {
757: this .buf = buf;
758: parse();
759: }
760:
761: public CoordinationMessage(MemberImpl leader,
762: MemberImpl source, MemberImpl[] view, UniqueId id,
763: byte[] type) {
764: this .buf = new XByteBuffer(4096, false);
765: this .leader = leader;
766: this .source = source;
767: this .view = view;
768: this .id = id;
769: this .type = type;
770: this .write();
771: }
772:
773: public byte[] getHeader() {
774: return NonBlockingCoordinator.COORD_HEADER;
775: }
776:
777: public MemberImpl getLeader() {
778: if (leader == null)
779: parse();
780: return leader;
781: }
782:
783: public MemberImpl getSource() {
784: if (source == null)
785: parse();
786: return source;
787: }
788:
789: public UniqueId getId() {
790: if (id == null)
791: parse();
792: return id;
793: }
794:
795: public MemberImpl[] getMembers() {
796: if (view == null)
797: parse();
798: return view;
799: }
800:
801: public byte[] getType() {
802: if (type == null)
803: parse();
804: return type;
805: }
806:
807: public XByteBuffer getBuffer() {
808: return this .buf;
809: }
810:
811: public void parse() {
812: //header
813: int offset = 16;
814: //leader
815: int ldrLen = buf.toInt(buf.getBytesDirect(), offset);
816: offset += 4;
817: byte[] ldr = new byte[ldrLen];
818: System.arraycopy(buf.getBytesDirect(), offset, ldr, 0,
819: ldrLen);
820: leader = MemberImpl.getMember(ldr);
821: offset += ldrLen;
822: //source
823: int srcLen = buf.toInt(buf.getBytesDirect(), offset);
824: offset += 4;
825: byte[] src = new byte[srcLen];
826: System.arraycopy(buf.getBytesDirect(), offset, src, 0,
827: srcLen);
828: source = MemberImpl.getMember(src);
829: offset += srcLen;
830: //view
831: int mbrCount = buf.toInt(buf.getBytesDirect(), offset);
832: offset += 4;
833: view = new MemberImpl[mbrCount];
834: for (int i = 0; i < view.length; i++) {
835: int mbrLen = buf.toInt(buf.getBytesDirect(), offset);
836: offset += 4;
837: byte[] mbr = new byte[mbrLen];
838: System.arraycopy(buf.getBytesDirect(), offset, mbr, 0,
839: mbrLen);
840: view[i] = MemberImpl.getMember(mbr);
841: offset += mbrLen;
842: }
843: //id
844: this .id = new UniqueId(buf.getBytesDirect(), offset, 16);
845: offset += 16;
846: type = new byte[16];
847: System.arraycopy(buf.getBytesDirect(), offset, type, 0,
848: type.length);
849: offset += 16;
850:
851: }
852:
853: public void write() {
854: buf.reset();
855: //header
856: buf.append(COORD_HEADER, 0, COORD_HEADER.length);
857: //leader
858: byte[] ldr = leader.getData(false, false);
859: buf.append(ldr.length);
860: buf.append(ldr, 0, ldr.length);
861: ldr = null;
862: //source
863: byte[] src = source.getData(false, false);
864: buf.append(src.length);
865: buf.append(src, 0, src.length);
866: src = null;
867: //view
868: buf.append(view.length);
869: for (int i = 0; i < view.length; i++) {
870: byte[] mbr = view[i].getData(false, false);
871: buf.append(mbr.length);
872: buf.append(mbr, 0, mbr.length);
873: }
874: //id
875: buf.append(id.getBytes(), 0, id.getBytes().length);
876: buf.append(type, 0, type.length);
877: }
878: }
879:
880: public void fireInterceptorEvent(InterceptorEvent event) {
881: if (event instanceof CoordinationEvent
882: && ((CoordinationEvent) event).type == CoordinationEvent.EVT_CONF_RX)
883: log.info(event);
884: }
885:
886: public static class CoordinationEvent implements InterceptorEvent {
887: public static final int EVT_START = 1;
888: public static final int EVT_MBR_ADD = 2;
889: public static final int EVT_MBR_DEL = 3;
890: public static final int EVT_START_ELECT = 4;
891: public static final int EVT_PROCESS_ELECT = 5;
892: public static final int EVT_MSG_ARRIVE = 6;
893: public static final int EVT_PRE_MERGE = 7;
894: public static final int EVT_POST_MERGE = 8;
895: public static final int EVT_WAIT_FOR_MSG = 9;
896: public static final int EVT_SEND_MSG = 10;
897: public static final int EVT_STOP = 11;
898: public static final int EVT_CONF_RX = 12;
899: public static final int EVT_ELECT_ABANDONED = 13;
900:
901: int type;
902: ChannelInterceptor interceptor;
903: Member coord;
904: Member[] mbrs;
905: String info;
906: Membership view;
907: Membership suggestedView;
908:
909: public CoordinationEvent(int type,
910: ChannelInterceptor interceptor, String info) {
911: this .type = type;
912: this .interceptor = interceptor;
913: this .coord = ((NonBlockingCoordinator) interceptor)
914: .getCoordinator();
915: this .mbrs = ((NonBlockingCoordinator) interceptor).membership
916: .getMembers();
917: this .info = info;
918: this .view = ((NonBlockingCoordinator) interceptor).view;
919: this .suggestedView = ((NonBlockingCoordinator) interceptor).suggestedView;
920: }
921:
922: public int getEventType() {
923: return type;
924: }
925:
926: public String getEventTypeDesc() {
927: switch (type) {
928: case EVT_START:
929: return "EVT_START:" + info;
930: case EVT_MBR_ADD:
931: return "EVT_MBR_ADD:" + info;
932: case EVT_MBR_DEL:
933: return "EVT_MBR_DEL:" + info;
934: case EVT_START_ELECT:
935: return "EVT_START_ELECT:" + info;
936: case EVT_PROCESS_ELECT:
937: return "EVT_PROCESS_ELECT:" + info;
938: case EVT_MSG_ARRIVE:
939: return "EVT_MSG_ARRIVE:" + info;
940: case EVT_PRE_MERGE:
941: return "EVT_PRE_MERGE:" + info;
942: case EVT_POST_MERGE:
943: return "EVT_POST_MERGE:" + info;
944: case EVT_WAIT_FOR_MSG:
945: return "EVT_WAIT_FOR_MSG:" + info;
946: case EVT_SEND_MSG:
947: return "EVT_SEND_MSG:" + info;
948: case EVT_STOP:
949: return "EVT_STOP:" + info;
950: case EVT_CONF_RX:
951: return "EVT_CONF_RX:" + info;
952: case EVT_ELECT_ABANDONED:
953: return "EVT_ELECT_ABANDONED:" + info;
954: default:
955: return "Unknown";
956: }
957: }
958:
959: public ChannelInterceptor getInterceptor() {
960: return interceptor;
961: }
962:
963: public String toString() {
964: StringBuffer buf = new StringBuffer(
965: "CoordinationEvent[type=");
966: buf.append(type).append("\n\tLocal:");
967: Member local = interceptor.getLocalMember(false);
968: buf.append(local != null ? local.getName() : "").append(
969: "\n\tCoord:");
970: buf.append(coord != null ? coord.getName() : "").append(
971: "\n\tView:");
972: buf.append(
973: Arrays.toNameString(view != null ? view
974: .getMembers() : null)).append(
975: "\n\tSuggested View:");
976: buf
977: .append(
978: Arrays
979: .toNameString(suggestedView != null ? suggestedView
980: .getMembers()
981: : null)).append(
982: "\n\tMembers:");
983: buf.append(Arrays.toNameString(mbrs)).append("\n\tInfo:");
984: buf.append(info).append("]");
985: return buf.toString();
986: }
987: }
988:
989: }
|