Source Code Cross Referenced for ChannelTestBase.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » tests » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.tests 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        package org.jgroups.tests;
002:
003:        import java.io.InputStream;
004:        import java.io.OutputStream;
005:        import java.lang.management.ManagementFactory;
006:        import java.lang.management.ThreadInfo;
007:        import java.lang.management.ThreadMXBean;
008:        import java.lang.reflect.Constructor;
009:        import java.util.ArrayList;
010:        import java.util.Collection;
011:        import java.util.HashMap;
012:        import java.util.Iterator;
013:        import java.util.List;
014:        import java.util.Map;
015:        import java.util.Random;
016:        import java.util.StringTokenizer;
017:
018:        import junit.framework.TestCase;
019:        import junit.framework.TestSuite;
020:
021:        import org.apache.commons.logging.Log;
022:        import org.apache.commons.logging.LogFactory;
023:        import org.jgroups.Address;
024:        import org.jgroups.Channel;
025:        import org.jgroups.ChannelException;
026:        import org.jgroups.ExtendedReceiver;
027:        import org.jgroups.JChannel;
028:        import org.jgroups.JChannelFactory;
029:        import org.jgroups.Message;
030:        import org.jgroups.View;
031:        import org.jgroups.blocks.RpcDispatcher;
032:        import org.jgroups.mux.MuxChannel;
033:        import org.jgroups.util.Util;
034:
035:        import EDU.oswego.cs.dl.util.concurrent.Semaphore;
036:
037:        /**
038:         * 
039:         * @author Bela Ban
040:         * @author Vladimir Blagojevic
041:         * @author <a href="mailto://brian.stansberry@jboss.com">Brian Stansberry</a>
042:         * @version $Revision$
043:         */
044:        public class ChannelTestBase extends TestCase {
045:
046:            private static final String TEST_CASES = "tests";
047:            private static final String ANT_PROPERTY = "${tests}";
048:            private static final String DELIMITER = ",";
049:
050:            protected final static Random RANDOM = new Random();
051:
052:            private static final int LETTER_A = 64;
053:
054:            protected static String DEFAULT_MUX_FACTORY_COUNT = "4";
055:
056:            protected static String CHANNEL_CONFIG = "udp.xml";
057:
058:            protected static String MUX_CHANNEL_CONFIG = "stacks.xml";
059:
060:            protected static String MUX_CHANNEL_CONFIG_STACK_NAME = "udp";
061:
062:            protected int active_threads = 0;
063:
064:            protected JChannelFactory muxFactory[] = null;
065:
066:            protected String thread_dump = null;
067:
068:            protected int currentChannelGeneratedName = LETTER_A;
069:
070:            protected final Log log = LogFactory.getLog(this .getClass());
071:
072:            public ChannelTestBase() {
073:                super ();
074:            }
075:
076:            public ChannelTestBase(String name) {
077:                super (name);
078:            }
079:
080:            protected void setUp() throws Exception {
081:                super .setUp();
082:                MUX_CHANNEL_CONFIG = System.getProperty("mux.conf",
083:                        MUX_CHANNEL_CONFIG);
084:                MUX_CHANNEL_CONFIG_STACK_NAME = System.getProperty(
085:                        "mux.conf.stack", MUX_CHANNEL_CONFIG_STACK_NAME);
086:                CHANNEL_CONFIG = System.getProperty("channel.conf",
087:                        CHANNEL_CONFIG);
088:
089:                currentChannelGeneratedName = LETTER_A;
090:
091:                if (isMuxChannelUsed()) {
092:                    muxFactory = new JChannelFactory[getMuxFactoryCount()];
093:
094:                    for (int i = 0; i < muxFactory.length; i++) {
095:                        muxFactory[i] = new JChannelFactory();
096:                        muxFactory[i].setMultiplexerConfig(MUX_CHANNEL_CONFIG);
097:                    }
098:                }
099:
100:                if (shouldCompareThreadCount()) {
101:                    active_threads = Thread.activeCount();
102:                    thread_dump = "active threads before (" + active_threads
103:                            + "):\n" + Util.activeThreads();
104:                }
105:            }
106:
107:            protected void tearDown() throws Exception {
108:                super .tearDown();
109:
110:                if (isMuxChannelUsed()) {
111:                    for (int i = 0; i < muxFactory.length; i++) {
112:                        muxFactory[i].destroy();
113:                    }
114:                }
115:
116:                Util.sleep(500); // remove this in 2.5 !
117:
118:                if (shouldCompareThreadCount()) {
119:                    int current_active_threads = Thread.activeCount();
120:
121:                    String msg = "";
122:                    if (active_threads != current_active_threads) {
123:                        System.out.println(thread_dump);
124:                        System.out.println("active threads after ("
125:                                + current_active_threads + "):\n"
126:                                + Util.activeThreads());
127:                        msg = "active threads:\n" + dumpThreads();
128:                    }
129:                    assertEquals(msg, active_threads, current_active_threads);
130:                }
131:            }
132:
133:            /**
134:             * Returns an array of mux application/service names with a guarantee that: 
135:             * <p>
136:             * - there are no application/service name collissions on top of one channel 
137:             * (i.e cannot have two application/service(s) with the same name on top of one channel)
138:             * <p>
139:             * - each generated application/service name is guaranteed to have a corresponding 
140:             * pair application/service with the same name on another channel     
141:             * 
142:             * @param muxApplicationstPerChannelCount
143:             * @return
144:             */
145:            protected String[] createMuxApplicationNames(
146:                    int muxApplicationstPerChannelCount) {
147:                return createMuxApplicationNames(
148:                        muxApplicationstPerChannelCount, getMuxFactoryCount());
149:            }
150:
151:            /**
152:             * Returns an array of mux application/service names with a guarantee that: 
153:             * <p>
154:             * - there are no application/service name collissions on top of one channel 
155:             * (i.e cannot have two application/service(s) with the same name on top of one channel)
156:             * <p>
157:             * - each generated application/service name is guaranteed to have a corresponding 
158:             * pair application/service with the same name on another channel     
159:             * 
160:             * @param muxApplicationstPerChannelCount
161:             * @param muxFactoryCount how many mux factories should be used (has to be less than getMuxFactoryCount())
162:             * @return array of mux application id's represented as String objects
163:             */
164:            protected String[] createMuxApplicationNames(
165:                    int muxApplicationstPerChannelCount, int muxFactoryCount) {
166:                if (muxFactoryCount > getMuxFactoryCount()) {
167:                    throw new IllegalArgumentException(
168:                            "Parameter muxFactoryCount hs to be less than or equal to getMuxFactoryCount()");
169:                }
170:
171:                int startLetter = LETTER_A;
172:                String names[] = null;
173:                int totalMuxAppCount = muxFactoryCount
174:                        * muxApplicationstPerChannelCount;
175:                names = new String[totalMuxAppCount];
176:
177:                boolean pickNextLetter = false;
178:                for (int i = 0; i < totalMuxAppCount; i++) {
179:                    pickNextLetter = (i % muxFactoryCount == 0) ? true : false;
180:                    if (pickNextLetter) {
181:                        startLetter++;
182:                    }
183:                    names[i] = Character.toString((char) startLetter);
184:                }
185:                return names;
186:            }
187:
188:            /**
189:             * Returns channel name as String next in alphabetic sequence since getNextChannelName()
190:             * has been called last. Sequence is restarted to letter "A" after each setUp call.
191:             * 
192:             * @return
193:             */
194:            protected String getNextChannelName() {
195:                return Character.toString((char) ++currentChannelGeneratedName);
196:            }
197:
198:            protected String[] createApplicationNames(int applicationCount) {
199:                String names[] = new String[applicationCount];
200:                for (int i = 0; i < applicationCount; i++) {
201:                    names[i] = getNextChannelName();
202:                }
203:                return names;
204:            }
205:
206:            protected Channel createChannel(Object id) throws Exception {
207:                Channel c = null;
208:                if (isMuxChannelUsed()) {
209:                    for (int i = 0; i < muxFactory.length; i++) {
210:                        if (!muxFactory[i].hasMuxChannel(
211:                                MUX_CHANNEL_CONFIG_STACK_NAME, id.toString())) {
212:                            c = new DefaultMuxChannelTestFactory(muxFactory[i])
213:                                    .createChannel(id);
214:                            return c;
215:                        }
216:                    }
217:
218:                    throw new Exception(
219:                            "Cannot create mux channel with id "
220:                                    + id
221:                                    + " since all currently used channels have already registered service with that id");
222:                } else {
223:                    c = new DefaultChannelTestFactory().createChannel(id);
224:                }
225:                return c;
226:            }
227:
228:            protected Channel createChannel() throws Exception {
229:                return createChannel("A");
230:            }
231:
232:            /**
233:             * Default channel factory used in junit tests
234:             *
235:             */
236:            protected class DefaultChannelTestFactory implements 
237:                    ChannelTestFactory {
238:                public Channel createChannel(Object id) throws Exception {
239:                    return createChannel(CHANNEL_CONFIG, useBlocking());
240:                }
241:
242:                protected Channel createChannel(String configFile,
243:                        boolean useBlocking) throws Exception {
244:                    HashMap channelOptions = new HashMap();
245:                    channelOptions
246:                            .put(new Integer(Channel.BLOCK), Boolean.TRUE);
247:                    return createChannel(configFile, channelOptions);
248:                }
249:
250:                protected Channel createChannel(String configFile,
251:                        Map channelOptions) throws Exception {
252:                    Channel ch = null;
253:                    log.info("Using configuration file " + configFile);
254:                    ch = new JChannel(configFile);
255:                    for (Iterator iter = channelOptions.keySet().iterator(); iter
256:                            .hasNext();) {
257:                        Integer key = (Integer) iter.next();
258:                        Object value = channelOptions.get(key);
259:                        ch.setOpt(key.intValue(), value);
260:                    }
261:                    return ch;
262:                }
263:            }
264:
265:            /**
266:             * Default channel factory used in junit tests
267:             *
268:             */
269:            public class DefaultMuxChannelTestFactory implements 
270:                    ChannelTestFactory {
271:                JChannelFactory f = null;
272:
273:                public DefaultMuxChannelTestFactory(JChannelFactory f) {
274:                    this .f = f;
275:                }
276:
277:                public Channel createChannel(Object id) throws Exception {
278:                    Channel c = f.createMultiplexerChannel(
279:                            MUX_CHANNEL_CONFIG_STACK_NAME, id.toString());
280:                    if (useBlocking()) {
281:                        c.setOpt(Channel.BLOCK, Boolean.TRUE);
282:                    }
283:                    Address address = c.getLocalAddress();
284:                    String append = "[" + id + "]" + " using "
285:                            + MUX_CHANNEL_CONFIG + ",stack "
286:                            + MUX_CHANNEL_CONFIG_STACK_NAME;
287:                    if (address == null) {
288:                        log.info("Created unconnected mux channel " + append);
289:                    } else {
290:                        log.info("Created mux channel " + address + append);
291:                    }
292:                    return c;
293:                }
294:            }
295:
296:            public class NextAvailableMuxChannelTestFactory implements 
297:                    ChannelTestFactory {
298:                public Channel createChannel(Object id) throws Exception {
299:                    return ChannelTestBase.this .createChannel(id);
300:                }
301:            }
302:
303:            /**
304:             * Decouples channel creation for junit tests
305:             *
306:             */
307:            protected interface ChannelTestFactory {
308:                public Channel createChannel(Object id) throws Exception;
309:            }
310:
311:            /**
312:             * Base class for all aplications using channel
313:             *   
314:             *
315:             */
316:            protected abstract class ChannelApplication implements  Runnable,
317:                    MemberRetrievable {
318:                protected Channel channel;
319:
320:                protected Thread thread;
321:
322:                protected Throwable exception;
323:
324:                protected String name;
325:
326:                public ChannelApplication(String name, JChannelFactory f)
327:                        throws Exception {
328:                    if (f == null) {
329:                        createChannel(name, new DefaultChannelTestFactory());
330:                    } else {
331:                        createChannel(name, new DefaultMuxChannelTestFactory(f));
332:                    }
333:                }
334:
335:                /**
336:                 * Creates a unconnected channel and assigns a name to it.
337:                 * 
338:                 * @param name name of this channel
339:                 * @param factory factory to create Channel
340:                 * @throws ChannelException
341:                 */
342:                public ChannelApplication(String name,
343:                        ChannelTestFactory factory) throws Exception {
344:                    createChannel(name, factory);
345:                }
346:
347:                private void createChannel(String name,
348:                        ChannelTestFactory factory) throws Exception {
349:                    this .name = name;
350:                    channel = factory.createChannel(name);
351:                }
352:
353:                /**
354:                 * Method allowing implementation of specific test application level logic      
355:                 * @throws Exception
356:                 */
357:                protected abstract void useChannel() throws Exception;
358:
359:                public void run() {
360:                    try {
361:                        useChannel();
362:                    } catch (Exception e) {
363:                        log.error(name + ": " + e.getLocalizedMessage(), e);
364:
365:                        // Save it for the test to check
366:                        exception = e;
367:                    }
368:                }
369:
370:                public List getMembers() {
371:                    List result = null;
372:                    View v = channel.getView();
373:                    if (v != null) {
374:                        result = v.getMembers();
375:                    }
376:                    return result;
377:                }
378:
379:                public boolean isUsingMuxChannel() {
380:                    return channel instanceof  MuxChannel;
381:                }
382:
383:                public Address getLocalAddress() {
384:                    return channel.getLocalAddress();
385:                }
386:
387:                public void start() {
388:                    thread = new Thread(this , getName());
389:                    thread.start();
390:                    Address a = getLocalAddress();
391:                    boolean connected = a != null ? true : false;
392:                    if (connected) {
393:                        log.info("Thread for channel " + a + "[" + getName()
394:                                + "] started");
395:                    } else {
396:                        log.info("Thread for channel [" + getName()
397:                                + "] started");
398:                    }
399:                }
400:
401:                public void setChannel(Channel ch) {
402:                    this .channel = ch;
403:                }
404:
405:                public Channel getChannel() {
406:                    return channel;
407:                }
408:
409:                public String getName() {
410:                    return name;
411:                }
412:
413:                public void cleanup() {
414:                    if (thread != null && thread.isAlive()) {
415:                        thread.interrupt();
416:                    }
417:                    Address a = getLocalAddress();
418:                    boolean connected = a != null ? true : false;
419:                    if (connected) {
420:                        log
421:                                .info("Closing channel " + a + "[" + getName()
422:                                        + "]");
423:                    } else {
424:                        log.info("Closing channel [" + getName() + "]");
425:                    }
426:                    channel.close();
427:                }
428:            }
429:
430:            protected abstract class PushChannelApplication extends
431:                    ChannelApplication implements  ExtendedReceiver {
432:                RpcDispatcher dispatcher;
433:
434:                public PushChannelApplication(String name) throws Exception {
435:                    this (name, new DefaultChannelTestFactory(), false);
436:                }
437:
438:                public PushChannelApplication(String name, JChannelFactory f)
439:                        throws Exception {
440:                    this (name, new DefaultMuxChannelTestFactory(f), false);
441:                }
442:
443:                public PushChannelApplication(String name, boolean useDispatcher)
444:                        throws Exception {
445:                    this (name, new DefaultChannelTestFactory(), useDispatcher);
446:                }
447:
448:                public PushChannelApplication(String name,
449:                        ChannelTestFactory factory, boolean useDispatcher)
450:                        throws Exception {
451:                    super (name, factory);
452:                    if (useDispatcher) {
453:                        dispatcher = new RpcDispatcher(channel, this , this ,
454:                                this );
455:                    } else {
456:                        channel.setReceiver(this );
457:                    }
458:                }
459:
460:                public RpcDispatcher getDispatcher() {
461:                    return dispatcher;
462:                }
463:
464:                public boolean hasDispatcher() {
465:                    return dispatcher != null;
466:                }
467:
468:                public void block() {
469:                    log.debug("Channel " + getLocalAddress() + "[" + getName()
470:                            + "] in blocking");
471:                }
472:
473:                public byte[] getState() {
474:                    log.debug("Channel " + getLocalAddress() + "[" + getName()
475:                            + "] ");
476:                    return null;
477:                }
478:
479:                public void getState(OutputStream ostream) {
480:                    log.debug("Channel " + getLocalAddress() + "[" + getName()
481:                            + "]");
482:                }
483:
484:                public byte[] getState(String state_id) {
485:                    log.debug("Channel " + getLocalAddress() + "[" + getName()
486:                            + " state id =" + state_id);
487:                    return null;
488:                }
489:
490:                public void getState(String state_id, OutputStream ostream) {
491:                    log.debug("Channel " + getLocalAddress() + "[" + getName()
492:                            + "] state id =" + state_id);
493:                }
494:
495:                public void receive(Message msg) {
496:                }
497:
498:                public void setState(byte[] state) {
499:                    log.debug("Channel " + getLocalAddress() + "[" + getName()
500:                            + "] ");
501:                }
502:
503:                public void setState(InputStream istream) {
504:                    log.debug("Channel " + getLocalAddress() + "[" + getName()
505:                            + "]");
506:                }
507:
508:                public void setState(String state_id, byte[] state) {
509:                    log.debug("Channel " + getLocalAddress() + "[" + getName()
510:                            + "] state id =" + state_id + ", state size is "
511:                            + state.length);
512:                }
513:
514:                public void setState(String state_id, InputStream istream) {
515:                    log.debug("Channel " + getLocalAddress() + "[" + getName()
516:                            + "] state id " + state_id);
517:                }
518:
519:                public void suspect(Address suspected_mbr) {
520:                    log.debug("Channel " + getLocalAddress() + "[" + getName()
521:                            + "] suspecting " + suspected_mbr);
522:                }
523:
524:                public void unblock() {
525:                    log.debug("Channel " + getLocalAddress() + "[" + getName()
526:                            + "] unblocking");
527:                }
528:
529:                public void viewAccepted(View new_view) {
530:                    log.debug("Channel " + getLocalAddress() + "[" + getName()
531:                            + "] accepted view " + new_view);
532:                }
533:            }
534:
535:            /**
536:             * Channel with semaphore allows application to go through fine-grained synchronous step control.
537:             * <p> 
538:             * PushChannelApplicationWithSemaphore application will not proceed to useChannel() 
539:             * until it acquires permit from semphore. After useChannel() completes the acquired 
540:             * permit will be released.  Test driver should control how semaphore tickets are given 
541:             * and acquired.
542:             *
543:             */
544:            protected abstract class PushChannelApplicationWithSemaphore extends
545:                    PushChannelApplication {
546:                protected Semaphore semaphore;
547:
548:                public PushChannelApplicationWithSemaphore(String name,
549:                        ChannelTestFactory factory, Semaphore semaphore,
550:                        boolean useDispatcher) throws Exception {
551:                    super (name, factory, useDispatcher);
552:                    this .semaphore = semaphore;
553:                }
554:
555:                protected PushChannelApplicationWithSemaphore(String name,
556:                        Semaphore semaphore) throws Exception {
557:                    this (name, new DefaultChannelTestFactory(), semaphore,
558:                            false);
559:                }
560:
561:                protected PushChannelApplicationWithSemaphore(String name,
562:                        JChannelFactory f, Semaphore semaphore)
563:                        throws Exception {
564:                    this (name, new DefaultMuxChannelTestFactory(f), semaphore,
565:                            false);
566:                }
567:
568:                protected PushChannelApplicationWithSemaphore(String name,
569:                        Semaphore semaphore, boolean useDispatcher)
570:                        throws Exception {
571:                    this (name, new DefaultChannelTestFactory(), semaphore,
572:                            useDispatcher);
573:                }
574:
575:                public void run() {
576:                    boolean acquired = false;
577:                    try {
578:                        acquired = semaphore.attempt(60000);
579:                        if (!acquired) {
580:                            throw new Exception(name
581:                                    + " cannot acquire semaphore");
582:                        }
583:
584:                        useChannel();
585:                    } catch (Exception e) {
586:                        log.error(name + ": " + e.getLocalizedMessage(), e);
587:                        // Save it for the test to check
588:                        exception = e;
589:                    } finally {
590:                        if (acquired) {
591:                            semaphore.release();
592:                        }
593:                    }
594:                }
595:            }
596:
597:            protected interface MemberRetrievable {
598:                public List getMembers();
599:
600:                public Address getLocalAddress();
601:            }
602:
603:            /**
604:             * Returns true if JVM has been started with mux.on system property 
605:             * set to true, false otherwise. 
606:             * 
607:             * @return
608:             */
609:            protected boolean isMuxChannelUsed() {
610:                return Boolean.valueOf(System.getProperty("mux.on", "false"))
611:                        .booleanValue();
612:            }
613:
614:            /**
615:             * Returns true if JVM has been started with threadcount system property 
616:             * set to true, false otherwise. 
617:             * 
618:             * @return
619:             */
620:            protected boolean shouldCompareThreadCount() {
621:                return Boolean.valueOf(
622:                        System.getProperty("threadcount", "false"))
623:                        .booleanValue();
624:            }
625:
626:            /**
627:             * Returns value of mux.factorycount system property has been set, otherwise returns 
628:             * DEFAULT_MUX_FACTORY_COUNT. 
629:             * 
630:             * @return
631:             */
632:            protected int getMuxFactoryCount() {
633:                return Integer.parseInt(System.getProperty("mux.factorycount",
634:                        DEFAULT_MUX_FACTORY_COUNT));
635:            }
636:
637:            /**
638:             * Returns true if JVM has been started with useBlocking system property 
639:             * set to true, false otherwise. 
640:             * 
641:             * @return
642:             */
643:            protected boolean useBlocking() {
644:                return Boolean.valueOf(
645:                        System.getProperty("useBlocking", "false"))
646:                        .booleanValue();
647:            }
648:
649:            /**
650:             * Checks each channel in the parameter array to see if it has the 
651:             * exact same view as other channels in an array.    
652:             */
653:            public static boolean areViewsComplete(
654:                    MemberRetrievable[] channels, int memberCount) {
655:                for (int i = 0; i < memberCount; i++) {
656:                    if (!isViewComplete(channels[i], memberCount)) {
657:                        return false;
658:                    }
659:                }
660:
661:                return true;
662:            }
663:
664:            /**
665:             * Loops, continually calling {@link #areViewsComplete(MemberRetrievable[])}
666:             * until it either returns true or <code>timeout</code> ms have elapsed.
667:             *
668:             * @param channels  channels which must all have consistent views
669:             * @param timeout max number of ms to loop
670:             * @throws RuntimeException if <code>timeout</code> ms have elapse without
671:             *                          all channels having the same number of members.
672:             */
673:            public static void blockUntilViewsReceived(
674:                    MemberRetrievable[] channels, long timeout) {
675:                blockUntilViewsReceived(channels, channels.length, timeout);
676:            }
677:
678:            public static void blockUntilViewsReceived(Collection channels,
679:                    long timeout) {
680:                blockUntilViewsReceived(channels, channels.size(), timeout);
681:            }
682:
683:            /**
684:             * Loops, continually calling {@link #areViewsComplete(MemberRetrievable[])}
685:             * until it either returns true or <code>timeout</code> ms have elapsed.
686:             *
687:             * @param channels  channels which must all have consistent views
688:             * @param timeout max number of ms to loop
689:             * @throws RuntimeException if <code>timeout</code> ms have elapse without
690:             *                          all channels having the same number of members.
691:             */
692:            public static void blockUntilViewsReceived(
693:                    MemberRetrievable[] channels, int count, long timeout) {
694:                long failTime = System.currentTimeMillis() + timeout;
695:
696:                while (System.currentTimeMillis() < failTime) {
697:                    sleepThread(100);
698:                    if (areViewsComplete(channels, count)) {
699:                        return;
700:                    }
701:                }
702:
703:                throw new RuntimeException(
704:                        "timed out before caches had complete views");
705:            }
706:
707:            public static void blockUntilViewsReceived(Collection channels,
708:                    int count, long timeout) {
709:                long failTime = System.currentTimeMillis() + timeout;
710:
711:                while (System.currentTimeMillis() < failTime) {
712:                    sleepThread(100);
713:                    if (areViewsComplete((MemberRetrievable[]) channels
714:                            .toArray(new MemberRetrievable[channels.size()]),
715:                            count)) {
716:                        return;
717:                    }
718:                }
719:
720:                throw new RuntimeException(
721:                        "timed out before caches had complete views");
722:            }
723:
724:            public static boolean isViewComplete(MemberRetrievable channel,
725:                    int memberCount) {
726:
727:                List members = channel.getMembers();
728:                if (members == null || memberCount > members.size()) {
729:                    return false;
730:                } else if (memberCount < members.size()) {
731:                    // This is an exceptional condition
732:                    StringBuffer sb = new StringBuffer("Channel at address ");
733:                    sb.append(channel.getLocalAddress());
734:                    sb.append(" had ");
735:                    sb.append(members.size());
736:                    sb.append(" members; expecting ");
737:                    sb.append(memberCount);
738:                    sb.append(". Members were (");
739:                    for (int j = 0; j < members.size(); j++) {
740:                        if (j > 0) {
741:                            sb.append(", ");
742:                        }
743:                        sb.append(members.get(j));
744:                    }
745:                    sb.append(')');
746:
747:                    throw new IllegalStateException(sb.toString());
748:                }
749:
750:                return true;
751:            }
752:
753:            public static void takeAllPermits(Semaphore semaphore, int count) {
754:                for (int i = 0; i < count; i++) {
755:                    try {
756:                        semaphore.acquire();
757:                    } catch (InterruptedException e) {
758:                        //not interested
759:                        e.printStackTrace();
760:                    }
761:                }
762:            }
763:
764:            public static void acquireSemaphore(Semaphore semaphore,
765:                    long timeout, int count) throws Exception {
766:                for (int i = 0; i < count; i++) {
767:                    boolean acquired = false;
768:                    try {
769:                        acquired = semaphore.attempt(timeout);
770:                    } catch (InterruptedException e) {
771:                        //not interested but print it
772:                        e.printStackTrace();
773:                    }
774:                    if (!acquired)
775:                        throw new Exception("Failed to acquire semaphore");
776:                }
777:            }
778:
779:            public static void sleepRandom(int maxTime) {
780:                sleepThread(RANDOM.nextInt(maxTime));
781:            }
782:
783:            /**
784:             * Puts the current thread to sleep for the desired number of ms, suppressing
785:             * any exceptions.
786:             *
787:             * @param sleeptime number of ms to sleep
788:             */
789:            public static void sleepThread(long sleeptime) {
790:                try {
791:                    Thread.sleep(sleeptime);
792:                } catch (InterruptedException ie) {
793:                }
794:            }
795:
796:            /* CAUTION: JDK 5 specific code */
797:            private String dumpThreads() {
798:                StringBuffer sb = new StringBuffer();
799:                ThreadMXBean bean = ManagementFactory.getThreadMXBean();
800:                long[] ids = bean.getAllThreadIds();
801:                ThreadInfo[] threads = bean.getThreadInfo(ids, 20);
802:                for (int i = 0; i < threads.length; i++) {
803:                    ThreadInfo info = threads[i];
804:                    if (info == null)
805:                        continue;
806:                    sb.append(info.getThreadName()).append(":\n");
807:                    StackTraceElement[] stack_trace = info.getStackTrace();
808:                    for (int j = 0; j < stack_trace.length; j++) {
809:                        StackTraceElement el = stack_trace[j];
810:                        sb.append("at ").append(el.getClassName()).append(".")
811:                                .append(el.getMethodName());
812:                        sb.append("(").append(el.getFileName()).append(":")
813:                                .append(el.getLineNumber()).append(")");
814:                        sb.append("\n");
815:                    }
816:                    sb.append("\n\n");
817:                }
818:                return sb.toString();
819:            }
820:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.