001: /*
002: * JacORB - a free Java ORB
003: *
004: * Copyright (C) 1997-2004 Gerald Brose.
005: *
006: * This library is free software; you can redistribute it and/or
007: * modify it under the terms of the GNU Library General Public
008: * License as published by the Free Software Foundation; either
009: * version 2 of the License, or (at your option) any later version.
010: *
011: * This library is distributed in the hope that it will be useful,
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
014: * Library General Public License for more details.
015: *
016: * You should have received a copy of the GNU Library General Public
017: * License along with this library; if not, write to the Free
018: * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
019: */
020:
021: package org.jacorb.orb.giop;
022:
023: import java.util.HashMap;
024: import java.util.Iterator;
025: import java.util.Map;
026:
027: import org.apache.avalon.framework.logger.Logger;
028: import org.jacorb.config.Configuration;
029: import org.jacorb.orb.ParsedIOR;
030: import org.jacorb.util.ObjectUtil;
031: import org.omg.CONV_FRAME.CodeSetComponent;
032: import org.omg.CONV_FRAME.CodeSetComponentInfo;
033:
034: /**
035: * @author Nicolas Noffke
036: * @version $Id: ClientConnection.java,v 1.63 2006/07/26 11:05:21 nick.cross Exp $
037: */
038: public class ClientConnection implements ReplyListener,
039: ConnectionListener {
040: private final GIOPConnection connection;
041: private final org.omg.CORBA.ORB orb;
042:
043: private final Map replies;
044:
045: /**
046: * <code>sasContexts</code> is used to support for SAS Stateful contexts.
047: */
048: private final Map sasContexts;
049: private static long last_client_context_id = 0;
050:
051: /**
052: * <code>client_count</code> denotes how many clients use this connection.
053: */
054: private int client_count = 0;
055:
056: /**
057: * <code>id_count</code> is used to generate request ids.
058: */
059: private int id_count = 0;
060:
061: private final ClientConnectionManager conn_mg;
062:
063: private final boolean client_initiated;
064:
065: private final String info;
066:
067: /**
068: * <code>gracefulStreamClose</code> indicates if the stream has been closed
069: * gracefully, i.e. by a CloseConnection message. This will trigger a
070: * remarshaling of all pending messages.
071: */
072: private boolean gracefulStreamClose;
073:
074: /**
075: * <code>registeredProfile</code> indicates the profile that was used for
076: * registering with the ClientConnectionManager. In case of BiDirIIOP it is
077: * NOT equal to the transports profile.
078: */
079: private final org.omg.ETF.Profile registeredProfile;
080:
081: /**
082: * <code>logger</code> is the logger for this object.
083: */
084: private final Logger logger;
085:
086: /**
087: * <code>ignoreComponentInfo</code> defaults to off. If jacorb.codeset
088: * is turned on then it will NOT ignore component profiles. If
089: * jacorb.codeset is turned off then this allows it to ignore codeset profiles
090: * in an IOR.
091: */
092: private final boolean ignoreComponentInfo;
093:
094: public ClientConnection(GIOPConnection connection,
095: org.omg.CORBA.ORB orb, ClientConnectionManager conn_mg,
096: org.omg.ETF.Profile registeredProfile,
097: boolean client_initiated) {
098: this .connection = connection;
099: this .orb = orb;
100: this .conn_mg = conn_mg;
101: this .registeredProfile = registeredProfile;
102: this .info = registeredProfile.toString();
103: this .client_initiated = client_initiated;
104:
105: final Configuration configuration = ((org.jacorb.orb.ORB) orb)
106: .getConfiguration();
107: logger = configuration.getNamedLogger("jacorb.giop.conn");
108:
109: ignoreComponentInfo = !(configuration.getAttributeAsBoolean(
110: "jacorb.codeset", true));
111:
112: //For BiDirGIOP, the connection initiator may only generate
113: //even valued request ids, and the other side odd valued
114: //request ids. Therefore, we always step the counter by 2, so
115: //we always get only odd or even ids depending on the counters
116: //initial value.
117: if (!client_initiated) {
118: id_count = 1;
119: }
120:
121: connection.setReplyListener(this );
122: connection.setConnectionListener(this );
123:
124: replies = new HashMap();
125: sasContexts = new HashMap();
126: }
127:
128: public final GIOPConnection getGIOPConnection() {
129: return connection;
130: }
131:
132: /**
133: * Get the profile that was used for registering with the
134: * ClientConnectionManager. In case of BiDirIIOP it is NOT equal
135: * to the transports profile.
136: */
137: public org.omg.ETF.Profile getRegisteredProfile() {
138: return registeredProfile;
139: }
140:
141: public void setCodeSet(ParsedIOR pior) {
142: if (isTCSNegotiated()) {
143: //if already negotiated, do nothing
144: return;
145: }
146:
147: //if the other side only talks GIOP 1.0, don't send a codeset
148: //context and don't try again
149: if (pior.getEffectiveProfile().version().minor == 0) {
150: connection.markTCSNegotiated();
151: return;
152: }
153:
154: int tcs = -1;
155: int tcsw = -1;
156:
157: CodeSetComponentInfo info = pior.getCodeSetComponentInfo();
158: if (info == null || ignoreComponentInfo) {
159: logger
160: .debug("No CodeSetComponentInfo in IOR. Will use default CodeSets");
161:
162: //If we can't find matching codesets, we still mark the
163: //GIOPConnection as negotiated, so the following requests
164: //will not always try to select a codeset again.
165:
166: /* ******
167: until the ETF spec is ammended to include components within
168: the base Profile type, then this is going to be problem. So
169: rather than not setting the codeset component, we should
170: pick reasonable default values and send those.
171: */
172:
173: connection.markTCSNegotiated();
174: return;
175: } else {
176: tcs = CodeSet.selectTCS(info);
177: tcsw = CodeSet.selectTCSW(info);
178: }
179:
180: if (tcs == -1 || tcsw == -1) {
181: if (logger.isDebugEnabled()) {
182: CodeSetComponent original = info.ForCharData;
183:
184: logger
185: .debug("Attempted to negotiate with target codeset "
186: + CodeSet
187: .csName(original.native_code_set)
188: + " to match with "
189: + CodeSet.csName(CodeSet
190: .getTCSDefault()));
191: logger
192: .debug("Target has "
193: + (original.conversion_code_sets == null ? 0
194: : original.conversion_code_sets.length)
195: + " conversion codesets and native has "
196: + CodeSet.csName(CodeSet
197: .getConversionDefault()));
198: logger.debug("Was negotiating with IOR "
199: + pior.getIORString());
200: }
201: //if no matching codesets can be found, an exception is
202: //thrown
203: throw new org.omg.CORBA.CODESET_INCOMPATIBLE(
204: "WARNING: CodeSet negotiation failed! No matching "
205: + ((tcs == -1) ? "normal" : "wide")
206: + " CodeSet found");
207: }
208:
209: connection.setCodeSets(tcs, tcsw);
210:
211: if (logger.isDebugEnabled()) {
212: logger.debug("Successfully negotiated Codesets. Using "
213: + CodeSet.csName(tcs) + " as TCS and "
214: + CodeSet.csName(tcsw) + " as TCSW");
215: }
216:
217: }
218:
219: public boolean isTCSNegotiated() {
220: return connection.isTCSNegotiated();
221: }
222:
223: public int getTCS() {
224: return connection.getTCS();
225: }
226:
227: public int getTCSW() {
228: return connection.getTCSW();
229: }
230:
231: public synchronized int getId() {
232: int result = id_count;
233:
234: //if odd or even is determined by the starting value of
235: //id_count
236: id_count += 2;
237:
238: return result;
239: }
240:
241: /**
242: * Increments the number of clients.
243: */
244: public synchronized void incClients() {
245: client_count++;
246: }
247:
248: /**
249: * This method decrements the number of clients.
250: *
251: * @return a <code>boolean</code> value, true if client_count is zero.
252: */
253: public synchronized boolean decClients() {
254: boolean result = false;
255:
256: client_count--;
257:
258: if (client_count == 0) {
259: result = true;
260: }
261: return result;
262: }
263:
264: /**
265: * Returns the number of clients currently using this connection.
266: */
267: public int numClients() {
268: return client_count;
269: }
270:
271: public boolean isClientInitiated() {
272: return client_initiated;
273: }
274:
275: /**
276: * The request_id parameter is only used, if response_expected.
277: */
278: public void sendRequest(MessageOutputStream outputStream,
279: ReplyPlaceholder placeholder, int request_id,
280: boolean response_expected) {
281: Integer key = ObjectUtil.newInteger(request_id);
282:
283: synchronized (replies) {
284: replies.put(key, placeholder);
285: }
286:
287: try {
288: sendRequest(outputStream, response_expected);
289: } catch (org.omg.CORBA.SystemException e) {
290: //remove reply receiver from list
291: //because there will be no response to this request
292: synchronized (replies) {
293: replies.remove(key);
294: }
295: throw e;
296: }
297: }
298:
299: public void sendRequest(MessageOutputStream outputStream,
300: boolean response_expected) {
301: try {
302: connection.sendRequest(outputStream, response_expected);
303: } catch (java.io.IOException e) {
304: if (logger.isDebugEnabled()) {
305: logger.debug("IOException", e);
306: }
307:
308: throw new org.omg.CORBA.COMM_FAILURE(0,
309: org.omg.CORBA.CompletionStatus.COMPLETED_MAYBE);
310: }
311: }
312:
313: /**
314: * called from Delegate/ConnectionManagement etc.
315: */
316:
317: public void close() {
318: connection.close();
319: }
320:
321: /**
322: * Operations from ReplyListener
323: */
324:
325: public void replyReceived(byte[] reply, GIOPConnection connection) {
326: connection.decPendingMessages();
327:
328: Integer key = ObjectUtil.newInteger(Messages
329: .getRequestId(reply));
330:
331: ReplyPlaceholder placeholder = null;
332:
333: synchronized (replies) {
334: placeholder = (ReplyPlaceholder) replies.remove(key);
335: }
336:
337: if (placeholder != null) {
338: ReplyInputStream ris = new ReplyInputStream(orb, reply);
339: ris.setCodeSet(this .getTCS(), this .getTCSW());
340: //this will unblock the waiting thread
341: placeholder.replyReceived(ris);
342: } else {
343: if (logger.isWarnEnabled()) {
344: logger.warn("Received reply for unknown request id: "
345: + key);
346: }
347: }
348: }
349:
350: public void locateReplyReceived(byte[] reply,
351: GIOPConnection connection) {
352: connection.decPendingMessages();
353:
354: Integer key = ObjectUtil.newInteger(Messages
355: .getRequestId(reply));
356:
357: ReplyPlaceholder placeholder = null;
358:
359: synchronized (replies) {
360: placeholder = (ReplyPlaceholder) replies.remove(key);
361: }
362:
363: if (placeholder != null) {
364: //this will unblock the waiting thread
365: placeholder.replyReceived(new LocateReplyInputStream(orb,
366: reply));
367: } else {
368: if (logger.isWarnEnabled()) {
369: logger.warn("Received reply for unknown request id: "
370: + key);
371: }
372: }
373: }
374:
375: /**
376: * Received a CloseConnection message. Remarshal all pending
377: * messages. The close mechanism will be invoked separately by the
378: * actual closing of the Transport and will trigger the
379: * remarshaling.
380: */
381: public void closeConnectionReceived(byte[] close_conn,
382: GIOPConnection connection) {
383: if (logger.isInfoEnabled()) {
384: logger.info("Received CloseConnection on "
385: + connection.toString());
386: }
387:
388: if (client_initiated) {
389: gracefulStreamClose = true;
390: ((ClientGIOPConnection) connection).closeAllowReopen();
391:
392: //since this is run on the message receptor thread itself, it
393: //will not try to read again after returning, because it just
394: //closed the transport itself. Therefore, no exception goes
395: //back up into the GIOPConnection, where streamClosed() will
396: //be called. Ergo, we need to call streamClosed() ourselves.
397: streamClosed();
398: }
399: }
400:
401: /**
402: * Operations from ConnectionListener
403: * used for upcalls from GIOPConnection
404: */
405:
406: public void connectionClosed() {
407: if (!client_initiated) {
408: //if this is a server side BiDir connection, it will stay
409: //pooled in the ClientConnectionManager even if no Delegate is
410: //associated with it. Therefore, it has to be removed when
411: //the underlying connection closed.
412:
413: conn_mg.removeConnection(this );
414: }
415:
416: streamClosed();
417: }
418:
419: /**
420: * the transport has been
421: * removed underneath the GIOP layer
422: */
423:
424: public void streamClosed() {
425: synchronized (replies) {
426: if (replies.size() > 0) {
427: if (gracefulStreamClose) {
428: if (logger.isDebugEnabled()) {
429: logger.debug("Stream closed. Will remarshal "
430: + replies.size() + " messages");
431: }
432: } else {
433: if (logger.isWarnEnabled()) {
434: logger
435: .warn("Abnormal connection termination. Lost "
436: + replies.size()
437: + " outstanding replie(s)!");
438: }
439: }
440:
441: Iterator entries = replies.values().iterator();
442: ReplyPlaceholder placeholder;
443:
444: while (entries.hasNext()) {
445: placeholder = (ReplyPlaceholder) entries.next();
446:
447: if (gracefulStreamClose) {
448: placeholder.retry();
449: } else {
450: placeholder.cancel();
451: }
452: entries.remove();
453: }
454: }
455: }
456:
457: gracefulStreamClose = false;
458: }
459:
460: public org.omg.ETF.Profile get_server_profile() {
461: return connection.getTransport().get_server_profile();
462: }
463:
464: public long cacheSASContext(byte[] client_authentication_token) {
465: long client_context_id = 0;
466: String key = new String(client_authentication_token);
467: synchronized (sasContexts) {
468: if (!sasContexts.containsKey(key)) {
469: // new context
470: client_context_id = ++last_client_context_id;
471: sasContexts.put(key, new Long(client_context_id));
472: client_context_id = -client_context_id;
473: } else {
474: // reuse cached context
475: client_context_id = ((Long) sasContexts.get(key))
476: .longValue();
477: }
478: }
479: return client_context_id;
480: }
481:
482: public long purgeSASContext(long client_context_id) {
483: synchronized (sasContexts) {
484: Iterator entries = sasContexts.keySet().iterator();
485: while (entries.hasNext()) {
486: Object key = entries.next();
487: if (((Long) sasContexts.get(key)).longValue() != client_context_id) {
488: continue;
489: }
490: entries.remove();
491: break;
492: }
493: }
494: return client_context_id;
495: }
496: }// ClientConnection
|