Source Code Cross Referenced for JMSUtils.java in  » ESB » synapse » org » apache » synapse » transport » jms » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » ESB » synapse » org.apache.synapse.transport.jms 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * Copyright 2004,2005 The Apache Software Foundation.
003:         *
004:         * Licensed under the Apache License, Version 2.0 (the "License");
005:         * you may not use this file except in compliance with the License.
006:         * You may obtain a copy of the License at
007:         *
008:         *      http://www.apache.org/licenses/LICENSE-2.0
009:         *
010:         * Unless required by applicable law or agreed to in writing, software
011:         * distributed under the License is distributed on an "AS IS" BASIS,
012:         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013:         * See the License for the specific language governing permissions and
014:         * limitations under the License.
015:         */
016:        package org.apache.synapse.transport.jms;
017:
018:        import org.apache.axiom.om.OMOutputFormat;
019:        import org.apache.axiom.om.OMText;
020:        import org.apache.axiom.om.OMElement;
021:        import org.apache.axiom.om.util.StAXUtils;
022:        import org.apache.axiom.om.impl.builder.StAXBuilder;
023:        import org.apache.axiom.om.impl.builder.StAXOMBuilder;
024:        import org.apache.axiom.om.impl.llom.OMTextImpl;
025:        import org.apache.axiom.soap.SOAP11Constants;
026:        import org.apache.axiom.soap.SOAP12Constants;
027:        import org.apache.axiom.soap.SOAPEnvelope;
028:        import org.apache.axiom.soap.SOAPFactory;
029:        import org.apache.axiom.soap.impl.llom.soap11.SOAP11Factory;
030:        import org.apache.axiom.attachments.ByteArrayDataSource;
031:        import org.apache.axis2.AxisFault;
032:        import org.apache.axis2.Constants;
033:        import org.apache.axis2.builder.BuilderUtil;
034:        import org.apache.axis2.context.MessageContext;
035:        import org.apache.axis2.description.AxisService;
036:        import org.apache.axis2.description.Parameter;
037:        import org.apache.axis2.description.AxisOperation;
038:        import org.apache.axis2.description.ParameterIncludeImpl;
039:        import org.apache.axis2.transport.http.HTTPTransportUtils;
040:        import org.apache.synapse.transport.base.BaseUtils;
041:        import org.apache.synapse.transport.base.BaseConstants;
042:        import org.apache.commons.logging.Log;
043:        import org.apache.commons.logging.LogFactory;
044:
045:        import javax.jms.*;
046:        import javax.jms.Queue;
047:        import javax.xml.stream.XMLStreamException;
048:        import javax.xml.namespace.QName;
049:        import javax.activation.DataHandler;
050:        import javax.naming.Context;
051:        import java.io.*;
052:        import java.util.*;
053:        import java.nio.ByteBuffer;
054:
055:        /**
056:         * Miscallaneous methods used for the JMS transport
057:         */
058:        public class JMSUtils extends BaseUtils {
059:
060:            private static final Log log = LogFactory.getLog(JMSUtils.class);
061:
062:            private static BaseUtils _instance = new JMSUtils();
063:
064:            public static BaseUtils getInstace() {
065:                return _instance;
066:            }
067:
068:            /**
069:             * Create a JMS Queue using the given connection with the JNDI destination name, and return the
070:             * JMS Destination name of the created queue
071:             *
072:             * @param con the JMS Connection to be used
073:             * @param destinationJNDIName the JNDI name of the Queue to be created
074:             * @return the JMS Destination name of the created Queue
075:             * @throws JMSException on error
076:             */
077:            public static String createJMSQueue(Connection con,
078:                    String destinationJNDIName) throws JMSException {
079:                try {
080:                    QueueSession session = ((QueueConnection) con)
081:                            .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
082:                    Queue queue = session.createQueue(destinationJNDIName);
083:                    log.info("JMS Queue with JNDI name : "
084:                            + destinationJNDIName + " created");
085:                    return queue.getQueueName();
086:
087:                } finally {
088:                    try {
089:                        con.close();
090:                    } catch (JMSException ignore) {
091:                    }
092:                }
093:            }
094:
095:            /**
096:             * Create a JMS Topic using the given connection with the JNDI destination name, and return the
097:             * JMS Destination name of the created queue
098:             *
099:             * @param con the JMS Connection to be used
100:             * @param destinationJNDIName the JNDI name of the Topic to be created
101:             * @return the JMS Destination name of the created Topic
102:             * @throws JMSException on error
103:             */
104:            public static String createJMSTopic(Connection con,
105:                    String destinationJNDIName) throws JMSException {
106:                try {
107:                    TopicSession session = ((TopicConnection) con)
108:                            .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
109:                    Topic topic = session.createTopic(destinationJNDIName);
110:                    log.info("JMS Topic with JNDI name : "
111:                            + destinationJNDIName + " created");
112:                    return topic.getTopicName();
113:
114:                } finally {
115:                    try {
116:                        con.close();
117:                    } catch (JMSException ignore) {
118:                    }
119:                }
120:            }
121:
122:            /**
123:             * Should this service be enabled over the JMS transport?
124:             *
125:             * @param service the Axis service
126:             * @return true if JMS should be enabled
127:             */
128:            public static boolean isJMSService(AxisService service) {
129:                if (service.isEnableAllTransports()) {
130:                    return true;
131:
132:                } else {
133:                    List transports = service.getExposedTransports();
134:                    for (int i = 0; i < transports.size(); i++) {
135:                        if (JMSListener.TRANSPORT_NAME
136:                                .equals(transports.get(i))) {
137:                            return true;
138:                        }
139:                    }
140:                }
141:                return false;
142:            }
143:
144:            /**
145:             * Get the JMS destination used by this service
146:             *
147:             * @param service the Axis Service
148:             * @return the name of the JMS destination
149:             */
150:            public static String getJNDIDestinationNameForService(
151:                    AxisService service) {
152:                Parameter destParam = service
153:                        .getParameter(JMSConstants.DEST_PARAM);
154:                if (destParam != null) {
155:                    return (String) destParam.getValue();
156:                } else {
157:                    return service.getName();
158:                }
159:            }
160:
161:            /**
162:             * Get the JMS destination type of this service
163:             *
164:             * @param service the Axis Service
165:             * @return the name of the JMS destination
166:             */
167:            public static String getDestinationTypeForService(
168:                    AxisService service) {
169:                Parameter destTypeParam = service
170:                        .getParameter(JMSConstants.DEST_PARAM_TYPE);
171:                if (destTypeParam != null) {
172:                    String paramValue = (String) destTypeParam.getValue();
173:                    if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(paramValue)
174:                            || JMSConstants.DESTINATION_TYPE_TOPIC
175:                                    .equals(paramValue)) {
176:                        return paramValue;
177:                    } else {
178:                        handleException("Invalid destinaton type value "
179:                                + paramValue);
180:                        return null;
181:                    }
182:                } else {
183:                    log.debug("JMS destination type not given. default queue");
184:                    return JMSConstants.DESTINATION_TYPE_QUEUE;
185:                }
186:            }
187:
188:            /**
189:             * Extract connection factory properties from a given URL
190:             *
191:             * @param url a JMS URL of the form jms:/<destination>?[<key>=<value>&]*
192:             * @return a Hashtable of extracted properties
193:             */
194:            public static Hashtable getProperties(String url) {
195:                Hashtable h = new Hashtable();
196:                int propPos = url.indexOf("?");
197:                if (propPos != -1) {
198:                    StringTokenizer st = new StringTokenizer(url
199:                            .substring(propPos + 1), "&");
200:                    while (st.hasMoreTokens()) {
201:                        String token = st.nextToken();
202:                        int sep = token.indexOf("=");
203:                        if (sep != -1) {
204:                            h.put(token.substring(0, sep), token
205:                                    .substring(sep + 1));
206:                        } else {
207:                            continue; // ignore, what else can we do?
208:                        }
209:                    }
210:                }
211:                return h;
212:            }
213:
214:            /**
215:             * Get the EPR for the given JMS connection factory and destination
216:             * the form of the URL is
217:             * jms:/<destination>?[<key>=<value>&]*
218:             *
219:             * @param cf          the Axis2 JMS connection factory
220:             * @param destination the JNDI name of the destination
221:             * @return the EPR as a String
222:             */
223:            static String getEPR(JMSConnectionFactory cf, String destination) {
224:                StringBuffer sb = new StringBuffer();
225:                sb.append(JMSConstants.JMS_PREFIX).append(destination);
226:                sb.append("?").append(JMSConstants.CONFAC_JNDI_NAME_PARAM)
227:                        .append("=").append(cf.getConnFactoryJNDIName());
228:                Iterator props = cf.getJndiProperties().keySet().iterator();
229:                while (props.hasNext()) {
230:                    String key = (String) props.next();
231:                    String value = (String) cf.getJndiProperties().get(key);
232:                    sb.append("&").append(key).append("=").append(value);
233:                }
234:                return sb.toString();
235:            }
236:
237:            /**
238:             * Get a String property from the JMS message
239:             *
240:             * @param message  JMS message
241:             * @param property property name
242:             * @return property value
243:             */
244:            public String getProperty(Object message, String property) {
245:                try {
246:                    return ((Message) message).getStringProperty(property);
247:                } catch (JMSException e) {
248:                    return null;
249:                }
250:            }
251:
252:            /**
253:             * Return the destination name from the given URL
254:             *
255:             * @param url the URL
256:             * @return the destination name
257:             */
258:            public static String getDestination(String url) {
259:                String tempUrl = url
260:                        .substring(JMSConstants.JMS_PREFIX.length());
261:                int propPos = tempUrl.indexOf("?");
262:
263:                if (propPos == -1) {
264:                    return tempUrl;
265:                } else {
266:                    return tempUrl.substring(0, propPos);
267:                }
268:            }
269:
270:            /**
271:             * Set JNDI properties and any other connection factory parameters to the connection factory
272:             * passed in, looing at the parameter in axis2.xml
273:             * @param param the axis parameter that holds the connection factory settings
274:             * @param jmsConFactory the JMS connection factory to which the parameters should be applied
275:             */
276:            public static void setConnectionFactoryParameters(Parameter param,
277:                    JMSConnectionFactory jmsConFactory) {
278:
279:                ParameterIncludeImpl pi = new ParameterIncludeImpl();
280:                try {
281:                    pi.deserializeParameters((OMElement) param.getValue());
282:                } catch (AxisFault axisFault) {
283:                    log.error(
284:                            "Error reading parameters for JMS connection factory"
285:                                    + jmsConFactory.getName(), axisFault);
286:                }
287:
288:                Iterator params = pi.getParameters().iterator();
289:                while (params.hasNext()) {
290:
291:                    Parameter p = (Parameter) params.next();
292:
293:                    if (JMSConstants.CONFAC_TYPE.equals(p.getName())) {
294:                        String connectionFactoryType = (String) p.getValue();
295:                        jmsConFactory
296:                                .setConnectionFactoryType(connectionFactoryType);
297:
298:                    } else if (JMSConstants.RECONNECT_TIMEOUT.equals(p
299:                            .getName())) {
300:                        String strTimeout = (String) p.getValue();
301:                        int reconnectTimeoutSeconds = Integer
302:                                .parseInt(strTimeout);
303:                        long reconnectTimeoutMillis = reconnectTimeoutSeconds * 1000;
304:                        jmsConFactory
305:                                .setReconnectTimeout(reconnectTimeoutMillis);
306:
307:                    } else if (Context.INITIAL_CONTEXT_FACTORY.equals(p
308:                            .getName())) {
309:                        jmsConFactory.addJNDIContextProperty(
310:                                Context.INITIAL_CONTEXT_FACTORY, (String) p
311:                                        .getValue());
312:                    } else if (Context.PROVIDER_URL.equals(p.getName())) {
313:                        jmsConFactory.addJNDIContextProperty(
314:                                Context.PROVIDER_URL, (String) p.getValue());
315:                    } else if (Context.SECURITY_PRINCIPAL.equals(p.getName())) {
316:                        jmsConFactory.addJNDIContextProperty(
317:                                Context.SECURITY_PRINCIPAL, (String) p
318:                                        .getValue());
319:                    } else if (Context.SECURITY_CREDENTIALS.equals(p.getName())) {
320:                        jmsConFactory.addJNDIContextProperty(
321:                                Context.SECURITY_CREDENTIALS, (String) p
322:                                        .getValue());
323:                    } else if (JMSConstants.CONFAC_JNDI_NAME_PARAM.equals(p
324:                            .getName())) {
325:                        jmsConFactory.setConnFactoryJNDIName((String) p
326:                                .getValue());
327:                        jmsConFactory.addJNDIContextProperty(
328:                                JMSConstants.CONFAC_JNDI_NAME_PARAM, (String) p
329:                                        .getValue());
330:                    }
331:                }
332:            }
333:
334:            /**
335:             * Get an InputStream to the JMS message payload
336:             *
337:             * @param message the JMS message
338:             * @return an InputStream to the payload
339:             */
340:            public InputStream getInputStream(Object message) {
341:
342:                try {
343:                    if (message instanceof  BytesMessage) {
344:                        byte[] buffer = new byte[1024];
345:                        ByteArrayOutputStream out = new ByteArrayOutputStream();
346:
347:                        BytesMessage byteMsg = (BytesMessage) message;
348:                        for (int bytesRead = byteMsg.readBytes(buffer); bytesRead != -1; bytesRead = byteMsg
349:                                .readBytes(buffer)) {
350:                            out.write(buffer, 0, bytesRead);
351:                        }
352:                        return new ByteArrayInputStream(out.toByteArray());
353:
354:                    } else if (message instanceof  TextMessage) {
355:                        TextMessage txtMsg = (TextMessage) message;
356:                        String contentType = getProperty(txtMsg,
357:                                BaseConstants.CONTENT_TYPE);
358:
359:                        if (contentType != null) {
360:                            return new ByteArrayInputStream(
361:                                    txtMsg
362:                                            .getText()
363:                                            .getBytes(
364:                                                    BuilderUtil
365:                                                            .getCharSetEncoding(contentType)));
366:                        } else {
367:                            return new ByteArrayInputStream(txtMsg.getText()
368:                                    .getBytes());
369:                        }
370:
371:                    } else {
372:                        handleException("Unsupported JMS message type : "
373:                                + message.getClass().getName());
374:                    }
375:
376:                } catch (JMSException e) {
377:                    handleException("JMS Exception reading message payload", e);
378:                } catch (UnsupportedEncodingException e) {
379:                    handleException(
380:                            "Encoding exception getting InputStream into message",
381:                            e);
382:                }
383:                return null;
384:            }
385:
386:            /**
387:             * Set the JMS ReplyTo for the message
388:             *
389:             * @param replyDestination the JMS Destination where the reply is expected
390:             * @param session the session to use to create a temp Queue if a response is expected
391:             * but a Destination has not been specified
392:             * @param message the JMS message where the final Destinatio would be set as the JMS ReplyTo
393:             * @return the JMS ReplyTo Destination for the message
394:             */
395:            public static Destination setReplyDestination(
396:                    Destination replyDestination, Session session,
397:                    Message message) {
398:                if (replyDestination == null) {
399:                    try {
400:                        // create temporary queue to receive the reply
401:                        replyDestination = createTemporaryDestination(session);
402:                    } catch (JMSException e) {
403:                        handleException("Error creating temporary queue for response");
404:                    }
405:                }
406:
407:                try {
408:                    message.setJMSReplyTo(replyDestination);
409:                } catch (JMSException e) {
410:                    log.warn("Error setting JMS ReplyTo destination to : "
411:                            + replyDestination, e);
412:                }
413:
414:                if (log.isDebugEnabled()) {
415:                    try {
416:                        log
417:                                .debug("Expecting a response to JMS Destination : "
418:                                        + (replyDestination instanceof  Queue ? ((Queue) replyDestination)
419:                                                .getQueueName()
420:                                                : ((Topic) replyDestination)
421:                                                        .getTopicName()));
422:                    } catch (JMSException ignore) {
423:                    }
424:                }
425:                return replyDestination;
426:            }
427:
428:            /**
429:             * When trying to send a message to a destination, if it does not exist, try to create it
430:             *
431:             * @param destination the JMS destination to send messages
432:             * @param targetAddress the target JMS EPR to find the Destination to be created if required
433:             * @param session the JMS session to use
434:             * @return the JMS Destination where messages could be posted
435:             * @throws AxisFault if the target Destination does not exist and cannot be created
436:             */
437:            public static Destination createDestinationIfRequired(
438:                    Destination destination, String destinationType,
439:                    String targetAddress, Session session) throws AxisFault {
440:                if (destination == null) {
441:                    if (targetAddress != null) {
442:                        String name = JMSUtils.getDestination(targetAddress);
443:                        if (log.isDebugEnabled()) {
444:                            log.debug("Creating JMS Destination : " + name);
445:                        }
446:
447:                        try {
448:                            destination = createDestination(session, name,
449:                                    destinationType);
450:                        } catch (JMSException e) {
451:                            handleException(
452:                                    "Error creating destination Queue : "
453:                                            + name, e);
454:                        }
455:                    } else {
456:                        handleException("Cannot send reply to null JMS Destination");
457:                    }
458:                }
459:                return destination;
460:            }
461:
462:            /**
463:             * Send the given message to the Destination using the given session
464:             * @param session the session to use to send
465:             * @param destination the Destination
466:             * @param message the JMS Message
467:             * @throws AxisFault on error
468:             */
469:            public static void sendMessageToJMSDestination(Session session,
470:                    Destination destination, String destinationType,
471:                    Message message) throws AxisFault {
472:
473:                MessageProducer producer = null;
474:                try {
475:                    if (log.isDebugEnabled()) {
476:                        log.debug("Sending message to destination : "
477:                                + destination);
478:                    }
479:
480:                    if (JMSConstants.DESTINATION_TYPE_TOPIC
481:                            .equals(destinationType)) {
482:                        producer = ((TopicSession) session)
483:                                .createPublisher((Topic) destination);
484:                        ((TopicPublisher) producer).publish(message);
485:                    } else {
486:                        producer = ((QueueSession) session)
487:                                .createSender((Queue) destination);
488:                        ((QueueSender) producer).send(message);
489:                    }
490:
491:                    if (log.isDebugEnabled()) {
492:                        log.debug("Sent message to destination : "
493:                                + destination + "\nMessage ID : "
494:                                + message.getJMSMessageID()
495:                                + "\nCorrelation ID : "
496:                                + message.getJMSCorrelationID()
497:                                + "\nReplyTo ID : " + message.getJMSReplyTo());
498:                    }
499:
500:                } catch (JMSException e) {
501:                    handleException(
502:                            "Error creating a producer or sending to : "
503:                                    + destination, e);
504:                } finally {
505:                    if (producer != null) {
506:                        try {
507:                            producer.close();
508:                        } catch (JMSException ignore) {
509:                        }
510:                    }
511:                }
512:            }
513:
514:            /**
515:             * Set transport headers from the axis message context, into the JMS message
516:             *
517:             * @param msgContext the axis message context
518:             * @param message the JMS Message
519:             * @throws JMSException on exception
520:             */
521:            public static void setTransportHeaders(MessageContext msgContext,
522:                    Message message) throws JMSException {
523:
524:                Map headerMap = (Map) msgContext
525:                        .getProperty(MessageContext.TRANSPORT_HEADERS);
526:
527:                if (headerMap == null) {
528:                    return;
529:                }
530:
531:                Iterator iter = headerMap.keySet().iterator();
532:                while (iter.hasNext()) {
533:
534:                    String name = (String) iter.next();
535:
536:                    if (JMSConstants.JMS_COORELATION_ID.equals(name)) {
537:                        message.setJMSCorrelationID((String) headerMap
538:                                .get(JMSConstants.JMS_COORELATION_ID));
539:                    } else if (JMSConstants.JMS_DELIVERY_MODE.equals(name)) {
540:                        Object o = headerMap
541:                                .get(JMSConstants.JMS_DELIVERY_MODE);
542:                        if (o instanceof  Integer) {
543:                            message
544:                                    .setJMSDeliveryMode(((Integer) o)
545:                                            .intValue());
546:                        } else if (o instanceof  String) {
547:                            try {
548:                                message.setJMSDeliveryMode(Integer
549:                                        .parseInt((String) o));
550:                            } catch (NumberFormatException nfe) {
551:                                log.warn(
552:                                        "Invalid delivery mode ignored : " + o,
553:                                        nfe);
554:                            }
555:                        } else {
556:                            log.warn("Invalid delivery mode ignored : " + o);
557:                        }
558:                    } else if (JMSConstants.JMS_EXPIRATION.equals(name)) {
559:                        message.setJMSExpiration(Long
560:                                .parseLong((String) headerMap
561:                                        .get(JMSConstants.JMS_EXPIRATION)));
562:                    } else if (JMSConstants.JMS_MESSAGE_ID.equals(name)) {
563:                        message.setJMSMessageID((String) headerMap
564:                                .get(JMSConstants.JMS_MESSAGE_ID));
565:                    } else if (JMSConstants.JMS_PRIORITY.equals(name)) {
566:                        message.setJMSPriority(Integer
567:                                .parseInt((String) headerMap
568:                                        .get(JMSConstants.JMS_PRIORITY)));
569:                    } else if (JMSConstants.JMS_TIMESTAMP.equals(name)) {
570:                        message.setJMSTimestamp(Long
571:                                .parseLong((String) headerMap
572:                                        .get(JMSConstants.JMS_TIMESTAMP)));
573:                    } else if (JMSConstants.JMS_MESSAGE_TYPE.equals(name)) {
574:                        message.setJMSType((String) headerMap
575:                                .get(JMSConstants.JMS_MESSAGE_TYPE));
576:                    } else {
577:                        Object value = headerMap.get(name);
578:                        if (value instanceof  String) {
579:                            message.setStringProperty(name, (String) value);
580:                        } else if (value instanceof  Boolean) {
581:                            message.setBooleanProperty(name, ((Boolean) value)
582:                                    .booleanValue());
583:                        } else if (value instanceof  Integer) {
584:                            message.setIntProperty(name, ((Integer) value)
585:                                    .intValue());
586:                        } else if (value instanceof  Long) {
587:                            message.setLongProperty(name, ((Long) value)
588:                                    .longValue());
589:                        } else if (value instanceof  Double) {
590:                            message.setDoubleProperty(name, ((Double) value)
591:                                    .doubleValue());
592:                        } else if (value instanceof  Float) {
593:                            message.setFloatProperty(name, ((Float) value)
594:                                    .floatValue());
595:                        }
596:                    }
597:                }
598:            }
599:
600:            /**
601:             * Read the transport headers from the JMS Message and set them to the axis2 message context
602:             *
603:             * @param message the JMS Message received
604:             * @param responseMsgCtx the axis message context
605:             * @throws AxisFault on error
606:             */
607:            public static void loadTransportHeaders(Message message,
608:                    MessageContext responseMsgCtx) throws AxisFault {
609:                responseMsgCtx.setProperty(MessageContext.TRANSPORT_HEADERS,
610:                        getTransportHeaders(message));
611:            }
612:
613:            /**
614:             * Extract transport level headers for JMS from the given message into a Map
615:             *
616:             * @param message the JMS message
617:             * @return a Map of the transport headers
618:             */
619:            public static Map getTransportHeaders(Message message) {
620:                // create a Map to hold transport headers
621:                Map map = new HashMap();
622:
623:                // correlation ID
624:                try {
625:                    if (message.getJMSCorrelationID() != null) {
626:                        map.put(JMSConstants.JMS_COORELATION_ID, message
627:                                .getJMSCorrelationID());
628:                    }
629:                } catch (JMSException ignore) {
630:                }
631:
632:                // set the delivery mode as persistent or not
633:                try {
634:                    map.put(JMSConstants.JMS_DELIVERY_MODE, Integer
635:                            .toString(message.getJMSDeliveryMode()));
636:                } catch (JMSException ignore) {
637:                }
638:
639:                // destination name
640:                try {
641:                    if (message.getJMSDestination() != null) {
642:                        Destination dest = message.getJMSDestination();
643:                        map.put(JMSConstants.JMS_DESTINATION,
644:                                dest instanceof  Queue ? ((Queue) dest)
645:                                        .getQueueName() : ((Topic) dest)
646:                                        .getTopicName());
647:                    }
648:                } catch (JMSException ignore) {
649:                }
650:
651:                // expiration
652:                try {
653:                    map.put(JMSConstants.JMS_EXPIRATION, Long.toString(message
654:                            .getJMSExpiration()));
655:                } catch (JMSException ignore) {
656:                }
657:
658:                // if a JMS message ID is found
659:                try {
660:                    if (message.getJMSMessageID() != null) {
661:                        map.put(JMSConstants.JMS_MESSAGE_ID, message
662:                                .getJMSMessageID());
663:                    }
664:                } catch (JMSException ignore) {
665:                }
666:
667:                // priority
668:                try {
669:                    map.put(JMSConstants.JMS_PRIORITY, Long.toString(message
670:                            .getJMSPriority()));
671:                } catch (JMSException ignore) {
672:                }
673:
674:                // redelivered
675:                try {
676:                    map.put(JMSConstants.JMS_REDELIVERED, Boolean
677:                            .toString(message.getJMSRedelivered()));
678:                } catch (JMSException ignore) {
679:                }
680:
681:                // replyto destination name
682:                try {
683:                    if (message.getJMSReplyTo() != null) {
684:                        Destination dest = message.getJMSReplyTo();
685:                        map.put(JMSConstants.JMS_REPLY_TO,
686:                                dest instanceof  Queue ? ((Queue) dest)
687:                                        .getQueueName() : ((Topic) dest)
688:                                        .getTopicName());
689:                    }
690:                } catch (JMSException ignore) {
691:                }
692:
693:                // priority
694:                try {
695:                    map.put(JMSConstants.JMS_TIMESTAMP, Long.toString(message
696:                            .getJMSTimestamp()));
697:                } catch (JMSException ignore) {
698:                }
699:
700:                // message type
701:                try {
702:                    if (message.getJMSType() != null) {
703:                        map.put(JMSConstants.JMS_TYPE, message.getJMSType());
704:                    }
705:                } catch (JMSException ignore) {
706:                }
707:
708:                // any other transport properties / headers
709:                Enumeration e = null;
710:                try {
711:                    e = message.getPropertyNames();
712:                } catch (JMSException ignore) {
713:                }
714:
715:                if (e != null) {
716:                    while (e.hasMoreElements()) {
717:                        String headerName = (String) e.nextElement();
718:                        try {
719:                            map.put(headerName, message
720:                                    .getStringProperty(headerName));
721:                            continue;
722:                        } catch (JMSException ignore) {
723:                        }
724:                        try {
725:                            map.put(headerName, Boolean.valueOf(message
726:                                    .getBooleanProperty(headerName)));
727:                            continue;
728:                        } catch (JMSException ignore) {
729:                        }
730:                        try {
731:                            map.put(headerName, new Integer(message
732:                                    .getIntProperty(headerName)));
733:                            continue;
734:                        } catch (JMSException ignore) {
735:                        }
736:                        try {
737:                            map.put(headerName, new Long(message
738:                                    .getLongProperty(headerName)));
739:                            continue;
740:                        } catch (JMSException ignore) {
741:                        }
742:                        try {
743:                            map.put(headerName, new Double(message
744:                                    .getDoubleProperty(headerName)));
745:                            continue;
746:                        } catch (JMSException ignore) {
747:                        }
748:                        try {
749:                            map.put(headerName, new Float(message
750:                                    .getFloatProperty(headerName)));
751:                            continue;
752:                        } catch (JMSException ignore) {
753:                        }
754:                    }
755:                }
756:
757:                return map;
758:            }
759:
760:            public String getMessageTextPayload(Object message) {
761:                if (message instanceof  TextMessage) {
762:                    try {
763:                        return ((TextMessage) message).getText();
764:                    } catch (JMSException e) {
765:                        handleException(
766:                                "Error reading JMS text message payload", e);
767:                    }
768:                }
769:                return null;
770:            }
771:
772:            public byte[] getMessageBinaryPayload(Object message) {
773:
774:                if (message instanceof  BytesMessage) {
775:                    BytesMessage bytesMessage = (BytesMessage) message;
776:
777:                    try {
778:                        bytesMessage.reset();
779:
780:                        byte[] buffer = new byte[1024];
781:                        ByteArrayOutputStream out = new ByteArrayOutputStream();
782:
783:                        for (int bytesRead = bytesMessage.readBytes(buffer); bytesRead != -1; bytesRead = bytesMessage
784:                                .readBytes(buffer)) {
785:                            out.write(buffer, 0, bytesRead);
786:                        }
787:                        return out.toByteArray();
788:
789:                    } catch (JMSException e) {
790:                        handleException(
791:                                "Error reading JMS binary message payload", e);
792:                    }
793:                }
794:                return null;
795:            }
796:
797:            // ----------- JMS 1.0.2b compatibility methods -------------
798:            public static Connection createConnection(
799:                    ConnectionFactory conFactory, String user, String pass,
800:                    String destinationType) throws JMSException {
801:
802:                if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(destinationType)) {
803:                    if (user != null && pass != null) {
804:                        return ((QueueConnectionFactory) conFactory)
805:                                .createQueueConnection(user, pass);
806:                    } else {
807:                        return ((QueueConnectionFactory) conFactory)
808:                                .createQueueConnection();
809:                    }
810:
811:                } else if (JMSConstants.DESTINATION_TYPE_TOPIC
812:                        .equals(destinationType)) {
813:                    if (user != null && pass != null) {
814:                        return ((TopicConnectionFactory) conFactory)
815:                                .createTopicConnection(user, pass);
816:                    } else {
817:                        return ((TopicConnectionFactory) conFactory)
818:                                .createTopicConnection();
819:                    }
820:                } else {
821:                    handleException("Unable to determine type of JMS Connection Factory - i.e Queue/Topic");
822:                }
823:                return null;
824:            }
825:
826:            public static Session createSession(Connection con,
827:                    boolean transacted, int acknowledgeMode,
828:                    String destinationType) throws JMSException {
829:
830:                if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(destinationType)) {
831:                    return ((QueueConnection) con).createQueueSession(
832:                            transacted, acknowledgeMode);
833:                } else if (JMSConstants.DESTINATION_TYPE_TOPIC
834:                        .equals(destinationType)) {
835:                    return ((TopicConnection) con).createTopicSession(
836:                            transacted, acknowledgeMode);
837:                } else {
838:                    log
839:                            .debug("JMS destination type not given or invalid. default queue. was "
840:                                    + destinationType);
841:                    return ((QueueConnection) con).createQueueSession(
842:                            transacted, acknowledgeMode);
843:                }
844:            }
845:
846:            public static Destination createDestination(Session session,
847:                    String destName, String destinationType)
848:                    throws JMSException {
849:
850:                if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(destinationType)) {
851:                    return ((QueueSession) session).createQueue(destName);
852:                } else if (JMSConstants.DESTINATION_TYPE_TOPIC
853:                        .equals(destinationType)) {
854:                    return ((TopicSession) session).createTopic(destName);
855:                } else {
856:                    log
857:                            .debug("JMS destination type not given or invalid. default queue. was "
858:                                    + destinationType);
859:                    return ((QueueSession) session).createQueue(destName);
860:                }
861:            }
862:
863:            public static void createDestination(ConnectionFactory conFactory,
864:                    String destinationJNDIName, String destinationType)
865:                    throws JMSException {
866:
867:                if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(destinationType)) {
868:                    JMSUtils.createJMSQueue(
869:                            ((QueueConnectionFactory) conFactory)
870:                                    .createQueueConnection(),
871:                            destinationJNDIName);
872:                } else if (JMSConstants.DESTINATION_TYPE_TOPIC
873:                        .equals(destinationType)) {
874:                    JMSUtils.createJMSTopic(
875:                            ((TopicConnectionFactory) conFactory)
876:                                    .createTopicConnection(),
877:                            destinationJNDIName);
878:                }
879:            }
880:
881:            public static MessageConsumer createConsumer(Session session,
882:                    Destination dest) throws JMSException {
883:
884:                if (dest instanceof  Queue) {
885:                    return ((QueueSession) session)
886:                            .createReceiver((Queue) dest);
887:                } else {
888:                    return ((TopicSession) session)
889:                            .createSubscriber((Topic) dest);
890:                }
891:            }
892:
893:            public static Destination createTemporaryDestination(Session session)
894:                    throws JMSException {
895:
896:                if (session instanceof  QueueSession) {
897:                    return ((QueueSession) session).createTemporaryQueue();
898:                } else {
899:                    return ((TopicSession) session).createTemporaryTopic();
900:                }
901:            }
902:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.