001: package com.coldcore.coloradoftp.connection.impl;
002:
003: import com.coldcore.coloradoftp.command.Reply;
004: import com.coldcore.coloradoftp.connection.ConnectionPool;
005: import com.coldcore.coloradoftp.connection.ControlConnection;
006: import com.coldcore.coloradoftp.connection.DataConnection;
007: import com.coldcore.coloradoftp.connection.DataConnectionInitiator;
008: import com.coldcore.coloradoftp.factory.ObjectFactory;
009: import com.coldcore.coloradoftp.factory.ObjectName;
010: import com.coldcore.coloradoftp.session.Session;
011: import com.coldcore.coloradoftp.session.SessionAttributeName;
012: import org.apache.log4j.Logger;
013:
014: import java.net.InetSocketAddress;
015: import java.nio.channels.SocketChannel;
016:
017: /**
018: * @see com.coldcore.coloradoftp.connection.DataConnectionInitiator
019: */
020: public class GenericDataConnectionInitiator implements
021: DataConnectionInitiator, Runnable {
022:
023: private static Logger log = Logger
024: .getLogger(GenericDataConnectionInitiator.class);
025: protected String ip;
026: protected int port;
027: protected boolean active;
028: protected ControlConnection controlConnection;
029: protected ConnectionPool dataConnectionPool;
030: protected SocketChannel sc;
031: protected Reply errorReply;
032: protected Thread thr;
033: protected long sleep;
034: protected boolean aborted;
035:
036: public GenericDataConnectionInitiator() {
037: sleep = 100L;
038: }
039:
040: protected Reply getErrorReply() {
041: if (errorReply == null) {
042: errorReply = (Reply) ObjectFactory
043: .getObject(ObjectName.REPLY);
044: errorReply.setCode("425");
045: errorReply.setText("Can't open data connection.");
046: }
047: return errorReply;
048: }
049:
050: /** Get thread sleep time
051: * @return Time in mills
052: */
053: public long getSleep() {
054: return sleep;
055: }
056:
057: /** Set thread sleep time
058: * @param sleep Time in mills
059: */
060: public void setSleep(long sleep) {
061: this .sleep = sleep;
062: }
063:
064: /** Test if user got a "150" reply
065: * @return TRUE if user got the reply, FALSE if not yet
066: */
067: protected boolean isReply150() {
068: Session session = controlConnection.getSession();
069: Long bytesWrote = (Long) session
070: .getAttribute(SessionAttributeName.BYTE_MARKER_150_REPLY);
071: if (bytesWrote == null
072: || controlConnection.getBytesWrote() == bytesWrote
073: || controlConnection.getOutgoingBufferSize() != 0)
074: return false;
075: log.debug("User got a 150 reply");
076: return true;
077: }
078:
079: public String getIp() {
080: return ip;
081: }
082:
083: public void setIp(String ip) {
084: this .ip = ip;
085: }
086:
087: public int getPort() {
088: return port;
089: }
090:
091: public void setPort(int port) {
092: this .port = port;
093: }
094:
095: public void run() {
096: while (active) {
097:
098: DataConnection dataConnection = null;
099: try {
100:
101: /* We cannot open the socket yet. We must wait until user receives the positive "150" reply.
102: * The reply might not be in the buffer of the control connection just yet.
103: */
104: if (!isReply150()) {
105: Thread.sleep(sleep);
106: continue;
107: }
108:
109: //Get required objects
110: dataConnectionPool = (ConnectionPool) ObjectFactory
111: .getObject(ObjectName.DATA_CONNECTION_POOL);
112:
113: //Configure socket and connect
114: sc = SocketChannel.open();
115: sc.connect(new InetSocketAddress(ip, port)); //Thread blocks here...
116: if (!sc.finishConnect())
117: throw new RuntimeException("Failed finishConnect");
118: String ip = sc.socket().getInetAddress()
119: .getHostAddress();
120: log.debug("New data connection established (IP " + ip
121: + ")");
122:
123: //Create new connection instance
124: dataConnection = (DataConnection) ObjectFactory
125: .getObject(ObjectName.DATA_CONNECTION);
126: dataConnection.initialize(sc);
127:
128: //If there is a data connection already then kill it
129: DataConnection existing = controlConnection
130: .getDataConnection();
131: if (existing != null && !existing.isDestroyed()) {
132: log
133: .warn("BUG: Replacing existing data connection with a new one!");
134: existing.destroyNoReply();
135: }
136:
137: //Configure the data connection and wire it with the control connection and add to pool
138: controlConnection.setDataConnection(dataConnection);
139: dataConnection.setControlConnection(controlConnection);
140: configure(dataConnection);
141: dataConnectionPool.add(dataConnection);
142: log.debug("New data connection is ready");
143:
144: active = false;
145:
146: } catch (Throwable e) {
147:
148: //If aborted then do not post an error message
149: if (!aborted) {
150: log.warn("Failed to establish a connection with "
151: + ip + ":" + port + " (ignoring)", e);
152: try {
153: dataConnection.destroyNoReply();
154: } catch (Throwable ex) {
155: }
156: try {
157: sc.close();
158: } catch (Throwable ex) {
159: log.error(
160: "Cannot close the channel (ignoring)",
161: e);
162: }
163:
164: controlConnection.reply(getErrorReply());
165: }
166:
167: active = false;
168: }
169:
170: }
171: log.debug("Data connection initiator thread finished");
172: }
173:
174: public boolean isActive() {
175: return active;
176: }
177:
178: public synchronized void activate() {
179: if (active) {
180: log
181: .warn("Data connection initiator was active when activate routine was called");
182: return;
183: }
184:
185: active = true;
186: aborted = false;
187:
188: //Start this class
189: thr = new Thread(this );
190: thr.start();
191: }
192:
193: public synchronized void abort() {
194: aborted = true;
195: if (!active)
196: return;
197:
198: //Close the channel
199: try {
200: if (sc != null && sc.isOpen())
201: sc.close();
202: } catch (Throwable e) {
203: log.error("Cannot close channel (ignoring)", e);
204: }
205:
206: controlConnection.reply(getErrorReply());
207:
208: //Clear the attribute to prevent misuse by future instances
209: Session session = controlConnection.getSession();
210: session
211: .removeAttribute(SessionAttributeName.BYTE_MARKER_150_REPLY);
212:
213: active = false;
214: }
215:
216: public ControlConnection getControlConnection() {
217: return controlConnection;
218: }
219:
220: public void setControlConnection(ControlConnection controlConnection) {
221: this .controlConnection = controlConnection;
222: }
223:
224: /** Configure connection before adding it to a pool
225: * @param connection Connection
226: */
227: public void configure(DataConnection connection) {
228: }
229: }
|