001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: package org.apache.catalina.tribes.transport.bio;
019:
020: import java.io.IOException;
021: import java.util.HashMap;
022: import java.util.Map;
023:
024: import org.apache.catalina.tribes.ChannelException;
025: import org.apache.catalina.tribes.ChannelMessage;
026: import org.apache.catalina.tribes.Member;
027: import org.apache.catalina.tribes.io.ChannelData;
028: import org.apache.catalina.tribes.io.XByteBuffer;
029: import org.apache.catalina.tribes.transport.MultiPointSender;
030: import org.apache.catalina.tribes.transport.AbstractSender;
031: import org.apache.catalina.tribes.Channel;
032:
033: /**
034: *
035: * @author Filip Hanik
036: * @version $Revision: 532800 $ $Date: 2007-04-26 18:52:29 +0200 (jeu., 26 avr. 2007) $
037: *
038: */
039: public class MultipointBioSender extends AbstractSender implements
040: MultiPointSender {
041: public MultipointBioSender() {
042: }
043:
044: protected long selectTimeout = 1000;
045: protected HashMap bioSenders = new HashMap();
046:
047: public synchronized void sendMessage(Member[] destination,
048: ChannelMessage msg) throws ChannelException {
049: byte[] data = XByteBuffer.createDataPackage((ChannelData) msg);
050: BioSender[] senders = setupForSend(destination);
051: ChannelException cx = null;
052: for (int i = 0; i < senders.length; i++) {
053: try {
054: senders[i]
055: .sendMessage(
056: data,
057: (msg.getOptions() & Channel.SEND_OPTIONS_USE_ACK) == Channel.SEND_OPTIONS_USE_ACK);
058: } catch (Exception x) {
059: if (cx == null)
060: cx = new ChannelException(x);
061: cx.addFaultyMember(destination[i], x);
062: }
063: }
064: if (cx != null)
065: throw cx;
066: }
067:
068: protected BioSender[] setupForSend(Member[] destination)
069: throws ChannelException {
070: ChannelException cx = null;
071: BioSender[] result = new BioSender[destination.length];
072: for (int i = 0; i < destination.length; i++) {
073: try {
074: BioSender sender = (BioSender) bioSenders
075: .get(destination[i]);
076: if (sender == null) {
077: sender = new BioSender();
078: sender.transferProperties(this , sender);
079: sender.setDestination(destination[i]);
080: bioSenders.put(destination[i], sender);
081: }
082: result[i] = sender;
083: if (!result[i].isConnected())
084: result[i].connect();
085: result[i].keepalive();
086: } catch (Exception x) {
087: if (cx == null)
088: cx = new ChannelException(x);
089: cx.addFaultyMember(destination[i], x);
090: }
091: }
092: if (cx != null)
093: throw cx;
094: else
095: return result;
096: }
097:
098: public void connect() throws IOException {
099: //do nothing, we connect on demand
100: setConnected(true);
101: }
102:
103: private synchronized void close() throws ChannelException {
104: ChannelException x = null;
105: Object[] members = bioSenders.keySet().toArray();
106: for (int i = 0; i < members.length; i++) {
107: Member mbr = (Member) members[i];
108: try {
109: BioSender sender = (BioSender) bioSenders.get(mbr);
110: sender.disconnect();
111: } catch (Exception e) {
112: if (x == null)
113: x = new ChannelException(e);
114: x.addFaultyMember(mbr, e);
115: }
116: bioSenders.remove(mbr);
117: }
118: if (x != null)
119: throw x;
120: }
121:
122: public void add(Member member) {
123:
124: }
125:
126: public void remove(Member member) {
127: //disconnect senders
128: BioSender sender = (BioSender) bioSenders.remove(member);
129: if (sender != null)
130: sender.disconnect();
131: }
132:
133: public synchronized void disconnect() {
134: try {
135: close();
136: } catch (Exception x) {
137: }
138: setConnected(false);
139: }
140:
141: public void finalize() {
142: try {
143: disconnect();
144: } catch (Exception ignore) {
145: }
146: }
147:
148: public boolean keepalive() {
149: //throw new UnsupportedOperationException("Method ParallelBioSender.checkKeepAlive() not implemented");
150: boolean result = false;
151: Map.Entry[] entries = (Map.Entry[]) bioSenders.entrySet()
152: .toArray(new Map.Entry[bioSenders.size()]);
153: for (int i = 0; i < entries.length; i++) {
154: BioSender sender = (BioSender) entries[i].getValue();
155: if (sender.keepalive()) {
156: bioSenders.remove(entries[i].getKey());
157: }
158: }
159: return result;
160: }
161:
162: }
|