001: package com.coldcore.coloradoftp.connection.impl;
002:
003: import com.coldcore.coloradoftp.connection.Connection;
004: import com.coldcore.coloradoftp.connection.ConnectionPool;
005: import com.coldcore.coloradoftp.connection.TerminatedException;
006: import com.coldcore.coloradoftp.core.Core;
007: import com.coldcore.coloradoftp.core.CoreStatus;
008: import com.coldcore.coloradoftp.factory.ObjectFactory;
009: import com.coldcore.coloradoftp.factory.ObjectName;
010: import org.apache.log4j.Logger;
011:
012: import java.util.HashSet;
013: import java.util.Iterator;
014: import java.util.Set;
015:
016: /**
017: * @see com.coldcore.coloradoftp.connection.ConnectionPool
018: *
019: * This class is thread safe as it takes care of all synchronizations.
020: */
021: public class GenericConnectionPool implements ConnectionPool, Runnable {
022:
023: private static Logger log = Logger
024: .getLogger(GenericConnectionPool.class);
025: protected Set<Connection> connections;
026: protected Core core;
027: protected Thread thr;
028: protected long sleep;
029: protected boolean running;
030:
031: public GenericConnectionPool() {
032: sleep = 1000L;
033: }
034:
035: /** Get thread sleep time
036: * @return Time in mills
037: */
038: public long getSleep() {
039: return sleep;
040: }
041:
042: /** Set thread sleep time
043: * @param sleep Time in mills
044: */
045: public void setSleep(long sleep) {
046: this .sleep = sleep;
047: }
048:
049: public void initialize() throws Exception {
050: if (connections == null)
051: connections = new HashSet<Connection>();
052: core = (Core) ObjectFactory.getObject(ObjectName.CORE);
053:
054: //Start this class
055: if (!running) {
056: running = true;
057: thr = new Thread(this );
058: thr.start();
059: }
060: }
061:
062: public void add(Connection connection) {
063: synchronized (connections) {
064: connections.add(connection);
065: }
066: }
067:
068: public void remove(Connection connection) {
069: synchronized (connections) {
070: connections.remove(connection);
071: }
072: }
073:
074: public int size() {
075: synchronized (connections) {
076: return connections.size();
077: }
078: }
079:
080: public void run() {
081: while (running) {
082:
083: synchronized (connections) {
084: Connection connection = null;
085: Iterator<Connection> it = connections.iterator();
086: while (it.hasNext())
087: try {
088: connection = it.next();
089:
090: //Remove if destroyed
091: if (connection.isDestroyed()) {
092: it.remove();
093: continue;
094: }
095:
096: //If a server is poisoned then spread the poison to all its connections
097: if (core.getStatus() == CoreStatus.POISONED
098: && !connection.isPoisoned()) {
099: connection.poison();
100: }
101:
102: //Service connection
103: connection.service();
104:
105: } catch (TerminatedException e) {
106: //Normal termination (exception has a message)
107: log.debug(e);
108: try {
109: connection.destroy();
110: } catch (Throwable ex) {
111: }
112:
113: } catch (Throwable e) {
114: //Failed connection
115: log.error("Connection failed", e);
116: try {
117: connection.destroy();
118: } catch (Throwable ex) {
119: }
120: }
121: }
122:
123: try {
124: Thread.sleep(sleep);
125: } catch (Throwable e) {
126: }
127:
128: }
129: log.debug("Connection pool thread finished");
130: }
131:
132: public void destroy() {
133: running = false;
134:
135: synchronized (connections) {
136: for (Connection connection : connections)
137: try {
138: connection.destroy();
139: } catch (Throwable e) {
140: }
141:
142: connections.clear();
143: }
144:
145: //Wait for this class to stop (just in case)
146: try {
147: thr.join(30000);
148: } catch (Throwable e) {
149: }
150: }
151:
152: public Set<Connection> list() {
153: synchronized (connections) {
154: return new HashSet<Connection>(connections);
155: }
156: }
157: }
|