001: /*
002: * Copyright (C) The MX4J Contributors.
003: * All rights reserved.
004: *
005: * This software is distributed under the terms of the MX4J License version 1.0.
006: * See the terms of the MX4J License in the documentation provided with this software.
007: */
008:
009: package mx4j.remote;
010:
011: import java.io.IOException;
012: import java.util.ArrayList;
013: import java.util.Arrays;
014: import java.util.HashMap;
015: import java.util.Iterator;
016: import java.util.LinkedList;
017: import java.util.List;
018: import java.util.Map;
019: import javax.management.Notification;
020: import javax.management.NotificationFilter;
021: import javax.management.NotificationListener;
022: import javax.management.remote.NotificationResult;
023: import javax.management.remote.TargetedNotification;
024:
025: import mx4j.log.Log;
026: import mx4j.log.Logger;
027:
028: /**
029: * Base implementation of the RemoteNotificationClientHandler interface.
030: *
031: * @version $Revision: 1.7 $
032: */
033: public abstract class AbstractRemoteNotificationClientHandler implements
034: RemoteNotificationClientHandler {
035: private static int fetcherID;
036: private static int delivererID;
037:
038: private final ConnectionNotificationEmitter emitter;
039: private final HeartBeat heartbeat;
040: private final Map tuples = new HashMap();
041: private NotificationFetcherThread fetcherThread;
042: private NotificationDelivererThread delivererThread;
043:
044: /**
045: * Creates a new remote notification client-side handler.
046: * It uses an emitter, an heartbeat and an environment to perform the job.
047: * All 3 can be null, but the corrispondent methods must be overridden
048: *
049: * @param emitter The NotificationEmitter that emits connection failures notifications
050: * @param heartbeat The heart beat is used to get the retry parameters in case of connection failure
051: * @param environment Contains environment variables used to configure this handler
052: * @see MX4JRemoteConstants#FETCH_NOTIFICATIONS_MAX_NUMBER
053: * @see MX4JRemoteConstants#FETCH_NOTIFICATIONS_SLEEP
054: * @see MX4JRemoteConstants#FETCH_NOTIFICATIONS_TIMEOUT
055: * @see #sendConnectionNotificationLost
056: * @see #getMaxRetries
057: * @see #getRetryPeriod
058: */
059: protected AbstractRemoteNotificationClientHandler(
060: ConnectionNotificationEmitter emitter, HeartBeat heartbeat,
061: Map environment) {
062: this .emitter = emitter;
063: this .heartbeat = heartbeat;
064: this .fetcherThread = new NotificationFetcherThread(environment);
065: this .delivererThread = new NotificationDelivererThread(
066: environment);
067: }
068:
069: /**
070: * Returns whether this client handler is fetching notifications or not.
071: *
072: * @see #start
073: * @see #stop
074: */
075: public boolean isActive() {
076: return fetcherThread.isActive();
077: }
078:
079: public void start() {
080: if (isActive())
081: return;
082: delivererThread.start();
083: fetcherThread.start();
084: }
085:
086: public void stop() {
087: if (!isActive())
088: return;
089: fetcherThread.stop();
090: delivererThread.stop();
091: synchronized (tuples) {
092: tuples.clear();
093: }
094: }
095:
096: private synchronized static int getFetcherID() {
097: return ++fetcherID;
098: }
099:
100: private synchronized static int getDelivererID() {
101: return ++delivererID;
102: }
103:
104: public boolean contains(NotificationTuple tuple) {
105: synchronized (tuples) {
106: return tuples.containsValue(tuple);
107: }
108: }
109:
110: public void addNotificationListener(Integer id,
111: NotificationTuple tuple) {
112: if (!isActive())
113: start();
114:
115: synchronized (tuples) {
116: tuples.put(id, tuple);
117: }
118:
119: Logger logger = getLogger();
120: if (logger.isEnabledFor(Logger.DEBUG))
121: logger.debug("Adding remote NotificationListener " + tuple);
122: }
123:
124: public Integer[] getNotificationListeners(NotificationTuple tuple) {
125: synchronized (tuples) {
126: ArrayList ids = new ArrayList();
127: for (Iterator i = tuples.entrySet().iterator(); i.hasNext();) {
128: Map.Entry entry = (Map.Entry) i.next();
129: if (entry.getValue().equals(tuple))
130: ids.add(entry.getKey());
131: }
132: if (ids.size() > 0)
133: return (Integer[]) ids.toArray(new Integer[ids.size()]);
134: }
135: return null;
136: }
137:
138: public Integer getNotificationListener(NotificationTuple tuple) {
139: synchronized (tuples) {
140: for (Iterator i = tuples.entrySet().iterator(); i.hasNext();) {
141: Map.Entry entry = (Map.Entry) i.next();
142: if (entry.getValue().equals(tuple))
143: return (Integer) entry.getKey();
144: }
145: }
146: return null;
147: }
148:
149: public void removeNotificationListeners(Integer[] ids) {
150: Logger logger = getLogger();
151: synchronized (tuples) {
152: for (int i = 0; i < ids.length; ++i) {
153: Integer id = ids[i];
154: NotificationTuple tuple = (NotificationTuple) tuples
155: .remove(id);
156: if (tuple != null && logger.isEnabledFor(Logger.DEBUG))
157: logger
158: .debug("Removing remote NotificationListener "
159: + tuple);
160: }
161: }
162: }
163:
164: /**
165: * Calls the server side to fetch notifications.
166: */
167: protected abstract NotificationResult fetchNotifications(
168: long sequence, int maxNumber, long timeout)
169: throws IOException;
170:
171: /**
172: * Returns the period between two retries if the connection with the server side fails.
173: * This implementation returns the heartbeat pulse period, but can be overridden.
174: *
175: * @see #getMaxRetries
176: * @see #AbstractRemoteNotificationClientHandler
177: */
178: protected long getRetryPeriod() {
179: return heartbeat.getPulsePeriod();
180: }
181:
182: /**
183: * Returns the maximum number of attempts that should be made before declaring a connection
184: * failed.
185: * This implementation returns the heartbeat max retries, but can be overridden.
186: *
187: * @see #getRetryPeriod
188: * @see #AbstractRemoteNotificationClientHandler
189: */
190: protected int getMaxRetries() {
191: return heartbeat.getMaxRetries();
192: }
193:
194: /**
195: * Sends the {@link javax.management.remote.JMXConnectionNotification#NOTIFS_LOST} notification
196: * using the emitter passed to {@link AbstractRemoteNotificationClientHandler}
197: */
198: protected void sendConnectionNotificationLost(long number) {
199: emitter.sendConnectionNotificationLost(number);
200: }
201:
202: protected int getNotificationsCount() {
203: return delivererThread.getNotificationsCount();
204: }
205:
206: private int deliverNotifications(
207: TargetedNotification[] notifications) {
208: return delivererThread.addNotifications(notifications);
209: }
210:
211: private void sendNotification(TargetedNotification notification) {
212: NotificationTuple tuple = null;
213: synchronized (tuples) {
214: tuple = (NotificationTuple) tuples.get(notification
215: .getListenerID());
216: }
217:
218: // It may be possible that a notification arrived after the client already removed the listener
219: if (tuple == null)
220: return;
221:
222: Notification notif = notification.getNotification();
223:
224: Logger logger = getLogger();
225:
226: if (tuple.getInvokeFilter()) {
227: // Invoke the filter on client side
228: NotificationFilter filter = tuple.getNotificationFilter();
229: if (logger.isEnabledFor(Logger.DEBUG))
230: logger.debug("Filtering notification " + notif
231: + ", filter = " + filter);
232: if (filter != null) {
233: try {
234: boolean deliver = filter
235: .isNotificationEnabled(notif);
236: if (!deliver)
237: return;
238: } catch (Throwable x) {
239: logger.warn(
240: "Throwable caught from isNotificationEnabled, filter = "
241: + filter, x);
242: // And go on quietly
243: }
244: }
245: }
246:
247: if (logger.isEnabledFor(Logger.DEBUG))
248: logger.debug("Sending Notification " + notif
249: + ", listener info is " + tuple);
250:
251: NotificationListener listener = tuple.getNotificationListener();
252:
253: try {
254: listener.handleNotification(notif, tuple.getHandback());
255: } catch (Throwable x) {
256: logger.warn(
257: "Throwable caught from handleNotification, listener = "
258: + listener, x);
259: // And return quietly
260: }
261: }
262:
263: protected Logger getLogger() {
264: return Log.getLogger(getClass().getName());
265: }
266:
267: private class NotificationFetcherThread implements Runnable {
268: private long sequenceNumber;
269: private volatile boolean active;
270: private Thread thread;
271: private long timeout;
272: private int maxNumber;
273: private long sleep;
274:
275: private NotificationFetcherThread(Map environment) {
276: // Default server timeout is one minute
277: timeout = 60 * 1000;
278: // At most 25 notifications at time
279: maxNumber = 25;
280: // By default we don't sleep and we call the server again.
281: sleep = 0;
282: if (environment != null) {
283: try {
284: timeout = ((Long) environment
285: .get(MX4JRemoteConstants.FETCH_NOTIFICATIONS_TIMEOUT))
286: .longValue();
287: } catch (Exception ignored) {
288: }
289: try {
290: maxNumber = ((Integer) environment
291: .get(MX4JRemoteConstants.FETCH_NOTIFICATIONS_MAX_NUMBER))
292: .intValue();
293: } catch (Exception ignored) {
294: }
295: try {
296: sleep = ((Integer) environment
297: .get(MX4JRemoteConstants.FETCH_NOTIFICATIONS_SLEEP))
298: .intValue();
299: } catch (Exception ignored) {
300: }
301: }
302: }
303:
304: private synchronized long getSequenceNumber() {
305: return sequenceNumber;
306: }
307:
308: private synchronized void setSequenceNumber(long sequenceNumber) {
309: this .sequenceNumber = sequenceNumber;
310: }
311:
312: private boolean isActive() {
313: return active;
314: }
315:
316: private synchronized void start() {
317: active = true;
318: // Initialized to a negative value for the first fetchNotification call
319: sequenceNumber = -1;
320: thread = new Thread(this , "Notification Fetcher #"
321: + getFetcherID());
322: thread.setDaemon(true);
323: thread.start();
324: }
325:
326: private synchronized void stop() {
327: active = false;
328: thread.interrupt();
329: }
330:
331: public void run() {
332: Logger logger = getLogger();
333: try {
334: while (isActive() && !thread.isInterrupted()) {
335: try {
336: long sequence = getSequenceNumber();
337: NotificationResult result = fetchNotifications(
338: sequence, maxNumber, timeout);
339: if (logger.isEnabledFor(Logger.DEBUG))
340: logger.debug("Fetched Notifications: "
341: + result);
342:
343: long sleepTime = sleep;
344: if (result != null) {
345: long nextSequence = result
346: .getNextSequenceNumber();
347: TargetedNotification[] targeted = result
348: .getTargetedNotifications();
349: int targetedLength = targeted == null ? 0
350: : targeted.length;
351: boolean notifsFilteredByServer = sequence >= 0 ? nextSequence
352: - sequence != targetedLength
353: : false;
354: boolean notifsLostByServer = sequence >= 0
355: && result
356: .getEarliestSequenceNumber() > sequence;
357: if (notifsFilteredByServer) {
358: // We lost some notification
359: sendConnectionNotificationLost(nextSequence
360: - sequence - targetedLength);
361: }
362: if (notifsLostByServer) {
363: // We lost some notification
364: sendConnectionNotificationLost(result
365: .getEarliestSequenceNumber()
366: - sequence);
367: }
368:
369: setSequenceNumber(nextSequence);
370: int delivered = deliverNotifications(targeted);
371: if (delivered < targetedLength) {
372: // We lost some notification
373: sendConnectionNotificationLost(targetedLength
374: - delivered);
375: }
376:
377: // If we got a maxNumber of notifications, probably the server has more to send, don't sleep
378: if (targeted != null
379: && targeted.length == maxNumber)
380: sleepTime = 0;
381: }
382:
383: if (sleepTime > 0)
384: Thread.sleep(sleepTime);
385: } catch (IOException x) {
386: if (logger.isEnabledFor(Logger.DEBUG))
387: logger
388: .debug(
389: "Caught IOException from fetchNotifications",
390: x);
391: break;
392: } catch (InterruptedException x) {
393: Thread.currentThread().interrupt();
394: break;
395: } catch (Throwable x) {
396: if (logger.isEnabledFor(Logger.WARN))
397: logger
398: .warn(
399: "Caught an unexpected exception",
400: x);
401: }
402: }
403: } finally {
404: AbstractRemoteNotificationClientHandler.this .stop();
405: if (logger.isEnabledFor(Logger.DEBUG))
406: logger.debug(thread.getName() + " Thread exited");
407: }
408: }
409:
410: /**
411: * Fetches notifications from the server side in a separate thread.
412: * Since it involves a remote call, IOExceptions must be handled carefully.
413: * If the connection fails for any reason, the thread will be a sleep and then
414: * retry for a configurable number of times.
415: * If the connection is really lost, the thread will exit.
416: */
417: private NotificationResult fetchNotifications(long sequence,
418: int maxNumber, long timeout) throws IOException,
419: InterruptedException {
420: Logger logger = getLogger();
421: int retries = 0;
422: while (true) {
423: if (logger.isEnabledFor(Logger.DEBUG))
424: logger.debug("Fetching notifications, sequence is "
425: + sequence + ", timeout is " + timeout);
426: try {
427: return AbstractRemoteNotificationClientHandler.this
428: .fetchNotifications(sequence, maxNumber,
429: timeout);
430: } catch (IOException x) {
431: if (logger.isEnabledFor(Logger.DEBUG))
432: logger.debug(
433: "Could not fetch notifications, sleeping "
434: + getRetryPeriod()
435: + " and trying "
436: + (getMaxRetries() - retries)
437: + " more times", x);
438: Thread.sleep(getRetryPeriod());
439: if (retries++ == getMaxRetries())
440: throw x;
441: }
442: }
443: }
444: }
445:
446: private class NotificationDelivererThread implements Runnable {
447: private final List notificationQueue = new LinkedList();
448: private int capacity;
449: private volatile boolean active;
450: private Thread thread;
451:
452: private NotificationDelivererThread(Map environment) {
453: if (environment != null) {
454: Object size = environment
455: .get(MX4JRemoteConstants.NOTIFICATION_QUEUE_CAPACITY);
456: if (size instanceof Integer) {
457: capacity = ((Integer) size).intValue();
458: if (capacity < 0)
459: capacity = 0;
460: }
461: }
462: }
463:
464: private int addNotifications(
465: TargetedNotification[] notifications) {
466: if (notifications == null || notifications.length == 0)
467: return 0;
468:
469: List notifs = Arrays.asList(notifications);
470:
471: Logger logger = getLogger();
472: if (logger.isEnabledFor(Logger.DEBUG))
473: logger.debug("Enqueuing notifications for delivery: "
474: + notifs);
475:
476: synchronized (this ) {
477: int size = notifs.size();
478: int added = size;
479: if (capacity > 0) {
480: int room = capacity - notificationQueue.size();
481: if (room < size) {
482: added = room;
483: if (logger.isEnabledFor(Logger.DEBUG))
484: logger
485: .debug("Notification queue is full, enqueued "
486: + room
487: + " notifications out of "
488: + size
489: + ", exceeding will be lost");
490: }
491: notificationQueue.addAll(notifs.subList(0, added));
492: } else {
493: notificationQueue.addAll(notifs);
494: }
495: notifyAll();
496: return added;
497: }
498: }
499:
500: private boolean isActive() {
501: return active;
502: }
503:
504: private synchronized void start() {
505: active = true;
506: notificationQueue.clear();
507: thread = new Thread(this , "Notification Deliverer #"
508: + getDelivererID());
509: thread.setDaemon(true);
510: thread.start();
511: }
512:
513: private synchronized void stop() {
514: active = false;
515: thread.interrupt();
516: }
517:
518: public void run() {
519: Logger logger = getLogger();
520: try {
521: while (isActive() && !thread.isInterrupted()) {
522: try {
523: TargetedNotification notification = null;
524: synchronized (this ) {
525: while (notificationQueue.isEmpty())
526: wait();
527: notification = (TargetedNotification) notificationQueue
528: .remove(0);
529: }
530: sendNotification(notification);
531: } catch (InterruptedException x) {
532: Thread.currentThread().interrupt();
533: break;
534: } catch (Throwable x) {
535: if (logger.isEnabledFor(Logger.WARN))
536: logger
537: .warn(
538: "Caught an unexpected exception",
539: x);
540: }
541: }
542: } finally {
543: active = false;
544: if (logger.isEnabledFor(Logger.DEBUG))
545: logger.debug(thread.getName() + " Thread exited");
546: }
547: }
548:
549: private int getNotificationsCount() {
550: synchronized (this) {
551: return notificationQueue.size();
552: }
553: }
554: }
555: }
|