Source Code Cross Referenced for DefaultRemoteNotificationServerHandler.java in  » JMX » mx4j » mx4j » remote » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » JMX » mx4j » mx4j.remote 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * Copyright (C) The MX4J Contributors.
003:         * All rights reserved.
004:         *
005:         * This software is distributed under the terms of the MX4J License version 1.0.
006:         * See the terms of the MX4J License in the documentation provided with this software.
007:         */
008:
009:        package mx4j.remote;
010:
011:        import java.io.IOException;
012:        import java.util.Arrays;
013:        import java.util.HashMap;
014:        import java.util.LinkedList;
015:        import java.util.List;
016:        import java.util.Map;
017:
018:        import javax.management.Notification;
019:        import javax.management.NotificationFilter;
020:        import javax.management.NotificationListener;
021:        import javax.management.ObjectName;
022:        import javax.management.remote.NotificationResult;
023:        import javax.management.remote.TargetedNotification;
024:
025:        import mx4j.log.Log;
026:        import mx4j.log.Logger;
027:
028:        /**
029:         * Base implementation of the RemoteNotificationServerHandler interface.
030:         *
031:         * @version $Revision: 1.12 $
032:         */
033:        public class DefaultRemoteNotificationServerHandler implements 
034:                RemoteNotificationServerHandler {
035:            private static int listenerID;
036:
037:            private final NotificationListener listener;
038:            private final Map tuples = new HashMap();
039:            private final NotificationBuffer buffer;
040:            private volatile boolean closed;
041:
042:            /**
043:             * Creates a new remote notification server handler.
044:             *
045:             * @param environment Contains environment variables used to configure this handler
046:             * @see MX4JRemoteConstants#NOTIFICATION_BUFFER_CAPACITY
047:             * @see MX4JRemoteConstants#NOTIFICATION_PURGE_DISTANCE
048:             */
049:            public DefaultRemoteNotificationServerHandler(Map environment) {
050:                listener = new ServerListener();
051:                buffer = new NotificationBuffer(environment);
052:            }
053:
054:            public Integer generateListenerID(ObjectName name,
055:                    NotificationFilter filter) {
056:                synchronized (DefaultRemoteNotificationServerHandler.class) {
057:                    return new Integer(++listenerID);
058:                }
059:            }
060:
061:            public NotificationListener getServerNotificationListener() {
062:                return listener;
063:            }
064:
065:            public void addNotificationListener(Integer id,
066:                    NotificationTuple tuple) {
067:                if (closed)
068:                    return;
069:                synchronized (tuples) {
070:                    tuples.put(id, tuple);
071:                }
072:            }
073:
074:            public NotificationTuple removeNotificationListener(Integer id) {
075:                if (closed)
076:                    return null;
077:                synchronized (tuples) {
078:                    return (NotificationTuple) tuples.remove(id);
079:                }
080:            }
081:
082:            public NotificationResult fetchNotifications(long sequenceNumber,
083:                    int maxNotifications, long timeout) throws IOException {
084:                if (closed)
085:                    throw new IOException(
086:                            "RemoteNotificationServerHandler is closed");
087:                return buffer.getNotifications(sequenceNumber,
088:                        maxNotifications, timeout);
089:            }
090:
091:            public NotificationTuple[] close() {
092:                Logger logger = getLogger();
093:                closed = true;
094:                stopWaitingForNotifications(buffer);
095:                synchronized (tuples) {
096:                    NotificationTuple[] result = (NotificationTuple[]) tuples
097:                            .values().toArray(
098:                                    new NotificationTuple[tuples.size()]);
099:                    tuples.clear();
100:                    if (logger.isEnabledFor(Logger.DEBUG))
101:                        logger
102:                                .debug("RemoteNotificationServerHandler closed, returning: "
103:                                        + Arrays.asList(result));
104:                    return result;
105:                }
106:            }
107:
108:            /**
109:             * When a connection is closed, it may be possible that a client RMI call is waiting in
110:             * {@link #waitForNotifications}, so here we wake it up, letting the thread return to the
111:             * client and free resources on client's side.
112:             *
113:             * @param lock The object on which {@link #notifyAll} should be called
114:             */
115:            private void stopWaitingForNotifications(Object lock) {
116:                synchronized (lock) {
117:                    lock.notifyAll();
118:                }
119:            }
120:
121:            /**
122:             * Called when there are no notifications to send to the client.
123:             * It is guaranteed that no notification can be added before this method waits on the given lock.
124:             * It should wait on the given lock for the specified timeout, and return true
125:             * to send notifications (if no notifications arrived, an empty notification array
126:             * will be returned to the client), or false if no notifications should be sent to
127:             * the client.
128:             *
129:             * @param lock    The object on which {@link #wait} should be called
130:             * @param timeout The amount of time to wait (guaranteed to be strictly greater than 0)
131:             */
132:            protected boolean waitForNotifications(Object lock, long timeout) {
133:                Logger logger = getLogger();
134:                long start = 0;
135:                if (logger.isEnabledFor(Logger.DEBUG)) {
136:                    logger
137:                            .debug("Waiting for notifications " + timeout
138:                                    + " ms");
139:                    start = System.currentTimeMillis();
140:                }
141:
142:                synchronized (lock) {
143:                    try {
144:                        lock.wait(timeout);
145:                    } catch (InterruptedException x) {
146:                        Thread.currentThread().interrupt();
147:                    }
148:                }
149:
150:                if (logger.isEnabledFor(Logger.DEBUG)) {
151:                    long elapsed = System.currentTimeMillis() - start;
152:                    logger.debug("Waited for notifications " + elapsed + " ms");
153:                }
154:
155:                return true;
156:            }
157:
158:            /**
159:             * This method filters the given notification array and returns a possibly smaller array containing
160:             * only notifications that passed successfully the filtering.
161:             * Default behavior is no filtering, but subclasses may choose to change this bahavior.
162:             * For example, for RMI, one can assure that all notifications are truly serializable, and log those
163:             * that are not.
164:             */
165:            protected TargetedNotification[] filterNotifications(
166:                    TargetedNotification[] notifications) {
167:                return notifications;
168:            }
169:
170:            private void addNotification(Integer id, Notification notification) {
171:                buffer.add(new TargetedNotification(notification, id));
172:            }
173:
174:            protected Logger getLogger() {
175:                return Log.getLogger(getClass().getName());
176:            }
177:
178:            private class ServerListener implements  NotificationListener {
179:                public void handleNotification(Notification notification,
180:                        Object handback) {
181:                    Integer id = (Integer) handback;
182:                    addNotification(id, notification);
183:                }
184:            }
185:
186:            private class NotificationBuffer {
187:                private final List notifications = new LinkedList();
188:                private int maxCapacity;
189:                private int purgeDistance;
190:                private long firstSequence;
191:                private long lastSequence;
192:                private long lowestExpectedSequence = -1;
193:
194:                private NotificationBuffer(Map environment) {
195:                    if (environment != null) {
196:                        try {
197:                            Integer maxCapacityInteger = (Integer) environment
198:                                    .get(MX4JRemoteConstants.NOTIFICATION_BUFFER_CAPACITY);
199:                            if (maxCapacityInteger != null)
200:                                maxCapacity = maxCapacityInteger.intValue();
201:                        } catch (Exception ignored) {
202:                        }
203:
204:                        try {
205:                            Integer purgeDistanceInteger = (Integer) environment
206:                                    .get(MX4JRemoteConstants.NOTIFICATION_PURGE_DISTANCE);
207:                            if (purgeDistanceInteger != null)
208:                                purgeDistance = purgeDistanceInteger.intValue();
209:                        } catch (Exception ignored) {
210:                        }
211:                    }
212:                    if (maxCapacity <= 0)
213:                        maxCapacity = 1024;
214:                    if (purgeDistance <= 0)
215:                        purgeDistance = 128;
216:                }
217:
218:                private int getSize() {
219:                    synchronized (this ) {
220:                        return notifications.size();
221:                    }
222:                }
223:
224:                private void add(TargetedNotification notification) {
225:                    Logger logger = getLogger();
226:                    synchronized (this ) {
227:                        if (notifications.size() == maxCapacity) {
228:                            if (logger.isEnabledFor(Logger.DEBUG))
229:                                logger.debug("Notification buffer full: "
230:                                        + this );
231:                            removeRange(0, 1);
232:                        }
233:                        notifications.add(notification);
234:                        ++lastSequence;
235:                        if (logger.isEnabledFor(Logger.DEBUG))
236:                            logger.debug("Notification added to buffer: "
237:                                    + this );
238:                        notifyAll();
239:                    }
240:                }
241:
242:                private void removeRange(int start, int end) {
243:                    synchronized (this ) {
244:                        notifications.subList(start, end).clear();
245:                        firstSequence += end - start;
246:                    }
247:                }
248:
249:                private long getFirstSequenceNumber() {
250:                    synchronized (this ) {
251:                        return firstSequence;
252:                    }
253:                }
254:
255:                private long getLastSequenceNumber() {
256:                    synchronized (this ) {
257:                        return lastSequence;
258:                    }
259:                }
260:
261:                private NotificationResult getNotifications(
262:                        long sequenceNumber, int maxNotifications, long timeout) {
263:                    Logger logger = getLogger();
264:                    synchronized (this ) {
265:                        NotificationResult result = null;
266:                        int size = 0;
267:                        if (sequenceNumber < 0) {
268:                            // We loose the notifications between addNotificationListener() and fetchNotifications(), but c'est la vie.
269:                            long sequence = getLastSequenceNumber();
270:                            size = new Long(sequence + 1).intValue();
271:                            result = new NotificationResult(
272:                                    getFirstSequenceNumber(), sequence,
273:                                    new TargetedNotification[0]);
274:                            if (lowestExpectedSequence < 0)
275:                                lowestExpectedSequence = sequence;
276:                            if (logger.isEnabledFor(Logger.DEBUG))
277:                                logger.debug("First fetchNotification call: "
278:                                        + this  + ", returning " + result);
279:                        } else {
280:                            long firstSequence = getFirstSequenceNumber();
281:
282:                            int losts = 0;
283:                            int start = new Long(sequenceNumber - firstSequence)
284:                                    .intValue();
285:                            // In the time between 2 fetches the buffer may have overflew, so that start < 0.
286:                            // It simply mean that we send the first notification we have (start = 0),
287:                            // and the client will emit a notification lost event.
288:                            if (start < 0) {
289:                                losts = -start;
290:                                start = 0;
291:                            }
292:
293:                            List sublist = null;
294:                            boolean send = false;
295:                            while (size == 0) {
296:                                int end = notifications.size();
297:                                if (end - start > maxNotifications)
298:                                    end = start + maxNotifications;
299:
300:                                sublist = notifications.subList(start, end);
301:                                size = sublist.size();
302:
303:                                if (closed || send)
304:                                    break;
305:
306:                                if (size == 0) {
307:                                    if (timeout <= 0)
308:                                        break;
309:                                    if (logger.isEnabledFor(Logger.DEBUG))
310:                                        logger
311:                                                .debug("No notifications to send, waiting "
312:                                                        + timeout + " ms");
313:
314:                                    // We wait for notifications to arrive. Since we release the lock on the buffer
315:                                    // other threads can modify it. To avoid ConcurrentModificationException we compute
316:                                    // again the sublist by coming up back to the while statement
317:                                    send = waitForNotifications(this , timeout);
318:                                }
319:                            }
320:
321:                            TargetedNotification[] notifications = (TargetedNotification[]) sublist
322:                                    .toArray(new TargetedNotification[size]);
323:                            notifications = filterNotifications(notifications);
324:                            result = new NotificationResult(firstSequence,
325:                                    sequenceNumber + losts + size,
326:                                    notifications);
327:                            if (logger.isEnabledFor(Logger.DEBUG))
328:                                logger
329:                                        .debug("Non-first fetchNotification call: "
330:                                                + this 
331:                                                + ", returning "
332:                                                + result);
333:
334:                            int purged = purgeNotifications(sequenceNumber,
335:                                    size);
336:                            if (logger.isEnabledFor(Logger.DEBUG))
337:                                logger.debug("Purged " + purged
338:                                        + " notifications: " + this );
339:                        }
340:                        return result;
341:                    }
342:                }
343:
344:                private int purgeNotifications(long sequenceNumber, int size) {
345:                    // Record the lowest expected sequence number sent by the client.
346:                    // New clients will always have an initial big sequence number
347:                    // (they're initialized with getLastSequenceNumber()), while old
348:                    // clients can have lesser sequence numbers.
349:                    // Here we record the lesser of these sequence numbers, that is the
350:                    // sequence number of the oldest notification any client may ever ask.
351:                    // This way we can purge old notifications that have already been
352:                    // delivered to clients.
353:
354:                    // The worst case is when a client has a long interval between fetchNotifications()
355:                    // calls, and another client has a short interval. The lowestExpectedSequence will
356:                    // grow with the second client, until a purge happens, so the first client can
357:                    // loose notifications. By tuning appropriately the purgeDistance and the interval
358:                    // between fetchNotifications() calls, it should never happen.
359:
360:                    int result = 0;
361:                    synchronized (this ) {
362:                        if (sequenceNumber <= lowestExpectedSequence) {
363:                            long lowest = Math.min(lowestExpectedSequence,
364:                                    sequenceNumber);
365:
366:                            long firstSequence = getFirstSequenceNumber();
367:                            if (lowest - firstSequence > purgeDistance) {
368:                                // Purge only half of the old notifications, for safety
369:                                int purgeSize = purgeDistance >> 1;
370:                                removeRange(0, purgeSize);
371:                                result = purgeSize;
372:                            }
373:
374:                            long expected = Math.max(sequenceNumber + size,
375:                                    firstSequence);
376:                            lowestExpectedSequence = expected;
377:                        }
378:                    }
379:                    return result;
380:                }
381:
382:                public String toString() {
383:                    StringBuffer buffer = new StringBuffer(
384:                            "NotificationBuffer@");
385:                    buffer.append(Integer.toHexString(hashCode())).append("[");
386:                    buffer.append("first=").append(getFirstSequenceNumber())
387:                            .append(", ");
388:                    buffer.append("last=").append(getLastSequenceNumber())
389:                            .append(", ");
390:                    buffer.append("size=").append(getSize()).append(", ");
391:                    buffer.append("lowestExpected=").append(
392:                            lowestExpectedSequence).append(", ");
393:                    buffer.append("maxCapacity=").append(maxCapacity).append(
394:                            ", ");
395:                    buffer.append("purgeDistance=").append(purgeDistance)
396:                            .append("]");
397:                    return buffer.toString();
398:                }
399:            }
400:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.