Source Code Cross Referenced for JUMPMessageDispatcherImpl.java in  » 6.0-JDK-Modules » j2me » com » sun » jumpimpl » process » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » 6.0 JDK Modules » j2me » com.sun.jumpimpl.process 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * %W% %E%
003:         *
004:         * Copyright  1990-2006 Sun Microsystems, Inc. All Rights Reserved.
005:         * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER
006:         *
007:         * This program is free software; you can redistribute it and/or
008:         * modify it under the terms of the GNU General Public License version
009:         * 2 only, as published by the Free Software Foundation.
010:         *
011:         * This program is distributed in the hope that it will be useful, but
012:         * WITHOUT ANY WARRANTY; without even the implied warranty of
013:         * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
014:         * General Public License version 2 for more details (a copy is
015:         * included at /legal/license.txt).
016:         *
017:         * You should have received a copy of the GNU General Public License
018:         * version 2 along with this work; if not, write to the Free Software
019:         * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020:         * 02110-1301 USA
021:         *
022:         * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa
023:         * Clara, CA 95054 or visit www.sun.com if you need additional
024:         * information or have any questions.
025:         */
026:
027:        package com.sun.jumpimpl.process;
028:
029:        import com.sun.jump.message.JUMPMessage;
030:        import com.sun.jump.message.JUMPMessageHandler;
031:        import com.sun.jump.message.JUMPMessageDispatcher;
032:        import com.sun.jump.message.JUMPMessageDispatcherTypeException;
033:        import com.sun.jump.message.JUMPTimedOutException;
034:        import com.sun.jump.message.JUMPUnblockedException;
035:
036:        import com.sun.jump.os.JUMPOSInterface;
037:        import com.sun.jumpimpl.os.JUMPMessageQueueInterfaceImpl;
038:
039:        import java.io.IOException;
040:        import java.util.List;
041:        import java.util.ArrayList;
042:        import java.util.Map;
043:        import java.util.HashMap;
044:
045:        /**
046:         * A generic JUMPMessageDispatcher implementation.
047:         */
048:        public class JUMPMessageDispatcherImpl implements  JUMPMessageDispatcher {
049:            // A JUMPMessageDispatcherImpl has one Listener for each
050:            // messageType with a registered handler.
051:            // JUMPMessageDispatcherImpl and Listener are by necessity
052:            // somewhat intertwined, as explained here.
053:            // JUMPMessageDispatcherImpl.register() creates Listeners on
054:            // demand and adds them to listeners.  Listeners are not removed
055:            // by cancelRegistration(); instead, Listeners remove themselves
056:            // and exit some time after all their handlers have been canceled
057:            // and no other handlers have been registered.  This ensures that
058:            // at most one thread is ever listening for any messageType, and
059:            // that no message that has a registered handler will be dropped
060:            // since there will always be a Listener running for that
061:            // messageType.  Both JUMPMessageDispatcherImpl and Listener
062:            // synchronize on lock while accessing listeners.  Additionally,
063:            // Listener synchronizes on lock when accessing Listener.handlers.
064:            // It could synchronize on itself, but in most cases we already
065:            // need to synchronize on lock, so using lock for everything is
066:            // simpler.  We never block while holding lock and there shouldn't
067:            // be much if any contention for it.
068:
069:            // A JUMPMessageDispatcherImpl has one DirectRegistration for each
070:            // messageType with at least one outstanding registration.  We
071:            // must be careful to unreserve a messageType only when it is no
072:            // longer in use and can no longer be used, otherwise the
073:            // low-level code could use memory that it has freed, which would
074:            // be very un-Java.  With the messageType -> DirectRegistration
075:            // required by the API, we need to keep a use count and do other
076:            // gymnastics to ensure this.
077:
078:            private static final JUMPMessageQueueInterfaceImpl jumpMessageQueueInterfaceImpl = (JUMPMessageQueueInterfaceImpl) JUMPOSInterface
079:                    .getInstance().getQueueInterface();
080:
081:            private static JUMPMessageDispatcherImpl INSTANCE = null;
082:
083:            // directRegistrations maps String messageType to DirectRegistration.
084:            // Guarded by lock.
085:            // Invariant: If there is a mapping from messageType to a
086:            // DirectRegistration, then at least one registration is still
087:            // outstanding for the messageType, otherwise no registrations
088:            // are outstanding.
089:
090:            private final Map directRegistrations = new HashMap();
091:
092:            // listeners maps String messageType to Listener.
093:            // Guarded by lock.
094:            // Invariant: If there is a mapping from messageType to a Listener,
095:            // then the Listener is active, otherwise there is no Listener.
096:
097:            private final Map listeners = new HashMap();
098:
099:            // lock guards both directRegistrations and listeners.  We need
100:            // one lock so we can tell whether a messageType is registered one
101:            // way or the other without races.
102:
103:            private final Object lock = new Object();
104:
105:            public static synchronized JUMPMessageDispatcherImpl getInstance() {
106:                if (INSTANCE == null) {
107:                    INSTANCE = new JUMPMessageDispatcherImpl();
108:                }
109:                return INSTANCE;
110:            }
111:
112:            /**
113:             * Construction allowed only by getInstance().
114:             */
115:            private JUMPMessageDispatcherImpl() {
116:            }
117:
118:            public Object registerDirect(String messageType)
119:                    throws JUMPMessageDispatcherTypeException, IOException {
120:                DirectRegistration directRegistration;
121:                synchronized (lock) {
122:                    if (listeners.containsKey(messageType)) {
123:                        throw new JUMPMessageDispatcherTypeException("Type "
124:                                + messageType
125:                                + " already registered with handlers");
126:                    }
127:
128:                    directRegistration = getDirectRegistration(messageType);
129:                    directRegistration.incrementUseCount();
130:                }
131:
132:                return new DirectRegistrationToken(directRegistration);
133:            }
134:
135:            // Externally synchronized on lock.
136:            private DirectRegistration getDirectRegistration(String messageType)
137:                    throws IOException {
138:                DirectRegistration directRegistration = (DirectRegistration) directRegistrations
139:                        .get(messageType);
140:
141:                if (directRegistration == null) {
142:                    directRegistration = new DirectRegistration(messageType);
143:
144:                    // Be careful to maintain our invariant (and free
145:                    // resources) even on OutOfMemoryError, etc.
146:
147:                    boolean success = false;
148:                    try {
149:                        directRegistrations
150:                                .put(messageType, directRegistration);
151:                        success = true;
152:                    } finally {
153:                        if (!success) {
154:                            // Free OS resources.
155:                            directRegistration.close();
156:                        }
157:                    }
158:                }
159:
160:                return directRegistration;
161:            }
162:
163:            public JUMPMessage waitForMessage(String messageType, long timeout)
164:                    throws JUMPMessageDispatcherTypeException,
165:                    JUMPTimedOutException, IOException {
166:                DirectRegistration directRegistration;
167:                synchronized (lock) {
168:                    directRegistration = (DirectRegistration) directRegistrations
169:                            .get(messageType);
170:                    if (directRegistration == null) {
171:                        throw new JUMPMessageDispatcherTypeException("Type "
172:                                + messageType
173:                                + " not registered for direct listening");
174:                    }
175:                    directRegistration.incrementUseCount();
176:                }
177:
178:                try {
179:                    return doWaitForMessage(messageType, timeout);
180:                } finally {
181:                    directRegistration.decrementUseCountMaybeClose();
182:                }
183:            }
184:
185:            /**
186:             * @throws JUMPTimedOutException
187:             * @throws JUMPUnblockedException
188:             * @throws IOException
189:             */
190:            private JUMPMessage doWaitForMessage(String messageType,
191:                    long timeout) throws JUMPTimedOutException, IOException {
192:                byte[] raw = jumpMessageQueueInterfaceImpl.receiveMessage(
193:                        messageType, timeout);
194:                return new MessageImpl.Message(raw);
195:            }
196:
197:            /**
198:             * NOTE: the handler will be called in an arbitrary thread.  Use
199:             * appropriate synchronization.  Handlers may be called in an
200:             * arbitrary order.  If a handler is registered multiple times, it
201:             * will be called a corresponding number of times for each
202:             * message, and must be canceled a corresponding number of times.
203:             */
204:            public Object registerHandler(String messageType,
205:                    JUMPMessageHandler handler)
206:                    throws JUMPMessageDispatcherTypeException, IOException {
207:                if (messageType == null) {
208:                    throw new NullPointerException("messageType can't be null");
209:                }
210:                if (handler == null) {
211:                    throw new NullPointerException("handler can't be null");
212:                }
213:
214:                Listener listener;
215:                synchronized (lock) {
216:                    if (directRegistrations.containsKey(messageType)) {
217:                        throw new JUMPMessageDispatcherTypeException("Type "
218:                                + messageType
219:                                + " already registered for direct listening");
220:                    }
221:
222:                    listener = getListener(messageType);
223:
224:                    // Add the handler while synchronized on lock so that a
225:                    // new Listener won't exit before the handler is added.
226:                    // If this fails its ok, the Listener will exit soon if no
227:                    // other handlers are registered for it.
228:
229:                    listener.addHandler(handler);
230:                }
231:
232:                return new HandlerRegistrationToken(listener, handler);
233:            }
234:
235:            // Externally synchronized on lock.
236:            private Listener getListener(String messageType) throws IOException {
237:                Listener listener = (Listener) listeners.get(messageType);
238:                if (listener == null) {
239:                    listener = new Listener(messageType);
240:
241:                    // Be careful to maintain our invariant (and free
242:                    // resources) even on OutOfMemoryError, etc.
243:
244:                    boolean success = false;
245:                    try {
246:                        listeners.put(messageType, listener);
247:                        listener.start();
248:                        success = true;
249:                    } finally {
250:                        if (!success) {
251:                            // Free OS resources.
252:                            listener.close();
253:                            // Remove listener from the Map.  This is ok even
254:                            // if it was never added.
255:                            listeners.remove(messageType);
256:                        }
257:                    }
258:                }
259:                return listener;
260:            }
261:
262:            public void cancelRegistration(Object registrationToken)
263:                    throws IOException {
264:                ((RegistrationToken) registrationToken).cancelRegistration();
265:            }
266:
267:            private interface RegistrationToken {
268:                void cancelRegistration() throws IOException;
269:            }
270:
271:            private static class HandlerRegistrationToken implements 
272:                    RegistrationToken {
273:                private final Listener listener;
274:                private final JUMPMessageHandler handler;
275:
276:                // Don't allow cancelRegistration() to be called twice.
277:                private boolean canceled = false;
278:
279:                public HandlerRegistrationToken(Listener listener,
280:                        JUMPMessageHandler handler) {
281:                    this .listener = listener;
282:                    this .handler = handler;
283:                }
284:
285:                public void cancelRegistration() throws IOException {
286:                    synchronized (this ) {
287:                        if (canceled) {
288:                            throw new IllegalStateException(
289:                                    "Registration has already been canceled.");
290:                        }
291:                        canceled = true;
292:                    }
293:
294:                    listener.removeHandler(handler);
295:                }
296:            }
297:
298:            private static class DirectRegistrationToken implements 
299:                    RegistrationToken {
300:                private final DirectRegistration directRegistration;
301:
302:                // Don't allow cancelRegistration() to be called twice.
303:                private boolean canceled = false;
304:
305:                public DirectRegistrationToken(
306:                        DirectRegistration directRegistration) {
307:                    this .directRegistration = directRegistration;
308:                }
309:
310:                public void cancelRegistration() {
311:                    synchronized (this ) {
312:                        if (canceled) {
313:                            throw new IllegalStateException(
314:                                    "Registration has already been canceled.");
315:                        }
316:                        canceled = true;
317:                    }
318:                    directRegistration.decrementUseCountMaybeClose();
319:                }
320:            }
321:
322:            private class DirectRegistration {
323:                private final String messageType;
324:
325:                // useCount is incremented for every direct registration of
326:                // messageType and when a message receive begins, and
327:                // decremented when the registration is canceled or a message
328:                // read is finished.  When the count falls to zero, the
329:                // low-level resources are freed, and the directRegistrations
330:                // mapping is removed, therefore this DirectRegistration can
331:                // never be used again to access the (freed) low-level
332:                // resources.
333:
334:                private int useCount = 0;
335:
336:                public DirectRegistration(String messageType)
337:                        throws IOException {
338:                    this .messageType = messageType;
339:                    // Make sure we've got a receive queue for the messageType.
340:                    jumpMessageQueueInterfaceImpl.reserve(messageType);
341:                }
342:
343:                // Externally synchronized on lock.
344:                public void incrementUseCount() {
345:                    useCount++;
346:                }
347:
348:                public void decrementUseCountMaybeClose() {
349:                    synchronized (lock) {
350:                        useCount--;
351:                        if (useCount == 0) {
352:                            close();
353:                            directRegistrations.remove(messageType);
354:                        }
355:                    }
356:                }
357:
358:                public void close() {
359:                    // Tell the low-level code we're done with the message queue.
360:                    jumpMessageQueueInterfaceImpl.unreserve(messageType);
361:                }
362:            }
363:
364:            /*
365:             * Frequently asked questions:
366:             * 1. Why doesn't Listener extend Thread?  Extending Thread would
367:             *    put lots of unnecessary and inappropriate methods into its
368:             *    API.  It should keep control over those things to itself.
369:             * 2. How about implementing Runnable then?  The fact that Listener
370:             *    uses a Thread and/or Runnable is an implementation detail
371:             *    and shouldn't be exposed in its API.  The inner class
372:             *    implementing Runnable keeps the implementation private.
373:             *    Only those methods intended to be called from outside the
374:             *    class itself are public.
375:             * 3. How can we get the thread to exit when it's blocking in
376:             *    JUMPMessageReceiveQueue.receiveMessage()?
377:             *    There are three choices:
378:             *    1. Make JUMPMessageReceiveQueue.receiveMessage() interruptible,
379:             *       and interrupt the thread.  We probably don't want to go there.
380:             *    2. Send a message that the thread will see and exit on.
381:             *       This isn't as easy as it sounds since sending messages
382:             *       may fail, e.g., if the Listener is processing messages
383:             *       slowly and its queue has filled up.  But in that case
384:             *       it should exit after reading one of the "real" messages
385:             *       whether our sentinel is sent/received or not.
386:             *    3. Periodically time out and check for exit.  We do this,
387:             *       it's simple and effective and doesn't need any extra low-level
388:             *       support such as interrupt handling, although it doesn't stop
389:             *       the thread immediately, and requires the thread to wake up
390:             *       periodically.
391:             */
392:            private class Listener {
393:                // Guarded by this.
394:                private final List handlers = new ArrayList();
395:
396:                private final String messageType;
397:
398:                public Listener(String messageType) throws IOException {
399:                    this .messageType = messageType;
400:                    // Make sure we've got a receive queue for the messageType.
401:                    jumpMessageQueueInterfaceImpl.reserve(messageType);
402:                }
403:
404:                // Externally synchronized on lock.
405:                public void addHandler(JUMPMessageHandler handler) {
406:                    handlers.add(handler);
407:                }
408:
409:                public void removeHandler(JUMPMessageHandler handler)
410:                        throws IOException {
411:                    synchronized (lock) {
412:                        handlers.remove(handler);
413:                        if (handlers.isEmpty()) {
414:                            // Wake up the listening thread so it can exit if
415:                            // it finds handlers is still empty.
416:                            jumpMessageQueueInterfaceImpl.unblock(messageType);
417:                        }
418:                    }
419:                }
420:
421:                public void start() {
422:                    Thread thread = new Thread(new Runnable() {
423:                        public void run() {
424:                            try {
425:                                listen();
426:                            } finally {
427:                                close();
428:                            }
429:                        }
430:                    });
431:                    thread.setName(this .getClass().getName() + ": "
432:                            + messageType);
433:                    thread.setDaemon(true);
434:                    thread.start();
435:                }
436:
437:                public void close() {
438:                    // Tell the low-level code we're done with the message queue.
439:                    jumpMessageQueueInterfaceImpl.unreserve(messageType);
440:                }
441:
442:                private void listen() {
443:                    // FIXME We should either log Errors and RuntimeExceptions
444:                    // and continue, or cleanup and make sure they're thrown.
445:                    while (true) {
446:                        try {
447:                            JUMPMessage msg = doWaitForMessage(messageType, 0L);
448:                            dispatchMessage(msg);
449:                        } catch (JUMPUnblockedException e) {
450:                            // This is normal.  It's time to check for exit.
451:                        } catch (JUMPTimedOutException e) {
452:                            // This shouldn't happen.  Handle like IOException.
453:                        } catch (IOException e) {
454:                            // Unexpected exception.
455:                            e.printStackTrace();
456:                        }
457:                        synchronized (lock) {
458:                            if (handlers.isEmpty()) {
459:                                // Remove ourselves from the map and exit.
460:                                listeners.remove(messageType);
461:                                break;
462:                            }
463:                        }
464:                    }
465:                }
466:
467:                // NOTE: Handlers should not be called while holding our
468:                // monitor since it can lead to inadvertent deadlocks.
469:                // However, not synchronizing on "this" here can result in
470:                // handlers being called even after they've been removed.
471:                // This is a generally accepted hazard of patterns like this.
472:
473:                private void dispatchMessage(JUMPMessage msg) {
474:                    JUMPMessageHandler[] handlersSnapshot;
475:
476:                    // Get a snapsot with the lock held.
477:
478:                    synchronized (lock) {
479:                        handlersSnapshot = (JUMPMessageHandler[]) handlers
480:                                .toArray(new JUMPMessageHandler[handlers.size()]);
481:                    }
482:
483:                    // Call handlers with the lock released.
484:
485:                    for (int i = 0; i < handlersSnapshot.length; i++) {
486:                        JUMPMessageHandler handler = handlersSnapshot[i];
487:                        try {
488:                            handler.handleMessage(msg);
489:                        } catch (RuntimeException e) {
490:                            e.printStackTrace();
491:                        }
492:                    }
493:                }
494:            }
495:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.