Source Code Cross Referenced for RMICacheReplicatorTest.java in  » Cache » ehcache » net » sf » ehcache » distribution » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Cache » ehcache » net.sf.ehcache.distribution 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /**
0002:         *  Copyright 2003-2007 Luck Consulting Pty Ltd
0003:         *
0004:         *  Licensed under the Apache License, Version 2.0 (the "License");
0005:         *  you may not use this file except in compliance with the License.
0006:         *  You may obtain a copy of the License at
0007:         *
0008:         *      http://www.apache.org/licenses/LICENSE-2.0
0009:         *
0010:         *  Unless required by applicable law or agreed to in writing, software
0011:         *  distributed under the License is distributed on an "AS IS" BASIS,
0012:         *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0013:         *  See the License for the specific language governing permissions and
0014:         *  limitations under the License.
0015:         */package net.sf.ehcache.distribution;
0016:
0017:        import junit.framework.AssertionFailedError;
0018:        import net.sf.ehcache.AbstractCacheTest;
0019:        import net.sf.ehcache.Cache;
0020:        import net.sf.ehcache.CacheException;
0021:        import net.sf.ehcache.CacheManager;
0022:        import net.sf.ehcache.Ehcache;
0023:        import net.sf.ehcache.Element;
0024:        import net.sf.ehcache.StopWatch;
0025:        import net.sf.ehcache.ThreadKiller;
0026:        import net.sf.ehcache.management.ManagementService;
0027:        import net.sf.ehcache.event.CountingCacheEventListener;
0028:        import org.apache.commons.logging.Log;
0029:        import org.apache.commons.logging.LogFactory;
0030:
0031:        import java.io.IOException;
0032:        import java.io.Serializable;
0033:        import java.rmi.RemoteException;
0034:        import java.util.ArrayList;
0035:        import java.util.Arrays;
0036:        import java.util.Date;
0037:        import java.util.List;
0038:        import java.util.Random;
0039:
0040:        /**
0041:         * Tests replication of Cache events
0042:         * <p/>
0043:         * Note these tests need a live network interface running in multicast mode to work
0044:         * <p/>
0045:         * If running involving RMIAsynchronousCacheReplicator individually the test will fail because
0046:         * the VM will gobble up the SoftReferences rather than allocating more memory. Uncomment the
0047:         * forceVMGrowth() method usage in setup.
0048:         *
0049:         * @author Greg Luck
0050:         * @version $Id: RMICacheReplicatorTest.java 575 2008-01-30 07:22:04Z gregluck $
0051:         */
0052:        public class RMICacheReplicatorTest extends AbstractCacheTest {
0053:
0054:            /**
0055:             * A value to represent replicate asynchronously
0056:             */
0057:            protected static final boolean ASYNCHRONOUS = true;
0058:
0059:            /**
0060:             * A value to represent replicate synchronously
0061:             */
0062:            protected static final boolean SYNCHRONOUS = false;
0063:
0064:            private static final Log LOG = LogFactory
0065:                    .getLog(RMICacheReplicatorTest.class.getName());
0066:
0067:            /**
0068:             * CacheManager 1 in the cluster
0069:             */
0070:            protected CacheManager manager1;
0071:            /**
0072:             * CacheManager 2 in the cluster
0073:             */
0074:            protected CacheManager manager2;
0075:            /**
0076:             * CacheManager 3 in the cluster
0077:             */
0078:            protected CacheManager manager3;
0079:            /**
0080:             * CacheManager 4 in the cluster
0081:             */
0082:            protected CacheManager manager4;
0083:            /**
0084:             * CacheManager 5 in the cluster
0085:             */
0086:            protected CacheManager manager5;
0087:            /**
0088:             * CacheManager 6 in the cluster
0089:             */
0090:            protected CacheManager manager6;
0091:
0092:            /**
0093:             * The name of the cache under test
0094:             */
0095:            protected String cacheName = "sampleCache1";
0096:            /**
0097:             * CacheManager 1 of 2s cache being replicated
0098:             */
0099:            protected Ehcache cache1;
0100:
0101:            /**
0102:             * CacheManager 2 of 2s cache being replicated
0103:             */
0104:            protected Ehcache cache2;
0105:
0106:            /**
0107:             * Allows setup to be the same
0108:             */
0109:            protected String cacheNameBase = "ehcache-distributed";
0110:
0111:            /**
0112:             * {@inheritDoc}
0113:             * Sets up two caches: cache1 is local. cache2 is to be receive updates
0114:             *
0115:             * @throws Exception
0116:             */
0117:            protected void setUp() throws Exception {
0118:                if (JVMUtil.isSingleRMIRegistryPerVM()) {
0119:                    return;
0120:                }
0121:
0122:                //Required to get SoftReference tests to pass. The VM clean up SoftReferences rather than allocating
0123:                // memory to -Xmx!
0124:                //forceVMGrowth();
0125:                //System.gc();
0126:                MulticastKeepaliveHeartbeatSender.setHeartBeatInterval(1000);
0127:
0128:                CountingCacheEventListener.resetCounters();
0129:                manager1 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR
0130:                        + "distribution/ehcache-distributed1.xml");
0131:                manager2 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR
0132:                        + "distribution/ehcache-distributed2.xml");
0133:                manager3 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR
0134:                        + "distribution/ehcache-distributed3.xml");
0135:                manager4 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR
0136:                        + "distribution/ehcache-distributed4.xml");
0137:                manager5 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR
0138:                        + "distribution/ehcache-distributed5.xml");
0139:
0140:                //manager6 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed-jndi6.xml");
0141:
0142:                //allow cluster to be established
0143:                Thread.sleep(1020);
0144:
0145:                cache1 = manager1.getCache(cacheName);
0146:                cache1.removeAll();
0147:
0148:                cache2 = manager2.getCache(cacheName);
0149:                cache2.removeAll();
0150:
0151:                //enable distributed removeAlls to finish
0152:                waitForProgagate();
0153:
0154:            }
0155:
0156:            /**
0157:             * {@inheritDoc}
0158:             *
0159:             * @throws Exception
0160:             */
0161:            protected void tearDown() throws Exception {
0162:
0163:                if (JVMUtil.isSingleRMIRegistryPerVM()) {
0164:                    return;
0165:                }
0166:
0167:                if (manager1 != null) {
0168:                    manager1.shutdown();
0169:                }
0170:                if (manager2 != null) {
0171:                    manager2.shutdown();
0172:                }
0173:                if (manager3 != null) {
0174:                    manager3.shutdown();
0175:                }
0176:                if (manager4 != null) {
0177:                    manager4.shutdown();
0178:                }
0179:                if (manager5 != null) {
0180:                    manager5.shutdown();
0181:                }
0182:                if (manager6 != null) {
0183:                    manager6.shutdown();
0184:                }
0185:                Thread.sleep(5000);
0186:
0187:                List threads = JVMUtil.enumerateThreads();
0188:                for (int i = 0; i < threads.size(); i++) {
0189:                    Thread thread = (Thread) threads.get(i);
0190:                    if (thread.getName().equals("Replication Thread")) {
0191:                        fail("There should not be any replication threads running after shutdown");
0192:                    }
0193:                }
0194:
0195:            }
0196:
0197:            /**
0198:             * 5 cache managers should means that each cache has four remote peers
0199:             */
0200:            public void testRemoteCachePeersEqualsNumberOfCacheManagersInCluster() {
0201:
0202:                if (JVMUtil.isSingleRMIRegistryPerVM()) {
0203:                    return;
0204:                }
0205:
0206:                CacheManagerPeerProvider provider = manager1
0207:                        .getCachePeerProvider();
0208:                List remotePeersOfCache1 = provider
0209:                        .listRemoteCachePeers(cache1);
0210:                assertEquals(4, remotePeersOfCache1.size());
0211:            }
0212:
0213:            /**
0214:             * Does a new cache manager in the cluster get detected?
0215:             */
0216:            public void testRemoteCachePeersDetectsNewCacheManager()
0217:                    throws InterruptedException {
0218:
0219:                if (JVMUtil.isSingleRMIRegistryPerVM()) {
0220:                    return;
0221:                }
0222:
0223:                CacheManagerPeerProvider provider = manager1
0224:                        .getCachePeerProvider();
0225:                List remotePeersOfCache1 = provider
0226:                        .listRemoteCachePeers(cache1);
0227:                assertEquals(4, remotePeersOfCache1.size());
0228:
0229:                //Add new CacheManager to cluster
0230:                manager6 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR
0231:                        + "distribution/ehcache-distributed6.xml");
0232:
0233:                //Allow detection to occur
0234:                Thread.sleep(10020);
0235:
0236:                remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
0237:                assertEquals(5, remotePeersOfCache1.size());
0238:            }
0239:
0240:            /**
0241:             * Does a down cache manager in the cluster get removed?
0242:             */
0243:            public void testRemoteCachePeersDetectsDownCacheManager()
0244:                    throws InterruptedException {
0245:
0246:                if (JVMUtil.isSingleRMIRegistryPerVM()) {
0247:                    return;
0248:                }
0249:
0250:                CacheManagerPeerProvider provider = manager1
0251:                        .getCachePeerProvider();
0252:                List remotePeersOfCache1 = provider
0253:                        .listRemoteCachePeers(cache1);
0254:                assertEquals(4, remotePeersOfCache1.size());
0255:
0256:                //Drop a CacheManager from the cluster
0257:                manager5.shutdown();
0258:
0259:                //Allow change detection to occur. Heartbeat 1 second and is not stale until 5000
0260:                Thread.sleep(11020);
0261:                remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
0262:
0263:                assertEquals(3, remotePeersOfCache1.size());
0264:            }
0265:
0266:            /**
0267:             * Does a down cache manager in the cluster get removed?
0268:             */
0269:            public void testRemoteCachePeersDetectsDownCacheManagerSlow()
0270:                    throws InterruptedException {
0271:
0272:                if (JVMUtil.isSingleRMIRegistryPerVM()) {
0273:                    return;
0274:                }
0275:
0276:                try {
0277:                    CacheManagerPeerProvider provider = manager1
0278:                            .getCachePeerProvider();
0279:                    List remotePeersOfCache1 = provider
0280:                            .listRemoteCachePeers(cache1);
0281:                    assertEquals(4, remotePeersOfCache1.size());
0282:
0283:                    MulticastKeepaliveHeartbeatSender
0284:                            .setHeartBeatInterval(2000);
0285:                    Thread.sleep(2000);
0286:
0287:                    //Drop a CacheManager from the cluster
0288:                    manager5.shutdown();
0289:
0290:                    //Insufficient time for it to timeout
0291:                    remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
0292:                    assertEquals(4, remotePeersOfCache1.size());
0293:                } finally {
0294:                    MulticastKeepaliveHeartbeatSender
0295:                            .setHeartBeatInterval(1000);
0296:                    Thread.sleep(2000);
0297:                }
0298:
0299:            }
0300:
0301:            /**
0302:             * Tests put and remove initiated from cache1 in a cluster
0303:             * <p/>
0304:             * This test goes into an infinite loop if the chain of notifications is not somehow broken.
0305:             */
0306:            public void testPutProgagatesFromAndToEveryCacheManagerAndCache()
0307:                    throws CacheException, InterruptedException {
0308:
0309:                if (JVMUtil.isSingleRMIRegistryPerVM()) {
0310:                    return;
0311:                }
0312:
0313:                //Put
0314:                String[] cacheNames = manager1.getCacheNames();
0315:                int numberOfCaches = getNumberOfReplicatingCachesInCacheManager();
0316:                Arrays.sort(cacheNames);
0317:                for (int i = 0; i < cacheNames.length; i++) {
0318:                    String name = cacheNames[i];
0319:                    manager1.getCache(name).put(
0320:                            new Element("" + i, new Integer(i)));
0321:                    //Add some non serializable elements that should not get propagated
0322:                    manager1.getCache(name).put(
0323:                            new Element("nonSerializable" + i, new Object()));
0324:                }
0325:
0326:                waitForProgagate();
0327:
0328:                int count2 = 0;
0329:                int count3 = 0;
0330:                int count4 = 0;
0331:                int count5 = 0;
0332:                for (int i = 0; i < cacheNames.length; i++) {
0333:                    String name = cacheNames[i];
0334:                    Element element2 = manager2.getCache(name).get("" + i);
0335:                    if (element2 != null) {
0336:                        count2++;
0337:                    }
0338:                    Element nonSerializableElement2 = manager2.getCache(name)
0339:                            .get("nonSerializable" + i);
0340:                    if (nonSerializableElement2 != null) {
0341:                        count2++;
0342:                    }
0343:                    Element element3 = manager3.getCache(name).get("" + i);
0344:                    if (element3 != null) {
0345:                        count3++;
0346:                    }
0347:                    Element element4 = manager4.getCache(name).get("" + i);
0348:                    if (element4 != null) {
0349:                        count4++;
0350:                    }
0351:                    Element element5 = manager5.getCache(name).get("" + i);
0352:                    if (element5 != null) {
0353:                        count5++;
0354:                    }
0355:                }
0356:                assertEquals(numberOfCaches, count2);
0357:                assertEquals(numberOfCaches, count3);
0358:                assertEquals(numberOfCaches, count4);
0359:                assertEquals(numberOfCaches, count5);
0360:
0361:            }
0362:
0363:            /**
0364:             * Tests what happens when a CacheManager in the cluster comes and goes. In ehcache-1.2.4 this would cause the new RMI CachePeers in the CacheManager to
0365:             * be permanently corrupt.
0366:             */
0367:            public void testPutProgagatesFromAndToEveryCacheManagerAndCacheDirty()
0368:                    throws CacheException, InterruptedException {
0369:
0370:                if (JVMUtil.isSingleRMIRegistryPerVM()) {
0371:                    return;
0372:                }
0373:
0374:                manager3.shutdown();
0375:
0376:                Thread.sleep(11020);
0377:
0378:                manager3 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR
0379:                        + "distribution/ehcache-distributed3.xml");
0380:                Thread.sleep(11020);
0381:
0382:                //Put
0383:                String[] cacheNames = manager1.getCacheNames();
0384:                int numberOfCaches = getNumberOfReplicatingCachesInCacheManager();
0385:                Arrays.sort(cacheNames);
0386:                for (int i = 0; i < cacheNames.length; i++) {
0387:                    String name = cacheNames[i];
0388:                    manager1.getCache(name).put(
0389:                            new Element("" + i, new Integer(i)));
0390:                    //Add some non serializable elements that should not get propagated
0391:                    manager1.getCache(name).put(
0392:                            new Element("nonSerializable" + i, new Object()));
0393:                }
0394:
0395:                waitForProgagate();
0396:
0397:                int count2 = 0;
0398:                int count3 = 0;
0399:                int count4 = 0;
0400:                int count5 = 0;
0401:                for (int i = 0; i < cacheNames.length; i++) {
0402:                    String name = cacheNames[i];
0403:                    Element element2 = manager2.getCache(name).get("" + i);
0404:                    if (element2 != null) {
0405:                        count2++;
0406:                    }
0407:                    Element nonSerializableElement2 = manager2.getCache(name)
0408:                            .get("nonSerializable" + i);
0409:                    if (nonSerializableElement2 != null) {
0410:                        count2++;
0411:                    }
0412:                    Element element3 = manager3.getCache(name).get("" + i);
0413:                    if (element3 != null) {
0414:                        count3++;
0415:                    }
0416:                    Element element4 = manager4.getCache(name).get("" + i);
0417:                    if (element4 != null) {
0418:                        count4++;
0419:                    }
0420:                    Element element5 = manager5.getCache(name).get("" + i);
0421:                    if (element5 != null) {
0422:                        count5++;
0423:                    }
0424:                }
0425:                assertEquals(numberOfCaches, count2);
0426:                assertEquals(numberOfCaches, count3);
0427:                assertEquals(numberOfCaches, count4);
0428:                assertEquals(numberOfCaches, count5);
0429:
0430:            }
0431:
0432:            /**
0433:             * Enables long stabilty runs using replication to be done.
0434:             * <p/>
0435:             * This test has been run in a profile for 15 hours without any observed issues.
0436:             *
0437:             * @throws InterruptedException
0438:             */
0439:            public void manualStabilityTest() throws InterruptedException {
0440:                forceVMGrowth();
0441:
0442:                ManagementService.registerMBeans(manager3, createMBeanServer(),
0443:                        true, true, true, true);
0444:                while (true) {
0445:                    testBigPutsProgagatesAsynchronous();
0446:                }
0447:            }
0448:
0449:            /**
0450:             * Non JUnit invocation of stability test to get cleaner run
0451:             *
0452:             * @param args
0453:             * @throws InterruptedException
0454:             */
0455:            public static void main(String[] args) throws Exception {
0456:                RMICacheReplicatorTest replicatorTest = new RMICacheReplicatorTest();
0457:                replicatorTest.setUp();
0458:                replicatorTest.manualStabilityTest();
0459:            }
0460:
0461:            /**
0462:             * The number of caches there should be.
0463:             */
0464:            protected int getNumberOfReplicatingCachesInCacheManager() {
0465:                return 55;
0466:            }
0467:
0468:            /**
0469:             * Performance and capacity tests.
0470:             * <p/>
0471:             * The numbers given are for the remote peer tester (java -jar ehcache-1.x-remote-debugger.jar ehcache-distributed1.xml)
0472:             * running on a 10Mbit ethernet network and are measured from the time the peer starts receiving to when
0473:             * it has fully received.
0474:             * <p/>
0475:             * r37 and earlier - initial implementation
0476:             * 38 seconds to get all notifications with 6 peers, 2000 Elements and 400 byte payload
0477:             * 18 seconds to get all notifications with 2 peers, 2000 Elements and 400 byte payload
0478:             * 40 seconds to get all notifications with 2 peers, 2000 Elements and 10k payload
0479:             * 22 seconds to get all notifications with 2 peers, 2000 Elements and 1k payload
0480:             * 26 seconds to get all notifications with 2 peers, 200 Elements and 100k payload
0481:             * <p/>
0482:             * r38 - RMI stub lookup on registration rather than at each lookup. Saves quite a few lookups. Also change to 5 second heartbeat
0483:             * 38 seconds to get 2000 notifications with 6 peers, Elements with 400 byte payload (1 second heartbeat)
0484:             * 16 seconds to get 2000 notifications with 6 peers, Elements with 400 byte payload (5 second heartbeat)
0485:             * 13 seconds to get 2000 notifications with 2 peers, Elements with 400 byte payload
0486:             * <p/>
0487:             * r39 - Batching asyn replicator. Send all queued messages in one RMI call once per second.
0488:             * 2 seconds to get 2000 notifications with 6 peers, Elements with 400 byte payload (5 second heartbeat)
0489:             */
0490:            public void testBigPutsProgagatesAsynchronous()
0491:                    throws CacheException, InterruptedException {
0492:
0493:                if (JVMUtil.isSingleRMIRegistryPerVM()) {
0494:                    return;
0495:                }
0496:
0497:                //Give everything a chance to startup
0498:                //Thread.sleep(10000);
0499:                StopWatch stopWatch = new StopWatch();
0500:                Integer index = null;
0501:                for (int i = 0; i < 2; i++) {
0502:                    for (int j = 0; j < 1000; j++) {
0503:                        index = new Integer(((1000 * i) + j));
0504:                        cache1
0505:                                .put(new Element(
0506:                                        index,
0507:                                        "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0508:                                                + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0509:                                                + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0510:                                                + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0511:                                                + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"));
0512:                    }
0513:
0514:                }
0515:                long elapsed = stopWatch.getElapsedTime();
0516:                long putTime = ((elapsed / 1000));
0517:                LOG.info("Put Elapsed time: " + putTime);
0518:                //assertTrue(putTime < 8);
0519:
0520:                assertEquals(2000, cache1.getSize());
0521:
0522:                Thread.sleep(2000);
0523:                assertEquals(2000, manager2.getCache("sampleCache1").getSize());
0524:                assertEquals(2000, manager3.getCache("sampleCache1").getSize());
0525:                assertEquals(2000, manager4.getCache("sampleCache1").getSize());
0526:                assertEquals(2000, manager5.getCache("sampleCache1").getSize());
0527:
0528:                CountingCacheEventListener.resetCounters();
0529:
0530:            }
0531:
0532:            /**
0533:             * Performance and capacity tests.
0534:             * <p/>
0535:             */
0536:            public void testBootstrap() throws CacheException,
0537:                    InterruptedException, RemoteException {
0538:
0539:                if (JVMUtil.isSingleRMIRegistryPerVM()) {
0540:                    return;
0541:                }
0542:
0543:                //load up some data
0544:                StopWatch stopWatch = new StopWatch();
0545:                Integer index = null;
0546:                for (int i = 0; i < 2; i++) {
0547:                    for (int j = 0; j < 1000; j++) {
0548:                        index = new Integer(((1000 * i) + j));
0549:                        cache1
0550:                                .put(new Element(
0551:                                        index,
0552:                                        "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0553:                                                + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0554:                                                + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0555:                                                + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0556:                                                + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"));
0557:                    }
0558:
0559:                }
0560:                long elapsed = stopWatch.getElapsedTime();
0561:                long putTime = ((elapsed / 1000));
0562:                LOG.info("Put Elapsed time: " + putTime);
0563:
0564:                assertEquals(2000, cache1.getSize());
0565:
0566:                Thread.sleep(7000);
0567:                assertEquals(2000, manager2.getCache("sampleCache1").getSize());
0568:                assertEquals(2000, manager3.getCache("sampleCache1").getSize());
0569:                assertEquals(2000, manager4.getCache("sampleCache1").getSize());
0570:                assertEquals(2000, manager5.getCache("sampleCache1").getSize());
0571:
0572:                //now test bootstrap
0573:                manager1.addCache("bootStrapResults");
0574:                Cache cache = manager1.getCache("bootStrapResults");
0575:                List cachePeers = manager1.getCacheManagerPeerProvider()
0576:                        .listRemoteCachePeers(cache1);
0577:                CachePeer cachePeer = (CachePeer) cachePeers.get(0);
0578:
0579:                List keys = cachePeer.getKeys();
0580:                assertEquals(2000, keys.size());
0581:
0582:                Element firstElement = cachePeer.getQuiet((Serializable) keys
0583:                        .get(0));
0584:                long size = firstElement.getSerializedSize();
0585:                assertEquals(574, size);
0586:
0587:                int chunkSize = (int) (5000000 / size);
0588:
0589:                List requestChunk = new ArrayList();
0590:                for (int i = 0; i < keys.size(); i++) {
0591:                    Serializable serializable = (Serializable) keys.get(i);
0592:                    requestChunk.add(serializable);
0593:                    if (requestChunk.size() == chunkSize) {
0594:                        fetchAndPutElements(cache, requestChunk, cachePeer);
0595:                        requestChunk.clear();
0596:                    }
0597:                }
0598:                //get leftovers
0599:                fetchAndPutElements(cache, requestChunk, cachePeer);
0600:
0601:                assertEquals(keys.size(), cache.getSize());
0602:
0603:            }
0604:
0605:            private void fetchAndPutElements(Ehcache cache, List requestChunk,
0606:                    CachePeer cachePeer) throws RemoteException {
0607:                List receivedChunk = cachePeer.getElements(requestChunk);
0608:                for (int i = 0; i < receivedChunk.size(); i++) {
0609:                    Element element = (Element) receivedChunk.get(i);
0610:                    assertNotNull(element);
0611:                    cache.put(element, true);
0612:                }
0613:
0614:            }
0615:
0616:            /**
0617:             * Drive everything to point of breakage within a 64MB VM.
0618:             */
0619:            public void xTestHugePutsBreaksAsynchronous()
0620:                    throws CacheException, InterruptedException {
0621:
0622:                if (JVMUtil.isSingleRMIRegistryPerVM()) {
0623:                    return;
0624:                }
0625:
0626:                //Give everything a chance to startup
0627:                StopWatch stopWatch = new StopWatch();
0628:                Integer index = null;
0629:                for (int i = 0; i < 500; i++) {
0630:                    for (int j = 0; j < 1000; j++) {
0631:                        index = new Integer(((1000 * i) + j));
0632:                        cache1
0633:                                .put(new Element(
0634:                                        index,
0635:                                        "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0636:                                                + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0637:                                                + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0638:                                                + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0639:                                                + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"));
0640:                    }
0641:
0642:                }
0643:                long elapsed = stopWatch.getElapsedTime();
0644:                long putTime = ((elapsed / 1000));
0645:                LOG.info("Put Elapsed time: " + putTime);
0646:                //assertTrue(putTime < 8);
0647:
0648:                assertEquals(100000, cache1.getSize());
0649:
0650:                Thread.sleep(100000);
0651:                assertEquals(20000, manager2.getCache("sampleCache1").getSize());
0652:                assertEquals(20000, manager3.getCache("sampleCache1").getSize());
0653:                assertEquals(20000, manager4.getCache("sampleCache1").getSize());
0654:                assertEquals(20000, manager5.getCache("sampleCache1").getSize());
0655:
0656:            }
0657:
0658:            /**
0659:             * Performance and capacity tests.
0660:             * <p/>
0661:             * The numbers given are for the remote peer tester (java -jar ehcache-1.x-remote-debugger.jar ehcache-distributed1.xml)
0662:             * running on a 10Mbit ethernet network and are measured from the time the peer starts receiving to when
0663:             * it has fully received.
0664:             * <p/>
0665:             * 4 seconds to get all remove notifications with 6 peers, 5000 Elements and 400 byte payload
0666:             */
0667:            public void testBigRemovesProgagatesAsynchronous()
0668:                    throws CacheException, InterruptedException {
0669:
0670:                if (JVMUtil.isSingleRMIRegistryPerVM()) {
0671:                    return;
0672:                }
0673:
0674:                //Give everything a chance to startup
0675:                Integer index = null;
0676:                for (int i = 0; i < 5; i++) {
0677:                    for (int j = 0; j < 1000; j++) {
0678:                        index = new Integer(((1000 * i) + j));
0679:                        cache1
0680:                                .put(new Element(
0681:                                        index,
0682:                                        "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0683:                                                + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0684:                                                + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0685:                                                + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0686:                                                + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"));
0687:                    }
0688:
0689:                }
0690:                Thread.sleep(8000);
0691:                assertEquals(5000, cache1.getSize());
0692:                assertEquals(5000, manager2.getCache("sampleCache1").getSize());
0693:                assertEquals(5000, manager3.getCache("sampleCache1").getSize());
0694:                assertEquals(5000, manager4.getCache("sampleCache1").getSize());
0695:                assertEquals(5000, manager5.getCache("sampleCache1").getSize());
0696:
0697:                //Let the disk stores catch up before the next stage of the test
0698:                Thread.sleep(2000);
0699:
0700:                StopWatch stopWatch = new StopWatch();
0701:
0702:                for (int i = 0; i < 5; i++) {
0703:                    for (int j = 0; j < 1000; j++) {
0704:                        index = new Integer(((1000 * i) + j));
0705:                        cache1.remove(index);
0706:                    }
0707:                }
0708:
0709:                int timeForPropagate = 10000;
0710:
0711:                Thread.sleep(timeForPropagate);
0712:                assertEquals(0, cache1.getSize());
0713:                assertEquals(0, manager2.getCache("sampleCache1").getSize());
0714:                assertEquals(0, manager3.getCache("sampleCache1").getSize());
0715:                assertEquals(0, manager4.getCache("sampleCache1").getSize());
0716:                assertEquals(0, manager5.getCache("sampleCache1").getSize());
0717:
0718:                LOG.info("Remove Elapsed time: " + timeForPropagate);
0719:
0720:            }
0721:
0722:            /**
0723:             * Performance and capacity tests.
0724:             * <p/>
0725:             * 5 seconds to send all notifications synchronously with 5 peers, 2000 Elements and 400 byte payload
0726:             * The numbers given below are for the remote peer tester (java -jar ehcache-1.x-remote-debugger.jar ehcache-distributed1.xml)
0727:             * running on a 10Mbit ethernet network and are measured from the time the peer starts receiving to when
0728:             * it has fully received.
0729:             */
0730:            public void testBigPutsProgagatesSynchronous()
0731:                    throws CacheException, InterruptedException {
0732:
0733:                if (JVMUtil.isSingleRMIRegistryPerVM()) {
0734:                    return;
0735:                }
0736:
0737:                //Give everything a chance to startup
0738:                StopWatch stopWatch = new StopWatch();
0739:                Integer index;
0740:                for (int i = 0; i < 2; i++) {
0741:                    for (int j = 0; j < 1000; j++) {
0742:                        index = new Integer(((1000 * i) + j));
0743:                        manager1
0744:                                .getCache("sampleCache3")
0745:                                .put(
0746:                                        new Element(
0747:                                                index,
0748:                                                "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0749:                                                        + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0750:                                                        + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0751:                                                        + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
0752:                                                        + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"));
0753:                    }
0754:
0755:                }
0756:                long elapsed = stopWatch.getElapsedTime();
0757:                long putTime = ((elapsed / 1000));
0758:                LOG.info("Put and Propagate Synchronously Elapsed time: "
0759:                        + putTime + " seconds");
0760:
0761:                assertEquals(2000, manager1.getCache("sampleCache3").getSize());
0762:                assertEquals(2000, manager2.getCache("sampleCache3").getSize());
0763:                assertEquals(2000, manager3.getCache("sampleCache3").getSize());
0764:                assertEquals(2000, manager4.getCache("sampleCache3").getSize());
0765:                assertEquals(2000, manager5.getCache("sampleCache3").getSize());
0766:
0767:            }
0768:
0769:            /**
0770:             * manager1 adds a replicating cache, then manager2 and so on. Then we remove one. Does everything work as expected?
0771:             */
0772:            public void testPutWithNewCacheAddedProgressively()
0773:                    throws InterruptedException {
0774:
0775:                manager1.addCache("progressiveAddCache");
0776:                manager2.addCache("progressiveAddCache");
0777:
0778:                //The cluster will not have formed yet, so it will fail
0779:                try {
0780:                    putTest(manager1.getCache("progressiveAddCache"), manager2
0781:                            .getCache("progressiveAddCache"), ASYNCHRONOUS);
0782:                    fail();
0783:                } catch (AssertionFailedError e) {
0784:                    //expected
0785:                }
0786:
0787:                //The cluster will now have formed yet, so it will succeed
0788:                putTest(manager1.getCache("progressiveAddCache"), manager2
0789:                        .getCache("progressiveAddCache"), ASYNCHRONOUS);
0790:
0791:                Cache secondCache = manager2.getCache("progressiveAddCache");
0792:
0793:                //The second peer disappears. The test will fail.
0794:                manager2.removeCache("progressiveAddCache");
0795:                try {
0796:                    putTest(manager1.getCache("progressiveAddCache"),
0797:                            secondCache, ASYNCHRONOUS);
0798:                    fail();
0799:                } catch (IllegalStateException e) {
0800:                    //The second cache will not alive. Expected. But no other exception is caught and this will otherwise fail.
0801:
0802:                }
0803:
0804:            }
0805:
0806:            /**
0807:             * Test various cache configurations for cache1 - explicit setting of:
0808:             * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
0809:             */
0810:            public void testPutWithExplicitReplicationConfig()
0811:                    throws InterruptedException {
0812:                if (JVMUtil.isSingleRMIRegistryPerVM()) {
0813:                    return;
0814:                }
0815:                putTest(manager1.getCache("sampleCache1"), manager2
0816:                        .getCache("sampleCache1"), ASYNCHRONOUS);
0817:            }
0818:
0819:            /**
0820:             * Test various cache configurations for cache1 - explicit setting of:
0821:             * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
0822:             */
0823:            public void testPutWithThreadKiller() throws InterruptedException {
0824:                if (JVMUtil.isSingleRMIRegistryPerVM()) {
0825:                    return;
0826:                }
0827:                putTestWithThreadKiller(manager1.getCache("sampleCache1"),
0828:                        manager2.getCache("sampleCache1"), ASYNCHRONOUS);
0829:            }
0830:
0831:            /**
0832:             * CacheEventListeners that are not CacheReplicators should receive cache events originated from receipt
0833:             * of a remote event by a CachePeer.
0834:             */
0835:            public void testRemotelyReceivedPutNotifiesCountingListener()
0836:                    throws InterruptedException {
0837:                if (JVMUtil.isSingleRMIRegistryPerVM()) {
0838:                    return;
0839:                }
0840:                putTest(manager1.getCache("sampleCache1"), manager2
0841:                        .getCache("sampleCache1"), ASYNCHRONOUS);
0842:                assertEquals(1, CountingCacheEventListener.getCacheElementsPut(
0843:                        manager1.getCache("sampleCache1")).size());
0844:                assertEquals(1, CountingCacheEventListener.getCacheElementsPut(
0845:                        manager2.getCache("sampleCache1")).size());
0846:
0847:            }
0848:
0849:            /**
0850:             * Test various cache configurations for cache1 - explicit setting of:
0851:             * properties="replicateAsynchronously=false, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
0852:             */
0853:            public void testPutWithExplicitReplicationSynchronousConfig()
0854:                    throws InterruptedException {
0855:                if (JVMUtil.isSingleRMIRegistryPerVM()) {
0856:                    return;
0857:                }
0858:                putTest(manager1.getCache("sampleCache3"), manager2
0859:                        .getCache("sampleCache3"), SYNCHRONOUS);
0860:            }
0861:
0862:            /**
0863:             * Test put replicated for cache4 - no properties.
0864:             * Defaults should be replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
0865:             */
0866:            public void testPutWithEmptyReplicationPropertiesConfig()
0867:                    throws InterruptedException {
0868:                if (JVMUtil.isSingleRMIRegistryPerVM()) {
0869:                    return;
0870:                }
0871:                putTest(manager1.getCache("sampleCache4"), manager2
0872:                        .getCache("sampleCache4"), ASYNCHRONOUS);
0873:            }
0874:
0875:            /**
0876:             * Test put replicated for cache4 - missing replicatePuts property.
0877:             * replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
0878:             * should equal replicateAsynchronously=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
0879:             */
0880:            public void testPutWithOneMissingReplicationPropertyConfig()
0881:                    throws InterruptedException {
0882:                if (JVMUtil.isSingleRMIRegistryPerVM()) {
0883:                    return;
0884:                }
0885:                putTest(manager1.getCache("sampleCache5"), manager2
0886:                        .getCache("sampleCache5"), ASYNCHRONOUS);
0887:            }
0888:
0889:            /**
0890:             * Tests put and remove initiated from cache1 in a cluster
0891:             * <p/>
0892:             * This test goes into an infinite loop if the chain of notifications is not somehow broken.
0893:             */
0894:            public void putTest(Ehcache fromCache, Ehcache toCache,
0895:                    boolean asynchronous) throws CacheException,
0896:                    InterruptedException {
0897:
0898:                Serializable key = new Date();
0899:                Serializable value = new Date();
0900:                Element sourceElement = new Element(key, value);
0901:
0902:                //Put
0903:                fromCache.put(sourceElement);
0904:                int i = 0;
0905:
0906:                if (asynchronous) {
0907:                    waitForProgagate();
0908:                }
0909:
0910:                //Should have been replicated to toCache.
0911:                Element deliveredElement = toCache.get(key);
0912:                assertEquals(sourceElement, deliveredElement);
0913:
0914:            }
0915:
0916:            /**
0917:             * Tests put and remove initiated from cache1 in a cluster
0918:             * <p/>
0919:             * This test goes into an infinite loop if the chain of notifications is not somehow broken.
0920:             */
0921:            public void putTestWithThreadKiller(Ehcache fromCache,
0922:                    Ehcache toCache, boolean asynchronous)
0923:                    throws CacheException, InterruptedException {
0924:
0925:                fromCache.put(new Element("thread killer", new ThreadKiller()));
0926:                if (asynchronous) {
0927:                    waitForProgagate();
0928:                }
0929:
0930:                Serializable key = new Date();
0931:                Serializable value = new Date();
0932:                Element sourceElement = new Element(key, value);
0933:
0934:                //Put
0935:                fromCache.put(sourceElement);
0936:
0937:                if (asynchronous) {
0938:                    waitForProgagate();
0939:                }
0940:
0941:                //Should have been replicated to toCache.
0942:                Element deliveredElement = toCache.get(key);
0943:                assertEquals(sourceElement, deliveredElement);
0944:
0945:            }
0946:
0947:            /**
0948:             * Checks that a put received from a remote cache notifies any registered listeners.
0949:             * <p/>
0950:             * This test goes into an infinite loop if the chain of notifications is not somehow broken.
0951:             */
0952:            public void testRemotePutNotificationGetsToOtherListeners()
0953:                    throws CacheException, InterruptedException {
0954:
0955:                if (JVMUtil.isSingleRMIRegistryPerVM()) {
0956:                    return;
0957:                }
0958:
0959:                Serializable key = new Date();
0960:                Serializable value = new Date();
0961:                Element element1 = new Element(key, value);
0962:
0963:                //Put
0964:                cache1.put(new Element("1", new Date()));
0965:                cache1.put(new Element("2", new Date()));
0966:                cache1.put(new Element("3", new Date()));
0967:
0968:                //Nonserializable and non deliverable put
0969:                Object nonSerializableObject = new Object();
0970:                cache1.put(new Element(nonSerializableObject, new Object()));
0971:
0972:                waitForProgagate();
0973:
0974:                //local initiating cache's counting listener should have been notified
0975:                assertEquals(4, CountingCacheEventListener.getCacheElementsPut(
0976:                        cache1).size());
0977:                //remote receiving caches' counting listener should have been notified
0978:                assertEquals(3, CountingCacheEventListener.getCacheElementsPut(
0979:                        cache2).size());
0980:
0981:                //Update
0982:                cache1.put(new Element("1", new Date()));
0983:                cache1.put(new Element("2", new Date()));
0984:                cache1.put(new Element("3", new Date()));
0985:
0986:                //Nonserializable and non deliverable put
0987:                cache1.put(new Element(nonSerializableObject, new Object()));
0988:
0989:                waitForProgagate();
0990:
0991:                //local initiating cache's counting listener should have been notified
0992:                assertEquals(4, CountingCacheEventListener
0993:                        .getCacheElementsUpdated(cache1).size());
0994:                //remote receiving caches' counting listener should have been notified
0995:                assertEquals(3, CountingCacheEventListener
0996:                        .getCacheElementsUpdated(cache2).size());
0997:
0998:                //Remove
0999:                cache1.remove("1");
1000:                cache1.remove("2");
1001:                cache1.remove("3");
1002:                cache1.remove(nonSerializableObject);
1003:
1004:                waitForProgagate();
1005:
1006:                //local initiating cache's counting listener should have been notified
1007:                assertEquals(4, CountingCacheEventListener
1008:                        .getCacheElementsRemoved(cache1).size());
1009:                //remote receiving caches' counting listener should have been notified
1010:                assertEquals(3, CountingCacheEventListener
1011:                        .getCacheElementsRemoved(cache2).size());
1012:
1013:            }
1014:
1015:            /**
1016:             * Test various cache configurations for cache1 - explicit setting of:
1017:             * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
1018:             */
1019:            public void testRemoveWithExplicitReplicationConfig()
1020:                    throws InterruptedException {
1021:                if (JVMUtil.isSingleRMIRegistryPerVM()) {
1022:                    return;
1023:                }
1024:                removeTest(manager1.getCache("sampleCache1"), manager2
1025:                        .getCache("sampleCache1"), ASYNCHRONOUS);
1026:            }
1027:
1028:            /**
1029:             * Test various cache configurations for cache1 - explicit setting of:
1030:             * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
1031:             */
1032:            public void testRemoveWithExplicitReplicationSynchronousConfig()
1033:                    throws InterruptedException {
1034:                if (JVMUtil.isSingleRMIRegistryPerVM()) {
1035:                    return;
1036:                }
1037:                removeTest(manager1.getCache("sampleCache3"), manager2
1038:                        .getCache("sampleCache3"), SYNCHRONOUS);
1039:            }
1040:
1041:            /**
1042:             * Test put replicated for cache4 - no properties.
1043:             * Defaults should be replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
1044:             */
1045:            public void testRemoveWithEmptyReplicationPropertiesConfig()
1046:                    throws InterruptedException {
1047:                if (JVMUtil.isSingleRMIRegistryPerVM()) {
1048:                    return;
1049:                }
1050:                removeTest(manager1.getCache("sampleCache4"), manager2
1051:                        .getCache("sampleCache4"), ASYNCHRONOUS);
1052:            }
1053:
1054:            /**
1055:             * Tests put and remove initiated from a cache to another cache in a cluster
1056:             * <p/>
1057:             * This test goes into an infinite loop if the chain of notifications is not somehow broken.
1058:             */
1059:            public void removeTest(Ehcache fromCache, Ehcache toCache,
1060:                    boolean asynchronous) throws CacheException,
1061:                    InterruptedException {
1062:
1063:                Serializable key = new Date();
1064:                Serializable value = new Date();
1065:                Element element1 = new Element(key, value);
1066:
1067:                //Put
1068:                fromCache.put(element1);
1069:
1070:                if (asynchronous) {
1071:                    waitForProgagate();
1072:                }
1073:
1074:                //Should have been replicated to cache2.
1075:                Element element2 = toCache.get(key);
1076:                assertEquals(element1, element2);
1077:
1078:                //Remove
1079:                fromCache.remove(key);
1080:                if (asynchronous) {
1081:                    waitForProgagate();
1082:                }
1083:
1084:                //Should have been replicated to cache2.
1085:                element2 = toCache.get(key);
1086:                assertNull(element2);
1087:
1088:            }
1089:
1090:            /**
1091:             * test removeAll sync
1092:             */
1093:            public void testRemoveAllAsynchronous() throws Exception {
1094:                if (JVMUtil.isSingleRMIRegistryPerVM()) {
1095:                    return;
1096:                }
1097:                removeAllTest(manager1.getCache("sampleCache1"), manager2
1098:                        .getCache("sampleCache1"), ASYNCHRONOUS);
1099:            }
1100:
1101:            /**
1102:             * test removeAll async
1103:             */
1104:            public void testRemoveAllSynchronous() throws Exception {
1105:                if (JVMUtil.isSingleRMIRegistryPerVM()) {
1106:                    return;
1107:                }
1108:                removeAllTest(manager1.getCache("sampleCache3"), manager2
1109:                        .getCache("sampleCache3"), SYNCHRONOUS);
1110:            }
1111:
1112:            /**
1113:             * Tests removeAll initiated from a cache to another cache in a cluster
1114:             * <p/>
1115:             * This test goes into an infinite loop if the chain of notifications is not somehow broken.
1116:             */
1117:            public void removeAllTest(Ehcache fromCache, Ehcache toCache,
1118:                    boolean asynchronous) throws Exception {
1119:
1120:                //removeAll is distributed. Stop it colliding with the rest of the test
1121:                waitForProgagate();
1122:
1123:                Serializable key = new Date();
1124:                Serializable value = new Date();
1125:                Element element1 = new Element(key, value);
1126:
1127:                //Put
1128:                fromCache.put(element1);
1129:
1130:                if (asynchronous) {
1131:                    waitForProgagate();
1132:                }
1133:
1134:                //Should have been replicated to cache2.
1135:                Element element2 = toCache.get(key);
1136:                assertEquals(element1, element2);
1137:
1138:                //Remove
1139:                fromCache.removeAll();
1140:                if (asynchronous) {
1141:                    waitForProgagate();
1142:                }
1143:
1144:                //Should have been replicated to cache2.
1145:                element2 = toCache.get(key);
1146:                assertNull(element2);
1147:                assertEquals(0, toCache.getSize());
1148:
1149:            }
1150:
1151:            /**
1152:             * Test various cache configurations for cache1 - explicit setting of:
1153:             * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
1154:             */
1155:            public void testUpdateWithExplicitReplicationConfig()
1156:                    throws Exception {
1157:                if (JVMUtil.isSingleRMIRegistryPerVM()) {
1158:                    return;
1159:                }
1160:                updateViaCopyTest(manager1.getCache("sampleCache1"), manager2
1161:                        .getCache("sampleCache1"), ASYNCHRONOUS);
1162:            }
1163:
1164:            /**
1165:             * Test various cache configurations for cache1 - explicit setting of:
1166:             * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
1167:             */
1168:            public void testUpdateWithExplicitReplicationSynchronousConfig()
1169:                    throws Exception {
1170:                if (JVMUtil.isSingleRMIRegistryPerVM()) {
1171:                    return;
1172:                }
1173:                updateViaCopyTest(manager1.getCache("sampleCache3"), manager2
1174:                        .getCache("sampleCache3"), SYNCHRONOUS);
1175:            }
1176:
1177:            /**
1178:             * Test put replicated for cache4 - no properties.
1179:             * Defaults should be replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
1180:             */
1181:            public void testUpdateWithEmptyReplicationPropertiesConfig()
1182:                    throws Exception {
1183:                if (JVMUtil.isSingleRMIRegistryPerVM()) {
1184:                    return;
1185:                }
1186:                updateViaCopyTest(manager1.getCache("sampleCache4"), manager2
1187:                        .getCache("sampleCache4"), ASYNCHRONOUS);
1188:            }
1189:
1190:            /**
1191:             * Tests put and update through copy initiated from cache1 in a cluster
1192:             * <p/>
1193:             * This test goes into an infinite loop if the chain of notifications is not somehow broken.
1194:             */
1195:            public void updateViaCopyTest(Ehcache fromCache, Ehcache toCache,
1196:                    boolean asynchronous) throws Exception {
1197:
1198:                fromCache.removeAll();
1199:                toCache.removeAll();
1200:
1201:                //removeAll is distributed. Stop it colliding with the rest of the test
1202:                waitForProgagate();
1203:
1204:                Serializable key = new Date();
1205:                Serializable value = new Date();
1206:                Element element1 = new Element(key, value);
1207:
1208:                //Put
1209:                fromCache.put(element1);
1210:                if (asynchronous) {
1211:                    waitForProgagate();
1212:                }
1213:
1214:                //Should have been replicated to cache2.
1215:                Element element2 = toCache.get(key);
1216:                assertEquals(element1, element2);
1217:
1218:                //Update
1219:                Element updatedElement1 = new Element(key, new Date());
1220:
1221:                fromCache.put(updatedElement1);
1222:                if (asynchronous) {
1223:                    waitForProgagate();
1224:                }
1225:
1226:                //Should have been replicated to cache2.
1227:                Element receivedUpdatedElement2 = toCache.get(key);
1228:                assertEquals(updatedElement1, receivedUpdatedElement2);
1229:
1230:            }
1231:
1232:            /**
1233:             * Tests put and update through invalidation initiated from cache1 in a cluster
1234:             * <p/>
1235:             * This test goes into an infinite loop if the chain of notifications is not somehow broken.
1236:             */
1237:            public void testUpdateViaInvalidate() throws CacheException,
1238:                    InterruptedException, IOException {
1239:
1240:                if (JVMUtil.isSingleRMIRegistryPerVM()) {
1241:                    return;
1242:                }
1243:
1244:                cache1 = manager1.getCache("sampleCache2");
1245:                cache1.removeAll();
1246:
1247:                cache2 = manager2.getCache("sampleCache2");
1248:                cache2.removeAll();
1249:
1250:                //removeAll is distributed. Stop it colliding with the rest of the test
1251:                waitForProgagate();
1252:
1253:                String key = "1";
1254:                Serializable value = new Date();
1255:                Element element1 = new Element(key, value);
1256:
1257:                //Put
1258:                cache1.put(element1);
1259:                waitForProgagate();
1260:
1261:                //Should have been replicated to cache2.
1262:                Element element2 = cache2.get(key);
1263:                assertEquals(element1, element2);
1264:
1265:                //Update
1266:                cache1.put(element1);
1267:                waitForProgagate();
1268:                waitForProgagate();
1269:                waitForProgagate();
1270:
1271:                //Should have been removed in cache2.
1272:                element2 = cache2.get(key);
1273:                assertNull(element2);
1274:
1275:            }
1276:
1277:            /**
1278:             * What happens when two cache instances replicate to each other and a change is initiated
1279:             */
1280:            public void testInfiniteNotificationsLoop()
1281:                    throws InterruptedException {
1282:
1283:                if (JVMUtil.isSingleRMIRegistryPerVM()) {
1284:                    return;
1285:                }
1286:
1287:                Serializable key = "1";
1288:                Serializable value = new Date();
1289:                Element element = new Element(key, value);
1290:
1291:                //Put
1292:                cache1.put(element);
1293:                waitForProgagate();
1294:
1295:                //Should have been replicated to cache2.
1296:                Element element2 = cache2.get(key);
1297:                assertEquals(element, element2);
1298:
1299:                //Remove
1300:                cache1.remove(key);
1301:                assertNull(cache1.get(key));
1302:
1303:                //Should have been replicated to cache2.
1304:                waitForProgagate();
1305:                element2 = cache2.get(key);
1306:                assertNull(element2);
1307:
1308:                //Put into 2
1309:                Element element3 = new Element("3", "ddsfds");
1310:                cache2.put(element3);
1311:                waitForProgagate();
1312:                Element element4 = cache2.get("3");
1313:                assertEquals(element3, element4);
1314:
1315:            }
1316:
1317:            /**
1318:             * Shows result of perf problem and fix in flushReplicationQueue
1319:             * <p/>
1320:             * Behaviour before change:
1321:             * <p/>
1322:             * INFO: Items written: 10381
1323:             * Oct 29, 2007 11:40:04 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1324:             * INFO: Items written: 29712
1325:             * Oct 29, 2007 11:40:57 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1326:             * INFO: Items written: 1
1327:             * Oct 29, 2007 11:40:58 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1328:             * INFO: Items written: 32354
1329:             * Oct 29, 2007 11:42:34 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1330:             * INFO: Items written: 322
1331:             * Oct 29, 2007 11:42:35 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1332:             * INFO: Items written: 41909
1333:             * <p/>
1334:             * Behaviour after change:
1335:             * INFO: Items written: 26356
1336:             * Oct 29, 2007 11:44:39 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1337:             * INFO: Items written: 33656
1338:             * Oct 29, 2007 11:44:40 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1339:             * INFO: Items written: 32234
1340:             * Oct 29, 2007 11:44:42 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1341:             * INFO: Items written: 38677
1342:             * Oct 29, 2007 11:44:43 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1343:             * INFO: Items written: 43418
1344:             * Oct 29, 2007 11:44:44 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1345:             * INFO: Items written: 31277
1346:             * Oct 29, 2007 11:44:45 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1347:             * INFO: Items written: 27769
1348:             * Oct 29, 2007 11:44:46 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1349:             * INFO: Items written: 29596
1350:             * Oct 29, 2007 11:44:47 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1351:             * INFO: Items written: 17142
1352:             * Oct 29, 2007 11:44:48 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1353:             * INFO: Items written: 14775
1354:             * Oct 29, 2007 11:44:49 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1355:             * INFO: Items written: 4088
1356:             * Oct 29, 2007 11:44:51 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1357:             * INFO: Items written: 5492
1358:             * Oct 29, 2007 11:44:52 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1359:             * INFO: Items written: 10188
1360:             *
1361:             * Also no pauses noted.
1362:             */
1363:            public void testReplicatePerf() throws InterruptedException {
1364:
1365:                if (manager2 != null) {
1366:                    manager2.shutdown();
1367:                }
1368:                if (manager3 != null) {
1369:                    manager3.shutdown();
1370:                }
1371:                if (manager4 != null) {
1372:                    manager4.shutdown();
1373:                }
1374:                if (manager5 != null) {
1375:                    manager5.shutdown();
1376:                }
1377:                if (manager6 != null) {
1378:                    manager6.shutdown();
1379:                }
1380:
1381:                //wait for cluster to drop back to just one: manager1
1382:                waitForProgagate();
1383:                waitForProgagate();
1384:
1385:                long start = System.currentTimeMillis();
1386:                final String keyBase = Long.toString(start);
1387:                int count = 0;
1388:
1389:                for (int i = 0; i < 100000; i++) {
1390:                    final String key = keyBase + ':'
1391:                            + Integer.toString((int) (Math.random() * 1000.0));
1392:                    cache1.put(new Element(key, "My Test"));
1393:                    cache1.get(key);
1394:                    cache1.remove(key);
1395:                    count++;
1396:
1397:                    final long end = System.currentTimeMillis();
1398:                    if (end - start >= 1000) {
1399:                        start = end;
1400:                        LOG.info("Items written: " + count);
1401:                        //make sure it does not choke
1402:                        assertTrue(count > 1000);
1403:                        count = 0;
1404:                    }
1405:                }
1406:            }
1407:
1408:            /**
1409:             * Need to wait for async
1410:             *
1411:             * @throws InterruptedException
1412:             */
1413:            protected void waitForProgagate() throws InterruptedException {
1414:                Thread.sleep(2000);
1415:            }
1416:
1417:            /**
1418:             * Need to wait for async
1419:             *
1420:             * @throws InterruptedException
1421:             */
1422:            protected void waitForSlowProgagate() throws InterruptedException {
1423:                Thread.sleep(6000);
1424:            }
1425:
1426:            /**
1427:             * Distributed operations create extra scope for deadlock.
1428:             * This test checks whether a distributed deadlock scenario exists for synchronous replication
1429:             * of each distributed operation all at once.
1430:             * It shows that no distributed deadlock exists for asynchronous replication. It is multi thread
1431:             * and multi process safe.
1432:             * <p/>
1433:             * Carefully tailored to exercise:
1434:             * <ol>
1435:             * <li>overflow to disk. We put in 20 things and the memory size is 10
1436:             * <li>each peer is working on the same set of keys thus maximising contention
1437:             * <li>we do puts, gets and removes to explore all the execution paths
1438:             * </ol>
1439:             * If a deadlock occurs, processing will stop until a SocketTimeout exception is thrown and
1440:             * the deadlock will be released.
1441:             */
1442:            public void testCacheOperationsSynchronousMultiThreaded()
1443:                    throws Exception, InterruptedException {
1444:
1445:                if (JVMUtil.isSingleRMIRegistryPerVM()) {
1446:                    return;
1447:                }
1448:
1449:                // Run a set of threads, that attempt to fetch the elements
1450:                final List executables = new ArrayList();
1451:
1452:                executables
1453:                        .add(new ClusterExecutable(manager1, "sampleCache3"));
1454:                executables
1455:                        .add(new ClusterExecutable(manager2, "sampleCache3"));
1456:                executables
1457:                        .add(new ClusterExecutable(manager3, "sampleCache3"));
1458:
1459:                runThreads(executables);
1460:            }
1461:
1462:            /**
1463:             * Distributed operations create extra scope for deadlock.
1464:             * This test checks whether a distributed deadlock scenario exists for asynchronous replication
1465:             * of each distributed operation all at once.
1466:             * It shows that no distributed deadlock exists for asynchronous replication. It is multi thread
1467:             * and multi process safe.
1468:             * It uses sampleCache2, which is configured to be asynchronous
1469:             * <p/>
1470:             * Carefully tailored to exercise:
1471:             * <ol>
1472:             * <li>overflow to disk. We put in 20 things and the memory size is 10
1473:             * <li>each peer is working on the same set of keys thus maximising contention
1474:             * <li>we do puts, gets and removes to explore all the execution paths
1475:             * </ol>
1476:             */
1477:            public void testCacheOperationsAynchronousMultiThreaded()
1478:                    throws Exception, InterruptedException {
1479:
1480:                if (JVMUtil.isSingleRMIRegistryPerVM()) {
1481:                    return;
1482:                }
1483:
1484:                // Run a set of threads, that attempt to fetch the elements
1485:                final List executables = new ArrayList();
1486:
1487:                executables
1488:                        .add(new ClusterExecutable(manager1, "sampleCache2"));
1489:                executables
1490:                        .add(new ClusterExecutable(manager2, "sampleCache2"));
1491:                executables
1492:                        .add(new ClusterExecutable(manager3, "sampleCache2"));
1493:
1494:                runThreads(executables);
1495:            }
1496:
1497:            /**
1498:             * An Exececutable which allows the CacheManager to be set
1499:             */
1500:            class ClusterExecutable implements  Executable {
1501:
1502:                private CacheManager manager;
1503:                private String cacheName;
1504:
1505:                /**
1506:                 * Construct with CacheManager
1507:                 *
1508:                 * @param manager
1509:                 */
1510:                public ClusterExecutable(CacheManager manager, String cacheName) {
1511:                    this .manager = manager;
1512:                    this .cacheName = cacheName;
1513:                }
1514:
1515:                /**
1516:                 * Execute
1517:                 *
1518:                 * @throws Exception
1519:                 */
1520:                public void execute() throws Exception {
1521:                    Random random = new Random();
1522:
1523:                    for (int i = 0; i < 20; i++) {
1524:                        Integer key = new Integer((i));
1525:                        int operationSelector = random.nextInt(4);
1526:                        Cache cache = manager.getCache(cacheName);
1527:                        if (operationSelector == 100) {
1528:                            cache.get(key);
1529:                            if (LOG.isDebugEnabled()) {
1530:                                LOG.debug(cache.getGuid() + ": get " + key);
1531:                            }
1532:                        } else if (operationSelector == 100) {
1533:                            cache.remove(key);
1534:                            if (LOG.isDebugEnabled()) {
1535:                                LOG.debug(cache.getGuid() + ": remove " + key);
1536:                            }
1537:                        } else if (operationSelector == 2) {
1538:                            cache
1539:                                    .put(new Element(
1540:                                            key,
1541:                                            "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
1542:                                                    + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
1543:                                                    + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
1544:                                                    + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
1545:                                                    + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"));
1546:                            if (LOG.isDebugEnabled()) {
1547:                                LOG.debug(cache.getGuid() + ": put " + key);
1548:                            }
1549:                        } else {
1550:                            //every twelfth time 1/4 * 1/3 = 1/12
1551:                            if (random.nextInt(3) == 1) {
1552:                                LOG.debug("cache.removeAll()");
1553:                                cache.removeAll();
1554:                            }
1555:                        }
1556:                    }
1557:
1558:                }
1559:            }
1560:
1561:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.