001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.catalina.tribes.test.interceptors;
018:
019: import org.apache.catalina.tribes.Channel;
020: import org.apache.catalina.tribes.Member;
021: import org.apache.catalina.tribes.group.GroupChannel;
022: import org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator;
023: import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
024: import junit.framework.TestCase;
025: import junit.framework.TestResult;
026: import junit.framework.TestSuite;
027:
028: public class TestNonBlockingCoordinator extends TestCase {
029:
030: GroupChannel[] channels = null;
031: NonBlockingCoordinator[] coordinators = null;
032: int channelCount = 10;
033: Thread[] threads = null;
034:
035: protected void setUp() throws Exception {
036: System.out.println("Setup");
037: super .setUp();
038: channels = new GroupChannel[channelCount];
039: coordinators = new NonBlockingCoordinator[channelCount];
040: threads = new Thread[channelCount];
041: for (int i = 0; i < channelCount; i++) {
042: channels[i] = new GroupChannel();
043: coordinators[i] = new NonBlockingCoordinator();
044: channels[i].addInterceptor(coordinators[i]);
045: channels[i].addInterceptor(new TcpFailureDetector());
046: final int j = i;
047: threads[i] = new Thread() {
048: public void run() {
049: try {
050: channels[j].start(Channel.DEFAULT);
051: Thread.sleep(50);
052: } catch (Exception x) {
053: x.printStackTrace();
054: }
055: }
056: };
057: }
058: for (int i = 0; i < channelCount; i++)
059: threads[i].start();
060: for (int i = 0; i < channelCount; i++)
061: threads[i].join();
062: Thread.sleep(1000);
063: }
064:
065: public void testCoord1() throws Exception {
066: for (int i = 1; i < channelCount; i++)
067: assertEquals("Message count expected to be equal.",
068: channels[i - 1].getMembers().length, channels[i]
069: .getMembers().length);
070: Member member = coordinators[0].getCoordinator();
071: int cnt = 0;
072: while (member == null && (cnt++ < 100))
073: try {
074: Thread.sleep(100);
075: member = coordinators[0].getCoordinator();
076: } catch (Exception x) {
077: }
078: for (int i = 0; i < channelCount; i++)
079: super
080: .assertEquals(member, coordinators[i]
081: .getCoordinator());
082: System.out.println("Coordinator[1] is:" + member);
083:
084: }
085:
086: public void testCoord2() throws Exception {
087: Member member = coordinators[1].getCoordinator();
088: System.out.println("Coordinator[2a] is:" + member);
089: int index = -1;
090: for (int i = 0; i < channelCount; i++) {
091: if (channels[i].getLocalMember(false).equals(member)) {
092: System.out.println("Shutting down:"
093: + channels[i].getLocalMember(true).toString());
094: channels[i].stop(Channel.DEFAULT);
095: index = i;
096: }
097: }
098: int dead = index;
099: Thread.sleep(1000);
100: if (index == 0)
101: index = 1;
102: else
103: index = 0;
104: System.out.println("Member count:"
105: + channels[index].getMembers().length);
106: member = coordinators[index].getCoordinator();
107: for (int i = 1; i < channelCount; i++)
108: if (i != dead)
109: super .assertEquals(member, coordinators[i]
110: .getCoordinator());
111: System.out.println("Coordinator[2b] is:" + member);
112: }
113:
114: protected void tearDown() throws Exception {
115: System.out.println("tearDown");
116: super .tearDown();
117: for (int i = 0; i < channelCount; i++) {
118: channels[i].stop(Channel.DEFAULT);
119: }
120: }
121:
122: public static void main(String[] args) throws Exception {
123: TestSuite suite = new TestSuite();
124: suite.addTestSuite(TestNonBlockingCoordinator.class);
125: suite.run(new TestResult());
126: }
127:
128: }
|