001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.jms.serverless;
023:
024: import org.jboss.logging.Logger;
025: import java.io.Serializable;
026: import javax.jms.Session;
027: import javax.jms.BytesMessage;
028: import javax.jms.MapMessage;
029: import javax.jms.Message;
030: import javax.jms.ObjectMessage;
031: import javax.jms.StreamMessage;
032: import javax.jms.JMSException;
033: import javax.jms.TextMessage;
034: import javax.jms.MessageListener;
035: import javax.jms.MessageProducer;
036: import javax.jms.MessageConsumer;
037: import javax.jms.Destination;
038: import javax.jms.Queue;
039: import javax.jms.Topic;
040: import javax.jms.TopicSubscriber;
041: import javax.jms.QueueBrowser;
042: import javax.jms.TemporaryQueue;
043: import javax.jms.TemporaryTopic;
044: import java.util.List;
045: import java.util.ArrayList;
046: import java.util.Iterator;
047:
048: /**
049: *
050: * @author Ovidiu Feodorov <ovidiu@jboss.org>
051: * @version $Revision: 57195 $ $Date: 2006-09-26 08:08:17 -0400 (Tue, 26 Sep 2006) $
052: *
053: **/
054: class SessionImpl implements Session {
055:
056: private static final Logger log = Logger
057: .getLogger(SessionImpl.class);
058:
059: private SessionManager sessionManager;
060: private String id;
061: private List subscribers;
062: private List receivers;
063: private boolean transacted;
064: private int acknowledgeMode;
065: private int receiverCounter = 0;
066:
067: /**
068: * @param id - the session id. The SessionManager instance guarantees uniqueness during its
069: * lifetime.
070: **/
071: SessionImpl(SessionManager sessionManager, String id,
072: boolean transacted, int acknowledgeMode) {
073:
074: this .sessionManager = sessionManager;
075: this .id = id;
076: subscribers = new ArrayList();
077: receivers = new ArrayList();
078: this .transacted = transacted;
079: this .acknowledgeMode = acknowledgeMode;
080:
081: if (transacted) {
082: throw new NotImplementedException(
083: "Transacted sessions not supported");
084: }
085: }
086:
087: public String getID() {
088: return id;
089: }
090:
091: void send(Message m) throws JMSException {
092: sessionManager.getConnection().send(m);
093: }
094:
095: /**
096: * Delivery to topic subscribers.
097: **/
098: // TO_DO: acknowledgement, deal with failed deliveries
099: void deliver(Message m) {
100:
101: // TO_DO: single threaded access for sessions
102: // So far, the only thread that accesses dispatch() is the connection's puller thread and
103: // this will be the unique thread that accesses the Sessions. This may not be sufficient
104: // for high load, consider the possiblity to (dynamically) add new threads to handle
105: // delivery, possibly a thread per session.
106:
107: Destination destination = null;
108: try {
109: destination = m.getJMSDestination();
110: } catch (JMSException e) {
111: // TO_DO: cannot deliver, a failure handler should take over
112: log.error("Unhandled failure", e);
113: return;
114: }
115:
116: // TO_DO: properly handle the case when the destination is null
117:
118: for (Iterator i = subscribers.iterator(); i.hasNext();) {
119:
120: TopicSubscriberImpl sub = (TopicSubscriberImpl) i.next();
121: if (destination.equals(sub.getDestination())) {
122: MessageListener l = null;
123: try {
124: l = sub.getMessageListener();
125: } catch (JMSException e) {
126: // TO_DO: cannot deliver, a failure handler should take over
127: log.error("Unhandled failure", e);
128: continue;
129: }
130: if (l == null) {
131: continue;
132: }
133: l.onMessage(m);
134: }
135: }
136: }
137:
138: /**
139: * Delivery to queue receivers.
140: **/
141: // TO_DO: acknowledgement, deal with failed deliveries
142: void deliver(Message m, String receiverID) {
143:
144: // TO_DO: single threaded access for sessions
145: // So far, the only thread that accesses dispatch() is the connection's puller thread and
146: // this will be the unique thread that accesses the Sessions. This may not be sufficient
147: // for high load, consider the possiblity to (dynamically) add new threads to handle
148: // delivery, possibly a thread per session.
149:
150: QueueReceiverImpl receiver = null;
151: for (Iterator i = receivers.iterator(); i.hasNext();) {
152:
153: QueueReceiverImpl crtRec = (QueueReceiverImpl) i.next();
154: if (crtRec.getID().equals(receiverID)) {
155: receiver = crtRec;
156: break;
157: }
158: }
159:
160: if (receiver == null) {
161: log.error("No such receiver: " + receiverID
162: + ". Delivery failed!");
163: return;
164: }
165: MessageListener l = null;
166: try {
167: l = receiver.getMessageListener();
168: } catch (JMSException e) {
169: // TO_DO: cannot deliver, a failure handler should take over
170: log.error("Unhandled failure", e);
171: return;
172: }
173: if (l == null) {
174: log.warn("No message listener for receiver " + receiverID
175: + ". Delivery failed!");
176: } else {
177: l.onMessage(m);
178: }
179: }
180:
181: //
182: // Session INTERFACE IMPLEMEMENTATION
183: //
184:
185: public BytesMessage createBytesMessage() throws JMSException {
186: throw new NotImplementedException();
187: }
188:
189: public MapMessage createMapMessage() throws JMSException {
190: throw new NotImplementedException();
191: }
192:
193: public Message createMessage() throws JMSException {
194: throw new NotImplementedException();
195: }
196:
197: public ObjectMessage createObjectMessage() throws JMSException {
198: throw new NotImplementedException();
199: }
200:
201: public ObjectMessage createObjectMessage(Serializable object)
202: throws JMSException {
203: throw new NotImplementedException();
204: }
205:
206: public StreamMessage createStreamMessage() throws JMSException {
207: throw new NotImplementedException();
208: }
209:
210: public TextMessage createTextMessage() throws JMSException {
211: return new TextMessageImpl();
212: }
213:
214: public TextMessage createTextMessage(String text)
215: throws JMSException {
216: throw new NotImplementedException();
217: }
218:
219: public boolean getTransacted() throws JMSException {
220: return transacted;
221: }
222:
223: public int getAcknowledgeMode() throws JMSException {
224: return acknowledgeMode;
225: }
226:
227: public void commit() throws JMSException {
228: throw new NotImplementedException();
229: }
230:
231: public void rollback() throws JMSException {
232: throw new NotImplementedException();
233: }
234:
235: public void close() throws JMSException {
236: throw new NotImplementedException();
237: }
238:
239: public void recover() throws JMSException {
240: throw new NotImplementedException();
241: }
242:
243: public MessageListener getMessageListener() throws JMSException {
244: throw new NotImplementedException();
245: }
246:
247: public void setMessageListener(MessageListener listener)
248: throws JMSException {
249: throw new NotImplementedException();
250: }
251:
252: public void run() {
253: throw new NotImplementedException();
254: }
255:
256: public MessageProducer createProducer(Destination destination)
257: throws JMSException {
258:
259: if (destination instanceof Topic) {
260: return new TopicPublisherImpl(this , (Topic) destination);
261: } else if (destination instanceof Queue) {
262: return new QueueSenderImpl(this , (Queue) destination);
263: }
264: throw new JMSException("Destination not a Topic or Queue");
265: }
266:
267: public MessageConsumer createConsumer(Destination destination)
268: throws JMSException {
269:
270: if (destination instanceof Topic) {
271: TopicSubscriberImpl ts = new TopicSubscriberImpl(this ,
272: (Topic) destination);
273: subscribers.add(ts);
274: return ts;
275: } else if (destination instanceof Queue) {
276: QueueReceiverImpl qr = new QueueReceiverImpl(this ,
277: generateReceiverID(), (Queue) destination);
278: sessionManager.advertiseQueueReceiver(getID(), qr, true);
279: receivers.add(qr);
280: return qr;
281: }
282: throw new JMSException("Destination not a Topic or Queue");
283: }
284:
285: public MessageConsumer createConsumer(Destination destination,
286: String messageSelector) throws JMSException {
287: throw new NotImplementedException();
288: }
289:
290: public MessageConsumer createConsumer(Destination destination,
291: String messageSelector, boolean NoLocal)
292: throws JMSException {
293: throw new NotImplementedException();
294: }
295:
296: public Queue createQueue(String queueName) throws JMSException {
297: throw new NotImplementedException();
298: }
299:
300: public Topic createTopic(String topicName) throws JMSException {
301: throw new NotImplementedException();
302: }
303:
304: public TopicSubscriber createDurableSubscriber(Topic topic,
305: String name) throws JMSException {
306: throw new NotImplementedException();
307: }
308:
309: public TopicSubscriber createDurableSubscriber(Topic topic,
310: String name, String messageSelector, boolean noLocal)
311: throws JMSException {
312: throw new NotImplementedException();
313: }
314:
315: public QueueBrowser createBrowser(Queue queue) throws JMSException {
316: throw new NotImplementedException();
317: }
318:
319: public QueueBrowser createBrowser(Queue queue,
320: String messageSelector) throws JMSException {
321: throw new NotImplementedException();
322: }
323:
324: public TemporaryQueue createTemporaryQueue() throws JMSException {
325: throw new NotImplementedException();
326: }
327:
328: public TemporaryTopic createTemporaryTopic() throws JMSException {
329: throw new NotImplementedException();
330: }
331:
332: public void unsubscribe(String name) throws JMSException {
333: throw new NotImplementedException();
334: }
335:
336: //
337: // END Session INTERFACE IMPLEMEMENTATION
338: //
339:
340: /**
341: * The reverse of createConsumer().
342: **/
343: void removeConsumer(MessageConsumer consumer) throws JMSException {
344:
345: if (consumer instanceof QueueReceiverImpl) {
346: if (!receivers.contains(consumer)) {
347: throw new JMSException("No such QueueReceiver: "
348: + consumer);
349: }
350: sessionManager.advertiseQueueReceiver(getID(),
351: (QueueReceiverImpl) consumer, false);
352: receivers.remove(consumer);
353: } else if (consumer instanceof TopicSubscriberImpl) {
354: throw new NotImplementedException();
355: } else {
356: throw new JMSException(
357: "MessageConsumer not a TopicSubscriber or a QueueReceiver");
358: }
359: }
360:
361: /**
362: * Generate a queue receiver ID that is quaranteed to be unique for the life time of this
363: * Session instance.
364: **/
365: private synchronized String generateReceiverID() {
366: return Integer.toString(receiverCounter++);
367: }
368:
369: //
370: // LIST MANAGEMENT METHODS
371: //
372:
373: //private QueueReceiverImpl getReceiver(String receiverID)
374:
375: //
376: // END OF LIST MANAGEMENT METHODS
377: //
378:
379: }
|