001: /*
002: * $Id: ActiveMQJmsConnector.java 10803 2008-02-14 13:31:25Z holger $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.transport.jms.activemq;
012:
013: import org.mule.transport.ConnectException;
014: import org.mule.transport.jms.JmsConnector;
015: import org.mule.transport.jms.xa.ConnectionInvocationHandler;
016: import org.mule.util.ClassUtils;
017:
018: import java.lang.reflect.Method;
019: import java.lang.reflect.Proxy;
020:
021: import javax.jms.Connection;
022: import javax.jms.ConnectionFactory;
023:
024: /**
025: * ActiveMQ 4.x-specific JMS connector.
026: */
027: public class ActiveMQJmsConnector extends JmsConnector {
028: public static final String ACTIVEMQ_CONNECTION_FACTORY_CLASS = "org.apache.activemq.ActiveMQConnectionFactory";
029: public static final String DEFAULT_BROKER_URL = "vm://localhost?broker.persistent=false&broker.useJmx=false";
030:
031: private String brokerURL = DEFAULT_BROKER_URL;
032:
033: /**
034: * Constructs a new ActiveMQJmsConnector.
035: */
036: public ActiveMQJmsConnector() {
037: setEagerConsumer(false);
038: // TODO MULE-1409 better support for ActiveMQ 4.x temp destinations
039: }
040:
041: protected ConnectionFactory getDefaultConnectionFactory() {
042: try {
043: ConnectionFactory connectionFactory = (ConnectionFactory) ClassUtils
044: .instanciateClass(
045: ACTIVEMQ_CONNECTION_FACTORY_CLASS,
046: new Object[] { getBrokerURL() });
047: applyVendorSpecificConnectionFactoryProperties(connectionFactory);
048: return connectionFactory;
049: } catch (Exception e) {
050: handleException(e);
051: return null;
052: }
053: }
054:
055: protected void applyVendorSpecificConnectionFactoryProperties(
056: ConnectionFactory connectionFactory) {
057: try {
058: Method getRedeliveryPolicyMethod = connectionFactory
059: .getClass().getMethod("getRedeliveryPolicy",
060: new Class[] {});
061: Object redeliveryPolicy = getRedeliveryPolicyMethod.invoke(
062: connectionFactory, new Object[] {});
063: Method setMaximumRedeliveriesMethod = redeliveryPolicy
064: .getClass().getMethod("setMaximumRedeliveries",
065: new Class[] { Integer.TYPE });
066: setMaximumRedeliveriesMethod.invoke(redeliveryPolicy,
067: new Object[] { new Integer(getMaxRedelivery()) });
068: } catch (Exception e) {
069: logger
070: .error("Can not set MaxRedelivery parameter to RedeliveryPolicy "
071: + e);
072: }
073: }
074:
075: /**
076: * Will additionally try to cleanup the ActiveMq connection, otherwise there's a deadlock on shutdown.
077: */
078: protected void doDisconnect() throws ConnectException {
079: try {
080: Connection connection = getConnection();
081: if (connection == null) {
082: return;
083: }
084:
085: final Class clazz = connection.getClass();
086: Method cleanupMethod;
087: if (Proxy.isProxyClass(clazz)) {
088: ConnectionInvocationHandler handler = (ConnectionInvocationHandler) Proxy
089: .getInvocationHandler(connection);
090: // this is really an XA connection, bypass the java.lang.reflect.Proxy as it
091: // can't delegate to non-interfaced methods (like proprietary 'cleanup' one)
092: // TODO check if CGlib will manage to enhance the AMQ connection class,
093: // there are no final methods, but a number of private ones, though
094: connection = (Connection) handler.getTargetObject();
095: Class realConnectionClass = connection.getClass();
096: cleanupMethod = realConnectionClass.getMethod(
097: "cleanup", (Class[]) null);
098: } else {
099: cleanupMethod = clazz.getMethod("cleanup",
100: (Class[]) null);
101: }
102:
103: try {
104: if (cleanupMethod != null) {
105: cleanupMethod.invoke(connection, (Object[]) null);
106: }
107: } finally {
108: connection.close();
109: }
110: } catch (Exception e) {
111: throw new ConnectException(e, this );
112: } finally {
113: setConnection(null);
114: }
115: }
116:
117: public String getBrokerURL() {
118: return brokerURL;
119: }
120:
121: public void setBrokerURL(String brokerURL) {
122: this.brokerURL = brokerURL;
123: }
124: }
|