001: /**
002: * Copyright 2003-2007 Luck Consulting Pty Ltd
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */package net.sf.ehcache.distribution;
016:
017: import net.sf.ehcache.CacheManager;
018: import org.apache.commons.logging.Log;
019: import org.apache.commons.logging.LogFactory;
020:
021: import java.io.IOException;
022: import java.net.DatagramPacket;
023: import java.net.InetAddress;
024: import java.net.MulticastSocket;
025: import java.rmi.RemoteException;
026: import java.util.Set;
027: import java.util.Collections;
028: import java.util.HashSet;
029: import java.util.StringTokenizer;
030: import java.util.List;
031:
032: import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
033: import edu.emory.mathcs.backport.java.util.concurrent.Executors;
034:
035: /**
036: * Receives heartbeats from any {@link MulticastKeepaliveHeartbeatSender}s out there.
037: * <p/>
038: * Our own multicast heartbeats are ignored.
039: *
040: * @author Greg Luck
041: * @version $Id: MulticastKeepaliveHeartbeatReceiver.java 556 2007-10-29 02:06:30Z gregluck $
042: */
043: public final class MulticastKeepaliveHeartbeatReceiver {
044: private static final Log LOG = LogFactory
045: .getLog(MulticastKeepaliveHeartbeatReceiver.class.getName());
046:
047: private ExecutorService processingThreadPool;
048: private Set rmiUrlsProcessingQueue = Collections
049: .synchronizedSet(new HashSet());
050: private final InetAddress groupMulticastAddress;
051: private final Integer groupMulticastPort;
052: private MulticastReceiverThread receiverThread;
053: private MulticastSocket socket;
054: private boolean stopped;
055: private final MulticastRMICacheManagerPeerProvider peerProvider;
056:
057: /**
058: * Constructor.
059: *
060: * @param peerProvider
061: * @param multicastAddress
062: * @param multicastPort
063: */
064: public MulticastKeepaliveHeartbeatReceiver(
065: MulticastRMICacheManagerPeerProvider peerProvider,
066: InetAddress multicastAddress, Integer multicastPort) {
067: this .peerProvider = peerProvider;
068: this .groupMulticastAddress = multicastAddress;
069: this .groupMulticastPort = multicastPort;
070: }
071:
072: /**
073: * Start.
074: *
075: * @throws IOException
076: */
077: final void init() throws IOException {
078: socket = new MulticastSocket(groupMulticastPort.intValue());
079: socket.joinGroup(groupMulticastAddress);
080: receiverThread = new MulticastReceiverThread();
081: receiverThread.start();
082: processingThreadPool = Executors.newCachedThreadPool();
083: }
084:
085: /**
086: * Shutdown the heartbeat.
087: */
088: public final void dispose() {
089: LOG.debug("dispose called");
090: processingThreadPool.shutdownNow();
091: stopped = true;
092: receiverThread.interrupt();
093: }
094:
095: /**
096: * A multicast receiver which continously receives heartbeats.
097: */
098: private final class MulticastReceiverThread extends Thread {
099:
100: /**
101: * Constructor
102: */
103: public MulticastReceiverThread() {
104: super ("Multicast Heartbeat Receiver Thread");
105: setDaemon(true);
106: }
107:
108: public final void run() {
109: byte[] buf = new byte[PayloadUtil.MTU];
110: try {
111: while (!stopped) {
112: DatagramPacket packet = new DatagramPacket(buf,
113: buf.length);
114: try {
115: socket.receive(packet);
116: byte[] payload = packet.getData();
117: processPayload(payload);
118:
119: } catch (IOException e) {
120: if (!stopped) {
121: LOG.error("Error receiving heartbeat. "
122: + e.getMessage()
123: + ". Initial cause was "
124: + e.getMessage(), e);
125: }
126: }
127: }
128: } catch (Throwable t) {
129: LOG
130: .error("Multicast receiver thread caught throwable. Cause was "
131: + t.getMessage() + ". Continuing...");
132: }
133: }
134:
135: private void processPayload(byte[] compressedPayload) {
136: byte[] payload = PayloadUtil.ungzip(compressedPayload);
137: String rmiUrls = new String(payload);
138: if (self(rmiUrls)) {
139: return;
140: }
141: rmiUrls = rmiUrls.trim();
142: if (LOG.isTraceEnabled()) {
143: LOG.trace("rmiUrls received " + rmiUrls);
144: }
145: processRmiUrls(rmiUrls);
146: }
147:
148: /**
149: * This method forks a new executor to process the received heartbeat in a thread pool.
150: * That way each remote cache manager cannot interfere with others.
151: * <p/>
152: * In the worst case, we have as many concurrent threads as remote cache managers.
153: *
154: * @param rmiUrls
155: */
156: private void processRmiUrls(final String rmiUrls) {
157: if (rmiUrlsProcessingQueue.contains(rmiUrls)) {
158: if (LOG.isDebugEnabled()) {
159: LOG
160: .debug("We are already processing these rmiUrls. Another heartbeat came before we finished: "
161: + rmiUrls);
162: }
163: return;
164: }
165:
166: if (processingThreadPool == null) {
167: return;
168: }
169:
170: processingThreadPool.execute(new Runnable() {
171: public void run() {
172: try {
173: // Add the rmiUrls we are processing.
174: rmiUrlsProcessingQueue.add(rmiUrls);
175: for (StringTokenizer stringTokenizer = new StringTokenizer(
176: rmiUrls, PayloadUtil.URL_DELIMITER); stringTokenizer
177: .hasMoreTokens();) {
178: if (stopped) {
179: return;
180: }
181: String rmiUrl = stringTokenizer.nextToken();
182: registerNotification(rmiUrl);
183: if (!peerProvider.peerUrls
184: .containsKey(rmiUrl)) {
185: if (LOG.isDebugEnabled()) {
186: LOG
187: .debug("Aborting processing of rmiUrls since failed to add rmiUrl: "
188: + rmiUrl);
189: }
190: return;
191: }
192: }
193: } finally {
194: // Remove the rmiUrls we just processed
195: rmiUrlsProcessingQueue.remove(rmiUrls);
196: }
197: }
198: });
199: }
200:
201: /**
202: * @param rmiUrls
203: * @return true if our own hostname and listener port are found in the list. This then means we have
204: * caught our onw multicast, and should be ignored.
205: */
206: private boolean self(String rmiUrls) {
207: CacheManager cacheManager = peerProvider.getCacheManager();
208: CacheManagerPeerListener cacheManagerPeerListener = cacheManager
209: .getCachePeerListener();
210: if (cacheManagerPeerListener == null) {
211: return false;
212: }
213: List boundCachePeers = cacheManagerPeerListener
214: .getBoundCachePeers();
215: if (boundCachePeers == null || boundCachePeers.size() == 0) {
216: return false;
217: }
218: CachePeer peer = (CachePeer) boundCachePeers.get(0);
219: String cacheManagerUrlBase = null;
220: try {
221: cacheManagerUrlBase = peer.getUrlBase();
222: } catch (RemoteException e) {
223: LOG.error("Error geting url base");
224: }
225: int baseUrlMatch = rmiUrls.indexOf(cacheManagerUrlBase);
226: return baseUrlMatch != -1;
227: }
228:
229: private void registerNotification(String rmiUrl) {
230: peerProvider.registerPeer(rmiUrl);
231: }
232:
233: /**
234: * {@inheritDoc}
235: */
236: public final void interrupt() {
237: try {
238: socket.leaveGroup(groupMulticastAddress);
239: } catch (IOException e) {
240: LOG.error("Error leaving group");
241: }
242: socket.close();
243: super.interrupt();
244: }
245: }
246:
247: }
|