001: /*
002: * Copyright 2001-2006 C:1 Financial Services GmbH
003: *
004: * This software is free software; you can redistribute it and/or
005: * modify it under the terms of the GNU Lesser General Public
006: * License Version 2.1, as published by the Free Software Foundation.
007: *
008: * This software is distributed in the hope that it will be useful,
009: * but WITHOUT ANY WARRANTY; without even the implied warranty of
010: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
011: * Lesser General Public License for more details.
012: *
013: * You should have received a copy of the GNU Lesser General Public
014: * License along with this library; if not, write to the Free Software
015: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA
016: */
017:
018: package de.finix.contelligent.event;
019:
020: import java.util.Collections;
021: import java.util.HashSet;
022: import java.util.Iterator;
023: import java.util.LinkedList;
024: import java.util.List;
025: import java.util.Set;
026:
027: import javax.transaction.Status;
028:
029: import de.finix.contelligent.core.TxSynchronization;
030: import de.finix.contelligent.logging.LoggingService;
031:
032: /**
033: * This singleton handles system event distribution and event listener
034: * registration.
035: */
036: final public class EventQueue implements TxSynchronization {
037: final static org.apache.log4j.Logger log = LoggingService
038: .getLogger(EventQueue.class);
039:
040: private static final EventQueue instance = new EventQueue();
041:
042: private final ThreadLocal eventList = new ThreadLocal();
043:
044: private HashSet listeners = new HashSet();
045:
046: private HashSet txListeners = new HashSet();
047:
048: /**
049: * Answer the global event queue
050: *
051: * @return
052: */
053: public static final EventQueue getInstance() {
054: return instance;
055: }
056:
057: // EventQueue is a singleton, so prevent instantiation
058: private EventQueue() {
059: }
060:
061: public void addListener(EventQueueListener listener) {
062: if (listener instanceof EventQueueTXListener) {
063: synchronized (txListeners) {
064: HashSet newListeners = (HashSet) txListeners.clone();
065: newListeners.add(listener);
066: txListeners = newListeners;
067: }
068: } else {
069: synchronized (listeners) {
070: HashSet newListeners = (HashSet) listeners.clone();
071: newListeners.add(listener);
072: listeners = newListeners;
073: }
074: }
075: }
076:
077: public void removeListener(EventQueueListener listener) {
078: if (listener instanceof EventQueueTXListener) {
079: synchronized (txListeners) {
080: HashSet newListeners = (HashSet) txListeners.clone();
081: newListeners.remove(listener);
082: txListeners = newListeners;
083: }
084: } else {
085: synchronized (listeners) {
086: HashSet newListeners = (HashSet) listeners.clone();
087: newListeners.remove(listener);
088: listeners = newListeners;
089: }
090: }
091: }
092:
093: /**
094: * Gets called prior to the start of the transaction completion process. If
095: * this method throws an exception or returns false the transaction will be
096: * rolled back.
097: *
098: * @return a boolean value indicating success or failure
099: * @see TxSynchronization#beforeCompletion
100: */
101: public boolean beforeCompletion() throws Exception {
102: List list = (List) eventList.get();
103: if ((list != null && list.size() > 0)
104: && (txListeners != null && txListeners.size() > 0)) {
105: notifyTXListenersBeforeCompletion(txListeners, Collections
106: .unmodifiableList(list));
107: }
108: return true;
109: }
110:
111: public void afterCompletion(int status) {
112: List list = (List) eventList.get();
113: if (list != null && list.size() > 0) {
114: List events = Collections.unmodifiableList(list);
115: try {
116: if (txListeners != null && txListeners.size() > 0) {
117: notifyTXListenersAfterCompletion(txListeners,
118: events, status);
119: }
120:
121: if ((status == Status.STATUS_COMMITTED)
122: && (listeners != null && listeners.size() > 0)) {
123: notifyListeners(listeners, events);
124: }
125: } finally {
126: clean();
127: }
128: }
129: }
130:
131: public void clean() {
132: eventList.set(null);
133: }
134:
135: public void addEvent(ContelligentEvent event) {
136: List list = (List) eventList.get();
137: if (list == null) {
138: list = new LinkedList();
139: eventList.set(list);
140: }
141: list.add(event);
142: if (log.isDebugEnabled())
143: log.debug("added event '" + event + "'.");
144: }
145:
146: public void addEvents(List events) {
147: List list = (List) eventList.get();
148: if (list == null) {
149: list = new LinkedList(events);
150: eventList.set(list);
151: } else {
152: list.addAll(events);
153: }
154: if (log.isDebugEnabled())
155: log.debug("added events '" + events + "'.");
156: }
157:
158: private void notifyListeners(Set listeners, List eventList) {
159: final boolean debugEnabled = log.isDebugEnabled();
160:
161: if (debugEnabled)
162: log.debug("notifyListeners() - current listeners="
163: + listeners);
164:
165: Iterator iterator = listeners.iterator();
166: while (iterator.hasNext()) {
167: EventQueueListener listener = (EventQueueListener) iterator
168: .next();
169: if (debugEnabled) {
170: log.debug("Notifying listener " + listener
171: + " about events " + eventList);
172: }
173: try {
174: listener.onEvents(eventList);
175: } catch (Exception e) {
176: log.error("Event listener [" + listener
177: + "] failed when receiving events.", e);
178: }
179: }
180: // long end = System.currentTimeMillis();
181: // log.info("notifying "+listenersClone.size()+" listeners about
182: // "+eventList.size()+
183: // " events took "+(end-start)+" ms. ("+start+"->"+end+")");
184: }
185:
186: private void notifyTXListenersBeforeCompletion(Set listeners,
187: List eventList) throws Exception {
188: final boolean debugEnabled = log.isDebugEnabled();
189:
190: if (debugEnabled)
191: log.debug("notifyListeners() - current listeners="
192: + listeners);
193:
194: Iterator iterator = listeners.iterator();
195: while (iterator.hasNext()) {
196: EventQueueTXListener listener = (EventQueueTXListener) iterator
197: .next();
198: if (debugEnabled) {
199: log.debug("Notifying listener " + listener
200: + " about events " + eventList);
201: }
202: listener.onEventsBeforeCompletion(eventList);
203: }
204: // long end = System.currentTimeMillis();
205: // log.info("notifying "+listenersClone.size()+" listeners about
206: // "+eventList.size()+
207: // " events took "+(end-start)+" ms. ("+start+"->"+end+")");
208: }
209:
210: private void notifyTXListenersAfterCompletion(Set listeners,
211: List eventList, int status) {
212: // long start = System.currentTimeMillis();
213: final boolean debugEnabled = log.isDebugEnabled();
214:
215: if (debugEnabled)
216: log.debug("notifyListeners() - current listeners="
217: + listeners);
218:
219: Iterator iterator = listeners.iterator();
220: while (iterator.hasNext()) {
221: EventQueueTXListener listener = (EventQueueTXListener) iterator
222: .next();
223: if (debugEnabled) {
224: log.debug("Notifying listener " + listener
225: + " about events " + eventList);
226: }
227: listener.onEventsAfterCompletion(status, eventList);
228: }
229: // long end = System.currentTimeMillis();
230: // log.info("notifying "+listenersClone.size()+" listeners about
231: // "+eventList.size()+
232: // " events took "+(end-start)+" ms. ("+start+"->"+end+")");
233: }
234:
235: }
|