001: package com.coldcore.coloradoftp.connection.impl;
002:
003: import com.coldcore.coloradoftp.command.Command;
004: import com.coldcore.coloradoftp.command.CommandProcessor;
005: import com.coldcore.coloradoftp.connection.ConnectionPool;
006: import com.coldcore.coloradoftp.connection.ControlConnection;
007: import com.coldcore.coloradoftp.connection.ControlConnector;
008: import com.coldcore.coloradoftp.core.Core;
009: import com.coldcore.coloradoftp.core.CoreStatus;
010: import com.coldcore.coloradoftp.factory.ObjectFactory;
011: import com.coldcore.coloradoftp.factory.ObjectName;
012: import org.apache.log4j.Logger;
013:
014: import java.io.IOException;
015: import java.net.InetSocketAddress;
016: import java.nio.channels.ServerSocketChannel;
017: import java.nio.channels.SocketChannel;
018:
019: /**
020: * @see com.coldcore.coloradoftp.connection.ControlConnector
021: */
022: public class GenericControlConnector implements ControlConnector,
023: Runnable {
024:
025: private static Logger log = Logger
026: .getLogger(GenericControlConnector.class);
027: protected ServerSocketChannel ssc;
028: protected int port;
029: protected boolean bound;
030: protected Core core;
031: protected CommandProcessor commandProcessor;
032: protected ConnectionPool controlConnectionPool;
033: protected Thread thr;
034: protected long sleep;
035:
036: public GenericControlConnector() {
037: port = 21;
038: sleep = 100L;
039: }
040:
041: /** Configure connection before adding it to a pool
042: * @param connection Connection
043: */
044: public void configure(ControlConnection connection) {
045: if (core.getStatus() == CoreStatus.POISONED) {
046: //Server is shutting down, reply and poison the connection
047: Command command = (Command) ObjectFactory
048: .getObject(ObjectName.COMMAND_POISONED);
049: command.setConnection(connection);
050: commandProcessor.execute(command);
051: connection.poison();
052: } else {
053: //Server is ready, reply so
054: Command command = (Command) ObjectFactory
055: .getObject(ObjectName.COMMAND_WELCOME);
056: command.setConnection(connection);
057: commandProcessor.execute(command);
058: }
059: }
060:
061: public synchronized void bind() throws IOException {
062: if (bound) {
063: log.warn("Connector on port " + port
064: + " was bound when bind routine was submitted");
065: throw new IllegalStateException(
066: "Unbind the connector on port " + port + " first");
067: }
068:
069: //Get required objects
070: core = (Core) ObjectFactory.getObject(ObjectName.CORE);
071: commandProcessor = (CommandProcessor) ObjectFactory
072: .getObject(ObjectName.COMMAND_PROCESSOR);
073: controlConnectionPool = (ConnectionPool) ObjectFactory
074: .getObject(ObjectName.CONTROL_CONNECTION_POOL);
075:
076: //Bind to the port
077: ssc = ServerSocketChannel.open();
078: ssc.socket().bind(new InetSocketAddress(port));
079:
080: //Start this class
081: thr = new Thread(this );
082: thr.start();
083:
084: bound = true;
085: log.info("Connector is bound to port " + port);
086: }
087:
088: public synchronized void unbind() throws IOException {
089: if (!bound) {
090: log
091: .warn("Connector on port "
092: + port
093: + " was not bound when unbind routine was submitted");
094: throw new IllegalStateException(
095: "Cannot unbind the connector on port " + port
096: + ", it is not bound");
097: }
098:
099: //Unbind from the port
100: if (ssc.isOpen())
101: ssc.close();
102:
103: //Wait for this class to stop (just in case)
104: try {
105: thr.join(30000);
106: } catch (Throwable e) {
107: }
108:
109: bound = false;
110: log.info("Connector on port " + port + " is unbound");
111: }
112:
113: public boolean isBound() {
114: return bound;
115: }
116:
117: public void run() {
118: while (bound) {
119:
120: ControlConnection connection = null;
121: SocketChannel sc = null;
122: try {
123: sc = ssc.accept(); //Thread blocks here...
124: String ip = sc.socket().getInetAddress()
125: .getHostAddress();
126: log.debug("New control connection accepted (IP " + ip
127: + ")");
128:
129: //Create new connection instance and initialize it
130: connection = (ControlConnection) ObjectFactory
131: .getObject(ObjectName.CONTROL_CONNECTION);
132: connection.initialize(sc);
133:
134: //Configure the control connection and add to pool
135: configure(connection);
136: controlConnectionPool.add(connection);
137: log.debug("New control connection is ready");
138:
139: Thread.sleep(sleep);
140:
141: } catch (Throwable e) {
142: log.warn("Failed to accept a connection (ignoring)", e);
143: try {
144: connection.destroy();
145: } catch (Throwable ex) {
146: }
147: try {
148: sc.close();
149: } catch (Throwable ex) {
150: }
151: }
152:
153: }
154: log.debug("Control connector thread finished");
155: }
156:
157: public void setPort(int port) {
158: if (port < 1)
159: throw new IllegalArgumentException("Invalid port");
160: this .port = port;
161: }
162:
163: public int getPort() {
164: return port;
165: }
166:
167: /** Get thread sleep time
168: * @return Time in mills
169: */
170: public long getSleep() {
171: return sleep;
172: }
173:
174: /** Set thread sleep time
175: * @param sleep Time in mills
176: */
177: public void setSleep(long sleep) {
178: this.sleep = sleep;
179: }
180: }
|