Source Code Cross Referenced for MultiplexerTest.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » tests » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.tests 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        package org.jgroups.tests;
0002:
0003:        import junit.framework.Test;
0004:        import junit.framework.TestSuite;
0005:        import org.jgroups.*;
0006:        import org.jgroups.mux.MuxChannel;
0007:        import org.jgroups.stack.IpAddress;
0008:        import org.jgroups.stack.ProtocolStack;
0009:        import org.jgroups.stack.Protocol;
0010:        import org.jgroups.util.Util;
0011:
0012:        import java.util.*;
0013:        import java.io.*;
0014:
0015:        /**
0016:         * Test the multiplexer functionality provided by JChannelFactory
0017:         * @author Bela Ban
0018:         * @version $Id: MultiplexerTest.java,v 1.31.2.1 2006/12/04 22:45:49 vlada Exp $
0019:         */
0020:        public class MultiplexerTest extends ChannelTestBase {
0021:            private Cache c1, c2, c1_repl, c2_repl;
0022:            private Channel ch1, ch2, ch1_repl, ch2_repl;
0023:            JChannelFactory factory, factory2;
0024:
0025:            public MultiplexerTest(String name) {
0026:                super (name);
0027:            }
0028:
0029:            public void setUp() throws Exception {
0030:                super .setUp();
0031:                factory = new JChannelFactory();
0032:                factory.setMultiplexerConfig(MUX_CHANNEL_CONFIG);
0033:
0034:                factory2 = new JChannelFactory();
0035:                factory2.setMultiplexerConfig(MUX_CHANNEL_CONFIG);
0036:            }
0037:
0038:            public void tearDown() throws Exception {
0039:                if (ch1_repl != null)
0040:                    ch1_repl.close();
0041:                if (ch2_repl != null)
0042:                    ch2_repl.close();
0043:                if (ch1 != null)
0044:                    ch1.close();
0045:                if (ch2 != null)
0046:                    ch2.close();
0047:                if (ch1 != null) {
0048:                    assertFalse(((MuxChannel) ch1).getChannel().isOpen());
0049:                    assertFalse(((MuxChannel) ch1).getChannel().isConnected());
0050:                }
0051:                if (ch2 != null) {
0052:                    assertFalse(((MuxChannel) ch2).getChannel().isOpen());
0053:                    assertFalse(((MuxChannel) ch2).getChannel().isConnected());
0054:                }
0055:                if (ch1_repl != null) {
0056:                    assertFalse(((MuxChannel) ch1_repl).getChannel().isOpen());
0057:                    assertFalse(((MuxChannel) ch1_repl).getChannel()
0058:                            .isConnected());
0059:                }
0060:                if (ch2_repl != null) {
0061:                    assertFalse(((MuxChannel) ch2_repl).getChannel().isOpen());
0062:                    assertFalse(((MuxChannel) ch2_repl).getChannel()
0063:                            .isConnected());
0064:                }
0065:
0066:                if (c1 != null)
0067:                    c1.clear();
0068:                if (c2 != null)
0069:                    c2.clear();
0070:                if (c1_repl != null)
0071:                    c1_repl.clear();
0072:                if (c2_repl != null)
0073:                    c2_repl.clear();
0074:
0075:                ch1_repl = ch2_repl = ch1 = ch2 = null;
0076:                c1 = c2 = c1_repl = c2_repl = null;
0077:
0078:                super .tearDown();
0079:            }
0080:
0081:            public void testReplicationWithOneChannel() throws Exception {
0082:                ch1 = factory.createMultiplexerChannel(
0083:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0084:                ch1.connect("bla");
0085:                c1 = new Cache(ch1, "cache-1");
0086:                assertEquals("cache has to be empty initially", 0, c1.size());
0087:                c1.put("name", "Bela");
0088:                Util.sleep(300); // we need to wait because replication is asynchronous here
0089:                assertEquals(1, c1.size());
0090:                assertEquals("Bela", c1.get("name"));
0091:            }
0092:
0093:            public void testLifecycle() throws Exception {
0094:                ch1 = factory.createMultiplexerChannel(
0095:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0096:                assertTrue(ch1.isOpen());
0097:                assertFalse(ch1.isConnected());
0098:
0099:                ch1.connect("bla");
0100:                assertTrue(ch1.isOpen());
0101:                assertTrue(ch1.isConnected());
0102:
0103:                ch2 = factory.createMultiplexerChannel(
0104:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
0105:                assertTrue(ch2.isOpen());
0106:                assertFalse(ch2.isConnected());
0107:
0108:                ch2.connect("bla");
0109:                assertTrue(ch2.isOpen());
0110:                assertTrue(ch2.isConnected());
0111:
0112:                ch2.disconnect();
0113:                assertTrue(ch2.isOpen());
0114:                assertFalse(ch2.isConnected());
0115:
0116:                ch2.connect("bla");
0117:                assertTrue(ch2.isOpen());
0118:                assertTrue(ch2.isConnected());
0119:
0120:                ch2.disconnect();
0121:                assertTrue(ch2.isOpen());
0122:                assertFalse(ch2.isConnected());
0123:
0124:                ch2.close();
0125:                assertFalse(ch2.isOpen());
0126:                assertFalse(ch2.isConnected());
0127:
0128:                ch2 = factory.createMultiplexerChannel(
0129:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
0130:                ch2.connect("bla");
0131:                assertTrue(ch2.isOpen());
0132:                assertTrue(ch2.isConnected());
0133:
0134:                ch2.close();
0135:                assertFalse(ch2.isOpen());
0136:                assertFalse(ch2.isConnected());
0137:            }
0138:
0139:            public void testDisconnect() throws Exception {
0140:                ch1 = factory.createMultiplexerChannel(
0141:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0142:                assertTrue(ch1.isOpen());
0143:                assertFalse(ch1.isConnected());
0144:                assertTrue(((MuxChannel) ch1).getChannel().isOpen());
0145:                assertFalse(((MuxChannel) ch1).getChannel().isConnected());
0146:
0147:                ch1.connect("bla");
0148:                assertTrue(ch1.isOpen());
0149:                assertTrue(ch1.isConnected());
0150:                assertTrue(((MuxChannel) ch1).getChannel().isOpen());
0151:                assertTrue(((MuxChannel) ch1).getChannel().isConnected());
0152:
0153:                ch2 = factory.createMultiplexerChannel(
0154:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
0155:                assertTrue(ch2.isOpen());
0156:                assertFalse(ch2.isConnected());
0157:
0158:                ch1.disconnect();
0159:                assertTrue(ch1.isOpen());
0160:                assertFalse(ch1.isConnected());
0161:
0162:                ch1.connect("bla");
0163:                assertTrue(ch1.isOpen());
0164:                assertTrue(ch1.isConnected());
0165:
0166:                ch1.close();
0167:                assertFalse(ch1.isOpen());
0168:                assertFalse(ch1.isConnected());
0169:                assertTrue(((MuxChannel) ch1).getChannel().isOpen());
0170:                assertTrue(((MuxChannel) ch1).getChannel().isConnected());
0171:
0172:                ch2.close();
0173:                assertFalse(ch2.isOpen());
0174:                assertFalse(ch2.isConnected());
0175:            }
0176:
0177:            public void testDisconnect2() throws Exception {
0178:                ch1 = factory.createMultiplexerChannel(
0179:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0180:                assertTrue(ch1.isOpen());
0181:                assertFalse(ch1.isConnected());
0182:
0183:                ch1.connect("bla");
0184:                assertTrue(ch1.isOpen());
0185:                assertTrue(ch1.isConnected());
0186:
0187:                ch2 = factory.createMultiplexerChannel(
0188:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
0189:                assertTrue(ch2.isOpen());
0190:                assertFalse(ch2.isConnected());
0191:
0192:                ch1.disconnect();
0193:                assertTrue(ch1.isOpen());
0194:                assertFalse(ch1.isConnected());
0195:
0196:                assertTrue(ch2.isOpen());
0197:                assertFalse(ch2.isConnected());
0198:
0199:                ch1.connect("bla");
0200:                assertTrue(ch1.isOpen());
0201:                assertTrue(ch1.isConnected());
0202:
0203:                assertTrue(ch2.isOpen());
0204:                assertFalse(ch2.isConnected());
0205:            }
0206:
0207:            public void testClose() throws Exception {
0208:                ch1 = factory.createMultiplexerChannel(
0209:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0210:                ch1.connect("bla");
0211:                ch2 = factory.createMultiplexerChannel(
0212:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
0213:                ch2.connect("bla");
0214:                ch1.close();
0215:                ch2.close();
0216:            }
0217:
0218:            public void testReplicationWithTwoChannels() throws Exception {
0219:                ch1 = factory.createMultiplexerChannel(
0220:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0221:                c1 = new Cache(ch1, "cache-1");
0222:                assertEquals("cache has to be empty initially", 0, c1.size());
0223:                ch1.connect("bla");
0224:
0225:                ch1_repl = factory2.createMultiplexerChannel(
0226:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0227:                c1_repl = new Cache(ch1_repl, "cache-1-repl");
0228:                assertEquals("cache has to be empty initially", 0, c1_repl
0229:                        .size());
0230:                ch1_repl.connect("bla");
0231:
0232:                View v = ch1_repl.getView();
0233:                assertNotNull(v);
0234:                assertEquals(2, v.size());
0235:
0236:                // System.out.println("****** [c1] PUT(name, Bela) *******");
0237:                c1.put("name", "Bela");
0238:                if (ch1.flushSupported())
0239:                    ch1.startFlush(5000, true);
0240:                else
0241:                    Util.sleep(10000);
0242:
0243:                System.out.println("c1: " + c1 + ", c1_repl: " + c1_repl);
0244:
0245:                assertEquals(1, c1.size());
0246:                assertEquals("Bela", c1.get("name"));
0247:
0248:                assertEquals(1, c1_repl.size());
0249:                assertEquals("Bela", c1_repl.get("name"));
0250:
0251:                c1.put("id", new Long(322649));
0252:                c1_repl.put("hobbies", "biking");
0253:                c1_repl.put("bike", "Centurion");
0254:                if (ch1.flushSupported())
0255:                    ch1.startFlush(5000, true);
0256:                else
0257:                    Util.sleep(10000);
0258:
0259:                System.out.println("c1: " + c1 + ", c1_repl: " + c1_repl);
0260:
0261:                assertEquals(4, c1.size());
0262:                assertEquals(4, c1_repl.size());
0263:
0264:                assertEquals(new Long(322649), c1.get("id"));
0265:                assertEquals(new Long(322649), c1_repl.get("id"));
0266:
0267:                assertEquals("biking", c1.get("hobbies"));
0268:                assertEquals("biking", c1_repl.get("hobbies"));
0269:
0270:                assertEquals("Centurion", c1.get("bike"));
0271:                assertEquals("Centurion", c1_repl.get("bike"));
0272:            }
0273:
0274:            public void testReplicationWithReconnect() throws Exception {
0275:                ch1 = factory.createMultiplexerChannel(
0276:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0277:                ch1.connect("bla");
0278:                c1 = new Cache(ch1, "cache-1");
0279:                assertEquals("cache has to be empty initially", 0, c1.size());
0280:                c1.put("name", "Bela");
0281:                Util.sleep(300); // we need to wait because replication is asynchronous here
0282:                assertEquals(1, c1.size());
0283:                assertEquals("Bela", c1.get("name"));
0284:
0285:                ch1.disconnect();
0286:
0287:                ch1.connect("bla");
0288:
0289:                c2 = new Cache(ch1, "cache-1");
0290:                assertEquals("cache has to be empty initially", 0, c2.size());
0291:                c2.put("name", "Bela");
0292:                Util.sleep(300); // we need to wait because replication is asynchronous here
0293:                assertEquals(1, c2.size());
0294:                assertEquals("Bela", c2.get("name"));
0295:
0296:            }
0297:
0298:            public void testStateTransfer() throws Exception {
0299:                ch1 = factory.createMultiplexerChannel(
0300:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0301:                ch1.connect("bla");
0302:                c1 = new Cache(ch1, "cache-1");
0303:                assertEquals("cache has to be empty initially", 0, c1.size());
0304:
0305:                ch1_repl = factory2.createMultiplexerChannel(
0306:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0307:
0308:                c1.put("name", "Bela");
0309:                c1.put("id", new Long(322649));
0310:                c1.put("hobbies", "biking");
0311:                c1.put("bike", "Centurion");
0312:
0313:                ch1_repl.connect("bla");
0314:                c1_repl = new Cache(ch1_repl, "cache-1-repl");
0315:                boolean rc = ch1_repl.getState(null, 5000);
0316:                System.out.println("state transfer: " + rc);
0317:                Util.sleep(500);
0318:
0319:                System.out.println("c1_repl: " + c1_repl);
0320:                assertEquals("initial state should have been transferred", 4,
0321:                        c1_repl.size());
0322:
0323:                assertEquals(new Long(322649), c1.get("id"));
0324:                assertEquals(new Long(322649), c1_repl.get("id"));
0325:
0326:                assertEquals("biking", c1.get("hobbies"));
0327:                assertEquals("biking", c1_repl.get("hobbies"));
0328:
0329:                assertEquals("Centurion", c1.get("bike"));
0330:                assertEquals("Centurion", c1_repl.get("bike"));
0331:            }
0332:
0333:            public void testStateTransferWithTwoApplications() throws Exception {
0334:                ch1 = factory.createMultiplexerChannel(
0335:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0336:                ch1.connect("bla");
0337:                c1 = new Cache(ch1, "cache-1");
0338:                assertEquals("cache has to be empty initially", 0, c1.size());
0339:
0340:                ch2 = factory.createMultiplexerChannel(
0341:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
0342:                ch2.connect("bla");
0343:                c2 = new Cache(ch2, "cache-2");
0344:                assertEquals("cache has to be empty initially", 0, c2.size());
0345:
0346:                ch1_repl = factory2.createMultiplexerChannel(
0347:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0348:
0349:                ch2_repl = factory2.createMultiplexerChannel(
0350:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
0351:
0352:                c1.put("name", "cache-1");
0353:                c2.put("name", "cache-2");
0354:
0355:                ch1_repl.connect("bla");
0356:                c1_repl = new Cache(ch1_repl, "cache-1-repl");
0357:                boolean rc = ch1_repl.getState(null, 5000);
0358:                System.out.println("state transfer: " + rc);
0359:
0360:                ch2_repl.connect("bla");
0361:                c2_repl = new Cache(ch2_repl, "cache-2-repl");
0362:                rc = ch2_repl.getState(null, 5000);
0363:                System.out.println("state transfer: " + rc);
0364:                Util.sleep(500);
0365:
0366:                System.out.println("Caches after state transfers:");
0367:                System.out.println("c1: " + c1);
0368:                System.out.println("c1_repl: " + c1_repl);
0369:                System.out.println("c2: " + c2);
0370:                System.out.println("c2_repl: " + c2_repl);
0371:
0372:                assertEquals(1, c1.size());
0373:                assertEquals(1, c1_repl.size());
0374:
0375:                assertEquals(1, c2.size());
0376:                assertEquals(1, c2_repl.size());
0377:
0378:                assertEquals("cache-1", c1.get("name"));
0379:                assertEquals("cache-1", c1_repl.get("name"));
0380:
0381:                assertEquals("cache-2", c2.get("name"));
0382:                assertEquals("cache-2", c2_repl.get("name"));
0383:            }
0384:
0385:            public void testStateTransferWithRegistration() throws Exception {
0386:                ch1 = factory.createMultiplexerChannel(
0387:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0388:                ch1.connect("bla");
0389:                c1 = new Cache(ch1, "cache-1");
0390:                assertEquals("cache has to be empty initially", 0, c1.size());
0391:
0392:                ch2 = factory.createMultiplexerChannel(
0393:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
0394:                ch2.connect("bla");
0395:                c2 = new Cache(ch2, "cache-2");
0396:                assertEquals("cache has to be empty initially", 0, c2.size());
0397:                c1.put("name", "cache-1");
0398:                c2.put("name", "cache-2");
0399:
0400:                ch1_repl = factory2.createMultiplexerChannel(
0401:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c1", true, null); // register for state transfer
0402:                ch2_repl = factory2.createMultiplexerChannel(
0403:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c2", true, null); // register for state transfer
0404:
0405:                ch1_repl.connect("bla");
0406:                c1_repl = new Cache(ch1_repl, "cache-1-repl");
0407:                boolean rc = ch1_repl.getState(null, 5000); // this will *not* trigger the state transfer protocol
0408:                System.out.println("state transfer: " + rc);
0409:
0410:                ch2_repl.connect("bla");
0411:                c2_repl = new Cache(ch2_repl, "cache-2-repl");
0412:                rc = ch2_repl.getState(null, 5000); // only *this* will trigger the state transfer
0413:                System.out.println("state transfer: " + rc);
0414:                Util.sleep(500);
0415:
0416:                System.out.println("Caches after state transfers:");
0417:                System.out.println("c1: " + c1);
0418:                System.out.println("c1_repl: " + c1_repl);
0419:                System.out.println("c2: " + c2);
0420:                System.out.println("c2_repl: " + c2_repl);
0421:
0422:                assertEquals(1, c1.size());
0423:                assertEquals(1, c1_repl.size());
0424:
0425:                assertEquals(1, c2.size());
0426:                assertEquals(1, c2_repl.size());
0427:
0428:                assertEquals("cache-1", c1.get("name"));
0429:                assertEquals("cache-1", c1_repl.get("name"));
0430:
0431:                assertEquals("cache-2", c2.get("name"));
0432:                assertEquals("cache-2", c2_repl.get("name"));
0433:                c1.clear();
0434:                c1_repl.clear();
0435:                c2.clear();
0436:                c2_repl.clear();
0437:            }
0438:
0439:            private void setCorrectPortRange(Channel ch) {
0440:                ProtocolStack stack = ((MuxChannel) ch).getProtocolStack();
0441:                Protocol tcpping = stack.findProtocol("TCPPING");
0442:                if (tcpping == null)
0443:                    return;
0444:
0445:                Properties props = tcpping.getProperties();
0446:                String port_range = props.getProperty("port_range");
0447:                if (port_range != null) {
0448:                    System.out.println("port_range in TCPPING: " + port_range
0449:                            + ", setting it to 2");
0450:                    port_range = "2";
0451:                    Properties p = new Properties();
0452:                    // p.putAll(props);
0453:                    p.setProperty("port_range", port_range);
0454:                    tcpping.setProperties(p);
0455:                }
0456:            }
0457:
0458:            public void testStateTransferWithReconnect() throws Exception {
0459:                ch1 = factory.createMultiplexerChannel(
0460:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0461:                setCorrectPortRange(ch1);
0462:
0463:                assertTrue(ch1.isOpen());
0464:                assertFalse(ch1.isConnected());
0465:                ch1.connect("bla");
0466:                assertTrue(ch1.isOpen());
0467:                assertTrue(ch1.isConnected());
0468:                assertServiceAndClusterView(ch1, 1, 1);
0469:
0470:                c1 = new Cache(ch1, "cache-1");
0471:                assertEquals("cache has to be empty initially", 0, c1.size());
0472:
0473:                ch1_repl = factory2.createMultiplexerChannel(
0474:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0475:                setCorrectPortRange(ch1_repl);
0476:                assertTrue(ch1_repl.isOpen());
0477:                assertFalse(ch1_repl.isConnected());
0478:
0479:                c1.put("name", "Bela");
0480:                c1.put("id", new Long(322649));
0481:                c1.put("hobbies", "biking");
0482:                c1.put("bike", "Centurion");
0483:
0484:                ch1_repl.connect("bla");
0485:                assertTrue(ch1_repl.isOpen());
0486:                assertTrue(ch1_repl.isConnected());
0487:                assertServiceAndClusterView(ch1_repl, 2, 2);
0488:                Util.sleep(500);
0489:                assertServiceAndClusterView(ch1, 2, 2);
0490:
0491:                c1_repl = new Cache(ch1_repl, "cache-1-repl");
0492:                boolean rc = ch1_repl.getState(null, 5000);
0493:                System.out.println("state transfer: " + rc);
0494:                Util.sleep(500);
0495:
0496:                System.out.println("c1_repl: " + c1_repl);
0497:                assertEquals("initial state should have been transferred", 4,
0498:                        c1_repl.size());
0499:                assertEquals(new Long(322649), c1.get("id"));
0500:                assertEquals(new Long(322649), c1_repl.get("id"));
0501:
0502:                assertEquals("biking", c1.get("hobbies"));
0503:                assertEquals("biking", c1_repl.get("hobbies"));
0504:
0505:                assertEquals("Centurion", c1.get("bike"));
0506:                assertEquals("Centurion", c1_repl.get("bike"));
0507:
0508:                ch1_repl.disconnect();
0509:                assertTrue(ch1_repl.isOpen());
0510:                assertFalse(ch1_repl.isConnected());
0511:                Util.sleep(1000);
0512:                assertServiceAndClusterView(ch1, 1, 1);
0513:
0514:                c1_repl.clear();
0515:
0516:                ch1_repl.connect("bla");
0517:                assertTrue(ch1_repl.isOpen());
0518:                assertTrue(ch1_repl.isConnected());
0519:                assertServiceAndClusterView(ch1_repl, 2, 2);
0520:                Util.sleep(300);
0521:                assertServiceAndClusterView(ch1, 2, 2);
0522:
0523:                assertEquals("cache has to be empty initially", 0, c1_repl
0524:                        .size());
0525:
0526:                rc = ch1_repl.getState(null, 5000);
0527:                System.out.println("state transfer: " + rc);
0528:                Util.sleep(500);
0529:
0530:                System.out.println("c1_repl: " + c1_repl);
0531:                assertEquals("initial state should have been transferred", 4,
0532:                        c1_repl.size());
0533:
0534:                assertEquals(new Long(322649), c1.get("id"));
0535:                assertEquals(new Long(322649), c1_repl.get("id"));
0536:
0537:                assertEquals("biking", c1.get("hobbies"));
0538:                assertEquals("biking", c1_repl.get("hobbies"));
0539:
0540:                assertEquals("Centurion", c1.get("bike"));
0541:                assertEquals("Centurion", c1_repl.get("bike"));
0542:
0543:                // Now see what happens if we reconnect the first channel
0544:                // But first, add another MuxChannel on that JChannel
0545:                // just so it remains coordinator (test that it doesn't
0546:                // ask for state from itself)
0547:                ch2 = factory.createMultiplexerChannel(
0548:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
0549:                setCorrectPortRange(ch2);
0550:                assertTrue(ch2.isOpen());
0551:                assertFalse(ch2.isConnected());
0552:                assertServiceAndClusterView(ch1, 2, 2);
0553:                assertServiceAndClusterView(ch1_repl, 2, 2);
0554:
0555:                ch1.disconnect();
0556:                //sleep a bit and thus let asynch VIEW to propagate to other channel
0557:                Util.sleep(500);
0558:                assertTrue(ch1.isOpen());
0559:                assertFalse(ch1.isConnected());
0560:                assertServiceAndClusterView(ch1_repl, 1, 1);
0561:                assertTrue(ch2.isOpen());
0562:                assertFalse(ch2.isConnected());
0563:
0564:                c1.clear();
0565:
0566:                ch1.connect("bla");
0567:                assertTrue(ch1.isOpen());
0568:                assertTrue(ch1.isConnected());
0569:                assertServiceAndClusterView(ch1, 2, 2);
0570:                Util.sleep(500);
0571:                assertServiceAndClusterView(ch1_repl, 2, 2);
0572:                assertTrue(ch2.isOpen());
0573:                assertFalse(ch2.isConnected());
0574:
0575:                assertEquals("cache has to be empty initially", 0, c1.size());
0576:
0577:                rc = ch1.getState(null, 5000);
0578:                System.out.println("state transfer: " + rc);
0579:                Util.sleep(500);
0580:
0581:                System.out.println("c1: " + c1);
0582:                assertEquals("initial state should have been transferred", 4,
0583:                        c1.size());
0584:
0585:                assertEquals(new Long(322649), c1.get("id"));
0586:                assertEquals(new Long(322649), c1_repl.get("id"));
0587:
0588:                assertEquals("biking", c1.get("hobbies"));
0589:                assertEquals("biking", c1_repl.get("hobbies"));
0590:
0591:                assertEquals("Centurion", c1.get("bike"));
0592:                assertEquals("Centurion", c1_repl.get("bike"));
0593:            }
0594:
0595:            private void assertServiceAndClusterView(Channel ch,
0596:                    int num_service_view_mbrs, int num_cluster_view_mbrs) {
0597:                View service_view, cluster_view;
0598:                service_view = ch.getView();
0599:                cluster_view = ((MuxChannel) ch).getClusterView();
0600:
0601:                String msg = "cluster view=" + cluster_view + ", service view="
0602:                        + service_view;
0603:
0604:                assertNotNull(service_view);
0605:                assertNotNull(cluster_view);
0606:
0607:                assertEquals(msg, num_service_view_mbrs, service_view.size());
0608:                assertEquals(msg, num_cluster_view_mbrs, cluster_view.size());
0609:            }
0610:
0611:            public void testStateTransferFromSelfWithRegularChannel()
0612:                    throws Exception {
0613:                JChannel ch = new JChannel();
0614:                ch.connect("X");
0615:                try {
0616:                    boolean rc = ch.getState(null, 2000);
0617:                    assertFalse("getState() on singleton should return false",
0618:                            rc);
0619:                } finally {
0620:                    ch.close();
0621:                }
0622:            }
0623:
0624:            public void testStateTransferFromSelf() throws Exception {
0625:                ch1 = factory.createMultiplexerChannel(
0626:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0627:                ch1.connect("bla");
0628:                boolean rc = ch1.getState(null, 2000);
0629:                assertFalse("getState() on singleton should return false", rc);
0630:                ch2 = factory.createMultiplexerChannel(
0631:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
0632:                ch2.connect("foo");
0633:                rc = ch2.getState(null, 2000);
0634:                assertFalse("getState() on singleton should return false", rc);
0635:            }
0636:
0637:            public void testAdditionalData() throws Exception {
0638:                byte[] additional_data = new byte[] { 'b', 'e', 'l', 'a' };
0639:                ch1 = factory.createMultiplexerChannel(
0640:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0641:                Map m = new HashMap(1);
0642:                m.put("additional_data", additional_data);
0643:                ch1.down(new Event(Event.CONFIG, m));
0644:                ch1.connect("bla");
0645:                IpAddress local_addr = (IpAddress) ch1.getLocalAddress();
0646:                assertNotNull(local_addr);
0647:                byte[] tmp = local_addr.getAdditionalData();
0648:                assertNotNull(tmp);
0649:                assertEquals(tmp, additional_data);
0650:
0651:                ch2 = factory.createMultiplexerChannel(
0652:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
0653:                ch2.connect("foo");
0654:                local_addr = (IpAddress) ch2.getLocalAddress();
0655:                assertNotNull(local_addr);
0656:                tmp = local_addr.getAdditionalData();
0657:                assertNotNull(tmp);
0658:                assertEquals(tmp, additional_data);
0659:            }
0660:
0661:            public void testAdditionalData2() throws Exception {
0662:                byte[] additional_data = new byte[] { 'b', 'e', 'l', 'a' };
0663:                byte[] additional_data2 = new byte[] { 'm', 'i', 'c', 'h', 'i' };
0664:                ch1 = factory.createMultiplexerChannel(
0665:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0666:                ch1.connect("bla");
0667:                IpAddress local_addr = (IpAddress) ch1.getLocalAddress();
0668:                assertNotNull(local_addr);
0669:                byte[] tmp = local_addr.getAdditionalData();
0670:                assertNull(tmp);
0671:
0672:                ch2 = factory.createMultiplexerChannel(
0673:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
0674:                Map m = new HashMap(1);
0675:                m.put("additional_data", additional_data);
0676:                ch2.down(new Event(Event.CONFIG, m));
0677:                ch2.connect("foo");
0678:                local_addr = (IpAddress) ch2.getLocalAddress();
0679:                assertNotNull(local_addr);
0680:                tmp = local_addr.getAdditionalData();
0681:                assertNotNull(tmp);
0682:                assertEquals(tmp, additional_data);
0683:
0684:                local_addr = (IpAddress) ch1.getLocalAddress();
0685:                assertNotNull(local_addr);
0686:                tmp = local_addr.getAdditionalData();
0687:                assertNotNull(tmp);
0688:                assertEquals(tmp, additional_data);
0689:
0690:                m.clear();
0691:                m.put("additional_data", additional_data2);
0692:                ch2.down(new Event(Event.CONFIG, m));
0693:                local_addr = (IpAddress) ch2.getLocalAddress();
0694:                assertNotNull(local_addr);
0695:                tmp = local_addr.getAdditionalData();
0696:                assertNotNull(tmp);
0697:                assertEquals(tmp, additional_data2);
0698:                assertFalse(Arrays.equals(tmp, additional_data));
0699:            }
0700:
0701:            public void testGetSubstates() throws Exception {
0702:                ch1 = factory.createMultiplexerChannel(
0703:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0704:                ch1.connect("bla");
0705:                c1 = new ExtendedCache(ch1, "cache-1");
0706:                assertEquals("cache has to be empty initially", 0, c1.size());
0707:
0708:                ch2 = factory.createMultiplexerChannel(
0709:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
0710:                ch2.connect("bla");
0711:                c2 = new ExtendedCache(ch2, "cache-2");
0712:                assertEquals("cache has to be empty initially", 0, c2.size());
0713:
0714:                for (int i = 0; i < 10; i++) {
0715:                    c1.put(new Integer(i), new Integer(i));
0716:                    c2.put(new Integer(i), new Integer(i));
0717:                }
0718:
0719:                ch1_repl = factory2.createMultiplexerChannel(
0720:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0721:                ch2_repl = factory2.createMultiplexerChannel(
0722:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
0723:                ch1_repl.connect("bla");
0724:                c1_repl = new ExtendedCache(ch1_repl, "cache-1-repl");
0725:                boolean rc = ch1_repl.getState(null, "odd", 5000);
0726:                System.out.println("state transfer: " + rc);
0727:
0728:                ch2_repl.connect("bla");
0729:                c2_repl = new ExtendedCache(ch2_repl, "cache-2-repl");
0730:                rc = ch2_repl.getState(null, "even", 5000);
0731:                System.out.println("state transfer: " + rc);
0732:                Util.sleep(500);
0733:
0734:                System.out.println("Caches after state transfers:");
0735:                System.out.println("c1: " + c1);
0736:                System.out.println("c2: " + c2);
0737:
0738:                System.out
0739:                        .println("c1_repl (removed odd substate): " + c1_repl);
0740:                System.out.println("c2_repl (removed even substate): "
0741:                        + c2_repl);
0742:
0743:                assertEquals(5, c1_repl.size());
0744:                assertEquals(5, c2_repl.size());
0745:
0746:                _testEvenNumbersPresent(c1_repl);
0747:                _testOddNumbersPresent(c2_repl);
0748:            }
0749:
0750:            private void _testEvenNumbersPresent(Cache c) {
0751:                Integer[] evens = new Integer[] { new Integer(0),
0752:                        new Integer(2), new Integer(4), new Integer(6),
0753:                        new Integer(8) };
0754:                _testNumbersPresent(c, evens);
0755:
0756:            }
0757:
0758:            private void _testOddNumbersPresent(Cache c) {
0759:                Integer[] odds = new Integer[] { new Integer(1),
0760:                        new Integer(3), new Integer(5), new Integer(7),
0761:                        new Integer(9) };
0762:                _testNumbersPresent(c, odds);
0763:            }
0764:
0765:            private void _testNumbersPresent(Cache c, Integer[] numbers) {
0766:                int len = numbers.length;
0767:                assertEquals(len, c.size());
0768:                for (int i = 0; i < numbers.length; i++) {
0769:                    Integer number = numbers[i];
0770:                    assertEquals(number, c.get(number));
0771:                }
0772:            }
0773:
0774:            public void testGetSubstatesMultipleTimes() throws Exception {
0775:                ch1 = factory.createMultiplexerChannel(
0776:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0777:                ch1.connect("bla");
0778:                c1 = new ExtendedCache(ch1, "cache-1");
0779:                assertEquals("cache has to be empty initially", 0, c1.size());
0780:
0781:                ch2 = factory.createMultiplexerChannel(
0782:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
0783:                ch2.connect("bla");
0784:                c2 = new ExtendedCache(ch2, "cache-2");
0785:                assertEquals("cache has to be empty initially", 0, c2.size());
0786:
0787:                for (int i = 0; i < 10; i++) {
0788:                    c1.put(new Integer(i), new Integer(i));
0789:                    c2.put(new Integer(i), new Integer(i));
0790:                }
0791:
0792:                ch1_repl = factory2.createMultiplexerChannel(
0793:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c1");
0794:                ch2_repl = factory2.createMultiplexerChannel(
0795:                        MUX_CHANNEL_CONFIG_STACK_NAME, "c2");
0796:                ch1_repl.connect("bla");
0797:                c1_repl = new ExtendedCache(ch1_repl, "cache-1-repl");
0798:                boolean rc = ch1_repl.getState(null, "odd", 5000);
0799:                System.out.println("state transfer: " + rc);
0800:
0801:                ch2_repl.connect("bla");
0802:                c2_repl = new ExtendedCache(ch2_repl, "cache-2-repl");
0803:                rc = ch2_repl.getState(null, "even", 5000);
0804:                System.out.println("state transfer: " + rc);
0805:                Util.sleep(500);
0806:                _testOddNumbersPresent(c2_repl);
0807:
0808:                System.out.println("Caches after state transfers:");
0809:                System.out.println("c1: " + c1);
0810:                System.out.println("c2: " + c2);
0811:                System.out
0812:                        .println("c1_repl (removed odd substate): " + c1_repl);
0813:                System.out.println("c2_repl (removed even substate): "
0814:                        + c2_repl);
0815:
0816:                assertEquals(5, c2_repl.size());
0817:                rc = ch2_repl.getState(null, "odd", 5000);
0818:                Util.sleep(500);
0819:                System.out
0820:                        .println("c2_repl (removed odd substate): " + c2_repl);
0821:                _testEvenNumbersPresent(c2_repl);
0822:
0823:                assertEquals(5, c2_repl.size());
0824:                rc = ch2_repl.getState(null, "even", 5000);
0825:                Util.sleep(500);
0826:                System.out.println("c2_repl (removed even substate): "
0827:                        + c2_repl);
0828:                _testOddNumbersPresent(c2_repl);
0829:
0830:                assertEquals(5, c2_repl.size());
0831:                rc = ch2_repl.getState(null, "odd", 5000);
0832:                Util.sleep(500);
0833:                System.out
0834:                        .println("c2_repl (removed odd substate): " + c2_repl);
0835:                _testEvenNumbersPresent(c2_repl);
0836:            }
0837:
0838:            public static Test suite() {
0839:                return new TestSuite(MultiplexerTest.class);
0840:            }
0841:
0842:            public static void main(String[] args) {
0843:                junit.textui.TestRunner.run(MultiplexerTest.suite());
0844:            }
0845:
0846:            private static class Cache extends ExtendedReceiverAdapter {
0847:                final Map data = new HashMap();
0848:                Channel ch;
0849:                String name;
0850:
0851:                public Cache(Channel ch, String name) {
0852:                    this .ch = ch;
0853:                    this .name = name;
0854:                    this .ch.setReceiver(this );
0855:                }
0856:
0857:                protected Object get(Object key) {
0858:                    synchronized (data) {
0859:                        return data.get(key);
0860:                    }
0861:                }
0862:
0863:                protected void put(Object key, Object val) throws Exception {
0864:                    Object[] buf = new Object[2];
0865:                    buf[0] = key;
0866:                    buf[1] = val;
0867:                    Message msg = new Message(null, null, buf);
0868:                    ch.send(msg);
0869:                }
0870:
0871:                protected int size() {
0872:                    synchronized (data) {
0873:                        return data.size();
0874:                    }
0875:                }
0876:
0877:                public void receive(Message msg) {
0878:                    Object[] modification = (Object[]) msg.getObject();
0879:                    Object key = modification[0];
0880:                    Object val = modification[1];
0881:                    synchronized (data) {
0882:                        // System.out.println("****** [" + name + "] received PUT(" + key + ", " + val + ") " + " from " + msg.getSrc() + " *******");
0883:                        data.put(key, val);
0884:                    }
0885:                }
0886:
0887:                public byte[] getState() {
0888:                    byte[] state = null;
0889:                    synchronized (data) {
0890:                        try {
0891:                            state = Util.objectToByteBuffer(data);
0892:                        } catch (Exception e) {
0893:                            e.printStackTrace();
0894:                            return null;
0895:                        }
0896:                    }
0897:                    return state;
0898:                }
0899:
0900:                public byte[] getState(String state_id) {
0901:                    return getState();
0902:                }
0903:
0904:                public void setState(byte[] state) {
0905:                    Map m;
0906:                    try {
0907:                        m = (Map) Util.objectFromByteBuffer(state);
0908:                        synchronized (data) {
0909:                            data.clear();
0910:                            data.putAll(m);
0911:                        }
0912:                    } catch (Exception e) {
0913:                        e.printStackTrace();
0914:                    }
0915:                }
0916:
0917:                public void setState(String state_id, byte[] state) {
0918:                    setState(state);
0919:                }
0920:
0921:                public void getState(OutputStream ostream) {
0922:                    ObjectOutputStream oos = null;
0923:                    try {
0924:                        oos = new ObjectOutputStream(ostream);
0925:                        synchronized (data) {
0926:                            oos.writeObject(data);
0927:                        }
0928:                        oos.flush();
0929:                    } catch (IOException e) {
0930:                    } finally {
0931:                        try {
0932:                            if (oos != null)
0933:                                oos.close();
0934:                        } catch (IOException e) {
0935:                            System.err.println(e);
0936:                        }
0937:                    }
0938:                }
0939:
0940:                public void getState(String state_id, OutputStream ostream) {
0941:                    getState(ostream);
0942:                }
0943:
0944:                public void setState(InputStream istream) {
0945:                    ObjectInputStream ois = null;
0946:                    try {
0947:                        ois = new ObjectInputStream(istream);
0948:                        Map m = (Map) ois.readObject();
0949:                        synchronized (data) {
0950:                            data.clear();
0951:                            data.putAll(m);
0952:                        }
0953:
0954:                    } catch (Exception e) {
0955:                    } finally {
0956:                        try {
0957:                            if (ois != null)
0958:                                ois.close();
0959:                        } catch (IOException e) {
0960:                            System.err.println(e);
0961:                        }
0962:                    }
0963:                }
0964:
0965:                public void setState(String state_id, InputStream istream) {
0966:                    setState(istream);
0967:                }
0968:
0969:                public void clear() {
0970:                    synchronized (data) {
0971:                        data.clear();
0972:                    }
0973:                }
0974:
0975:                public void viewAccepted(View new_view) {
0976:                    log("view is " + new_view);
0977:                }
0978:
0979:                public String toString() {
0980:                    return data.toString();
0981:                }
0982:
0983:                private void log(String msg) {
0984:                    System.out.println("-- [" + name + "] " + msg);
0985:                }
0986:
0987:            }
0988:
0989:            static class ExtendedCache extends Cache {
0990:
0991:                public ExtendedCache(Channel ch, String name) {
0992:                    super (ch, name);
0993:                }
0994:
0995:                public byte[] getState(String state_id) {
0996:                    Map copy = null;
0997:                    synchronized (data) {
0998:                        copy = new HashMap(data);
0999:                    }
1000:                    for (Iterator it = copy.keySet().iterator(); it.hasNext();) {
1001:                        Integer key = (Integer) it.next();
1002:                        if (state_id.equals("odd") && key.intValue() % 2 != 0)
1003:                            it.remove();
1004:                        else if (state_id.equals("even")
1005:                                && key.intValue() % 2 == 0)
1006:                            it.remove();
1007:                    }
1008:                    try {
1009:                        return Util.objectToByteBuffer(copy);
1010:                    } catch (Exception e) {
1011:                        e.printStackTrace();
1012:                        return null;
1013:                    }
1014:                }
1015:
1016:                public void getState(String state_id, OutputStream os) {
1017:                    Map copy = null;
1018:                    synchronized (data) {
1019:                        copy = new HashMap(data);
1020:                    }
1021:                    for (Iterator it = copy.keySet().iterator(); it.hasNext();) {
1022:                        Integer key = (Integer) it.next();
1023:                        if (state_id.equals("odd") && key.intValue() % 2 != 0)
1024:                            it.remove();
1025:                        else if (state_id.equals("even")
1026:                                && key.intValue() % 2 == 0)
1027:                            it.remove();
1028:                    }
1029:                    ObjectOutputStream oos = null;
1030:                    try {
1031:                        oos = new ObjectOutputStream(os);
1032:                        oos.writeObject(copy);
1033:                        oos.flush();
1034:                    } catch (IOException e) {
1035:                    } finally {
1036:                        try {
1037:                            if (oos != null)
1038:                                oos.close();
1039:                        } catch (IOException e) {
1040:                            System.err.println(e);
1041:                        }
1042:                    }
1043:                }
1044:
1045:                public void setState(String state_id, InputStream is) {
1046:                    setState(is);
1047:                }
1048:
1049:                public void setState(String state_id, byte[] state) {
1050:                    setState(state);
1051:                }
1052:
1053:                public String toString() {
1054:                    synchronized (data) {
1055:                        Set keys = new TreeSet(data.keySet());
1056:                        StringBuffer sb = new StringBuffer();
1057:                        for (Iterator it = keys.iterator(); it.hasNext();) {
1058:                            Object o = it.next();
1059:                            sb.append(o).append("=").append(data.get(o))
1060:                                    .append(" ");
1061:                        }
1062:                        return sb.toString();
1063:                    }
1064:                }
1065:            }
1066:
1067:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.