Source Code Cross Referenced for GIOPConnection.java in  » Collaboration » JacORB » org » jacorb » orb » giop » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Collaboration » JacORB » org.jacorb.orb.giop 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         *        JacORB - a free Java ORB
003:         *
004:         *   Copyright (C) 1997-2004 Gerald Brose.
005:         *
006:         *   This library is free software; you can redistribute it and/or
007:         *   modify it under the terms of the GNU Library General Public
008:         *   License as published by the Free Software Foundation; either
009:         *   version 2 of the License, or (at your option) any later version.
010:         *
011:         *   This library is distributed in the hope that it will be useful,
012:         *   but WITHOUT ANY WARRANTY; without even the implied warranty of
013:         *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
014:         *   Library General Public License for more details.
015:         *
016:         *   You should have received a copy of the GNU Library General Public
017:         *   License along with this library; if not, write to the Free
018:         *   Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
019:         */
020:
021:        package org.jacorb.orb.giop;
022:
023:        import org.apache.avalon.framework.logger.Logger;
024:        import org.apache.avalon.framework.configuration.*;
025:
026:        import java.io.*;
027:        import java.util.*;
028:
029:        import org.omg.GIOP.*;
030:        import org.omg.CORBA.NO_IMPLEMENT;
031:        import org.omg.CORBA.CompletionStatus;
032:        import org.omg.ETF.*;
033:
034:        import org.jacorb.orb.SystemExceptionHelper;
035:        import org.jacorb.orb.BufferManager;
036:        import org.jacorb.orb.iiop.*;
037:        import org.jacorb.util.*;
038:
039:        /**
040:         * GIOPConnection.java
041:         *
042:         * Created: Sun Aug 12 21:30:48 2002
043:         *
044:         * Configuration parameters:<br>
045:         * 
046:         * jacorb.debug.dump_incoming_messages=[on|off],        default=off<br>
047:         * jacorb.connection.client.connect_timeout=N,          default=0<br>
048:         * jacorb.connection.statistics_providers={classnames}, default=(empty)<br>
049:         * 
050:         * @author Nicolas Noffke
051:         * @version $Id: GIOPConnection.java,v 1.66 2007/02/06 19:27:28 andre.spiegel Exp $
052:         */
053:
054:        public abstract class GIOPConnection extends java.io.OutputStream {
055:            /**
056:             * Profile describing the remote endpoint of this connection.
057:             */
058:            protected final org.omg.ETF.Profile profile;
059:            protected org.omg.ETF.Connection transport = null;
060:
061:            private RequestListener request_listener = null;
062:            private ReplyListener reply_listener = null;
063:            protected ConnectionListener connection_listener = null;
064:
065:            protected Object connect_sync = new Object();
066:
067:            private boolean writer_active = false;
068:            private final Object write_sync = new Object();
069:
070:            private org.jacorb.config.Configuration configuration;
071:            protected Logger logger;
072:
073:            /*
074:             * Connection OSF character formats.
075:             */
076:            private int tcs = CodeSet.getTCSDefault();
077:            private int tcsw = CodeSet.getTCSWDefault();
078:
079:            private boolean tcs_negotiated = false;
080:
081:            //map request id (Integer) to ByteArrayInputStream
082:            private final Map fragments = new HashMap();
083:            private final BufferManager buf_mg;
084:
085:            private boolean dump_incoming = false;
086:            private long timeout = 0;
087:
088:            private final BufferHolder msg_header = new BufferHolder(
089:                    new byte[Messages.MSG_HEADER_SIZE]);
090:
091:            private final BufferHolder inbuf = new BufferHolder();
092:
093:            //// support for SAS Stateful contexts
094:            //private Hashtable sasContexts = null;
095:
096:            // provide cubbyholes for other layers to store connection persistent data
097:            private static int cubby_count = 0;
098:            private Object[] cubbyholes = null;
099:
100:            // the no. of outstanding messages (requests/replies)
101:            // pending_messages refers only to expected replies, to be sent
102:            // in response to two-way requests. pending_write refers to messages
103:            // that are outbound but have not yet been sent. These could be one-way
104:            // or two-way requests, or they could be replies being sent out of a
105:            // server. There will typicially be only one pending write.
106:            private int pending_messages = 0;
107:            private int pending_write = 0;
108:
109:            protected boolean discard_messages = false;
110:
111:            //used to lock the section where we got a message, but it isn't
112:            //yet decided, if this might need a reply, i.e. set the transport
113:            //busy
114:            protected Object pendingUndecidedSync = new Object();
115:
116:            //stop listening for messages
117:            protected boolean do_close = false;
118:
119:            protected StatisticsProvider statistics_provider = null;
120:            protected StatisticsProviderAdapter statistics_provider_adapter = null;
121:
122:            public GIOPConnection(org.omg.ETF.Profile profile,
123:                    org.omg.ETF.Connection transport,
124:                    RequestListener request_listener,
125:                    ReplyListener reply_listener,
126:                    StatisticsProvider statistics_provider) {
127:                super ();
128:
129:                this .profile = profile;
130:                this .transport = transport;
131:                this .request_listener = request_listener;
132:                this .reply_listener = reply_listener;
133:                this .statistics_provider = statistics_provider;
134:
135:                if (statistics_provider != null)
136:                    this .statistics_provider_adapter = new StatisticsProviderAdapter(
137:                            statistics_provider);
138:
139:                this .buf_mg = BufferManager.getInstance();
140:                //sasContexts = new Hashtable();
141:
142:                this .cubbyholes = new Object[cubby_count];
143:
144:            }
145:
146:            public void configure(Configuration configuration)
147:                    throws ConfigurationException {
148:                this .configuration = (org.jacorb.config.Configuration) configuration;
149:                logger = this .configuration.getNamedLogger("jacorb.giop.conn");
150:                dump_incoming = configuration.getAttribute(
151:                        "jacorb.debug.dump_incoming_messages", "off").equals(
152:                        "on");
153:                timeout = configuration.getAttributeAsInteger(
154:                        "jacorb.connection.client.connect_timeout", 90000);
155:
156:                List statsProviderClassNames = this .configuration
157:                        .getAttributeList("jacorb.connection.statistics_providers");
158:
159:                for (Iterator iter = statsProviderClassNames.iterator(); iter
160:                        .hasNext();) {
161:                    String className = (String) iter.next();
162:                    try {
163:                        Class iclass = ObjectUtil.classForName(className);
164:
165:                        this .statistics_provider_adapter = new StatisticsProviderAdapter(
166:                                (StatisticsProvider) iclass.newInstance(),
167:                                statistics_provider_adapter);
168:                    } catch (Exception e) {
169:                        if (logger.isErrorEnabled()) {
170:                            logger
171:                                    .error("Unable to create class from property "
172:                                            + ">jacorb.connection.statistics_provider_class<: "
173:                                            + e.toString());
174:                        }
175:                    }
176:                }
177:            }
178:
179:            public final void setCodeSets(int TCS, int TCSW) {
180:                this .tcs = TCS;
181:                this .tcsw = TCSW;
182:            }
183:
184:            public final int getTCS() {
185:                return tcs;
186:            }
187:
188:            public final int getTCSW() {
189:                return tcsw;
190:            }
191:
192:            public final void markTCSNegotiated() {
193:                tcs_negotiated = true;
194:            }
195:
196:            public final boolean isTCSNegotiated() {
197:                return tcs_negotiated;
198:            }
199:
200:            /**
201:             * Get the value of request_listener.
202:             * @return value of request_listener.
203:             */
204:            protected final synchronized RequestListener getRequestListener() {
205:                return request_listener;
206:            }
207:
208:            /**
209:             * Set the value of request_listener.
210:             * @param listener  Value to assign to request_listener.
211:             */
212:            public final synchronized void setRequestListener(
213:                    RequestListener listener) {
214:                this .request_listener = listener;
215:            }
216:
217:            /**
218:             * Get the value of reply_listener.
219:             * @return value of reply_listener.
220:             */
221:            private final synchronized ReplyListener getReplyListener() {
222:                return reply_listener;
223:            }
224:
225:            /**
226:             * Set the value of reply_listener.
227:             * @param listener  Value to assign to reply_listener.
228:             */
229:            public final synchronized void setReplyListener(
230:                    ReplyListener listener) {
231:                this .reply_listener = listener;
232:            }
233:
234:            public final void setConnectionListener(
235:                    ConnectionListener connection_listener) {
236:                this .connection_listener = connection_listener;
237:            }
238:
239:            public final org.omg.ETF.Connection getTransport() {
240:                synchronized (connect_sync) {
241:                    return transport;
242:                }
243:            }
244:
245:            private boolean waitUntilConnected() {
246:                synchronized (connect_sync) {
247:                    while (!transport.is_connected() && !do_close) {
248:                        if (logger.isDebugEnabled()) {
249:                            logger.debug(this .toString()
250:                                    + ": will wait until connected");
251:                        }
252:
253:                        try {
254:                            connect_sync.wait();
255:                        } catch (InterruptedException ie) {
256:                        }
257:                    }
258:                    return !do_close;
259:                }
260:            }
261:
262:            /**
263:             * Called by this.getMessage() to signal that the attempt to
264:             * read a message resulted in a timeout.  This method is implemented
265:             * differently on the client and server side.
266:             */
267:            protected abstract void readTimedOut();
268:
269:            /**
270:             * Called by this.getMessage() to signal that the underlying transport
271:             * was closed while attempting to read a message.  This method is
272:             * implemented differently on the client and server side.
273:             */
274:            protected abstract void streamClosed();
275:
276:            /**
277:             * Read a GIOP message from the stream. This will first try to
278:             * read in the fixed-length GIOP message header to determine the
279:             * message size, and the read the rest. It also checks the leading
280:             * four magic bytes of the message header. This method <b>is not
281:             * thread safe<b> and only expected to be called by a single
282:             * thread.
283:             *
284:             * @return a GIOP message or null.
285:             */
286:            private byte[] getMessage() {
287:                //Wait until the actual socket connection is established. This
288:                //is necessary for the client side, so opening up a new
289:                //connection can be delayed until the first message is to be
290:                //sent.
291:                if (!waitUntilConnected()) {
292:                    return null;
293:                }
294:
295:                try {
296:                    transport.read(msg_header, 0, Messages.MSG_HEADER_SIZE,
297:                            Messages.MSG_HEADER_SIZE, 0);
298:                } catch (org.omg.CORBA.TRANSIENT ex) {
299:                    return null;
300:                } catch (org.omg.CORBA.COMM_FAILURE ex) {
301:                    if (logger.isDebugEnabled()) {
302:                        logger.debug(this .toString()
303:                                + ": getMessage() -- COMM_FAILURE");
304:                    }
305:                    this .streamClosed();
306:                    return null;
307:                } catch (org.omg.CORBA.TIMEOUT ex) {
308:                    if (logger.isDebugEnabled()) {
309:                        logger.debug(this .toString()
310:                                + ": getMessage() -- TIMEOUT");
311:                    }
312:                    this .readTimedOut();
313:                    return null;
314:                }
315:
316:                byte[] header = msg_header.value;
317:
318:                //(minimally) decode GIOP message header. Main checks should
319:                //be done one layer above.
320:
321:                if (Messages.matchGIOPMagic(header)) {
322:                    //determine message size
323:                    int msg_size = Messages.getMsgSize(header);
324:
325:                    if (msg_size < 0) {
326:                        if (logger.isErrorEnabled()) {
327:                            logger.error("Negative GIOP message size ("
328:                                    + msg_size + ") in " + this .toString());
329:                        }
330:
331:                        if (logger.isDebugEnabled()) {
332:                            logger
333:                                    .debug("TCP_IP_GIOPTransport.getMessage() with header: \n"
334:                                            + new String(header)
335:                                            + "\nsize : "
336:                                            + Messages.MSG_HEADER_SIZE
337:                                            + ", in " + this .toString());
338:                        }
339:
340:                        return null;
341:                    }
342:
343:                    //get a large enough buffer from the pool
344:                    inbuf.value = buf_mg.getBuffer(msg_size
345:                            + Messages.MSG_HEADER_SIZE);
346:
347:                    //copy header
348:                    System.arraycopy(header, 0, inbuf.value, 0,
349:                            Messages.MSG_HEADER_SIZE);
350:
351:                    try {
352:                        transport.read(inbuf, Messages.MSG_HEADER_SIZE,
353:                                msg_size, msg_size, 0);
354:                    } catch (org.omg.CORBA.COMM_FAILURE ex) {
355:                        if (logger.isErrorEnabled()) {
356:                            logger.error("Failed to read GIOP message in "
357:                                    + this .toString(), ex);
358:                        }
359:                        return null;
360:                    }
361:
362:                    if (dump_incoming) {
363:                        if (logger.isInfoEnabled()) {
364:                            logger
365:                                    .info(this .toString()
366:                                            + " BufferDump:\n"
367:                                            + ObjectUtil
368:                                                    .bufToString(
369:                                                            inbuf.value,
370:                                                            0,
371:                                                            msg_size
372:                                                                    + Messages.MSG_HEADER_SIZE));
373:                        }
374:                    }
375:
376:                    if (getStatisticsProviderAdapter() != null) {
377:                        getStatisticsProviderAdapter().messageReceived(
378:                                msg_size + Messages.MSG_HEADER_SIZE);
379:                    }
380:
381:                    //this is the "good" exit point.
382:                    return inbuf.value;
383:                }
384:
385:                if (logger.isDebugEnabled()) {
386:                    logger.debug(this .toString()
387:                            + " getMessage(), invalid header read: "
388:                            + ObjectUtil.bufToString(msg_header.value, 0, 4));
389:                }
390:
391:                if (logger.isErrorEnabled()) {
392:                    logger.error("Failed to read GIOP message in "
393:                            + this .toString()
394:                            + ", incorrect magic number --> connection closed");
395:                }
396:
397:                // close transport connection, there is nearly no chance to sync
398:                // with peer on this connection again
399:                close();
400:
401:                // notify GIOPConnectionManager of close
402:                this .streamClosed();
403:
404:                return null;
405:            }
406:
407:            public final void receiveMessages() throws IOException {
408:                while (true) {
409:                    byte[] message = getMessage();
410:
411:                    if (message == null) {
412:                        if (do_close) {
413:                            return;
414:                        }
415:
416:                        continue;
417:                    }
418:
419:                    synchronized (pendingUndecidedSync) {
420:                        if (discard_messages) {
421:                            buf_mg.returnBuffer(message);
422:                            continue;
423:                        }
424:
425:                        //check major version
426:                        if (Messages.getGIOPMajor(message) != 1) {
427:                            if (logger.isErrorEnabled()) {
428:                                logger
429:                                        .error("Invalid GIOP major version encountered: "
430:                                                + Messages
431:                                                        .getGIOPMajor(message)
432:                                                + ", in " + this .toString());
433:                            }
434:
435:                            buf_mg.returnBuffer(message);
436:                            continue;
437:                        }
438:
439:                        int msg_type = Messages.getMsgType(message);
440:
441:                        if (msg_type == MsgType_1_1._Fragment) {
442:                            //GIOP 1.0 messages aren't allowed to be fragmented
443:                            if (Messages.getGIOPMinor(message) == 0) {
444:                                if (logger.isWarnEnabled()) {
445:                                    logger
446:                                            .warn("Received a GIOP 1.0 message of type Fragment"
447:                                                    + " in " + this .toString());
448:                                }
449:
450:                                final MessageOutputStream out = new MessageOutputStream();
451:
452:                                try {
453:                                    out.writeGIOPMsgHeader(
454:                                            MsgType_1_1._MessageError, 0);
455:                                    out.insertMsgSize();
456:                                    sendMessage(out);
457:                                    buf_mg.returnBuffer(message);
458:                                } finally {
459:                                    out.close();
460:                                }
461:                                continue;
462:                            }
463:
464:                            //GIOP 1.1 Fragmented messages currently not supported
465:                            if (Messages.getGIOPMinor(message) == 1) {
466:                                if (logger.isWarnEnabled()) {
467:                                    logger
468:                                            .warn("Received a GIOP 1.1 Fragment message"
469:                                                    + " in " + this .toString());
470:                                }
471:
472:                                //Can't return a message in this case, because
473:                                //GIOP 1.1 fragments don't have request
474:                                //ids. Therefore, just discard.
475:                                buf_mg.returnBuffer(message);
476:
477:                                continue;
478:                            }
479:
480:                            //for now, only GIOP 1.2 from here on
481:
482:                            Integer request_id = ObjectUtil.newInteger(Messages
483:                                    .getRequestId(message));
484:
485:                            //sanity check
486:                            if (!fragments.containsKey(request_id)) {
487:                                if (logger.isErrorEnabled()) {
488:                                    logger
489:                                            .error("No previous Fragment to this one in "
490:                                                    + this .toString());
491:                                }
492:
493:                                //Drop this one and continue
494:                                buf_mg.returnBuffer(message);
495:
496:                                continue;
497:                            }
498:
499:                            ByteArrayOutputStream b_out = (ByteArrayOutputStream) fragments
500:                                    .get(request_id);
501:
502:                            //add the message contents to stream (discarding the
503:                            //GIOP message header and the request id ulong of the
504:                            //Fragment header)
505:                            b_out.write(message, Messages.MSG_HEADER_SIZE + 4,
506:                                    Messages.getMsgSize(message) - 4);
507:
508:                            if (Messages.moreFragmentsFollow(message)) {
509:                                //more to follow, so don't hand over to processing
510:                                buf_mg.returnBuffer(message);
511:                                continue;
512:                            }
513:
514:                            buf_mg.returnBuffer(message);
515:
516:                            //silently replace the original message buffer and type
517:                            message = b_out.toByteArray();
518:                            msg_type = Messages.getMsgType(message);
519:
520:                            fragments.remove(request_id);
521:                        } else if (Messages.moreFragmentsFollow(message)) {
522:                            //GIOP 1.0 messages aren't allowed to be fragmented
523:                            if (Messages.getGIOPMinor(message) == 0) {
524:                                if (logger.isWarnEnabled()) {
525:                                    logger
526:                                            .warn("Received a GIOP 1.0 message "
527:                                                    + "with the \"more fragments follow\""
528:                                                    + "bits set in "
529:                                                    + this .toString());
530:                                }
531:
532:                                MessageOutputStream out = new MessageOutputStream();
533:                                out.writeGIOPMsgHeader(
534:                                        MsgType_1_1._MessageError, 0);
535:                                out.insertMsgSize();
536:                                sendMessage(out);
537:                                buf_mg.returnBuffer(message);
538:
539:                                continue;
540:                            }
541:
542:                            //If GIOP 1.1, only Request and Reply messages may be fragmented
543:                            if (Messages.getGIOPMinor(message) == 1) {
544:                                if (msg_type != MsgType_1_1._Request
545:                                        && msg_type != MsgType_1_1._Reply) {
546:                                    if (logger.isWarnEnabled()) {
547:                                        logger
548:                                                .warn("Received a GIOP 1.1 message of type "
549:                                                        + msg_type
550:                                                        + " with the "
551:                                                        + ""
552:                                                        + "\"more fragments follow\" bits set"
553:                                                        + " in "
554:                                                        + this .toString());
555:                                    }
556:
557:                                    MessageOutputStream out = new MessageOutputStream();
558:                                    out.writeGIOPMsgHeader(
559:                                            MsgType_1_1._MessageError, 1);
560:                                    out.insertMsgSize();
561:                                    sendMessage(out);
562:                                    buf_mg.returnBuffer(message);
563:
564:                                    continue;
565:                                }
566:
567:                                //GIOP 1.1 Fragmented messages currently not supported
568:                                if (logger.isWarnEnabled()) {
569:                                    logger
570:                                            .warn("Received a fragmented GIOP 1.1 message"
571:                                                    + " in " + this .toString());
572:                                }
573:
574:                                int giop_minor = Messages.getGIOPMinor(message);
575:
576:                                ReplyOutputStream out = new ReplyOutputStream(
577:                                        Messages.getRequestId(message),
578:                                        ReplyStatusType_1_2.SYSTEM_EXCEPTION,
579:                                        giop_minor, false, logger);//no locate reply
580:
581:                                SystemExceptionHelper.write(out,
582:                                        new NO_IMPLEMENT(0,
583:                                                CompletionStatus.COMPLETED_NO));
584:
585:                                sendMessage(out);
586:                                buf_mg.returnBuffer(message);
587:
588:                                continue;
589:                            }
590:
591:                            //check, that only the correct message types are fragmented
592:                            if (msg_type == MsgType_1_1._CancelRequest
593:                                    || msg_type == MsgType_1_1._CloseConnection
594:                                    || msg_type == MsgType_1_1._CancelRequest) {
595:                                if (logger.isWarnEnabled()) {
596:                                    logger
597:                                            .warn("Received a GIOP message of type "
598:                                                    + msg_type
599:                                                    + " with the \"more fragments follow\" bits set, "
600:                                                    + "but this message type isn't allowed to be "
601:                                                    + "fragmented, in "
602:                                                    + this .toString());
603:                                }
604:
605:                                MessageOutputStream out = new MessageOutputStream();
606:                                out.writeGIOPMsgHeader(
607:                                        MsgType_1_1._MessageError, 1);
608:                                out.insertMsgSize();
609:                                sendMessage(out);
610:                                buf_mg.returnBuffer(message);
611:
612:                                continue;
613:                            }
614:
615:                            //if we're here, it's the first part of a fragmented message
616:                            Integer request_id = new Integer(Messages
617:                                    .getRequestId(message)); // NOPMD
618:
619:                            //sanity check
620:                            if (fragments.containsKey(request_id)) {
621:                                if (logger.isErrorEnabled()) {
622:                                    logger
623:                                            .error("Received a message of type "
624:                                                    + msg_type
625:                                                    + " with the more fragments follow bit set,"
626:                                                    + " but there is already an fragmented,"
627:                                                    + " incomplete message with the same request id ("
628:                                                    + request_id
629:                                                    + ", in "
630:                                                    + this .toString());
631:                                }
632:
633:                                //Drop this one and continue
634:                                buf_mg.returnBuffer(message);
635:
636:                                continue;
637:                            }
638:
639:                            //create new stream and add to table
640:                            ByteArrayOutputStream b_out = new ByteArrayOutputStream();
641:                            fragments.put(request_id, b_out);
642:
643:                            //add the message contents to stream
644:                            b_out.write(message, 0, Messages.MSG_HEADER_SIZE
645:                                    + Messages.getMsgSize(message));
646:
647:                            buf_mg.returnBuffer(message);
648:
649:                            //This message isn't yet complete
650:                            continue;
651:                        }
652:
653:                        switch (msg_type) {
654:                        case MsgType_1_1._Request: {
655:                            getRequestListener().requestReceived(message, this );
656:
657:                            break;
658:                        }
659:                        case MsgType_1_1._Reply: {
660:                            getReplyListener().replyReceived(message, this );
661:
662:                            break;
663:                        }
664:                        case MsgType_1_1._CancelRequest: {
665:                            getRequestListener().cancelRequestReceived(message,
666:                                    this );
667:
668:                            break;
669:                        }
670:                        case MsgType_1_1._LocateRequest: {
671:                            getRequestListener().locateRequestReceived(message,
672:                                    this );
673:
674:                            break;
675:                        }
676:                        case MsgType_1_1._LocateReply: {
677:                            getReplyListener().locateReplyReceived(message,
678:                                    this );
679:
680:                            break;
681:                        }
682:                        case MsgType_1_1._CloseConnection: {
683:                            getReplyListener().closeConnectionReceived(message,
684:                                    this );
685:
686:                            break;
687:                        }
688:                        case MsgType_1_1._MessageError: {
689:                            break;
690:                        }
691:                        case MsgType_1_1._Fragment: {
692:                            //currently not reached
693:                            break;
694:                        }
695:                        default: {
696:                            if (logger.isErrorEnabled()) {
697:                                logger
698:                                        .error("Received message with unknown message type "
699:                                                + msg_type
700:                                                + ", in "
701:                                                + this .toString());
702:                            }
703:                            buf_mg.returnBuffer(message);
704:                        }
705:                        }
706:                    }//synchronized( pendingUndecidedSync )
707:                }
708:            }
709:
710:            protected final void getWriteLock() {
711:                synchronized (write_sync) {
712:                    while (writer_active) {
713:                        try {
714:                            write_sync.wait();
715:                        } catch (InterruptedException e) {
716:                        }
717:                    }
718:
719:                    writer_active = true;
720:                }
721:            }
722:
723:            protected final void releaseWriteLock() {
724:                synchronized (write_sync) {
725:                    writer_active = false;
726:
727:                    write_sync.notifyAll();
728:                }
729:            }
730:
731:            private final synchronized void incPendingWrite() {
732:                ++pending_write;
733:            }
734:
735:            private final synchronized void decPendingWrite() {
736:                --pending_write;
737:            }
738:
739:            public final synchronized void incPendingMessages() {
740:                ++pending_messages;
741:            }
742:
743:            public final synchronized void decPendingMessages() {
744:                --pending_messages;
745:            }
746:
747:            public final synchronized boolean hasPendingMessages() {
748:                return (pending_messages != 0) || (pending_write != 0);
749:            }
750:
751:            /**
752:             * write (a fragment of) the message (passes it on to the wire)
753:             */
754:
755:            public final void write(byte[] fragment, int start, int size) {
756:                transport.write(false, false, fragment, start, size, 0);
757:
758:                if (getStatisticsProviderAdapter() != null) {
759:                    getStatisticsProviderAdapter().messageChunkSent(size);
760:                }
761:            }
762:
763:            /* pro forma implementations of io.OutputStream methods */
764:
765:            public final void write(int value) throws java.io.IOException {
766:                throw new org.omg.CORBA.NO_IMPLEMENT();
767:            }
768:
769:            public final void write(byte[] value) throws java.io.IOException {
770:                throw new org.omg.CORBA.NO_IMPLEMENT();
771:            }
772:
773:            public final void flush() throws java.io.IOException {
774:                throw new org.omg.CORBA.NO_IMPLEMENT();
775:            }
776:
777:            public final void sendRequest(MessageOutputStream out,
778:                    boolean expect_reply) throws IOException {
779:                if (expect_reply) {
780:                    incPendingMessages();
781:                }
782:
783:                sendMessage(out);
784:            }
785:
786:            public final void sendReply(MessageOutputStream out)
787:                    throws IOException {
788:                decPendingMessages();
789:
790:                sendMessage(out);
791:            }
792:
793:            private final void sendMessage(MessageOutputStream out)
794:                    throws IOException {
795:                try {
796:                    incPendingWrite();
797:                    getWriteLock();
798:                    if (!transport.is_connected()) {
799:                        tcs_negotiated = false;
800:
801:                        if (logger.isDebugEnabled()) {
802:                            logger.debug(this .toString()
803:                                    + ": sendMessage() -- opening transport");
804:                        }
805:
806:                        synchronized (connect_sync) {
807:                            try {
808:                                transport.connect(profile, timeout);
809:                                connect_sync.notifyAll();
810:                            } catch (RuntimeException ex) {
811:                                if (logger.isDebugEnabled()) {
812:                                    logger
813:                                            .debug(this .toString()
814:                                                    + ": sendMessage() -- failed to open transport");
815:                                }
816:                                throw ex;
817:                            }
818:                        }
819:
820:                    }
821:
822:                    out.write_to(this );
823:
824:                    transport.flush();
825:
826:                    if (getStatisticsProviderAdapter() != null) {
827:                        getStatisticsProviderAdapter().flushed();
828:                    }
829:                } catch (org.omg.CORBA.COMM_FAILURE e) {
830:                    if (logger.isErrorEnabled()) {
831:                        logger.error(
832:                                "Failed to write GIOP message due to COMM_FAILURE, in "
833:                                        + this .toString(), e);
834:                    }
835:                    if (!do_close) {
836:                        if (logger.isErrorEnabled()) {
837:                            logger
838:                                    .error("GIOP connection closed due to errors during "
839:                                            + "sendMessage(), in "
840:                                            + this .toString());
841:                        }
842:                        //release write lock to prevent dead locks to 
843:                        //reader thread which might try to close this socket too
844:                        //concurrently (unfortunately write lock is requested during streamClosed())
845:                        releaseWriteLock();
846:                        //close transport connection, there is nearly no chance to sync with
847:                        //peer on this connection again
848:                        close();
849:                        //signal GIOPConnectionManager to throw this connection away
850:                        this .streamClosed();
851:                    }
852:                    throw e;
853:                } finally {
854:                    decPendingWrite();
855:                    releaseWriteLock();
856:                }
857:            }
858:
859:            public final boolean isSSL() {
860:                if (transport instanceof  IIOPConnection) {
861:                    return ((IIOPConnection) transport).isSSL();
862:                }
863:                return false;
864:            }
865:
866:            public void close() {
867:                if (logger.isDebugEnabled()) {
868:                    logger.debug(this .toString() + ": close()");
869:                }
870:
871:                synchronized (connect_sync) {
872:                    if (connection_listener != null) {
873:                        connection_listener.connectionClosed();
874:                    }
875:
876:                    transport.close();
877:                    do_close = true;
878:                    connect_sync.notifyAll();
879:                }
880:            }
881:
882:            /**
883:             * Get an instance of StatisticsProvider derivative, for 
884:             * updating the transport usage statistics.
885:             */
886:            protected final StatisticsProviderAdapter getStatisticsProviderAdapter() {
887:                return statistics_provider_adapter;
888:            }
889:
890:            /**
891:             * Get the statistics provider for transport usage statistics 
892:             * that can be used in conjunction with the SelectionStrategy. 
893:             * This is a special-case provider, usually supplied by, and 
894:             * known to, the concrete SelectionStrategy. To actually update 
895:             * the usage stats use getStatisticsProviderAdapter()
896:             */
897:            public final StatisticsProvider getStatisticsProvider() {
898:                return statistics_provider;
899:            }
900:
901:            /**
902:             * Return the StatissticsProvider, given the cardinality number
903:             * @param no
904:             * @return
905:             */
906:            public StatisticsProvider getStatisticsProvider(int no) {
907:                if (statistics_provider_adapter == null)
908:                    return null;
909:
910:                return statistics_provider_adapter.find(no);
911:            }
912:
913:            /*
914:              class CachedContext
915:              {
916:              public byte[] client_authentication_token;
917:              public EstablishContext msg;
918:              CachedContext(byte[] client_authentication_token, EstablishContext msg)
919:              {
920:              this.client_authentication_token = client_authentication_token;
921:              this.msg = msg;
922:              }
923:              }
924:
925:              public void cacheSASContext(long client_context_id, byte[] client_authentication_token, EstablishContext msg)
926:              {
927:              synchronized ( sasContexts )
928:              {
929:              sasContexts.put(new Long(client_context_id), new CachedContext(client_authentication_token, msg));
930:              }
931:              }
932:
933:              public void purgeSASContext(long client_context_id)
934:              {
935:              synchronized ( sasContexts )
936:              {
937:              sasContexts.remove(new Long(client_context_id));
938:              }
939:              }
940:
941:              public byte[] getSASContext(long client_context_id)
942:              {
943:              Long key = new Long(client_context_id);
944:              synchronized (sasContexts)
945:              {
946:              if (!sasContexts.containsKey(key)) return null;
947:              return ((CachedContext)sasContexts.get(key)).client_authentication_token;
948:              }
949:              }
950:
951:              public EstablishContext getSASContextMsg(long client_context_id)
952:              {
953:              Long key = new Long(client_context_id);
954:              synchronized (sasContexts)
955:              {
956:              if (!sasContexts.containsKey(key)) return null;
957:              return ((CachedContext)sasContexts.get(key)).msg;
958:              }
959:              }
960:             */
961:
962:            // provide cubbyholes for data
963:            public static int allocate_cubby_id() {
964:                return cubby_count++;
965:            }
966:
967:            public Object get_cubby(int id) {
968:                if (id < 0 || id >= cubby_count) {
969:                    if (logger.isErrorEnabled()) {
970:                        logger.error("Get bad cubby id " + id + " (max="
971:                                + cubby_count + "), in " + this .toString());
972:                    }
973:                    return null;
974:                }
975:                return cubbyholes[id];
976:            }
977:
978:            public void set_cubby(int id, Object obj) {
979:                if (id < 0 || id >= cubby_count) {
980:                    if (logger.isErrorEnabled()) {
981:                        logger.error("Set bad cubby id " + id + " (max="
982:                                + cubby_count + "), in " + this .toString());
983:                    }
984:                    return;
985:                }
986:                cubbyholes[id] = obj;
987:            }
988:
989:            /*default (or, package-level) access*/
990:            org.omg.ETF.Profile getProfile() {
991:                return profile;
992:            }
993:
994:        }// GIOPConnection
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.