001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.jms.serverless;
023:
024: import org.jboss.logging.Logger;
025: import org.jgroups.Address;
026: import java.util.Map;
027: import java.util.HashMap;
028: import java.util.List;
029: import java.util.ArrayList;
030: import org.jgroups.util.Util;
031: import java.util.Iterator;
032:
033: /**
034: *
035: * @author Ovidiu Feodorov <ovidiu@jboss.org>
036: * @version $Revision: 57195 $ $Date: 2006-09-26 08:08:17 -0400 (Tue, 26 Sep 2006) $
037: *
038: **/
039: class GroupState {
040:
041: private static final Logger log = Logger
042: .getLogger(GroupState.class);
043:
044: private Map queues;
045:
046: public synchronized byte[] toByteBuffer() throws Exception {
047: return Util.objectToByteBuffer(queues);
048: }
049:
050: public synchronized void fromByteBuffer(byte[] ba) throws Exception {
051: Object o = Util.objectFromByteBuffer(ba);
052: if (o == null) {
053: queues = null;
054: } else if (o instanceof Map) {
055: queues = (Map) o;
056: } else {
057: throw new IllegalStateException("Invalid group state");
058: }
059: }
060:
061: public synchronized void addQueueReceiver(String queueName,
062: Address addr, String sessionID, String queueReceiverID) {
063: if (queues == null) {
064: queues = new HashMap();
065: }
066: List l = (List) queues.get(queueName);
067: if (l == null) {
068: l = new ArrayList();
069: queues.put(queueName, l);
070: }
071: QueueReceiverAddress ra = new QueueReceiverAddress(addr,
072: sessionID, queueReceiverID);
073: if (l.contains(ra)) {
074: log.warn(ra + " already in the group state");
075: return;
076: }
077: l.add(ra);
078: log.debug("New GroupState: " + toString());
079: }
080:
081: /**
082: * If no such queue receiver is found, the method logs the event as a warning.
083: **/
084: public synchronized void removeQueueReceiver(String queueName,
085: Address addr, String sessionID, String queueReceiverID) {
086:
087: String noSuchReceiverMsg = "No such queue receiver: "
088: + queueName + "/" + addr + "/" + sessionID + "/"
089: + queueReceiverID;
090:
091: List l = null;
092:
093: if (queues == null
094: || ((l = (List) queues.get(queueName)) == null)
095: || l.isEmpty()) {
096: log.warn(noSuchReceiverMsg);
097: }
098: if (!l.remove(new QueueReceiverAddress(addr, sessionID,
099: queueReceiverID))) {
100: log.warn(noSuchReceiverMsg);
101: }
102: log.debug("New GroupState: " + toString());
103: }
104:
105: /**
106: * Could return null if there is no receiver for the queue
107: **/
108: public synchronized QueueReceiverAddress selectReceiver(
109: String queueName) {
110:
111: if (queues == null) {
112: return null;
113: }
114: List l = (List) queues.get(queueName);
115: if (l == null || l.size() == 0) {
116: return null;
117: }
118: QueueReceiverAddress selected = null;
119: int crtidx = 0;
120: for (Iterator i = l.iterator(); i.hasNext(); crtidx++) {
121: QueueReceiverAddress crt = (QueueReceiverAddress) i.next();
122: if (crt.isNextForDelivery()) {
123: selected = crt;
124: crt.setNextForDelivery(false);
125: ((QueueReceiverAddress) l.get((crtidx + 1) % l.size()))
126: .setNextForDelivery(true);
127: break;
128: }
129: }
130: if (selected == null) {
131: selected = (QueueReceiverAddress) l.get(0);
132: ((QueueReceiverAddress) l.get(1 % l.size()))
133: .setNextForDelivery(true);
134: ;
135:
136: }
137: return selected;
138: }
139:
140: public String toString() {
141: return queues == null ? "null" : queues.toString();
142: }
143:
144: }
|