Source Code Cross Referenced for JdbcMessage.java in  » EJB-Server-resin-3.1.5 » resin » com » caucho » jms » jdbc » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » EJB Server resin 3.1.5 » resin » com.caucho.jms.jdbc 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved
003:         *
004:         * This file is part of Resin(R) Open Source
005:         *
006:         * Each copy or derived work must preserve the copyright notice and this
007:         * notice unmodified.
008:         *
009:         * Resin Open Source is free software; you can redistribute it and/or modify
010:         * it under the terms of the GNU General Public License as published by
011:         * the Free Software Foundation; either version 2 of the License, or
012:         * (at your option) any later version.
013:         *
014:         * Resin Open Source is distributed in the hope that it will be useful,
015:         * but WITHOUT ANY WARRANTY; without even the implied warranty of
016:         * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
017:         * of NON-INFRINGEMENT.  See the GNU General Public License for more
018:         * details.
019:         *
020:         * You should have received a copy of the GNU General Public License
021:         * along with Resin Open Source; if not, write to the
022:         *
023:         *   Free Software Foundation, Inc.
024:         *   59 Temple Place, Suite 330
025:         *   Boston, MA 02111-1307  USA
026:         *
027:         * @author Scott Ferguson
028:         */
029:
030:        package com.caucho.jms.jdbc;
031:
032:        import com.caucho.config.ConfigException;
033:        import com.caucho.jdbc.JdbcMetaData;
034:        import com.caucho.jdbc.OracleMetaData;
035:        import com.caucho.jms.JMSExceptionWrapper;
036:        import com.caucho.jms.message.BytesMessageImpl;
037:        import com.caucho.jms.message.MapMessageImpl;
038:        import com.caucho.jms.message.MessageImpl;
039:        import com.caucho.jms.message.ObjectMessageImpl;
040:        import com.caucho.jms.message.StreamMessageImpl;
041:        import com.caucho.jms.message.TextMessageImpl;
042:        import com.caucho.jms.selector.Selector;
043:        import com.caucho.util.CharBuffer;
044:        import com.caucho.util.L10N;
045:        import com.caucho.vfs.*;
046:
047:        import javax.jms.*;
048:        import javax.sql.DataSource;
049:        import java.io.EOFException;
050:        import java.io.IOException;
051:        import java.io.InputStream;
052:        import java.io.ObjectInputStream;
053:        import java.io.ObjectOutputStream;
054:        import java.sql.Connection;
055:        import java.sql.PreparedStatement;
056:        import java.sql.ResultSet;
057:        import java.sql.SQLException;
058:        import java.sql.Statement;
059:        import java.sql.Types;
060:        import java.util.Enumeration;
061:        import java.util.logging.Level;
062:        import java.util.logging.Logger;
063:
064:        /**
065:         * Represents a JDBC message.
066:         */
067:        public class JdbcMessage {
068:            static final Logger log = Logger.getLogger(JdbcMessage.class
069:                    .getName());
070:            static final L10N L = new L10N(JdbcMessage.class);
071:
072:            private static final int MESSAGE = 0;
073:            private static final int TEXT = 1;
074:            private static final int BYTES = 2;
075:            private static final int STREAM = 3;
076:            private static final int OBJECT = 4;
077:            private static final int MAP = 5;
078:
079:            private final JdbcManager _jdbcManager;
080:            private DataSource _dataSource;
081:
082:            private String _messageTable;
083:            private String _messageSequence;
084:
085:            private boolean _isOracle;
086:
087:            public JdbcMessage(JdbcManager jdbcManager) {
088:                _jdbcManager = jdbcManager;
089:            }
090:
091:            /**
092:             * Initializes the JdbcMessage
093:             */
094:            public void init() throws ConfigException, SQLException {
095:                _messageTable = _jdbcManager.getMessageTable();
096:                _dataSource = _jdbcManager.getDataSource();
097:
098:                JdbcMetaData metaData = _jdbcManager.getMetaData();
099:
100:                _isOracle = metaData instanceof  OracleMetaData;
101:
102:                String longType = _jdbcManager.getLongType();
103:                String identity = longType + " PRIMARY KEY";
104:
105:                if (metaData.supportsIdentity())
106:                    identity = metaData.createIdentitySQL(identity);
107:                else
108:                    _messageSequence = _messageTable + "_cseq";
109:
110:                Connection conn = _dataSource.getConnection();
111:                try {
112:                    Statement stmt = conn.createStatement();
113:                    String sql = "SELECT 1 FROM " + _messageTable
114:                            + " WHERE 1=0";
115:
116:                    try {
117:                        ResultSet rs = stmt.executeQuery(sql);
118:                        rs.next();
119:                        rs.close();
120:                        stmt.close();
121:
122:                        return;
123:                    } catch (SQLException e) {
124:                        log.finest(e.toString());
125:                    }
126:
127:                    String blob = _jdbcManager.getBlob();
128:
129:                    log.info(L.l("creating JMS message table {0}",
130:                            _messageTable));
131:
132:                    sql = ("CREATE TABLE " + _messageTable + " (" + "  m_id "
133:                            + identity + "," + "  queue INTEGER NOT NULL,"
134:                            + "  conn VARCHAR(255)," + "  consumer " + longType
135:                            + "," + "  delivered INTEGER NOT NULL,"
136:                            + "  msg_type INTEGER NOT NULL," + "  expire "
137:                            + longType + " NOT NULL," + "  header " + blob
138:                            + "," + "  body " + blob + ")");
139:
140:                    if (_isOracle) {
141:                        String extent = "";
142:
143:                        if (_jdbcManager.getTablespace() != null) {
144:                            extent = " tablespace "
145:                                    + _jdbcManager.getTablespace();
146:                        }
147:
148:                        // oracle recommends using retention (over pctversion) for performance
149:                        // Oracle will keep deleted lobs for the retention time before
150:                        // releasing them (e.g. 900 seconds)
151:                        sql += (" LOB(header) STORE AS (cache retention"
152:                                + extent + ")");
153:                        sql += (" LOB(body) STORE AS (cache retention" + extent + ")");
154:                    }
155:
156:                    stmt.executeUpdate(sql);
157:
158:                    if (_messageSequence != null) {
159:                        stmt.executeUpdate(metaData.createSequenceSQL(
160:                                _messageSequence, 1));
161:                    }
162:                } finally {
163:                    conn.close();
164:                }
165:            }
166:
167:            /**
168:             * Sends the message to the queue.
169:             */
170:            public long send(Message message, int queue, long expireTime)
171:                    throws SQLException, IOException, JMSException {
172:                if (log.isLoggable(Level.FINE))
173:                    log.fine("jms jdbc queue:" + queue + " send message");
174:
175:                TempStream header = new TempStream();
176:                header.openWrite();
177:
178:                WriteStream ws = new WriteStream(header);
179:                writeMessageHeader(ws, message);
180:                ws.close();
181:
182:                TempStream body = null;
183:
184:                int type = MESSAGE;
185:
186:                if (message instanceof  TextMessage) {
187:                    TextMessage text = (TextMessage) message;
188:
189:                    type = TEXT;
190:
191:                    if (text.getText() != null) {
192:                        body = new TempStream();
193:                        body.openWrite();
194:
195:                        ws = new WriteStream(body);
196:                        ws.setEncoding("UTF-8");
197:                        ws.print(text.getText());
198:                        ws.close();
199:                    }
200:                } else if (message instanceof  BytesMessage) {
201:                    BytesMessage bytes = (BytesMessage) message;
202:
203:                    type = BYTES;
204:
205:                    body = writeBytes(bytes);
206:                } else if (message instanceof  StreamMessage) {
207:                    StreamMessage stream = (StreamMessage) message;
208:
209:                    type = STREAM;
210:
211:                    body = writeStream(stream);
212:                } else if (message instanceof  ObjectMessage) {
213:                    ObjectMessage obj = (ObjectMessage) message;
214:
215:                    type = OBJECT;
216:
217:                    body = writeObject(obj);
218:                } else if (message instanceof  MapMessage) {
219:                    MapMessage obj = (MapMessage) message;
220:
221:                    type = MAP;
222:
223:                    body = writeMap(obj);
224:                }
225:
226:                Connection conn = _dataSource.getConnection();
227:                try {
228:                    String sql;
229:
230:                    if (_messageSequence != null) {
231:                        sql = _jdbcManager.getMetaData().selectSequenceSQL(
232:                                _messageSequence);
233:
234:                        PreparedStatement pstmt = conn.prepareStatement(sql);
235:                        ;
236:
237:                        long mId = -1;
238:
239:                        ResultSet rs = pstmt.executeQuery();
240:                        if (rs.next())
241:                            mId = rs.getLong(1);
242:                        else
243:                            throw new RuntimeException("can't create message");
244:
245:                        sql = ("INSERT INTO "
246:                                + _messageTable
247:                                + "(m_id, queue, msg_type, expire, delivered, header, body) " + "VALUES (?,?,?,?,0,?,?)");
248:
249:                        pstmt = conn.prepareStatement(sql);
250:
251:                        int i = 1;
252:                        pstmt.setLong(i++, mId);
253:                        pstmt.setInt(i++, queue);
254:                        pstmt.setInt(i++, type);
255:                        pstmt.setLong(i++, expireTime);
256:
257:                        if (header.getLength() > 0)
258:                            pstmt.setBinaryStream(i++, header.openRead(),
259:                                    header.getLength());
260:                        else
261:                            pstmt.setNull(i++, Types.BINARY);
262:
263:                        if (body != null)
264:                            pstmt.setBinaryStream(i++, body.openRead(), body
265:                                    .getLength());
266:                        else
267:                            pstmt.setString(i++, "");
268:
269:                        pstmt.executeUpdate();
270:                    } else {
271:                        sql = ("INSERT INTO "
272:                                + _messageTable
273:                                + "(queue, msg_type, expire, delivered, header, body) " + "VALUES (?,?,?,0,?,?)");
274:                        PreparedStatement pstmt;
275:
276:                        pstmt = conn.prepareStatement(sql);
277:
278:                        int i = 1;
279:                        pstmt.setInt(i++, queue);
280:                        pstmt.setInt(i++, type);
281:                        pstmt.setLong(i++, expireTime);
282:                        pstmt.setBinaryStream(i++, header.openRead(), header
283:                                .getLength());
284:
285:                        if (body != null)
286:                            pstmt.setBinaryStream(i++, body.openRead(), body
287:                                    .getLength());
288:                        else
289:                            pstmt.setString(i++, "");
290:
291:                        pstmt.executeUpdate();
292:                    }
293:
294:                    return 0;
295:                } finally {
296:                    conn.close();
297:                }
298:            }
299:
300:            /**
301:             * Receives a message from the queue.
302:             */
303:            MessageImpl receive(int queue, int session) throws SQLException,
304:                    IOException, JMSException {
305:                long minId = -1;
306:
307:                Connection conn = _dataSource.getConnection();
308:                try {
309:                    String sql = ("SELECT m_id, msg_type, delivered, body, header"
310:                            + " FROM "
311:                            + _messageTable
312:                            + " WHERE ?<id AND queue=? AND consumer IS NULL" + " ORDER BY id");
313:
314:                    PreparedStatement selectStmt = conn.prepareStatement(sql);
315:
316:                    sql = ("UPDATE " + _messageTable
317:                            + " SET consumer=?, delivered=1 " + "WHERE m_id=? AND consumer IS NULL");
318:
319:                    PreparedStatement updateStmt = conn.prepareStatement(sql);
320:
321:                    long id = -1;
322:                    while (true) {
323:                        id = -1;
324:
325:                        selectStmt.setLong(1, minId);
326:                        selectStmt.setInt(2, queue);
327:
328:                        MessageImpl msg = null;
329:
330:                        ResultSet rs = selectStmt.executeQuery();
331:                        while (rs.next()) {
332:                            id = rs.getLong(1);
333:
334:                            minId = id;
335:
336:                            msg = readMessage(rs);
337:                        }
338:
339:                        rs.close();
340:
341:                        if (msg == null)
342:                            return null;
343:
344:                        updateStmt.setInt(1, session);
345:                        updateStmt.setLong(2, id);
346:
347:                        int updateCount = updateStmt.executeUpdate();
348:
349:                        if (updateCount == 1)
350:                            return msg;
351:                        else if (log.isLoggable(Level.FINE)) {
352:                            log.fine("JdbcMessageQueue[" + queue
353:                                    + "] can't update received message " + id
354:                                    + " for session " + session + ".");
355:                        }
356:                    }
357:                } finally {
358:                    conn.close();
359:                }
360:            }
361:
362:            /**
363:             * Acknowledges all received messages from the session.
364:             */
365:            void acknowledge(int session) throws SQLException {
366:                Connection conn = _dataSource.getConnection();
367:
368:                try {
369:                    String sql = ("DELETE FROM " + _messageTable + " " + "WHERE consumer=?");
370:
371:                    PreparedStatement pstmt;
372:                    pstmt = conn.prepareStatement(sql);
373:
374:                    pstmt.setInt(1, session);
375:
376:                    pstmt.executeUpdate();
377:
378:                    pstmt.close();
379:                } finally {
380:                    conn.close();
381:                }
382:            }
383:
384:            /**
385:             * Reads the message from the result stream.
386:             */
387:            MessageImpl readMessage(ResultSet rs) throws SQLException,
388:                    IOException, JMSException {
389:                int msgType = rs.getInt(2);
390:                boolean redelivered = rs.getInt(3) == 1;
391:
392:                MessageImpl msg;
393:
394:                switch (msgType) {
395:                case TEXT: {
396:                    InputStream is = rs.getBinaryStream(4);
397:
398:                    try {
399:                        msg = readTextMessage(is);
400:                    } finally {
401:                        if (is != null)
402:                            is.close();
403:                    }
404:                    break;
405:                }
406:
407:                case BYTES: {
408:                    InputStream is = rs.getBinaryStream(4);
409:
410:                    try {
411:                        msg = readBytesMessage(is);
412:                    } finally {
413:                        if (is != null)
414:                            is.close();
415:                    }
416:                    break;
417:                }
418:
419:                case STREAM: {
420:                    InputStream is = rs.getBinaryStream(4);
421:
422:                    try {
423:                        msg = readStreamMessage(is);
424:                    } finally {
425:                        if (is != null)
426:                            is.close();
427:                    }
428:                    break;
429:                }
430:
431:                case OBJECT: {
432:                    InputStream is = rs.getBinaryStream(4);
433:
434:                    try {
435:                        msg = readObjectMessage(is);
436:                    } finally {
437:                        if (is != null)
438:                            is.close();
439:                    }
440:                    break;
441:                }
442:
443:                case MAP: {
444:                    InputStream is = rs.getBinaryStream(4);
445:
446:                    try {
447:                        msg = readMapMessage(is);
448:                    } finally {
449:                        if (is != null)
450:                            is.close();
451:                    }
452:                    break;
453:                }
454:
455:                case MESSAGE:
456:                default: {
457:                    msg = new MessageImpl();
458:                    break;
459:                }
460:                }
461:
462:                InputStream is = rs.getBinaryStream(5);
463:
464:                if (is != null) {
465:                    try {
466:                        readMessageHeader(is, msg);
467:                    } finally {
468:                        is.close();
469:                    }
470:                }
471:
472:                msg.setJMSRedelivered(redelivered);
473:
474:                return msg;
475:            }
476:
477:            /**
478:             * Writes the message header for a Resin message.
479:             */
480:            private void writeMessageHeader(WriteStream ws, Message msg)
481:                    throws IOException, JMSException {
482:                Enumeration names = msg.getPropertyNames();
483:                CharBuffer cb = new CharBuffer();
484:
485:                while (names.hasMoreElements()) {
486:                    String name = (String) names.nextElement();
487:                    writeValue(ws, cb, name);
488:
489:                    String value = msg.getStringProperty(name);
490:                    writeValue(ws, cb, value);
491:                }
492:            }
493:
494:            /**
495:             * Writes a value to the output stream.
496:             */
497:            private void writeValue(WriteStream ws, CharBuffer cb, Object value)
498:                    throws IOException {
499:                if (value == null)
500:                    ws.write('N');
501:                else {
502:                    cb.clear();
503:                    cb.append(value);
504:                    int length = cb.length();
505:                    char[] buf = cb.getBuffer();
506:
507:                    ws.write('S');
508:                    ws.write(length >> 24);
509:                    ws.write(length >> 16);
510:                    ws.write(length >> 8);
511:                    ws.write(length);
512:
513:                    for (int i = 0; i < length; i++) {
514:                        int ch = buf[i];
515:
516:                        ws.write(ch >> 8);
517:                        ws.write(ch);
518:                    }
519:                }
520:            }
521:
522:            /**
523:             * Writes the bytes message.
524:             */
525:            private TempStream writeBytes(BytesMessage bytes)
526:                    throws IOException, JMSException {
527:                TempStream body = new TempStream();
528:                body.openWrite();
529:
530:                WriteStream ws = new WriteStream(body);
531:
532:                int data;
533:                //bytes.reset();
534:
535:                TempBuffer tb = TempBuffer.allocate();
536:                byte[] buffer = tb.getBuffer();
537:                int len;
538:
539:                while ((len = bytes.readBytes(buffer, buffer.length)) >= 0) {
540:                    ws.write(buffer, 0, len);
541:                }
542:
543:                TempBuffer.free(tb);
544:                tb = null;
545:
546:                ws.close();
547:
548:                return body;
549:            }
550:
551:            /**
552:             * Writes the stream message.
553:             */
554:            private TempStream writeStream(StreamMessage stream)
555:                    throws IOException, JMSException {
556:                TempStream body = new TempStream();
557:                body.openWrite();
558:
559:                WriteStream ws = new WriteStream(body);
560:                ObjectOutputStream out = new ObjectOutputStream(ws);
561:
562:                try {
563:                    while (true) {
564:                        Object data = stream.readObject();
565:
566:                        out.writeObject(data);
567:                    }
568:                } catch (MessageEOFException e) {
569:                }
570:
571:                out.close();
572:                ws.close();
573:
574:                return body;
575:            }
576:
577:            /**
578:             * Writes the object message.
579:             */
580:            private TempStream writeObject(ObjectMessage obj)
581:                    throws IOException, JMSException {
582:                TempStream body = new TempStream();
583:                body.openWrite();
584:
585:                WriteStream ws = new WriteStream(body);
586:                ObjectOutputStream out = new ObjectOutputStream(ws);
587:
588:                out.writeObject(obj.getObject());
589:
590:                out.close();
591:                ws.close();
592:
593:                return body;
594:            }
595:
596:            /**
597:             * Writes the map message.
598:             */
599:            private TempStream writeMap(MapMessage map) throws IOException,
600:                    JMSException {
601:                TempStream body = new TempStream();
602:                body.openWrite();
603:
604:                WriteStream ws = new WriteStream(body);
605:                ObjectOutputStream out = new ObjectOutputStream(ws);
606:
607:                try {
608:                    Enumeration e = map.getMapNames();
609:                    while (e.hasMoreElements()) {
610:                        String name = (String) e.nextElement();
611:                        out.writeUTF(name);
612:
613:                        Object data = map.getObject(name);
614:                        out.writeObject(data);
615:                    }
616:                } catch (MessageEOFException e) {
617:                }
618:
619:                out.close();
620:                ws.close();
621:
622:                return body;
623:            }
624:
625:            /**
626:             * Writes the message header for a Resin message.
627:             */
628:            private void readMessageHeader(InputStream is, Message msg)
629:                    throws IOException, JMSException {
630:                CharBuffer cb = new CharBuffer();
631:
632:                int type;
633:
634:                while ((type = is.read()) > 0) {
635:                    String name = (String) readValue(is, type, cb);
636:                    Object value = readValue(is, is.read(), cb);
637:
638:                    msg.setObjectProperty(name, value);
639:                }
640:            }
641:
642:            /**
643:             * Writes the message header for a Resin message.
644:             */
645:            private TextMessageImpl readTextMessage(InputStream is)
646:                    throws IOException, JMSException {
647:                TextMessageImpl text = new TextMessageImpl();
648:
649:                if (is == null)
650:                    return text;
651:
652:                ByteToChar byteToChar = ByteToChar.create();
653:
654:                int ch;
655:
656:                byteToChar.setEncoding("UTF-8");
657:                while ((ch = is.read()) >= 0) {
658:                    byteToChar.addByte(ch);
659:                }
660:
661:                text.setText(byteToChar.getConvertedString());
662:
663:                return text;
664:            }
665:
666:            /**
667:             * Reads a bytes message.
668:             */
669:            private BytesMessageImpl readBytesMessage(InputStream is)
670:                    throws IOException, JMSException {
671:                BytesMessageImpl bytes = new BytesMessageImpl();
672:
673:                if (is == null) {
674:                    bytes.reset();
675:
676:                    return bytes;
677:                }
678:
679:                int data;
680:
681:                while ((data = is.read()) >= 0) {
682:                    bytes.writeByte((byte) data);
683:                }
684:
685:                bytes.reset();
686:
687:                return bytes;
688:            }
689:
690:            /**
691:             * Reads a stream message.
692:             */
693:            private StreamMessageImpl readStreamMessage(InputStream is)
694:                    throws IOException, JMSException {
695:                StreamMessageImpl stream = new StreamMessageImpl();
696:
697:                if (is == null)
698:                    return stream;
699:
700:                ObjectInputStream in = new ContextLoaderObjectInputStream(is);
701:
702:                try {
703:                    while (true) {
704:                        Object obj = in.readObject();
705:
706:                        stream.writeObject(obj);
707:                    }
708:                } catch (EOFException e) {
709:                } catch (Exception e) {
710:                    throw new JMSExceptionWrapper(e);
711:                }
712:
713:                in.close();
714:
715:                stream.reset();
716:
717:                return stream;
718:            }
719:
720:            /**
721:             * Reads a map message.
722:             */
723:            private MapMessageImpl readMapMessage(InputStream is)
724:                    throws IOException, JMSException {
725:                MapMessageImpl map = new MapMessageImpl();
726:
727:                if (is == null)
728:                    return map;
729:
730:                ObjectInputStream in = new ContextLoaderObjectInputStream(is);
731:
732:                try {
733:                    while (true) {
734:                        String name = in.readUTF();
735:                        Object obj = in.readObject();
736:
737:                        map.setObject(name, obj);
738:                    }
739:                } catch (EOFException e) {
740:                } catch (Exception e) {
741:                    throw new JMSExceptionWrapper(e);
742:                }
743:
744:                in.close();
745:
746:                return map;
747:            }
748:
749:            /**
750:             * Reads an object message.
751:             */
752:            private ObjectMessageImpl readObjectMessage(InputStream is)
753:                    throws IOException, JMSException {
754:                ObjectMessageImpl msg = new ObjectMessageImpl();
755:
756:                if (is == null)
757:                    return msg;
758:
759:                ObjectInputStream in = new ContextLoaderObjectInputStream(is);
760:
761:                try {
762:                    Object obj = in.readObject();
763:                    msg.setObject((java.io.Serializable) obj);
764:                } catch (IOException e) {
765:                    throw e;
766:                } catch (Exception e) {
767:                    throw new JMSExceptionWrapper(e);
768:                }
769:
770:                in.close();
771:
772:                return msg;
773:            }
774:
775:            /**
776:             * Writes a value to the output stream.
777:             */
778:            private Object readValue(InputStream is, int type, CharBuffer cb)
779:                    throws IOException {
780:                switch (type) {
781:                case 'N':
782:                    return null;
783:                case 'S': {
784:                    cb.clear();
785:                    int length = readInt(is);
786:
787:                    for (int i = 0; i < length; i++) {
788:                        char ch = (char) ((is.read() << 8) + is.read());
789:
790:                        cb.append(ch);
791:                    }
792:
793:                    return cb.toString();
794:                }
795:                default:
796:                    throw new IOException(L.l("unknown header type"));
797:                }
798:            }
799:
800:            /**
801:             * Reads an integer value.
802:             */
803:            private int readInt(InputStream is) throws IOException {
804:                return ((is.read() << 24) + (is.read() << 16)
805:                        + (is.read() << 8) + (is.read()));
806:            }
807:
808:            /**
809:             * Removes the first message matching the selector.
810:             */
811:            private boolean hasMessage(Selector selector) throws JMSException {
812:                return false;
813:            }
814:
815:            /**
816:             * Returns a printable view of the queue.
817:             */
818:            public String toString() {
819:                return "JdbcMessage[" + _messageTable + "]";
820:            }
821:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.