001: /*
002: * Copyright 2002-2007 the original author or authors.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.springframework.jms.listener.serversession;
018:
019: import javax.jms.Connection;
020: import javax.jms.ConnectionConsumer;
021: import javax.jms.Destination;
022: import javax.jms.JMSException;
023: import javax.jms.Message;
024: import javax.jms.MessageListener;
025: import javax.jms.ServerSession;
026: import javax.jms.ServerSessionPool;
027: import javax.jms.Session;
028: import javax.jms.Topic;
029:
030: import org.springframework.jms.listener.AbstractMessageListenerContainer;
031: import org.springframework.jms.support.JmsUtils;
032:
033: /**
034: * Message listener container that builds on the JMS ServerSessionPool SPI,
035: * creating JMS ServerSessions through a pluggable ServerSessionFactory.
036: *
037: * <p>This is the most sophisticated message listener container possible
038: * with standard JMS. It allows for dynamic management of JMS Sessions,
039: * potentially using a pool of Sessions that receive messages in parallel.
040: *
041: * <p>Note that the default ServerSessionFactory is a {@link SimpleServerSessionFactory},
042: * which will create a new ServerSession for each listener execution.
043: * Consider specifying a {@link CommonsPoolServerSessionFactory} to reuse JMS Sessions
044: * and/or to limit the number of concurrent ServerSession executions
045: *
046: * <p>See the {@link AbstractMessageListenerContainer} javadoc for details
047: * on acknowledge modes and transaction options.
048: *
049: * <p>For a simpler message listener container, in particular when using
050: * a JMS provider without ServerSessionPool support, consider using
051: * {@link org.springframework.jms.listener.SimpleMessageListenerContainer}.
052: *
053: * <p>This class requires a JMS 1.1+ provider, because it builds on the
054: * domain-independent API. <b>Use the {@link ServerSessionMessageListenerContainer102}
055: * subclass for JMS 1.0.2 providers.</b>
056: *
057: * @author Juergen Hoeller
058: * @since 2.0
059: * @see org.springframework.jms.listener.SimpleMessageListenerContainer
060: * @see ServerSessionMessageListenerContainer102
061: */
062: public class ServerSessionMessageListenerContainer extends
063: AbstractMessageListenerContainer implements
064: ListenerSessionManager {
065:
066: private ServerSessionFactory serverSessionFactory = new SimpleServerSessionFactory();
067:
068: private int maxMessagesPerTask = 1;
069:
070: private ConnectionConsumer consumer;
071:
072: /**
073: * Set the Spring ServerSessionFactory to use.
074: * <p>Default is a plain SimpleServerSessionFactory.
075: * Consider using a CommonsPoolServerSessionFactory to reuse JMS Sessions
076: * and/or to limit the number of concurrent ServerSession executions.
077: * @see SimpleServerSessionFactory
078: * @see CommonsPoolServerSessionFactory
079: */
080: public void setServerSessionFactory(
081: ServerSessionFactory serverSessionFactory) {
082: this .serverSessionFactory = (serverSessionFactory != null ? serverSessionFactory
083: : new SimpleServerSessionFactory());
084: }
085:
086: /**
087: * Return the Spring ServerSessionFactory to use.
088: */
089: protected ServerSessionFactory getServerSessionFactory() {
090: return this .serverSessionFactory;
091: }
092:
093: /**
094: * Set the maximum number of messages to load into a JMS Session.
095: * Default is 1.
096: * <p>See the corresponding JMS <code>createConnectionConsumer</code>
097: * argument for details.
098: * @see javax.jms.Connection#createConnectionConsumer
099: */
100: public void setMaxMessagesPerTask(int maxMessagesPerTask) {
101: this .maxMessagesPerTask = maxMessagesPerTask;
102: }
103:
104: /**
105: * Return the maximum number of messages to load into a JMS Session.
106: */
107: protected int getMaxMessagesPerTask() {
108: return this .maxMessagesPerTask;
109: }
110:
111: //-------------------------------------------------------------------------
112: // Implementation of AbstractMessageListenerContainer's template methods
113: //-------------------------------------------------------------------------
114:
115: /**
116: * Always use a shared JMS Connection.
117: */
118: protected final boolean sharedConnectionEnabled() {
119: return true;
120: }
121:
122: /**
123: * Creates a JMS ServerSessionPool for the specified listener and registers
124: * it with a JMS ConnectionConsumer for the specified destination.
125: * @see #createServerSessionPool
126: * @see #createConsumer
127: */
128: protected void doInitialize() throws JMSException {
129: Connection con = getSharedConnection();
130: Destination destination = getDestination();
131: if (destination == null) {
132: Session session = createSession(con);
133: try {
134: destination = resolveDestinationName(session,
135: getDestinationName());
136: } finally {
137: JmsUtils.closeSession(session);
138: }
139: }
140: ServerSessionPool pool = createServerSessionPool();
141: this .consumer = createConsumer(con, destination, pool);
142: }
143:
144: /**
145: * Create a JMS ServerSessionPool for the specified message listener,
146: * via this container's ServerSessionFactory.
147: * <p>This message listener container implements the ListenerSessionManager
148: * interface, hence can be passed to the ServerSessionFactory itself.
149: * @see #setServerSessionFactory
150: * @see ServerSessionFactory#getServerSession(ListenerSessionManager)
151: */
152: protected ServerSessionPool createServerSessionPool()
153: throws JMSException {
154: return new ServerSessionPool() {
155: public ServerSession getServerSession() throws JMSException {
156: logger
157: .debug("JMS ConnectionConsumer requests ServerSession");
158: return getServerSessionFactory().getServerSession(
159: ServerSessionMessageListenerContainer.this );
160: }
161: };
162: }
163:
164: /**
165: * Return the JMS ConnectionConsumer used by this message listener container.
166: * Available after initialization.
167: */
168: protected final ConnectionConsumer getConsumer() {
169: return this .consumer;
170: }
171:
172: /**
173: * Close the JMS ServerSessionPool for the specified message listener,
174: * via this container's ServerSessionFactory, and subsequently also
175: * this container's JMS ConnectionConsumer.
176: * <p>This message listener container implements the ListenerSessionManager
177: * interface, hence can be passed to the ServerSessionFactory itself.
178: * @see #setServerSessionFactory
179: * @see ServerSessionFactory#getServerSession(ListenerSessionManager)
180: */
181: protected void doShutdown() throws JMSException {
182: logger.debug("Closing ServerSessionFactory");
183: getServerSessionFactory().close(this );
184: logger.debug("Closing JMS ConnectionConsumer");
185: this .consumer.close();
186: }
187:
188: //-------------------------------------------------------------------------
189: // Implementation of the ListenerSessionManager interface
190: //-------------------------------------------------------------------------
191:
192: /**
193: * Create a JMS Session with the specified listener registered.
194: * Listener execution is delegated to the <code>executeListener</code> method.
195: * <p>Default implementation simply calls <code>setMessageListener</code>
196: * on a newly created JMS Session, according to the JMS specification's
197: * ServerSessionPool section.
198: * @return the JMS Session
199: * @throws JMSException if thrown by JMS API methods
200: * @see #executeListener
201: */
202: public Session createListenerSession() throws JMSException {
203: final Session session = createSession(getSharedConnection());
204:
205: session.setMessageListener(new MessageListener() {
206: public void onMessage(Message message) {
207: executeListener(session, message);
208: }
209: });
210:
211: return session;
212: }
213:
214: /**
215: * Execute the given JMS Session, triggering invocation
216: * of its listener.
217: * <p>Default implementation simply calls <code>run()</code>
218: * on the JMS Session, according to the JMS specification's
219: * ServerSessionPool section.
220: * @param session the JMS Session to execute
221: */
222: public void executeListenerSession(Session session) {
223: session.run();
224: }
225:
226: //-------------------------------------------------------------------------
227: // JMS 1.1 factory methods, potentially overridden for JMS 1.0.2
228: //-------------------------------------------------------------------------
229:
230: /**
231: * Create a JMS ConnectionConsumer for the given Connection.
232: * <p>This implementation uses JMS 1.1 API.
233: * @param con the JMS Connection to create a Session for
234: * @return the new JMS Session
235: * @throws JMSException if thrown by JMS API methods
236: */
237: protected ConnectionConsumer createConsumer(Connection con,
238: Destination destination, ServerSessionPool pool)
239: throws JMSException {
240:
241: if (isSubscriptionDurable() && destination instanceof Topic) {
242: return con
243: .createDurableConnectionConsumer(
244: (Topic) destination,
245: getDurableSubscriptionName(),
246: getMessageSelector(), pool,
247: getMaxMessagesPerTask());
248: } else {
249: return con
250: .createConnectionConsumer(destination,
251: getMessageSelector(), pool,
252: getMaxMessagesPerTask());
253: }
254: }
255:
256: }
|