001: /**
002: * Copyright 2003-2007 Luck Consulting Pty Ltd
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */package net.sf.ehcache.distribution;
016:
017: import net.sf.ehcache.CacheException;
018: import net.sf.ehcache.CacheManager;
019: import net.sf.ehcache.Ehcache;
020: import org.apache.commons.logging.Log;
021: import org.apache.commons.logging.LogFactory;
022:
023: import java.io.IOException;
024: import java.net.InetAddress;
025: import java.rmi.NotBoundException;
026: import java.util.ArrayList;
027: import java.util.Date;
028: import java.util.Iterator;
029: import java.util.List;
030:
031: /**
032: * A peer provider which discovers peers using Multicast.
033: * <p/>
034: * Hosts can be in three different levels of conformance with the Multicast specification (RFC1112), according to the requirements they meet.
035: * <ol>
036: * <li>Level 0 is the "no support for IP Multicasting" level. Lots of hosts and routers in the Internet are in this state,
037: * as multicast support is not mandatory in IPv4 (it is, however, in IPv6).
038: * Not too much explanation is needed here: hosts in this level can neither send nor receive multicast packets.
039: * They must ignore the ones sent by other multicast capable hosts.
040: * <li>Level 1 is the "support for sending but not receiving multicast IP datagrams" level.
041: * Thus, note that it is not necessary to join a multicast group to be able to send datagrams to it.
042: * Very few additions are needed in the IP module to make a "Level 0" host "Level 1-compliant".
043: * <li>Level 2 is the "full support for IP multicasting" level.
044: * Level 2 hosts must be able to both send and receive multicast traffic.
045: * They must know the way to join and leave multicast groups and to propagate this information to multicast routers.
046: * Thus, they must include an Internet Group Management Protocol (IGMP) implementation in their TCP/IP stack.
047: * </ol>
048: * <p/>
049: * The list of CachePeers is maintained via heartbeats. rmiUrls are looked up using RMI and converted to CachePeers on
050: * registration. On lookup any stale references are removed.
051: *
052: * @author Greg Luck
053: * @version $Id: MulticastRMICacheManagerPeerProvider.java 519 2007-07-27 07:11:45Z gregluck $
054: */
055: public final class MulticastRMICacheManagerPeerProvider extends
056: RMICacheManagerPeerProvider implements CacheManagerPeerProvider {
057:
058: /**
059: * One second, in ms
060: */
061: protected static final int SHORT_DELAY = 100;
062:
063: private static final Log LOG = LogFactory
064: .getLog(MulticastRMICacheManagerPeerProvider.class
065: .getName());
066:
067: private final MulticastKeepaliveHeartbeatReceiver heartBeatReceiver;
068: private final MulticastKeepaliveHeartbeatSender heartBeatSender;
069:
070: /**
071: * Creates and starts a multicast peer provider
072: *
073: * @param groupMulticastAddress 224.0.0.1 to 239.255.255.255 e.g. 230.0.0.1
074: * @param groupMulticastPort 1025 to 65536 e.g. 4446
075: */
076: public MulticastRMICacheManagerPeerProvider(
077: CacheManager cacheManager,
078: InetAddress groupMulticastAddress,
079: Integer groupMulticastPort, Integer timeToLive) {
080: super (cacheManager);
081: heartBeatReceiver = new MulticastKeepaliveHeartbeatReceiver(
082: this , groupMulticastAddress, groupMulticastPort);
083: heartBeatSender = new MulticastKeepaliveHeartbeatSender(
084: cacheManager, groupMulticastAddress,
085: groupMulticastPort, timeToLive);
086: }
087:
088: /**
089: * {@inheritDoc}
090: */
091: public final void init() throws CacheException {
092: try {
093: heartBeatReceiver.init();
094: heartBeatSender.init();
095: } catch (IOException exception) {
096: LOG.error("Error starting heartbeat. Error was: "
097: + exception.getMessage(), exception);
098: throw new CacheException(exception.getMessage());
099: }
100: }
101:
102: /**
103: * Register a new peer, but only if the peer is new, otherwise the last seen timestamp is updated.
104: * <p/>
105: * This method is thread-safe. It relies on peerUrls being a synchronizedMap
106: *
107: * @param rmiUrl
108: */
109: public final void registerPeer(String rmiUrl) {
110: try {
111: CachePeerEntry cachePeerEntry = (CachePeerEntry) peerUrls
112: .get(rmiUrl);
113: if (cachePeerEntry == null || stale(cachePeerEntry.date)) {
114: //can take seconds if there is a problem
115: CachePeer cachePeer = lookupRemoteCachePeer(rmiUrl);
116: cachePeerEntry = new CachePeerEntry(cachePeer,
117: new Date());
118: //synchronized due to peerUrls being a synchronizedMap
119: peerUrls.put(rmiUrl, cachePeerEntry);
120: } else {
121: cachePeerEntry.date = new Date();
122: }
123: } catch (IOException e) {
124: if (LOG.isDebugEnabled()) {
125: LOG.debug("Unable to lookup remote cache peer for "
126: + rmiUrl
127: + ". Removing from peer list. Cause was: "
128: + e.getMessage());
129: }
130: unregisterPeer(rmiUrl);
131: } catch (NotBoundException e) {
132: peerUrls.remove(rmiUrl);
133: if (LOG.isDebugEnabled()) {
134: LOG.debug("Unable to lookup remote cache peer for "
135: + rmiUrl
136: + ". Removing from peer list. Cause was: "
137: + e.getMessage());
138: }
139: } catch (Throwable t) {
140: LOG
141: .error("Unable to lookup remote cache peer for "
142: + rmiUrl
143: + ". Cause was not due to an IOException or NotBoundException which will occur in normal operation:"
144: + " " + t.getMessage());
145: }
146: }
147:
148: /**
149: * @return a list of {@link CachePeer} peers, excluding the local peer.
150: */
151: public final synchronized List listRemoteCachePeers(Ehcache cache)
152: throws CacheException {
153: List remoteCachePeers = new ArrayList();
154: List staleList = new ArrayList();
155: synchronized (peerUrls) {
156: for (Iterator iterator = peerUrls.keySet().iterator(); iterator
157: .hasNext();) {
158: String rmiUrl = (String) iterator.next();
159: String rmiUrlCacheName = extractCacheName(rmiUrl);
160: try {
161: if (!rmiUrlCacheName.equals(cache.getName())) {
162: continue;
163: }
164: CachePeerEntry cachePeerEntry = (CachePeerEntry) peerUrls
165: .get(rmiUrl);
166: Date date = cachePeerEntry.date;
167: if (!stale(date)) {
168: CachePeer cachePeer = cachePeerEntry.cachePeer;
169: remoteCachePeers.add(cachePeer);
170: } else {
171: if (LOG.isDebugEnabled()) {
172: LOG
173: .debug("rmiUrl "
174: + rmiUrl
175: + " is stale. Either the remote peer is shutdown or the "
176: + "network connectivity has been interrupted. Will be removed from list of remote cache peers");
177: }
178: staleList.add(rmiUrl);
179: }
180: } catch (Exception exception) {
181: LOG.error(exception.getMessage(), exception);
182: throw new CacheException(
183: "Unable to list remote cache peers. Error was "
184: + exception.getMessage());
185: }
186: }
187: //Must remove entries after we have finished iterating over them
188: for (int i = 0; i < staleList.size(); i++) {
189: String rmiUrl = (String) staleList.get(i);
190: peerUrls.remove(rmiUrl);
191: }
192: }
193: return remoteCachePeers;
194: }
195:
196: /**
197: * Shutdown the heartbeat
198: */
199: public final void dispose() {
200: heartBeatSender.dispose();
201: heartBeatReceiver.dispose();
202: }
203:
204: /**
205: * Time for a cluster to form. This varies considerably, depending on the implementation.
206: *
207: * @return the time in ms, for a cluster to form
208: */
209: public long getTimeForClusterToForm() {
210: return getStaleTime();
211: }
212:
213: /**
214: * The time after which an unrefreshed peer provider entry is considered stale.
215: */
216: protected long getStaleTime() {
217: return MulticastKeepaliveHeartbeatSender.getHeartBeatInterval()
218: * 2 + SHORT_DELAY;
219: }
220:
221: /**
222: * Whether the entry should be considered stale.
223: * This will depend on the type of RMICacheManagerPeerProvider.
224: * This method should be overridden for implementations that go stale based on date
225: *
226: * @param date the date the entry was created
227: * @return true if stale
228: */
229: protected final boolean stale(Date date) {
230: long now = System.currentTimeMillis();
231: return date.getTime() < (now - getStaleTime());
232: }
233:
234: /**
235: * Entry containing a looked up CachePeer and date
236: */
237: protected static final class CachePeerEntry {
238:
239: private final CachePeer cachePeer;
240: private Date date;
241:
242: /**
243: * Constructor
244: *
245: * @param cachePeer the cache peer part of this entry
246: * @param date the date part of this entry
247: */
248: public CachePeerEntry(CachePeer cachePeer, Date date) {
249: this .cachePeer = cachePeer;
250: this .date = date;
251: }
252:
253: /**
254: * @return the cache peer part of this entry
255: */
256: public final CachePeer getCachePeer() {
257: return cachePeer;
258: }
259:
260: /**
261: * @return the date part of this entry
262: */
263: public final Date getDate() {
264: return date;
265: }
266:
267: }
268:
269: /**
270: * @return the MulticastKeepaliveHeartbeatReceiver
271: */
272: public MulticastKeepaliveHeartbeatReceiver getHeartBeatReceiver() {
273: return heartBeatReceiver;
274: }
275:
276: /**
277: * @return the MulticastKeepaliveHeartbeatSender
278: */
279: public MulticastKeepaliveHeartbeatSender getHeartBeatSender() {
280: return heartBeatSender;
281: }
282: }
|