Source Code Cross Referenced for FlushTest.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » tests » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.tests 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        package org.jgroups.tests;
002:
003:        import java.io.IOException;
004:        import java.io.InputStream;
005:        import java.io.OutputStream;
006:        import java.util.ArrayList;
007:        import java.util.Collection;
008:        import java.util.Collections;
009:        import java.util.HashMap;
010:        import java.util.Iterator;
011:        import java.util.LinkedList;
012:        import java.util.List;
013:        import java.util.Map;
014:        import java.util.Properties;
015:
016:        import junit.framework.Test;
017:        import junit.framework.TestSuite;
018:
019:        import org.jgroups.Address;
020:        import org.jgroups.BlockEvent;
021:        import org.jgroups.Channel;
022:        import org.jgroups.ChannelException;
023:        import org.jgroups.Event;
024:        import org.jgroups.ExtendedReceiverAdapter;
025:        import org.jgroups.GetStateEvent;
026:        import org.jgroups.JChannel;
027:        import org.jgroups.JChannelFactory;
028:        import org.jgroups.Message;
029:        import org.jgroups.SetStateEvent;
030:        import org.jgroups.UnblockEvent;
031:        import org.jgroups.View;
032:        import org.jgroups.mux.MuxChannel;
033:        import org.jgroups.stack.Protocol;
034:        import org.jgroups.util.Util;
035:
036:        import EDU.oswego.cs.dl.util.concurrent.Semaphore;
037:
038:        /**
039:         * Tests the FLUSH protocol, requires flush-udp.xml in ./conf to be present and configured to use FLUSH
040:         * @author Bela Ban
041:         * @version $Id: FlushTest.java,v 1.16.2.2 2007/04/25 15:24:21 vlada Exp $
042:         */
043:        public class FlushTest extends ChannelTestBase {
044:            public FlushTest() {
045:                super ();
046:                // TODO Auto-generated constructor stub
047:            }
048:
049:            public FlushTest(String name) {
050:                super (name);
051:                // TODO Auto-generated constructor stub
052:            }
053:
054:            Channel c1, c2, c3;
055:
056:            static final String CONFIG = "flush-udp.xml";
057:
058:            public void setUp() throws Exception {
059:                super .setUp();
060:                CHANNEL_CONFIG = System.getProperty("channel.conf.flush",
061:                        "flush-udp.xml");
062:            }
063:
064:            public void tearDown() throws Exception {
065:                if (c3 != null) {
066:                    c3.close();
067:                    assertFalse(c3.isOpen());
068:                    assertFalse(c3.isConnected());
069:                    c3 = null;
070:                }
071:
072:                if (c2 != null) {
073:                    c2.close();
074:                    assertFalse(c2.isOpen());
075:                    assertFalse(c2.isConnected());
076:                    c2 = null;
077:                }
078:
079:                if (c1 != null) {
080:                    c1.close();
081:                    assertFalse(c1.isOpen());
082:                    assertFalse(c1.isConnected());
083:                    c1 = null;
084:                }
085:
086:                Util.sleep(500);
087:                super .tearDown();
088:            }
089:
090:            public boolean useBlocking() {
091:                return true;
092:            }
093:
094:            public void testSingleChannel() throws Exception {
095:                Semaphore s = new Semaphore(1);
096:                FlushTestReceiver receivers[] = new FlushTestReceiver[] { new FlushTestReceiver(
097:                        "c1", s, false) };
098:                receivers[0].start();
099:                s.release(1);
100:
101:                //Make sure everyone is in sync
102:                blockUntilViewsReceived(receivers, 60000);
103:
104:                // Sleep to ensure the threads get all the semaphore tickets
105:                sleepThread(1000);
106:
107:                // Reacquire the semaphore tickets; when we have them all
108:                // we know the threads are done         
109:                try {
110:                    acquireSemaphore(s, 60000, 1);
111:                } catch (Exception e) {
112:                    e.printStackTrace();
113:                } finally {
114:                    receivers[0].cleanup();
115:                    sleepThread(1000);
116:                }
117:
118:                checkEventSequence(receivers[0], false);
119:
120:            }
121:
122:            /**
123:             * Tests issue #1 in http://jira.jboss.com/jira/browse/JGRP-335
124:             */
125:            public void testJoinFollowedByUnicast() throws ChannelException {
126:                c1 = createChannel();
127:                c1.setReceiver(new SimpleReplier(c1, true));
128:                c1.connect("test");
129:
130:                Address target = c1.getLocalAddress();
131:                Message unicast_msg = new Message(target);
132:
133:                c2 = createChannel();
134:                c2.setReceiver(new SimpleReplier(c2, false));
135:                c2.connect("test");
136:
137:                // now send unicast, this might block as described in the case
138:                c2.send(unicast_msg);
139:                // if we don't get here this means we'd time out
140:            }
141:
142:            /**
143:             * Tests issue #2 in http://jira.jboss.com/jira/browse/JGRP-335
144:             */
145:            public void testStateTransferFollowedByUnicast()
146:                    throws ChannelException {
147:                c1 = createChannel();
148:                c1.setReceiver(new SimpleReplier(c1, true));
149:                c1.connect("test");
150:
151:                Address target = c1.getLocalAddress();
152:                Message unicast_msg = new Message(target);
153:
154:                c2 = createChannel();
155:                c2.setReceiver(new SimpleReplier(c2, false));
156:                c2.connect("test");
157:
158:                // Util.sleep(100);
159:                log.info("\n** Getting the state **");
160:                c2.getState(null, 10000);
161:                // now send unicast, this might block as described in the case
162:                c2.send(unicast_msg);
163:            }
164:
165:            /**
166:             * Tests emition of block/unblock/get|set state events in both mux and bare 
167:             * channel mode. In mux mode this test creates getFactoryCount() real channels 
168:             * and creates only one mux application on top of each channel. In bare 
169:             * channel mode 4 real channels are created.
170:             * 
171:             */
172:            public void testBlockingNoStateTransfer() {
173:                String[] names = null;
174:                if (isMuxChannelUsed()) {
175:                    names = createMuxApplicationNames(1);
176:                    testChannels(names, false, getMuxFactoryCount());
177:                } else {
178:                    names = createApplicationNames(4);
179:                    testChannels(names, false, 4);
180:                }
181:            }
182:
183:            /**
184:             * Tests emition of block/unblock/get|set state events in mux mode. In this 
185:             * test all mux applications share the same "real" channel. This test runs 
186:             * only when mux.on=true. This test does not take into account 
187:             * -Dmux.factorycount parameter.
188:             * 
189:             */
190:            public void testBlockingSharedMuxFactory() {
191:                String[] names = null;
192:                int muxFactoryCount = 1;
193:                if (isMuxChannelUsed()) {
194:                    names = createMuxApplicationNames(4, muxFactoryCount);
195:                    testChannels(names, muxFactoryCount, false,
196:                            new ChannelAssertable(1));
197:                }
198:            }
199:
200:            /**
201:             * Tests emition of block/unblock/get|set state events in mux mode. In this 
202:             * test there will be exactly two real channels created where each real 
203:             * channel has two mux applications on top of it. This test runs 
204:             * only when mux.on=true. This test does not take into account 
205:             * -Dmux.factorycount parameter.
206:             * 
207:             */
208:            public void testBlockingUnsharedMuxFactoryMultipleService() {
209:                String[] names = null;
210:                int muxFactoryCount = 2;
211:                if (isMuxChannelUsed()) {
212:                    names = createMuxApplicationNames(2, muxFactoryCount);
213:                    testChannels(names, muxFactoryCount, false,
214:                            new ChannelAssertable(2));
215:                }
216:            }
217:
218:            /**
219:             * Tests emition of block/unblock/set|get state events for both 
220:             * mux and bare channel depending on mux.on parameter. In mux mode there 
221:             * will be only one mux channel for each "real" channel created and the 
222:             * number of real channels created is getMuxFactoryCount().
223:             * 
224:             */
225:            public void testBlockingWithStateTransfer() {
226:                String[] names = null;
227:                if (isMuxChannelUsed()) {
228:                    names = createMuxApplicationNames(1);
229:                    testChannels(names, true, getMuxFactoryCount());
230:                } else {
231:                    names = createApplicationNames(4);
232:                    testChannels(names, true, 4);
233:                }
234:            }
235:
236:            /**
237:             * Tests emition of block/unblock/set|get state events in mux mode setup 
238:             * where each "real" channel has two mux service on top of it. The 
239:             * number of real channels created is getMuxFactoryCount(). This test runs 
240:             * only when mux.on=true.
241:             * 
242:             */
243:            public void testBlockingWithStateTransferAndMultipleServiceMuxChannel() {
244:                String[] names = null;
245:                if (isMuxChannelUsed()) {
246:                    names = createMuxApplicationNames(2);
247:                    testChannels(names, true, getMuxFactoryCount());
248:                }
249:            }
250:
251:            private void testChannels(String names[], int muxFactoryCount,
252:                    boolean useTransfer, Assertable a) {
253:                int count = names.length;
254:
255:                ArrayList channels = new ArrayList(count);
256:                try {
257:                    // Create a semaphore and take all its permits
258:                    Semaphore semaphore = new Semaphore(count);
259:                    takeAllPermits(semaphore, count);
260:
261:                    // Create channels and their threads that will block on the semaphore        
262:                    for (int i = 0; i < count; i++) {
263:                        FlushTestReceiver channel = null;
264:                        if (isMuxChannelUsed()) {
265:                            channel = new FlushTestReceiver(names[i],
266:                                    muxFactory[i % muxFactoryCount], semaphore,
267:                                    useTransfer);
268:                        } else {
269:                            channel = new FlushTestReceiver(names[i],
270:                                    semaphore, useTransfer);
271:                        }
272:                        channels.add(channel);
273:
274:                        // Release one ticket at a time to allow the thread to start working                           
275:                        channel.start();
276:                        if (!useTransfer) {
277:                            semaphore.release(1);
278:                        }
279:                        sleepThread(2000);
280:                    }
281:
282:                    if (isMuxChannelUsed()) {
283:                        blockUntilViewsReceived(channels, muxFactoryCount,
284:                                60000);
285:                    } else {
286:                        blockUntilViewsReceived(channels, 60000);
287:                    }
288:
289:                    //if state transfer is used release all at once
290:                    //clear all channels of view events
291:                    if (useTransfer) {
292:                        for (Iterator iter = channels.iterator(); iter
293:                                .hasNext();) {
294:                            FlushTestReceiver app = (FlushTestReceiver) iter
295:                                    .next();
296:                            app.clear();
297:
298:                        }
299:                        semaphore.release(count);
300:                    }
301:
302:                    // Sleep to ensure the threads get all the semaphore tickets
303:                    sleepThread(1000);
304:
305:                    // Reacquire the semaphore tickets; when we have them all
306:                    // we know the threads are done         
307:                    acquireSemaphore(semaphore, 60000, count);
308:
309:                    //do general asserts about channels
310:                    a.verify(channels);
311:
312:                    //kill random member
313:                    FlushTestReceiver randomRecv = (FlushTestReceiver) channels
314:                            .remove(RANDOM.nextInt(count));
315:                    log.info("Closing random member " + randomRecv.getName()
316:                            + " at " + randomRecv.getLocalAddress());
317:                    ChannelCloseAssertable closeAssert = new ChannelCloseAssertable(
318:                            randomRecv);
319:                    randomRecv.cleanup();
320:
321:                    //let the view propagate and verify related asserts
322:                    sleepThread(5000);
323:                    closeAssert.verify(channels);
324:
325:                    //verify block/unblock/view/get|set state sequence              
326:
327:                    for (Iterator iter = channels.iterator(); iter.hasNext();) {
328:                        FlushTestReceiver receiver = (FlushTestReceiver) iter
329:                                .next();
330:                        if (useTransfer) {
331:                            checkEventStateTransferSequence(receiver);
332:                        } else {
333:                            checkEventSequence(receiver, isMuxChannelUsed());
334:                        }
335:                    }
336:                } catch (Exception ex) {
337:                    log.warn("Exception encountered during test", ex);
338:                    fail("Exception encountered during test execution");
339:                } finally {
340:                    for (Iterator iter = channels.iterator(); iter.hasNext();) {
341:                        FlushTestReceiver app = (FlushTestReceiver) iter.next();
342:                        app.cleanup();
343:                        sleepThread(500);
344:                    }
345:                }
346:            }
347:
348:            public void testChannels(String names[], boolean useTransfer,
349:                    int viewSize) {
350:                testChannels(names, getMuxFactoryCount(), useTransfer,
351:                        new ChannelAssertable(viewSize));
352:            }
353:
354:            private class ChannelCloseAssertable implements  Assertable {
355:                ChannelApplication app;
356:                View viewBeforeClose;
357:                Address appAddress;
358:                String muxId;
359:
360:                public ChannelCloseAssertable(ChannelApplication app) {
361:                    this .app = app;
362:                    this .viewBeforeClose = app.getChannel().getView();
363:                    appAddress = app.getChannel().getLocalAddress();
364:                    if (app.isUsingMuxChannel()) {
365:                        MuxChannel mch = (MuxChannel) app.getChannel();
366:                        muxId = mch.getId();
367:                    }
368:                }
369:
370:                public void verify(Object verifiable) {
371:                    Collection channels = (Collection) verifiable;
372:                    Channel ch = app.getChannel();
373:                    assertFalse("Channel open", ch.isOpen());
374:                    assertFalse("Chnanel connected", ch.isConnected());
375:
376:                    //if this channel had more than one member then verify that 
377:                    //the other member does not have departed member in its view
378:                    if (viewBeforeClose.getMembers().size() > 1) {
379:                        for (Iterator iter = channels.iterator(); iter
380:                                .hasNext();) {
381:                            FlushTestReceiver receiver = (FlushTestReceiver) iter
382:                                    .next();
383:                            Channel channel = receiver.getChannel();
384:                            boolean pairServiceFound = (receiver
385:                                    .isUsingMuxChannel() && muxId
386:                                    .equals(((MuxChannel) channel).getId()));
387:                            if (pairServiceFound
388:                                    || !receiver.isUsingMuxChannel()) {
389:                                assertTrue("Removed from view, address "
390:                                        + appAddress + " view is "
391:                                        + channel.getView(), !channel.getView()
392:                                        .getMembers().contains(appAddress));
393:                            }
394:                        }
395:                    }
396:                }
397:            }
398:
399:            private class ChannelAssertable implements  Assertable {
400:                int expectedViewSize = 0;
401:
402:                public ChannelAssertable(int expectedViewSize) {
403:                    this .expectedViewSize = expectedViewSize;
404:                }
405:
406:                public void verify(Object verifiable) {
407:                    Collection channels = (Collection) verifiable;
408:                    for (Iterator iter = channels.iterator(); iter.hasNext();) {
409:                        FlushTestReceiver receiver = (FlushTestReceiver) iter
410:                                .next();
411:                        Channel ch = receiver.getChannel();
412:                        assertEquals("Correct view", ch.getView().getMembers()
413:                                .size(), expectedViewSize);
414:                        assertTrue("Channel open", ch.isOpen());
415:                        assertTrue("Chnanel connected", ch.isConnected());
416:                        assertNotNull("Valid address ", ch.getLocalAddress());
417:                        assertTrue("Address included in view ", ch.getView()
418:                                .getMembers().contains(ch.getLocalAddress()));
419:                        assertNotNull("Valid cluster name ", ch
420:                                .getClusterName());
421:                    }
422:
423:                    //verify views for pair services created on top of different "real" channels
424:                    if (expectedViewSize > 1 && isMuxChannelUsed()) {
425:                        for (Iterator iter = channels.iterator(); iter
426:                                .hasNext();) {
427:                            FlushTestReceiver receiver = (FlushTestReceiver) iter
428:                                    .next();
429:                            MuxChannel ch = (MuxChannel) receiver.getChannel();
430:                            int servicePairs = 1;
431:                            for (Iterator it = channels.iterator(); it
432:                                    .hasNext();) {
433:                                FlushTestReceiver receiver2 = (FlushTestReceiver) it
434:                                        .next();
435:                                MuxChannel ch2 = (MuxChannel) receiver2
436:                                        .getChannel();
437:                                if (ch.getId().equals(ch2.getId())
438:                                        && !ch.getLocalAddress().equals(
439:                                                ch2.getLocalAddress())) {
440:                                    assertEquals(
441:                                            "Correct view for service pair", ch
442:                                                    .getView(), ch2.getView());
443:                                    assertTrue("Presence in view", ch.getView()
444:                                            .getMembers().contains(
445:                                                    ch.getLocalAddress()));
446:                                    assertTrue("Presence in view", ch.getView()
447:                                            .getMembers().contains(
448:                                                    ch2.getLocalAddress()));
449:                                    assertTrue("Presence in view", ch2
450:                                            .getView().getMembers().contains(
451:                                                    ch2.getLocalAddress()));
452:                                    assertTrue("Presence in view", ch2
453:                                            .getView().getMembers().contains(
454:                                                    ch.getLocalAddress()));
455:                                    servicePairs++;
456:                                }
457:                            }
458:                            assertEquals("Correct service count",
459:                                    expectedViewSize, servicePairs);
460:                        }
461:                    }
462:                }
463:            }
464:
465:            private void checkEventSequence(FlushTestReceiver receiver,
466:                    boolean isMuxUsed) {
467:                List events = receiver.getEvents();
468:                String eventString = "[" + receiver.getName() + "|"
469:                        + receiver.getLocalAddress() + ",events:" + events;
470:                log.info(eventString);
471:                assertNotNull(events);
472:                int size = events.size();
473:                for (int i = 0; i < size; i++) {
474:                    Object event = events.get(i);
475:                    if (event instanceof  BlockEvent) {
476:                        if (i + 1 < size) {
477:                            Object ev = events.get(i + 1);
478:                            if (isMuxUsed) {
479:                                assertTrue(
480:                                        "After Block should be View or Unblock"
481:                                                + eventString,
482:                                        ev instanceof  View
483:                                                || ev instanceof  UnblockEvent);
484:                            } else {
485:                                assertTrue("After Block should be View "
486:                                        + eventString,
487:                                        events.get(i + 1) instanceof  View);
488:                            }
489:                        }
490:                        if (i != 0) {
491:                            assertTrue("Before Block should be Unblock "
492:                                    + eventString,
493:                                    events.get(i - 1) instanceof  UnblockEvent);
494:                        }
495:                    }
496:                    if (event instanceof  View) {
497:                        if (i + 1 < size) {
498:                            assertTrue("After View should be Unblock "
499:                                    + eventString,
500:                                    events.get(i + 1) instanceof  UnblockEvent);
501:                        }
502:                        assertTrue(
503:                                "Before View should be Block " + eventString,
504:                                events.get(i - 1) instanceof  BlockEvent);
505:                    }
506:                    if (event instanceof  UnblockEvent) {
507:                        if (i + 1 < size) {
508:                            assertTrue("After UnBlock should be Block "
509:                                    + eventString,
510:                                    events.get(i + 1) instanceof  BlockEvent);
511:                        }
512:
513:                        Object ev = events.get(i - 1);
514:                        if (isMuxUsed) {
515:                            assertTrue("Before UnBlock should be View or Block"
516:                                    + eventString, ev instanceof  View
517:                                    || ev instanceof  BlockEvent);
518:                        } else {
519:                            assertTrue("Before UnBlock should be View "
520:                                    + eventString,
521:                                    events.get(i - 1) instanceof  View);
522:                        }
523:                    }
524:                }
525:                receiver.clear();
526:            }
527:
528:            private void checkEventStateTransferSequence(
529:                    FlushTestReceiver receiver) {
530:                List events = receiver.getEvents();
531:                String eventString = "[" + receiver.getName() + ",events:"
532:                        + events;
533:                log.info(eventString);
534:                assertNotNull(events);
535:                int size = events.size();
536:                for (int i = 0; i < size; i++) {
537:                    Object event = events.get(i);
538:                    if (event instanceof  BlockEvent) {
539:                        if (i + 1 < size) {
540:                            Object o = events.get(i + 1);
541:                            assertTrue(
542:                                    "After Block should be state|unblock|view"
543:                                            + eventString,
544:                                    o instanceof  SetStateEvent
545:                                            || o instanceof  GetStateEvent
546:                                            || o instanceof  UnblockEvent
547:                                            || o instanceof  View);
548:                        } else if (i != 0) {
549:                            Object o = events.get(i + 1);
550:                            assertTrue(
551:                                    "Before Block should be state or Unblock "
552:                                            + eventString,
553:                                    o instanceof  SetStateEvent
554:                                            || o instanceof  GetStateEvent
555:                                            || o instanceof  UnblockEvent);
556:                        }
557:                    }
558:                    if (event instanceof  SetStateEvent
559:                            || event instanceof  GetStateEvent) {
560:                        if (i + 1 < size) {
561:                            assertTrue("After state should be Unblock "
562:                                    + eventString,
563:                                    events.get(i + 1) instanceof  UnblockEvent);
564:                        }
565:                        assertTrue("Before state should be Block "
566:                                + eventString,
567:                                events.get(i - 1) instanceof  BlockEvent);
568:                    }
569:
570:                    if (event instanceof  UnblockEvent) {
571:                        if (i + 1 < size) {
572:                            assertTrue("After UnBlock should be Block "
573:                                    + eventString,
574:                                    events.get(i + 1) instanceof  BlockEvent);
575:                        } else {
576:                            Object o = events.get(size - 2);
577:                            assertTrue(
578:                                    "Before UnBlock should be block|state|view  "
579:                                            + eventString,
580:                                    o instanceof  SetStateEvent
581:                                            || o instanceof  GetStateEvent
582:                                            || o instanceof  BlockEvent
583:                                            || o instanceof  View);
584:                        }
585:                    }
586:
587:                }
588:                receiver.clear();
589:            }
590:
591:            protected Channel createChannel() throws ChannelException {
592:                Channel ret = new JChannel(CHANNEL_CONFIG);
593:                ret.setOpt(Channel.BLOCK, Boolean.TRUE);
594:                Protocol flush = ((JChannel) ret).getProtocolStack()
595:                        .findProtocol("FLUSH");
596:                if (flush != null) {
597:                    Properties p = new Properties();
598:                    p.setProperty("timeout", "0");
599:                    flush.setProperties(p);
600:
601:                    // send timeout up and down the stack, so other protocols can use the same value too
602:                    Map map = new HashMap();
603:                    map.put("flush_timeout", new Long(0));
604:                    flush.passUp(new Event(Event.CONFIG, map));
605:                    flush.passDown(new Event(Event.CONFIG, map));
606:                }
607:                return ret;
608:            }
609:
610:            private interface Assertable {
611:                public void verify(Object verifiable);
612:            }
613:
614:            private class FlushTestReceiver extends
615:                    PushChannelApplicationWithSemaphore {
616:                List events;
617:
618:                boolean shouldFetchState;
619:
620:                protected FlushTestReceiver(String name, Semaphore semaphore,
621:                        boolean shouldFetchState) throws Exception {
622:                    super (name, semaphore);
623:                    this .shouldFetchState = shouldFetchState;
624:                    events = Collections.synchronizedList(new LinkedList());
625:                    channel.connect("test");
626:                }
627:
628:                protected FlushTestReceiver(String name,
629:                        JChannelFactory factory, Semaphore semaphore,
630:                        boolean shouldFetchState) throws Exception {
631:                    super (name, factory, semaphore);
632:                    this .shouldFetchState = shouldFetchState;
633:                    events = Collections.synchronizedList(new LinkedList());
634:                    channel.connect("test");
635:                }
636:
637:                public void clear() {
638:                    events.clear();
639:                }
640:
641:                public List getEvents() {
642:                    return new LinkedList(events);
643:                }
644:
645:                public void block() {
646:                    events.add(new BlockEvent());
647:                }
648:
649:                public void unblock() {
650:                    events.add(new UnblockEvent());
651:                }
652:
653:                public void viewAccepted(View new_view) {
654:                    events.add(new_view);
655:                }
656:
657:                public byte[] getState() {
658:                    events.add(new GetStateEvent(null, null));
659:                    return new byte[] { 'b', 'e', 'l', 'a' };
660:                }
661:
662:                public void setState(byte[] state) {
663:                    events.add(new SetStateEvent(null, null));
664:                }
665:
666:                public void getState(OutputStream ostream) {
667:                    events.add(new GetStateEvent(null, null));
668:                    byte[] payload = new byte[] { 'b', 'e', 'l', 'a' };
669:                    try {
670:                        ostream.write(payload);
671:                    } catch (IOException e) {
672:                        e.printStackTrace();
673:                    } finally {
674:                        Util.close(ostream);
675:                    }
676:                }
677:
678:                public void setState(InputStream istream) {
679:                    events.add(new SetStateEvent(null, null));
680:                    byte[] payload = new byte[4];
681:                    try {
682:                        istream.read(payload);
683:                    } catch (IOException e) {
684:                        e.printStackTrace();
685:                    } finally {
686:                        Util.close(istream);
687:                    }
688:                }
689:
690:                protected void useChannel() throws Exception {
691:                    if (shouldFetchState) {
692:                        channel.getState(null, 25000);
693:                    }
694:                }
695:            }
696:
697:            private class SimpleReplier extends ExtendedReceiverAdapter {
698:                Channel channel;
699:
700:                boolean handle_requests = false;
701:
702:                public SimpleReplier(Channel channel, boolean handle_requests) {
703:                    this .channel = channel;
704:                    this .handle_requests = handle_requests;
705:                }
706:
707:                public void receive(Message msg) {
708:                    Message reply = new Message(msg.getSrc());
709:                    try {
710:                        log.info("-- MySimpleReplier["
711:                                + channel.getLocalAddress()
712:                                + "]: received message from " + msg.getSrc());
713:                        if (handle_requests) {
714:                            log.info(", sending reply");
715:                            channel.send(reply);
716:                        } else
717:                            System.out.println("\n");
718:                    } catch (Exception e) {
719:                        e.printStackTrace();
720:                    }
721:                }
722:
723:                public void viewAccepted(View new_view) {
724:                    log.info("-- MySimpleReplier[" + channel.getLocalAddress()
725:                            + "]: viewAccepted(" + new_view + ")");
726:                }
727:
728:                public void block() {
729:                    log.info("-- MySimpleReplier[" + channel.getLocalAddress()
730:                            + "]: block()");
731:                }
732:
733:                public void unblock() {
734:                    log.info("-- MySimpleReplier[" + channel.getLocalAddress()
735:                            + "]: unblock()");
736:                }
737:            }
738:
739:            public static Test suite() {
740:                return new TestSuite(FlushTest.class);
741:            }
742:
743:            public static void main(String[] args) {
744:                junit.textui.TestRunner.run(FlushTest.suite());
745:            }
746:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.