001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.ejb3.test.strictpool;
023:
024: import javax.annotation.Resource;
025: import javax.ejb.ActivationConfigProperty;
026: import javax.ejb.EJBException;
027: import javax.ejb.MessageDriven;
028: import javax.ejb.MessageDrivenContext;
029: import javax.jms.JMSException;
030: import javax.jms.Message;
031: import javax.jms.MessageListener;
032: import javax.jms.Queue;
033: import javax.jms.QueueConnection;
034: import javax.jms.QueueConnectionFactory;
035: import javax.jms.QueueSender;
036: import javax.jms.QueueSession;
037: import javax.jms.TextMessage;
038: import javax.naming.InitialContext;
039:
040: import org.jboss.annotation.ejb.PoolClass;
041: import org.jboss.annotation.ejb.PoolClass;
042:
043: /**
044: * @version <tt>$Revision: 60233 $</tt>
045: * @author <a href="mailto:bdecoste@jboss.com">William DeCoste</a>
046: */
047: @MessageDriven(activationConfig={@ActivationConfigProperty(propertyName="destinationType",propertyValue="javax.jms.Queue"),@ActivationConfigProperty(propertyName="destination",propertyValue="queue/overrideQueueA"),@ActivationConfigProperty(propertyName="maxMessages",propertyValue="10"),@ActivationConfigProperty(propertyName="minSession",propertyValue="10"),@ActivationConfigProperty(propertyName="maxSession",propertyValue="10")})
048: @PoolClass(value=org.jboss.ejb3.test.strictpool.BogusPool.class,maxSize=0,timeout=0)
049: public class OverrideStrictlyPooledMDB implements MessageListener {
050: /** The class wide max count of instances allows */
051: public static final int maxActiveCount = 2;
052: /** The class wide count of instances active in business code */
053: private static int activeCount;
054:
055: private MessageDrivenContext ctx = null;
056: private QueueConnection queConn;
057: private QueueSession session;
058: private QueueSender sender;
059:
060: private static synchronized int incActiveCount() {
061: return activeCount++;
062: }
063:
064: private static synchronized int decActiveCount() {
065: return activeCount--;
066: }
067:
068: @Resource
069: public void setMessageDrivenContext(MessageDrivenContext ctx)
070: throws EJBException {
071: System.out.println("setMessageDrivenContext()");
072: this .ctx = ctx;
073: try {
074: InitialContext iniCtx = new InitialContext();
075: QueueConnectionFactory factory = (QueueConnectionFactory) iniCtx
076: .lookup("java:/ConnectionFactory");
077: queConn = factory.createQueueConnection();
078: session = queConn.createQueueSession(false,
079: QueueSession.AUTO_ACKNOWLEDGE);
080: Queue queue = (Queue) iniCtx.lookup("queue/overrideQueueB");
081: sender = session.createSender(queue);
082: } catch (Exception e) {
083: System.out.println("Setup failure");
084: e.printStackTrace();
085: throw new EJBException("Setup failure", e);
086: }
087: }
088:
089: public void ejbCreate() {
090: }
091:
092: public void ejbRemove() {
093: try {
094: if (sender != null)
095: sender.close();
096: if (session != null)
097: session.close();
098: if (queConn != null)
099: queConn.close();
100: } catch (Exception e) {
101: System.out.println("Failed to close JMS resources");
102: e.printStackTrace();
103: }
104: }
105:
106: public void onMessage(Message message) {
107: int count = incActiveCount();
108: System.out.println("Begin onMessage, activeCount=" + count
109: + ", ctx=" + ctx);
110: try {
111: Message reply = null;
112: if (count > maxActiveCount) {
113: String msg = "IllegalState, activeCount > maxActiveCount, "
114: + count + " > " + maxActiveCount;
115: // Send an exception
116: Exception e = new IllegalStateException(msg);
117: reply = session.createObjectMessage(e);
118: } else {
119: TextMessage tm = (TextMessage) message;
120: // Send an ack
121: reply = session.createTextMessage("Recevied msg="
122: + tm.getText());
123: }
124: Thread.currentThread().sleep(1000);
125: sender.send(reply);
126: } catch (JMSException e) {
127: System.out.println("Failed to send error message");
128: e.printStackTrace();
129: } catch (InterruptedException e) {
130: } finally {
131: count = decActiveCount();
132: System.out.println("End onMessage, activeCount=" + count
133: + ", ctx=" + ctx);
134: }
135: }
136: }
|