001: package com.coldcore.coloradoftp.connection.impl;
002:
003: import com.coldcore.coloradoftp.command.Reply;
004: import com.coldcore.coloradoftp.connection.ConnectionPool;
005: import com.coldcore.coloradoftp.connection.ControlConnection;
006: import com.coldcore.coloradoftp.connection.DataConnection;
007: import com.coldcore.coloradoftp.connection.DataPortListener;
008: import com.coldcore.coloradoftp.factory.ObjectFactory;
009: import com.coldcore.coloradoftp.factory.ObjectName;
010: import org.apache.log4j.Logger;
011:
012: import java.io.IOException;
013: import java.net.InetSocketAddress;
014: import java.nio.channels.ServerSocketChannel;
015: import java.nio.channels.SocketChannel;
016: import java.util.HashMap;
017: import java.util.Map;
018:
019: /**
020: * @see com.coldcore.coloradoftp.connection.DataPortListener
021: */
022: public class GenericDataPortListener implements DataPortListener,
023: Runnable {
024:
025: private static Logger log = Logger
026: .getLogger(GenericDataPortListener.class);
027: protected int port;
028: protected boolean bound;
029: protected ServerSocketChannel ssc;
030: protected Map<String, ControlConnection> awaiting;
031: protected ConnectionPool dataConnectionPool;
032: protected Reply errorReply;
033: protected Thread thr;
034: protected long sleep;
035:
036: public GenericDataPortListener() {
037: port = -1;
038: sleep = 100L;
039: awaiting = new HashMap<String, ControlConnection>();
040: }
041:
042: protected Reply getErrorReply() {
043: if (errorReply == null) {
044: errorReply = (Reply) ObjectFactory
045: .getObject(ObjectName.REPLY);
046: errorReply.setCode("425");
047: errorReply.setText("Can't open data connection.");
048: }
049: return errorReply;
050: }
051:
052: /** Get thread sleep time
053: * @return Time in mills
054: */
055: public long getSleep() {
056: return sleep;
057: }
058:
059: /** Set thread sleep time
060: * @param sleep Time in mills
061: */
062: public void setSleep(long sleep) {
063: this .sleep = sleep;
064: }
065:
066: public void setPort(int port) {
067: this .port = port;
068: }
069:
070: public int getPort() {
071: return port;
072: }
073:
074: public synchronized void bind() throws IOException {
075: if (port < 1)
076: throw new IllegalArgumentException("Set correct port first");
077:
078: if (bound) {
079: log.warn("Listener on port " + port
080: + " was bound when bind routine was submitted");
081: throw new IllegalStateException(
082: "Unbind the listener on port " + port + " first");
083: }
084:
085: //Get required objects
086: dataConnectionPool = (ConnectionPool) ObjectFactory
087: .getObject(ObjectName.DATA_CONNECTION_POOL);
088:
089: //Bind to the port
090: ssc = ServerSocketChannel.open();
091: ssc.socket().bind(new InetSocketAddress(port));
092:
093: //Start this class
094: thr = new Thread(this );
095: thr.start();
096:
097: bound = true;
098: log.debug("Listener is bound to port " + port);
099: }
100:
101: public synchronized void unbind() throws IOException {
102: if (!bound) {
103: log
104: .warn("Listener on port "
105: + port
106: + " was not bound when unbind routine was submitted");
107: throw new IllegalStateException(
108: "Cannot unbind the listener on port " + port
109: + ", it is not bound");
110: }
111:
112: //Remove all awaiting connections
113: synchronized (awaiting) {
114: for (ControlConnection connection : awaiting.values())
115: removeConnection(connection);
116: }
117:
118: //Unbind from the port
119: if (ssc.isOpen())
120: ssc.close();
121:
122: bound = false;
123: log.debug("Listener on port " + port + " is unbound");
124: }
125:
126: public boolean isBound() {
127: return bound;
128: }
129:
130: public boolean addConnection(ControlConnection connection) {
131: if (!bound)
132: return false;
133:
134: //Clean up the map from unnecessary control connections
135: cleanup();
136:
137: //Add a new one
138: String ip = connection.getSocketChannel().socket()
139: .getInetAddress().getHostAddress();
140: synchronized (awaiting) {
141: ControlConnection con = awaiting.get(ip);
142: if (con != null && con != connection)
143: return false;
144: awaiting.put(ip, connection);
145: return true;
146: }
147: }
148:
149: public boolean removeConnection(ControlConnection connection) {
150: String ip = connection.getSocketChannel().socket()
151: .getInetAddress().getHostAddress();
152: synchronized (awaiting) {
153: ControlConnection c = awaiting.get(ip);
154: if (c == connection) {
155: awaiting.remove(ip);
156: c.reply(getErrorReply());
157: return true;
158: }
159: return false;
160: }
161: }
162:
163: public void run() {
164: while (bound) {
165:
166: ControlConnection controlConnection = null;
167: DataConnection dataConnection = null;
168: SocketChannel sc = null;
169: try {
170: sc = ssc.accept(); //Thread blocks here...
171: String ip = sc.socket().getInetAddress()
172: .getHostAddress();
173: log.debug("New incoming data connection (from " + ip
174: + " on port " + port + ")");
175:
176: //Create new connection instance
177: dataConnection = (DataConnection) ObjectFactory
178: .getObject(ObjectName.DATA_CONNECTION);
179: dataConnection.initialize(sc);
180:
181: //Locate a control connection waiting for this data connection
182: controlConnection = popControlConnection(dataConnection);
183: if (controlConnection == null) {
184: log
185: .warn("No control connection found for an incoming data connection (from "
186: + ip + " on port " + port + ")");
187: dataConnection.destroyNoReply();
188: } else {
189:
190: //If there is a data connection already then kill it
191: DataConnection existing = controlConnection
192: .getDataConnection();
193: if (existing != null && !existing.isDestroyed()) {
194: log
195: .warn("BUG: Replacing existing data connection with a new one!");
196: existing.destroyNoReply();
197: }
198:
199: //Configure the data connection and wire it with the control connection and add to the pool
200: controlConnection.setDataConnection(dataConnection);
201: dataConnection
202: .setControlConnection(controlConnection);
203: configure(dataConnection);
204: dataConnectionPool.add(dataConnection);
205: log.debug("New data connection is ready");
206: }
207:
208: Thread.sleep(sleep);
209:
210: } catch (Throwable e) {
211: log.warn("Failed to accept a connection (ignoring)", e);
212: try {
213: dataConnection.destroyNoReply();
214: } catch (Throwable ex) {
215: }
216: try {
217: sc.close();
218: } catch (Throwable ex) {
219: log.error("Cannot close the channel (ignoring)", e);
220: }
221:
222: //Send error reply
223: if (controlConnection != null)
224: controlConnection.reply(getErrorReply());
225: }
226:
227: }
228: log.debug("Data port listener thread finished");
229: }
230:
231: /** Cleas up the map from connections which should not be in it */
232: protected void cleanup() {
233: synchronized (awaiting) {
234: for (String ip : awaiting.keySet()) {
235: ControlConnection connection = awaiting.get(ip);
236: if (connection.isDestroyed())
237: awaiting.remove(ip);
238: }
239: }
240: }
241:
242: /** Locate a control connection which awaits for a data connection and remove it
243: * @param dataConnection Incoming data connection
244: * @return Control connection or NULL if a control connection cannot be located and the data connection should be dropped
245: */
246: protected ControlConnection popControlConnection(
247: DataConnection dataConnection) {
248: String dip = dataConnection.getSocketChannel().socket()
249: .getInetAddress().getHostAddress();
250: synchronized (awaiting) {
251: for (String ip : awaiting.keySet()) {
252: if (ip.equals(dip)) {
253: ControlConnection controlConnection = awaiting
254: .remove(ip);
255: return controlConnection.isDestroyed() ? null
256: : controlConnection;
257: }
258: }
259: }
260: return null;
261: }
262:
263: /** Configure connection before adding it to a pool
264: * @param connection Connection
265: */
266: public void configure(DataConnection connection) {
267: }
268: }
|