001: /**
002: * Copyright 2003-2007 Luck Consulting Pty Ltd
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */package net.sf.ehcache.distribution;
016:
017: import net.sf.ehcache.CacheException;
018: import net.sf.ehcache.CacheManager;
019: import net.sf.ehcache.Ehcache;
020: import net.sf.ehcache.Status;
021: import net.sf.ehcache.event.CacheEventListener;
022: import org.apache.commons.logging.Log;
023: import org.apache.commons.logging.LogFactory;
024:
025: import java.io.IOException;
026: import java.net.InetAddress;
027: import java.net.ServerSocket;
028: import java.net.UnknownHostException;
029: import java.rmi.Naming;
030: import java.rmi.NotBoundException;
031: import java.rmi.Remote;
032: import java.rmi.RemoteException;
033: import java.rmi.registry.LocateRegistry;
034: import java.rmi.registry.Registry;
035: import java.rmi.server.ExportException;
036: import java.rmi.server.UnicastRemoteObject;
037: import java.util.ArrayList;
038: import java.util.HashMap;
039: import java.util.Iterator;
040: import java.util.List;
041: import java.util.Map;
042: import java.util.Set;
043:
044: /**
045: * A cache server which exposes available cache operations remotely through RMI.
046: * <p/>
047: * It acts as a Decorator to a Cache. It holds an instance of cache, which is a local cache it talks to.
048: * <p/>
049: * This class could specify a security manager with code like:
050: * <pre>
051: * if (System.getSecurityManager() == null) {
052: * System.setSecurityManager(new RMISecurityManager());
053: * }
054: * </pre>
055: * Doing so would require the addition of <code>grant</code> statements in the <code>java.policy</code> file.
056: * <p/>
057: * Per the JDK documentation: "If no security manager is specified no class loading, by RMI clients or servers, is allowed,
058: * aside from what can be found in the local CLASSPATH." The classpath of each instance of this class should have
059: * all required classes to enable distribution, so no remote classloading is required or desirable. Accordingly,
060: * no security manager is set and there are no special JVM configuration requirements.
061: * <p/>
062: * This class opens a ServerSocket. The dispose method should be called for orderly closure of that socket. This class
063: * has a shutdown hook which calls dispose() as a convenience feature for developers.
064: *
065: * @author Greg Luck
066: * @version $Id: RMICacheManagerPeerListener.java 519 2007-07-27 07:11:45Z gregluck $
067: */
068: public class RMICacheManagerPeerListener implements
069: CacheManagerPeerListener {
070:
071: private static final Log LOG = LogFactory
072: .getLog(RMICacheManagerPeerListener.class.getName());
073: private static final int MINIMUM_SENSIBLE_TIMEOUT = 200;
074: private static final int NAMING_UNBIND_RETRY_INTERVAL = 400;
075: private static final int NAMING_UNBIND_MAX_RETRIES = 10;
076:
077: /**
078: * The cache peers. The value is an RMICachePeer.
079: */
080: protected final Map cachePeers = new HashMap();
081:
082: /**
083: * status.
084: */
085: protected Status status;
086:
087: /**
088: * The RMI listener port
089: */
090: protected Integer port;
091:
092: private Registry registry;
093: private boolean registryCreated;
094: private final String hostName;
095:
096: private CacheManager cacheManager;
097: private Integer socketTimeoutMillis;
098:
099: /**
100: * Constructor with full arguments.
101: *
102: * @param hostName may be null, in which case the hostName will be looked up. Machines with multiple
103: * interfaces should specify this if they do not want it to be the default NIC.
104: * @param port a port in the range 1025 - 65536
105: * @param cacheManager the CacheManager this listener belongs to
106: * @param socketTimeoutMillis TCP/IP Socket timeout when waiting on response
107: */
108: public RMICacheManagerPeerListener(String hostName, Integer port,
109: CacheManager cacheManager, Integer socketTimeoutMillis)
110: throws UnknownHostException {
111:
112: status = Status.STATUS_UNINITIALISED;
113:
114: if (hostName != null && hostName.length() != 0) {
115: this .hostName = hostName;
116: if (hostName.equals("localhost")) {
117: LOG
118: .warn("Explicitly setting the listener hostname to 'localhost' is not recommended. "
119: + "It will only work if all CacheManager peers are on the same machine.");
120: }
121: } else {
122: this .hostName = calculateHostAddress();
123: }
124: if (port == null || port.intValue() == 0) {
125: assignFreePort(false);
126: } else {
127: this .port = port;
128: }
129: this .cacheManager = cacheManager;
130: if (socketTimeoutMillis == null
131: || socketTimeoutMillis.intValue() < MINIMUM_SENSIBLE_TIMEOUT) {
132: throw new IllegalArgumentException(
133: "socketTimoutMillis must be a reasonable value greater than 200ms");
134: }
135: this .socketTimeoutMillis = socketTimeoutMillis;
136:
137: }
138:
139: /**
140: * Assigns a free port to be the listener port.
141: *
142: * @throws IllegalStateException if the statis of the listener is not {@link net.sf.ehcache.Status#STATUS_UNINITIALISED}
143: */
144: protected void assignFreePort(boolean forced)
145: throws IllegalStateException {
146: if (status != Status.STATUS_UNINITIALISED) {
147: throw new IllegalStateException(
148: "Cannot change the port of an already started listener.");
149: }
150: this .port = new Integer(this .getFreePort());
151: if (forced) {
152: LOG
153: .warn("Resolving RMI port conflict by automatically using a free TCP/IP port to listen on: "
154: + this .port);
155: } else {
156: LOG
157: .debug("Automatically finding a free TCP/IP port to listen on: "
158: + this .port);
159: }
160: }
161:
162: /**
163: * Calculates the host address as the default NICs IP address
164: *
165: * @throws UnknownHostException
166: */
167: protected String calculateHostAddress() throws UnknownHostException {
168: return InetAddress.getLocalHost().getHostAddress();
169: }
170:
171: /**
172: * Gets a free server socket port.
173: *
174: * @return a number in the range 1025 - 65536 that was free at the time this method was executed
175: * @throws IllegalArgumentException
176: */
177: protected int getFreePort() throws IllegalArgumentException {
178: ServerSocket serverSocket = null;
179: try {
180: serverSocket = new ServerSocket(0);
181: return serverSocket.getLocalPort();
182: } catch (IOException e) {
183: throw new IllegalArgumentException(
184: "Could not acquire a free port number.");
185: } finally {
186: if (serverSocket != null && !serverSocket.isClosed()) {
187: try {
188: serverSocket.close();
189: } catch (Exception e) {
190: LOG.debug("Error closing ServerSocket: "
191: + e.getMessage());
192: }
193: }
194: }
195: }
196:
197: /**
198: * {@inheritDoc}
199: */
200: public void init() throws CacheException {
201: RMICachePeer rmiCachePeer = null;
202: try {
203: startRegistry();
204: int counter = 0;
205: populateListOfRemoteCachePeers();
206: synchronized (cachePeers) {
207: for (Iterator iterator = cachePeers.values().iterator(); iterator
208: .hasNext();) {
209: rmiCachePeer = (RMICachePeer) iterator.next();
210: bind(rmiCachePeer.getUrl(), rmiCachePeer);
211: counter++;
212: }
213: }
214: LOG
215: .debug(counter
216: + " RMICachePeers bound in registry for RMI listener");
217: status = Status.STATUS_ALIVE;
218: } catch (Exception e) {
219: String url = null;
220: if (rmiCachePeer != null) {
221: url = rmiCachePeer.getUrl();
222: }
223:
224: throw new CacheException(
225: "Problem starting listener for RMICachePeer " + url
226: + ". Initial cause was " + e.getMessage(),
227: e);
228: }
229: }
230:
231: /**
232: * Bind a cache peer
233: *
234: * @param rmiCachePeer
235: */
236: protected void bind(String peerName, RMICachePeer rmiCachePeer)
237: throws Exception {
238: Naming.rebind(peerName, rmiCachePeer);
239: }
240:
241: /**
242: * Returns a list of bound objects.
243: * <p/>
244: * This should match the list of cachePeers i.e. they should always be bound
245: *
246: * @return a list of String representations of <code>RMICachePeer</code> objects
247: */
248: protected String[] listBoundRMICachePeers() throws CacheException {
249: try {
250: return registry.list();
251: } catch (RemoteException e) {
252: throw new CacheException("Unable to list cache peers "
253: + e.getMessage());
254: }
255: }
256:
257: /**
258: * Returns a reference to the remote object.
259: *
260: * @param name the name of the cache e.g. <code>sampleCache1</code>
261: */
262: protected Remote lookupPeer(String name) throws CacheException {
263: try {
264: return registry.lookup(name);
265: } catch (Exception e) {
266: throw new CacheException(
267: "Unable to lookup peer for replicated cache "
268: + name + " " + e.getMessage());
269: }
270: }
271:
272: /**
273: * Should be called on init because this is one of the last things that should happen on CacheManager startup.
274: */
275: protected void populateListOfRemoteCachePeers()
276: throws RemoteException {
277: String[] names = cacheManager.getCacheNames();
278: for (int i = 0; i < names.length; i++) {
279: String name = names[i];
280: Ehcache cache = cacheManager.getEhcache(name);
281: synchronized (cachePeers) {
282: if (cachePeers.get(name) == null) {
283: if (isDistributed(cache)) {
284: RMICachePeer peer = new RMICachePeer(cache,
285: hostName, port, socketTimeoutMillis);
286: cachePeers.put(name, peer);
287: }
288: }
289: }
290: }
291:
292: }
293:
294: /**
295: * Determine if the given cache is distributed.
296: *
297: * @param cache the cache to check
298: * @return true if a <code>CacheReplicator</code> is found in the listeners
299: */
300: protected boolean isDistributed(Ehcache cache) {
301: Set listeners = cache.getCacheEventNotificationService()
302: .getCacheEventListeners();
303: for (Iterator iterator = listeners.iterator(); iterator
304: .hasNext();) {
305: CacheEventListener cacheEventListener = (CacheEventListener) iterator
306: .next();
307: if (cacheEventListener instanceof CacheReplicator) {
308: return true;
309: }
310: }
311: return false;
312: }
313:
314: /**
315: * Start the rmiregistry.
316: * <p/>
317: * The alternative is to use the <code>rmiregistry</code> binary, in which case:
318: * <ol/>
319: * <li>rmiregistry running
320: * <li>-Djava.rmi.server.codebase="file:///Users/gluck/work/ehcache/build/classes/ file:///Users/gluck/work/ehcache/lib/commons-logging-1.0.4.jar"
321: * </ol>
322: *
323: * @throws RemoteException
324: */
325: protected void startRegistry() throws RemoteException {
326: try {
327: registry = LocateRegistry.getRegistry(port.intValue());
328: try {
329: registry.list();
330: } catch (RemoteException e) {
331: //may not be created. Let's create it.
332: registry = LocateRegistry.createRegistry(port
333: .intValue());
334: registryCreated = true;
335: }
336: } catch (ExportException exception) {
337: LOG.fatal("Exception starting RMI registry. Error was "
338: + exception.getMessage(), exception);
339: }
340: }
341:
342: /**
343: * Stop the rmiregistry if it was started by this class.
344: *
345: * @throws RemoteException
346: */
347: protected void stopRegistry() throws RemoteException {
348: if (registryCreated) {
349: // the unexportObject call must be done on the Registry object returned
350: // by createRegistry not by getRegistry, a NoSuchObjectException is
351: // thrown otherwise
352: boolean success = UnicastRemoteObject.unexportObject(
353: registry, true);
354: if (success) {
355: LOG.debug("rmiregistry unexported.");
356: } else {
357: LOG.warn("Could not unexport rmiregistry.");
358: }
359: }
360: }
361:
362: /**
363: * Stop the listener. It
364: * <ul>
365: * <li>unbinds the objects from the registry
366: * <li>unexports Remote objects
367: * </ul>
368: */
369: public void dispose() throws CacheException {
370: try {
371: int counter = 0;
372: synchronized (cachePeers) {
373: for (Iterator iterator = cachePeers.values().iterator(); iterator
374: .hasNext();) {
375: RMICachePeer rmiCachePeer = (RMICachePeer) iterator
376: .next();
377: disposeRMICachePeer(rmiCachePeer);
378: counter++;
379: }
380: stopRegistry();
381: }
382: LOG
383: .debug(counter
384: + " RMICachePeers unbound from registry in RMI listener");
385: status = Status.STATUS_SHUTDOWN;
386: } catch (Exception e) {
387: throw new CacheException(
388: "Problem unbinding remote cache peers. Initial cause was "
389: + e.getMessage(), e);
390: }
391: }
392:
393: /**
394: * A template method to dispose an individual RMICachePeer. This consists of:
395: * <ol>
396: * <li>Unbinding the peer from the naming service
397: * <li>Unexporting the peer
398: * </ol>
399: * Override to specialise behaviour
400: *
401: * @param rmiCachePeer the cache peer to dispose of
402: * @throws Exception thrown if something goes wrong
403: */
404: protected void disposeRMICachePeer(RMICachePeer rmiCachePeer)
405: throws Exception {
406: unbind(rmiCachePeer);
407: }
408:
409: /**
410: * Unbinds an RMICachePeer and unexports it.
411: * <p/>
412: * We unbind from the registry first before unexporting.
413: * Unbinding first removes the very small possibility of a client
414: * getting the object from the registry while we are trying to unexport it.
415: * <p/>
416: * This method may take up to 4 seconds to complete, if we are having trouble
417: * unexporting the peer.
418: *
419: * @param rmiCachePeer the bound and exported cache peer
420: * @throws Exception
421: */
422: protected void unbind(RMICachePeer rmiCachePeer) throws Exception {
423: String url = rmiCachePeer.getUrl();
424: try {
425: Naming.unbind(url);
426: } catch (NotBoundException e) {
427: LOG.warn(url + " not bound therefore not unbinding.");
428: }
429: // Try to gracefully unexport before forcing it.
430: boolean unexported = UnicastRemoteObject.unexportObject(
431: rmiCachePeer, false);
432: for (int count = 1; (count < NAMING_UNBIND_MAX_RETRIES)
433: && !unexported; count++) {
434: try {
435: Thread.sleep(NAMING_UNBIND_RETRY_INTERVAL);
436: } catch (InterruptedException ie) {
437: // break out of the unexportObject loop
438: break;
439: }
440: unexported = UnicastRemoteObject.unexportObject(
441: rmiCachePeer, false);
442: }
443:
444: // If we still haven't been able to unexport, force the unexport
445: // as a last resort.
446: if (!unexported) {
447: if (!UnicastRemoteObject.unexportObject(rmiCachePeer, true)) {
448: LOG.warn("Unable to unexport rmiCachePeer: "
449: + rmiCachePeer.getUrl() + ". Skipping.");
450: }
451: }
452: }
453:
454: /**
455: * All of the caches which are listening for remote changes.
456: *
457: * @return a list of <code>RMICachePeer</code> objects. The list if not live
458: */
459: public List getBoundCachePeers() {
460: List cachePeerList = new ArrayList();
461: synchronized (cachePeers) {
462: for (Iterator iterator = cachePeers.values().iterator(); iterator
463: .hasNext();) {
464: RMICachePeer rmiCachePeer = (RMICachePeer) iterator
465: .next();
466: cachePeerList.add(rmiCachePeer);
467: }
468: }
469: return cachePeerList;
470: }
471:
472: /**
473: * Returns the listener status.
474: */
475: public Status getStatus() {
476: return status;
477: }
478:
479: /**
480: * A listener will normally have a resource that only one instance can use at the same time,
481: * such as a port. This identifier is used to tell if it is unique and will not conflict with an
482: * existing instance using the resource.
483: *
484: * @return a String identifier for the resource
485: */
486: public String getUniqueResourceIdentifier() {
487: return "RMI listener port: " + port;
488: }
489:
490: /**
491: * If a conflict is detected in unique resource use, this method signals the listener to attempt
492: * automatic resolution of the resource conflict.
493: *
494: * @throws IllegalStateException if the statis of the listener is not {@link net.sf.ehcache.Status#STATUS_UNINITIALISED}
495: */
496: public void attemptResolutionOfUniqueResourceConflict()
497: throws IllegalStateException, CacheException {
498: assignFreePort(true);
499: }
500:
501: /**
502: * Called immediately after a cache has been added and activated.
503: * <p/>
504: * Note that the CacheManager calls this method from a synchronized method. Any attempt to call a synchronized
505: * method on CacheManager from this method will cause a deadlock.
506: * <p/>
507: * Note that activation will also cause a CacheEventListener status change notification from
508: * {@link net.sf.ehcache.Status#STATUS_UNINITIALISED} to {@link net.sf.ehcache.Status#STATUS_ALIVE}. Care should be
509: * taken on processing that notification because:
510: * <ul>
511: * <li>the cache will not yet be accessible from the CacheManager.
512: * <li>the addCaches methods whih cause this notification are synchronized on the CacheManager. An attempt to call
513: * {@link net.sf.ehcache.CacheManager#getCache(String)} will cause a deadlock.
514: * </ul>
515: * The calling method will block until this method returns.
516: * <p/>
517: * Repopulates the list of cache peers and rebinds the list.
518: * This method should be called if a cache is dynamically added
519: *
520: * @param cacheName the name of the <code>Cache</code> the operation relates to
521: * @see net.sf.ehcache.event.CacheEventListener
522: */
523: public void notifyCacheAdded(String cacheName)
524: throws CacheException {
525:
526: if (LOG.isDebugEnabled()) {
527: LOG.debug("Adding " + cacheName + " to RMI listener");
528: }
529:
530: //Don't add if exists.
531: synchronized (cachePeers) {
532: if (cachePeers.get(cacheName) != null) {
533: return;
534: }
535: }
536:
537: Ehcache cache = cacheManager.getEhcache(cacheName);
538: if (isDistributed(cache)) {
539: RMICachePeer rmiCachePeer = null;
540: String url = null;
541: try {
542: rmiCachePeer = new RMICachePeer(cache, hostName, port,
543: socketTimeoutMillis);
544: url = rmiCachePeer.getUrl();
545: bind(url, rmiCachePeer);
546: } catch (Exception e) {
547: throw new CacheException(
548: "Problem starting listener for RMICachePeer "
549: + url + ". Initial cause was "
550: + e.getMessage(), e);
551: }
552:
553: synchronized (cachePeers) {
554: cachePeers.put(cacheName, rmiCachePeer);
555: }
556:
557: }
558: if (LOG.isDebugEnabled()) {
559: LOG
560: .debug(cachePeers.size()
561: + " RMICachePeers bound in registry for RMI listener");
562: }
563: }
564:
565: /**
566: * Called immediately after a cache has been disposed and removed. The calling method will block until
567: * this method returns.
568: * <p/>
569: * Note that the CacheManager calls this method from a synchronized method. Any attempt to call a synchronized
570: * method on CacheManager from this method will cause a deadlock.
571: * <p/>
572: * Note that a {@link net.sf.ehcache.event.CacheEventListener} status changed will also be triggered. Any attempt from that notification
573: * to access CacheManager will also result in a deadlock.
574: *
575: * @param cacheName the name of the <code>Cache</code> the operation relates to
576: */
577: public void notifyCacheRemoved(String cacheName) {
578:
579: if (LOG.isDebugEnabled()) {
580: LOG.debug("Removing " + cacheName + " from RMI listener");
581: }
582:
583: //don't remove if already removed.
584: synchronized (cachePeers) {
585: if (cachePeers.get(cacheName) == null) {
586: return;
587: }
588: }
589:
590: RMICachePeer rmiCachePeer;
591: synchronized (cachePeers) {
592: rmiCachePeer = (RMICachePeer) cachePeers.remove(cacheName);
593: }
594: String url = null;
595: try {
596: unbind(rmiCachePeer);
597: } catch (Exception e) {
598: throw new CacheException("Error removing Cache Peer " + url
599: + " from listener. Message was: " + e.getMessage(),
600: e);
601: }
602:
603: if (LOG.isDebugEnabled()) {
604: LOG
605: .debug(cachePeers.size()
606: + " RMICachePeers bound in registry for RMI listener");
607: }
608: }
609:
610: /**
611: * Package local method for testing
612: */
613: void addCachePeer(String name, RMICachePeer peer) {
614: synchronized (cachePeers) {
615: cachePeers.put(name, peer);
616:
617: }
618: }
619: }
|