001: /*
002: * JacORB - a free Java ORB
003: *
004: * Copyright (C) 1997-2004 Gerald Brose.
005: *
006: * This library is free software; you can redistribute it and/or
007: * modify it under the terms of the GNU Library General Public
008: * License as published by the Free Software Foundation; either
009: * version 2 of the License, or (at your option) any later version.
010: *
011: * This library is distributed in the hope that it will be useful,
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
014: * Library General Public License for more details.
015: *
016: * You should have received a copy of the GNU Library General Public
017: * License along with this library; if not, write to the Free
018: * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
019: */
020:
021: package org.jacorb.orb.giop;
022:
023: import org.apache.avalon.framework.logger.Logger;
024: import org.apache.avalon.framework.configuration.*;
025:
026: import java.io.*;
027: import java.util.*;
028:
029: import org.omg.GIOP.*;
030: import org.omg.CORBA.NO_IMPLEMENT;
031: import org.omg.CORBA.CompletionStatus;
032: import org.omg.ETF.*;
033:
034: import org.jacorb.orb.SystemExceptionHelper;
035: import org.jacorb.orb.BufferManager;
036: import org.jacorb.orb.iiop.*;
037: import org.jacorb.util.*;
038:
039: /**
040: * GIOPConnection.java
041: *
042: * Created: Sun Aug 12 21:30:48 2002
043: *
044: * Configuration parameters:<br>
045: *
046: * jacorb.debug.dump_incoming_messages=[on|off], default=off<br>
047: * jacorb.connection.client.connect_timeout=N, default=0<br>
048: * jacorb.connection.statistics_providers={classnames}, default=(empty)<br>
049: *
050: * @author Nicolas Noffke
051: * @version $Id: GIOPConnection.java,v 1.66 2007/02/06 19:27:28 andre.spiegel Exp $
052: */
053:
054: public abstract class GIOPConnection extends java.io.OutputStream {
055: /**
056: * Profile describing the remote endpoint of this connection.
057: */
058: protected final org.omg.ETF.Profile profile;
059: protected org.omg.ETF.Connection transport = null;
060:
061: private RequestListener request_listener = null;
062: private ReplyListener reply_listener = null;
063: protected ConnectionListener connection_listener = null;
064:
065: protected Object connect_sync = new Object();
066:
067: private boolean writer_active = false;
068: private final Object write_sync = new Object();
069:
070: private org.jacorb.config.Configuration configuration;
071: protected Logger logger;
072:
073: /*
074: * Connection OSF character formats.
075: */
076: private int tcs = CodeSet.getTCSDefault();
077: private int tcsw = CodeSet.getTCSWDefault();
078:
079: private boolean tcs_negotiated = false;
080:
081: //map request id (Integer) to ByteArrayInputStream
082: private final Map fragments = new HashMap();
083: private final BufferManager buf_mg;
084:
085: private boolean dump_incoming = false;
086: private long timeout = 0;
087:
088: private final BufferHolder msg_header = new BufferHolder(
089: new byte[Messages.MSG_HEADER_SIZE]);
090:
091: private final BufferHolder inbuf = new BufferHolder();
092:
093: //// support for SAS Stateful contexts
094: //private Hashtable sasContexts = null;
095:
096: // provide cubbyholes for other layers to store connection persistent data
097: private static int cubby_count = 0;
098: private Object[] cubbyholes = null;
099:
100: // the no. of outstanding messages (requests/replies)
101: // pending_messages refers only to expected replies, to be sent
102: // in response to two-way requests. pending_write refers to messages
103: // that are outbound but have not yet been sent. These could be one-way
104: // or two-way requests, or they could be replies being sent out of a
105: // server. There will typicially be only one pending write.
106: private int pending_messages = 0;
107: private int pending_write = 0;
108:
109: protected boolean discard_messages = false;
110:
111: //used to lock the section where we got a message, but it isn't
112: //yet decided, if this might need a reply, i.e. set the transport
113: //busy
114: protected Object pendingUndecidedSync = new Object();
115:
116: //stop listening for messages
117: protected boolean do_close = false;
118:
119: protected StatisticsProvider statistics_provider = null;
120: protected StatisticsProviderAdapter statistics_provider_adapter = null;
121:
122: public GIOPConnection(org.omg.ETF.Profile profile,
123: org.omg.ETF.Connection transport,
124: RequestListener request_listener,
125: ReplyListener reply_listener,
126: StatisticsProvider statistics_provider) {
127: super ();
128:
129: this .profile = profile;
130: this .transport = transport;
131: this .request_listener = request_listener;
132: this .reply_listener = reply_listener;
133: this .statistics_provider = statistics_provider;
134:
135: if (statistics_provider != null)
136: this .statistics_provider_adapter = new StatisticsProviderAdapter(
137: statistics_provider);
138:
139: this .buf_mg = BufferManager.getInstance();
140: //sasContexts = new Hashtable();
141:
142: this .cubbyholes = new Object[cubby_count];
143:
144: }
145:
146: public void configure(Configuration configuration)
147: throws ConfigurationException {
148: this .configuration = (org.jacorb.config.Configuration) configuration;
149: logger = this .configuration.getNamedLogger("jacorb.giop.conn");
150: dump_incoming = configuration.getAttribute(
151: "jacorb.debug.dump_incoming_messages", "off").equals(
152: "on");
153: timeout = configuration.getAttributeAsInteger(
154: "jacorb.connection.client.connect_timeout", 90000);
155:
156: List statsProviderClassNames = this .configuration
157: .getAttributeList("jacorb.connection.statistics_providers");
158:
159: for (Iterator iter = statsProviderClassNames.iterator(); iter
160: .hasNext();) {
161: String className = (String) iter.next();
162: try {
163: Class iclass = ObjectUtil.classForName(className);
164:
165: this .statistics_provider_adapter = new StatisticsProviderAdapter(
166: (StatisticsProvider) iclass.newInstance(),
167: statistics_provider_adapter);
168: } catch (Exception e) {
169: if (logger.isErrorEnabled()) {
170: logger
171: .error("Unable to create class from property "
172: + ">jacorb.connection.statistics_provider_class<: "
173: + e.toString());
174: }
175: }
176: }
177: }
178:
179: public final void setCodeSets(int TCS, int TCSW) {
180: this .tcs = TCS;
181: this .tcsw = TCSW;
182: }
183:
184: public final int getTCS() {
185: return tcs;
186: }
187:
188: public final int getTCSW() {
189: return tcsw;
190: }
191:
192: public final void markTCSNegotiated() {
193: tcs_negotiated = true;
194: }
195:
196: public final boolean isTCSNegotiated() {
197: return tcs_negotiated;
198: }
199:
200: /**
201: * Get the value of request_listener.
202: * @return value of request_listener.
203: */
204: protected final synchronized RequestListener getRequestListener() {
205: return request_listener;
206: }
207:
208: /**
209: * Set the value of request_listener.
210: * @param listener Value to assign to request_listener.
211: */
212: public final synchronized void setRequestListener(
213: RequestListener listener) {
214: this .request_listener = listener;
215: }
216:
217: /**
218: * Get the value of reply_listener.
219: * @return value of reply_listener.
220: */
221: private final synchronized ReplyListener getReplyListener() {
222: return reply_listener;
223: }
224:
225: /**
226: * Set the value of reply_listener.
227: * @param listener Value to assign to reply_listener.
228: */
229: public final synchronized void setReplyListener(
230: ReplyListener listener) {
231: this .reply_listener = listener;
232: }
233:
234: public final void setConnectionListener(
235: ConnectionListener connection_listener) {
236: this .connection_listener = connection_listener;
237: }
238:
239: public final org.omg.ETF.Connection getTransport() {
240: synchronized (connect_sync) {
241: return transport;
242: }
243: }
244:
245: private boolean waitUntilConnected() {
246: synchronized (connect_sync) {
247: while (!transport.is_connected() && !do_close) {
248: if (logger.isDebugEnabled()) {
249: logger.debug(this .toString()
250: + ": will wait until connected");
251: }
252:
253: try {
254: connect_sync.wait();
255: } catch (InterruptedException ie) {
256: }
257: }
258: return !do_close;
259: }
260: }
261:
262: /**
263: * Called by this.getMessage() to signal that the attempt to
264: * read a message resulted in a timeout. This method is implemented
265: * differently on the client and server side.
266: */
267: protected abstract void readTimedOut();
268:
269: /**
270: * Called by this.getMessage() to signal that the underlying transport
271: * was closed while attempting to read a message. This method is
272: * implemented differently on the client and server side.
273: */
274: protected abstract void streamClosed();
275:
276: /**
277: * Read a GIOP message from the stream. This will first try to
278: * read in the fixed-length GIOP message header to determine the
279: * message size, and the read the rest. It also checks the leading
280: * four magic bytes of the message header. This method <b>is not
281: * thread safe<b> and only expected to be called by a single
282: * thread.
283: *
284: * @return a GIOP message or null.
285: */
286: private byte[] getMessage() {
287: //Wait until the actual socket connection is established. This
288: //is necessary for the client side, so opening up a new
289: //connection can be delayed until the first message is to be
290: //sent.
291: if (!waitUntilConnected()) {
292: return null;
293: }
294:
295: try {
296: transport.read(msg_header, 0, Messages.MSG_HEADER_SIZE,
297: Messages.MSG_HEADER_SIZE, 0);
298: } catch (org.omg.CORBA.TRANSIENT ex) {
299: return null;
300: } catch (org.omg.CORBA.COMM_FAILURE ex) {
301: if (logger.isDebugEnabled()) {
302: logger.debug(this .toString()
303: + ": getMessage() -- COMM_FAILURE");
304: }
305: this .streamClosed();
306: return null;
307: } catch (org.omg.CORBA.TIMEOUT ex) {
308: if (logger.isDebugEnabled()) {
309: logger.debug(this .toString()
310: + ": getMessage() -- TIMEOUT");
311: }
312: this .readTimedOut();
313: return null;
314: }
315:
316: byte[] header = msg_header.value;
317:
318: //(minimally) decode GIOP message header. Main checks should
319: //be done one layer above.
320:
321: if (Messages.matchGIOPMagic(header)) {
322: //determine message size
323: int msg_size = Messages.getMsgSize(header);
324:
325: if (msg_size < 0) {
326: if (logger.isErrorEnabled()) {
327: logger.error("Negative GIOP message size ("
328: + msg_size + ") in " + this .toString());
329: }
330:
331: if (logger.isDebugEnabled()) {
332: logger
333: .debug("TCP_IP_GIOPTransport.getMessage() with header: \n"
334: + new String(header)
335: + "\nsize : "
336: + Messages.MSG_HEADER_SIZE
337: + ", in " + this .toString());
338: }
339:
340: return null;
341: }
342:
343: //get a large enough buffer from the pool
344: inbuf.value = buf_mg.getBuffer(msg_size
345: + Messages.MSG_HEADER_SIZE);
346:
347: //copy header
348: System.arraycopy(header, 0, inbuf.value, 0,
349: Messages.MSG_HEADER_SIZE);
350:
351: try {
352: transport.read(inbuf, Messages.MSG_HEADER_SIZE,
353: msg_size, msg_size, 0);
354: } catch (org.omg.CORBA.COMM_FAILURE ex) {
355: if (logger.isErrorEnabled()) {
356: logger.error("Failed to read GIOP message in "
357: + this .toString(), ex);
358: }
359: return null;
360: }
361:
362: if (dump_incoming) {
363: if (logger.isInfoEnabled()) {
364: logger
365: .info(this .toString()
366: + " BufferDump:\n"
367: + ObjectUtil
368: .bufToString(
369: inbuf.value,
370: 0,
371: msg_size
372: + Messages.MSG_HEADER_SIZE));
373: }
374: }
375:
376: if (getStatisticsProviderAdapter() != null) {
377: getStatisticsProviderAdapter().messageReceived(
378: msg_size + Messages.MSG_HEADER_SIZE);
379: }
380:
381: //this is the "good" exit point.
382: return inbuf.value;
383: }
384:
385: if (logger.isDebugEnabled()) {
386: logger.debug(this .toString()
387: + " getMessage(), invalid header read: "
388: + ObjectUtil.bufToString(msg_header.value, 0, 4));
389: }
390:
391: if (logger.isErrorEnabled()) {
392: logger.error("Failed to read GIOP message in "
393: + this .toString()
394: + ", incorrect magic number --> connection closed");
395: }
396:
397: // close transport connection, there is nearly no chance to sync
398: // with peer on this connection again
399: close();
400:
401: // notify GIOPConnectionManager of close
402: this .streamClosed();
403:
404: return null;
405: }
406:
407: public final void receiveMessages() throws IOException {
408: while (true) {
409: byte[] message = getMessage();
410:
411: if (message == null) {
412: if (do_close) {
413: return;
414: }
415:
416: continue;
417: }
418:
419: synchronized (pendingUndecidedSync) {
420: if (discard_messages) {
421: buf_mg.returnBuffer(message);
422: continue;
423: }
424:
425: //check major version
426: if (Messages.getGIOPMajor(message) != 1) {
427: if (logger.isErrorEnabled()) {
428: logger
429: .error("Invalid GIOP major version encountered: "
430: + Messages
431: .getGIOPMajor(message)
432: + ", in " + this .toString());
433: }
434:
435: buf_mg.returnBuffer(message);
436: continue;
437: }
438:
439: int msg_type = Messages.getMsgType(message);
440:
441: if (msg_type == MsgType_1_1._Fragment) {
442: //GIOP 1.0 messages aren't allowed to be fragmented
443: if (Messages.getGIOPMinor(message) == 0) {
444: if (logger.isWarnEnabled()) {
445: logger
446: .warn("Received a GIOP 1.0 message of type Fragment"
447: + " in " + this .toString());
448: }
449:
450: final MessageOutputStream out = new MessageOutputStream();
451:
452: try {
453: out.writeGIOPMsgHeader(
454: MsgType_1_1._MessageError, 0);
455: out.insertMsgSize();
456: sendMessage(out);
457: buf_mg.returnBuffer(message);
458: } finally {
459: out.close();
460: }
461: continue;
462: }
463:
464: //GIOP 1.1 Fragmented messages currently not supported
465: if (Messages.getGIOPMinor(message) == 1) {
466: if (logger.isWarnEnabled()) {
467: logger
468: .warn("Received a GIOP 1.1 Fragment message"
469: + " in " + this .toString());
470: }
471:
472: //Can't return a message in this case, because
473: //GIOP 1.1 fragments don't have request
474: //ids. Therefore, just discard.
475: buf_mg.returnBuffer(message);
476:
477: continue;
478: }
479:
480: //for now, only GIOP 1.2 from here on
481:
482: Integer request_id = ObjectUtil.newInteger(Messages
483: .getRequestId(message));
484:
485: //sanity check
486: if (!fragments.containsKey(request_id)) {
487: if (logger.isErrorEnabled()) {
488: logger
489: .error("No previous Fragment to this one in "
490: + this .toString());
491: }
492:
493: //Drop this one and continue
494: buf_mg.returnBuffer(message);
495:
496: continue;
497: }
498:
499: ByteArrayOutputStream b_out = (ByteArrayOutputStream) fragments
500: .get(request_id);
501:
502: //add the message contents to stream (discarding the
503: //GIOP message header and the request id ulong of the
504: //Fragment header)
505: b_out.write(message, Messages.MSG_HEADER_SIZE + 4,
506: Messages.getMsgSize(message) - 4);
507:
508: if (Messages.moreFragmentsFollow(message)) {
509: //more to follow, so don't hand over to processing
510: buf_mg.returnBuffer(message);
511: continue;
512: }
513:
514: buf_mg.returnBuffer(message);
515:
516: //silently replace the original message buffer and type
517: message = b_out.toByteArray();
518: msg_type = Messages.getMsgType(message);
519:
520: fragments.remove(request_id);
521: } else if (Messages.moreFragmentsFollow(message)) {
522: //GIOP 1.0 messages aren't allowed to be fragmented
523: if (Messages.getGIOPMinor(message) == 0) {
524: if (logger.isWarnEnabled()) {
525: logger
526: .warn("Received a GIOP 1.0 message "
527: + "with the \"more fragments follow\""
528: + "bits set in "
529: + this .toString());
530: }
531:
532: MessageOutputStream out = new MessageOutputStream();
533: out.writeGIOPMsgHeader(
534: MsgType_1_1._MessageError, 0);
535: out.insertMsgSize();
536: sendMessage(out);
537: buf_mg.returnBuffer(message);
538:
539: continue;
540: }
541:
542: //If GIOP 1.1, only Request and Reply messages may be fragmented
543: if (Messages.getGIOPMinor(message) == 1) {
544: if (msg_type != MsgType_1_1._Request
545: && msg_type != MsgType_1_1._Reply) {
546: if (logger.isWarnEnabled()) {
547: logger
548: .warn("Received a GIOP 1.1 message of type "
549: + msg_type
550: + " with the "
551: + ""
552: + "\"more fragments follow\" bits set"
553: + " in "
554: + this .toString());
555: }
556:
557: MessageOutputStream out = new MessageOutputStream();
558: out.writeGIOPMsgHeader(
559: MsgType_1_1._MessageError, 1);
560: out.insertMsgSize();
561: sendMessage(out);
562: buf_mg.returnBuffer(message);
563:
564: continue;
565: }
566:
567: //GIOP 1.1 Fragmented messages currently not supported
568: if (logger.isWarnEnabled()) {
569: logger
570: .warn("Received a fragmented GIOP 1.1 message"
571: + " in " + this .toString());
572: }
573:
574: int giop_minor = Messages.getGIOPMinor(message);
575:
576: ReplyOutputStream out = new ReplyOutputStream(
577: Messages.getRequestId(message),
578: ReplyStatusType_1_2.SYSTEM_EXCEPTION,
579: giop_minor, false, logger);//no locate reply
580:
581: SystemExceptionHelper.write(out,
582: new NO_IMPLEMENT(0,
583: CompletionStatus.COMPLETED_NO));
584:
585: sendMessage(out);
586: buf_mg.returnBuffer(message);
587:
588: continue;
589: }
590:
591: //check, that only the correct message types are fragmented
592: if (msg_type == MsgType_1_1._CancelRequest
593: || msg_type == MsgType_1_1._CloseConnection
594: || msg_type == MsgType_1_1._CancelRequest) {
595: if (logger.isWarnEnabled()) {
596: logger
597: .warn("Received a GIOP message of type "
598: + msg_type
599: + " with the \"more fragments follow\" bits set, "
600: + "but this message type isn't allowed to be "
601: + "fragmented, in "
602: + this .toString());
603: }
604:
605: MessageOutputStream out = new MessageOutputStream();
606: out.writeGIOPMsgHeader(
607: MsgType_1_1._MessageError, 1);
608: out.insertMsgSize();
609: sendMessage(out);
610: buf_mg.returnBuffer(message);
611:
612: continue;
613: }
614:
615: //if we're here, it's the first part of a fragmented message
616: Integer request_id = new Integer(Messages
617: .getRequestId(message)); // NOPMD
618:
619: //sanity check
620: if (fragments.containsKey(request_id)) {
621: if (logger.isErrorEnabled()) {
622: logger
623: .error("Received a message of type "
624: + msg_type
625: + " with the more fragments follow bit set,"
626: + " but there is already an fragmented,"
627: + " incomplete message with the same request id ("
628: + request_id
629: + ", in "
630: + this .toString());
631: }
632:
633: //Drop this one and continue
634: buf_mg.returnBuffer(message);
635:
636: continue;
637: }
638:
639: //create new stream and add to table
640: ByteArrayOutputStream b_out = new ByteArrayOutputStream();
641: fragments.put(request_id, b_out);
642:
643: //add the message contents to stream
644: b_out.write(message, 0, Messages.MSG_HEADER_SIZE
645: + Messages.getMsgSize(message));
646:
647: buf_mg.returnBuffer(message);
648:
649: //This message isn't yet complete
650: continue;
651: }
652:
653: switch (msg_type) {
654: case MsgType_1_1._Request: {
655: getRequestListener().requestReceived(message, this );
656:
657: break;
658: }
659: case MsgType_1_1._Reply: {
660: getReplyListener().replyReceived(message, this );
661:
662: break;
663: }
664: case MsgType_1_1._CancelRequest: {
665: getRequestListener().cancelRequestReceived(message,
666: this );
667:
668: break;
669: }
670: case MsgType_1_1._LocateRequest: {
671: getRequestListener().locateRequestReceived(message,
672: this );
673:
674: break;
675: }
676: case MsgType_1_1._LocateReply: {
677: getReplyListener().locateReplyReceived(message,
678: this );
679:
680: break;
681: }
682: case MsgType_1_1._CloseConnection: {
683: getReplyListener().closeConnectionReceived(message,
684: this );
685:
686: break;
687: }
688: case MsgType_1_1._MessageError: {
689: break;
690: }
691: case MsgType_1_1._Fragment: {
692: //currently not reached
693: break;
694: }
695: default: {
696: if (logger.isErrorEnabled()) {
697: logger
698: .error("Received message with unknown message type "
699: + msg_type
700: + ", in "
701: + this .toString());
702: }
703: buf_mg.returnBuffer(message);
704: }
705: }
706: }//synchronized( pendingUndecidedSync )
707: }
708: }
709:
710: protected final void getWriteLock() {
711: synchronized (write_sync) {
712: while (writer_active) {
713: try {
714: write_sync.wait();
715: } catch (InterruptedException e) {
716: }
717: }
718:
719: writer_active = true;
720: }
721: }
722:
723: protected final void releaseWriteLock() {
724: synchronized (write_sync) {
725: writer_active = false;
726:
727: write_sync.notifyAll();
728: }
729: }
730:
731: private final synchronized void incPendingWrite() {
732: ++pending_write;
733: }
734:
735: private final synchronized void decPendingWrite() {
736: --pending_write;
737: }
738:
739: public final synchronized void incPendingMessages() {
740: ++pending_messages;
741: }
742:
743: public final synchronized void decPendingMessages() {
744: --pending_messages;
745: }
746:
747: public final synchronized boolean hasPendingMessages() {
748: return (pending_messages != 0) || (pending_write != 0);
749: }
750:
751: /**
752: * write (a fragment of) the message (passes it on to the wire)
753: */
754:
755: public final void write(byte[] fragment, int start, int size) {
756: transport.write(false, false, fragment, start, size, 0);
757:
758: if (getStatisticsProviderAdapter() != null) {
759: getStatisticsProviderAdapter().messageChunkSent(size);
760: }
761: }
762:
763: /* pro forma implementations of io.OutputStream methods */
764:
765: public final void write(int value) throws java.io.IOException {
766: throw new org.omg.CORBA.NO_IMPLEMENT();
767: }
768:
769: public final void write(byte[] value) throws java.io.IOException {
770: throw new org.omg.CORBA.NO_IMPLEMENT();
771: }
772:
773: public final void flush() throws java.io.IOException {
774: throw new org.omg.CORBA.NO_IMPLEMENT();
775: }
776:
777: public final void sendRequest(MessageOutputStream out,
778: boolean expect_reply) throws IOException {
779: if (expect_reply) {
780: incPendingMessages();
781: }
782:
783: sendMessage(out);
784: }
785:
786: public final void sendReply(MessageOutputStream out)
787: throws IOException {
788: decPendingMessages();
789:
790: sendMessage(out);
791: }
792:
793: private final void sendMessage(MessageOutputStream out)
794: throws IOException {
795: try {
796: incPendingWrite();
797: getWriteLock();
798: if (!transport.is_connected()) {
799: tcs_negotiated = false;
800:
801: if (logger.isDebugEnabled()) {
802: logger.debug(this .toString()
803: + ": sendMessage() -- opening transport");
804: }
805:
806: synchronized (connect_sync) {
807: try {
808: transport.connect(profile, timeout);
809: connect_sync.notifyAll();
810: } catch (RuntimeException ex) {
811: if (logger.isDebugEnabled()) {
812: logger
813: .debug(this .toString()
814: + ": sendMessage() -- failed to open transport");
815: }
816: throw ex;
817: }
818: }
819:
820: }
821:
822: out.write_to(this );
823:
824: transport.flush();
825:
826: if (getStatisticsProviderAdapter() != null) {
827: getStatisticsProviderAdapter().flushed();
828: }
829: } catch (org.omg.CORBA.COMM_FAILURE e) {
830: if (logger.isErrorEnabled()) {
831: logger.error(
832: "Failed to write GIOP message due to COMM_FAILURE, in "
833: + this .toString(), e);
834: }
835: if (!do_close) {
836: if (logger.isErrorEnabled()) {
837: logger
838: .error("GIOP connection closed due to errors during "
839: + "sendMessage(), in "
840: + this .toString());
841: }
842: //release write lock to prevent dead locks to
843: //reader thread which might try to close this socket too
844: //concurrently (unfortunately write lock is requested during streamClosed())
845: releaseWriteLock();
846: //close transport connection, there is nearly no chance to sync with
847: //peer on this connection again
848: close();
849: //signal GIOPConnectionManager to throw this connection away
850: this .streamClosed();
851: }
852: throw e;
853: } finally {
854: decPendingWrite();
855: releaseWriteLock();
856: }
857: }
858:
859: public final boolean isSSL() {
860: if (transport instanceof IIOPConnection) {
861: return ((IIOPConnection) transport).isSSL();
862: }
863: return false;
864: }
865:
866: public void close() {
867: if (logger.isDebugEnabled()) {
868: logger.debug(this .toString() + ": close()");
869: }
870:
871: synchronized (connect_sync) {
872: if (connection_listener != null) {
873: connection_listener.connectionClosed();
874: }
875:
876: transport.close();
877: do_close = true;
878: connect_sync.notifyAll();
879: }
880: }
881:
882: /**
883: * Get an instance of StatisticsProvider derivative, for
884: * updating the transport usage statistics.
885: */
886: protected final StatisticsProviderAdapter getStatisticsProviderAdapter() {
887: return statistics_provider_adapter;
888: }
889:
890: /**
891: * Get the statistics provider for transport usage statistics
892: * that can be used in conjunction with the SelectionStrategy.
893: * This is a special-case provider, usually supplied by, and
894: * known to, the concrete SelectionStrategy. To actually update
895: * the usage stats use getStatisticsProviderAdapter()
896: */
897: public final StatisticsProvider getStatisticsProvider() {
898: return statistics_provider;
899: }
900:
901: /**
902: * Return the StatissticsProvider, given the cardinality number
903: * @param no
904: * @return
905: */
906: public StatisticsProvider getStatisticsProvider(int no) {
907: if (statistics_provider_adapter == null)
908: return null;
909:
910: return statistics_provider_adapter.find(no);
911: }
912:
913: /*
914: class CachedContext
915: {
916: public byte[] client_authentication_token;
917: public EstablishContext msg;
918: CachedContext(byte[] client_authentication_token, EstablishContext msg)
919: {
920: this.client_authentication_token = client_authentication_token;
921: this.msg = msg;
922: }
923: }
924:
925: public void cacheSASContext(long client_context_id, byte[] client_authentication_token, EstablishContext msg)
926: {
927: synchronized ( sasContexts )
928: {
929: sasContexts.put(new Long(client_context_id), new CachedContext(client_authentication_token, msg));
930: }
931: }
932:
933: public void purgeSASContext(long client_context_id)
934: {
935: synchronized ( sasContexts )
936: {
937: sasContexts.remove(new Long(client_context_id));
938: }
939: }
940:
941: public byte[] getSASContext(long client_context_id)
942: {
943: Long key = new Long(client_context_id);
944: synchronized (sasContexts)
945: {
946: if (!sasContexts.containsKey(key)) return null;
947: return ((CachedContext)sasContexts.get(key)).client_authentication_token;
948: }
949: }
950:
951: public EstablishContext getSASContextMsg(long client_context_id)
952: {
953: Long key = new Long(client_context_id);
954: synchronized (sasContexts)
955: {
956: if (!sasContexts.containsKey(key)) return null;
957: return ((CachedContext)sasContexts.get(key)).msg;
958: }
959: }
960: */
961:
962: // provide cubbyholes for data
963: public static int allocate_cubby_id() {
964: return cubby_count++;
965: }
966:
967: public Object get_cubby(int id) {
968: if (id < 0 || id >= cubby_count) {
969: if (logger.isErrorEnabled()) {
970: logger.error("Get bad cubby id " + id + " (max="
971: + cubby_count + "), in " + this .toString());
972: }
973: return null;
974: }
975: return cubbyholes[id];
976: }
977:
978: public void set_cubby(int id, Object obj) {
979: if (id < 0 || id >= cubby_count) {
980: if (logger.isErrorEnabled()) {
981: logger.error("Set bad cubby id " + id + " (max="
982: + cubby_count + "), in " + this .toString());
983: }
984: return;
985: }
986: cubbyholes[id] = obj;
987: }
988:
989: /*default (or, package-level) access*/
990: org.omg.ETF.Profile getProfile() {
991: return profile;
992: }
993:
994: }// GIOPConnection
|