001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.ejb3.test.strictpool;
023:
024: import javax.annotation.Resource;
025: import javax.ejb.ActivationConfigProperty;
026: import javax.ejb.EJBException;
027: import javax.ejb.MessageDriven;
028: import javax.ejb.MessageDrivenContext;
029: import javax.jms.JMSException;
030: import javax.jms.Message;
031: import javax.jms.MessageListener;
032: import javax.jms.Queue;
033: import javax.jms.QueueConnection;
034: import javax.jms.QueueConnectionFactory;
035: import javax.jms.QueueSender;
036: import javax.jms.QueueSession;
037: import javax.jms.TextMessage;
038: import javax.naming.InitialContext;
039:
040: import org.jboss.annotation.ejb.PoolClass;
041: import org.jboss.annotation.ejb.PoolClass;
042:
043: /**
044: * Adapted from the EJB 2.1 tests (org.jboss.test.cts.ejb.StrictlyPooledMDB)
045: * @author <a href="mailto:kabir.khan@jboss.org">Kabir Khan</a>
046: * @version $Revision: 57207 $
047: */
048: @MessageDriven(activationConfig={@ActivationConfigProperty(propertyName="destinationType",propertyValue="javax.jms.Queue"),@ActivationConfigProperty(propertyName="destination",propertyValue="queue/queueA"),@ActivationConfigProperty(propertyName="maxMessages",propertyValue="10"),@ActivationConfigProperty(propertyName="minSession",propertyValue="10"),@ActivationConfigProperty(propertyName="maxSession",propertyValue="10")})
049: @PoolClass(value=org.jboss.ejb3.StrictMaxPool.class,maxSize=StrictlyPooledMDB.maxActiveCount,timeout=10000)
050: public class StrictlyPooledMDB implements MessageListener {
051: /** The class wide max count of instances allows */
052: public static final int maxActiveCount = 2;
053: /** The class wide count of instances active in business code */
054: private static int activeCount;
055:
056: private MessageDrivenContext ctx = null;
057: private QueueConnection queConn;
058: private QueueSession session;
059: private QueueSender sender;
060:
061: private static synchronized int incActiveCount() {
062: return activeCount++;
063: }
064:
065: private static synchronized int decActiveCount() {
066: return activeCount--;
067: }
068:
069: @Resource
070: public void setMessageDrivenContext(MessageDrivenContext ctx)
071: throws EJBException {
072: System.out.println("setMessageDrivenContext()");
073: this .ctx = ctx;
074: try {
075: InitialContext iniCtx = new InitialContext();
076: QueueConnectionFactory factory = (QueueConnectionFactory) iniCtx
077: .lookup("java:/ConnectionFactory");
078: queConn = factory.createQueueConnection();
079: session = queConn.createQueueSession(false,
080: QueueSession.AUTO_ACKNOWLEDGE);
081: Queue queue = (Queue) iniCtx.lookup("queue/queueB");
082: sender = session.createSender(queue);
083: } catch (Exception e) {
084: System.out.println("Setup failure");
085: e.printStackTrace();
086: throw new EJBException("Setup failure", e);
087: }
088: }
089:
090: public void ejbCreate() {
091: }
092:
093: public void ejbRemove() {
094: try {
095: if (sender != null)
096: sender.close();
097: if (session != null)
098: session.close();
099: if (queConn != null)
100: queConn.close();
101: } catch (Exception e) {
102: System.out.println("Failed to close JMS resources");
103: e.printStackTrace();
104: }
105: }
106:
107: public void onMessage(Message message) {
108: int count = incActiveCount();
109: System.out.println("Begin onMessage, activeCount=" + count
110: + ", ctx=" + ctx);
111: try {
112: Message reply = null;
113: if (count > maxActiveCount) {
114: String msg = "IllegalState, activeCount > maxActiveCount, "
115: + count + " > " + maxActiveCount;
116: // Send an exception
117: Exception e = new IllegalStateException(msg);
118: reply = session.createObjectMessage(e);
119: } else {
120: TextMessage tm = (TextMessage) message;
121: // Send an ack
122: reply = session.createTextMessage("Recevied msg="
123: + tm.getText());
124: }
125: Thread.currentThread().sleep(1000);
126: sender.send(reply);
127: } catch (JMSException e) {
128: System.out.println("Failed to send error message");
129: e.printStackTrace();
130: } catch (InterruptedException e) {
131: } finally {
132: count = decActiveCount();
133: System.out.println("End onMessage, activeCount=" + count
134: + ", ctx=" + ctx);
135: }
136: }
137: }
|