001: package com.coldcore.coloradoftp.connection.impl;
002:
003: import com.coldcore.coloradoftp.command.Reply;
004: import com.coldcore.coloradoftp.connection.*;
005: import com.coldcore.coloradoftp.factory.ObjectFactory;
006: import com.coldcore.coloradoftp.factory.ObjectName;
007: import com.coldcore.coloradoftp.session.Session;
008: import com.coldcore.coloradoftp.session.SessionAttributeName;
009: import org.apache.log4j.Logger;
010:
011: import java.nio.ByteBuffer;
012: import java.nio.channels.ReadableByteChannel;
013: import java.nio.channels.WritableByteChannel;
014: import java.nio.channels.Channel;
015:
016: /**
017: * @see com.coldcore.coloradoftp.connection.DataConnection
018: */
019: public class GenericDataConnection extends GenericConnection implements
020: DataConnection {
021:
022: private static Logger log = Logger
023: .getLogger(GenericDataConnection.class);
024: protected ControlConnection controlConnection;
025: protected ReadableByteChannel rbc;
026: protected WritableByteChannel wbc;
027: protected DataConnectionMode mode;
028: protected String filename;
029: protected boolean userAborted;
030: protected boolean successful;
031: protected boolean skipReply;
032: protected DataConnectionCallback callback;
033:
034: public GenericDataConnection(int bufferSize) {
035: super ();
036:
037: //rbuffer = ByteBuffer.allocateDirect(bufferSize);
038: rbuffer = ByteBuffer.allocate(bufferSize);
039: rbuffer.flip();
040: }
041:
042: /** Read data from user */
043: protected void read() throws Exception {
044: /* We will read data from the user and write it into the channel until the user
045: * disconnects. There is no way to check if a complete file has been uploaded,
046: * so we assume that every transfer is a success.
047: */
048:
049: //Read data from user into the buffer if the buffer is empty
050: if (!rbuffer.hasRemaining()) {
051: rbuffer.clear();
052: int i = sc.read(rbuffer); //Thread blocks here...
053: rbuffer.flip();
054:
055: //Client disconnected?
056: if (i == -1) {
057: successful = true;
058: throw new TransferCompleteException();
059: }
060:
061: bytesRead += i;
062: log.debug("Read from socket " + i + " bytes (total "
063: + bytesRead + ")");
064: }
065:
066: //Forward the data into the channel
067: wbc.write(rbuffer);
068: }
069:
070: /** Write data to user */
071: protected void write() throws Exception {
072: /* We wiil read data from the channel and write it to the user until the
073: * channel is empty (successful transfer). If user disconnects earlier than
074: * all data is transferred then the transfer has failed.
075: */
076:
077: //Read the data from the channel into the buffer if the buffer is empty
078: if (!rbuffer.hasRemaining()) {
079: rbuffer.clear();
080: int i = rbc.read(rbuffer);
081: rbuffer.flip();
082:
083: //File done?
084: if (i == -1) {
085: successful = true;
086: throw new TransferCompleteException();
087: }
088: }
089:
090: //Forward the data to the user
091: int i = sc.write(rbuffer); //Thread blocks here...
092:
093: //Client disconnected?
094: if (i == -1)
095: throw new TransferAbortedException();
096:
097: bytesWrote += i;
098: log.debug("Wrote into socket " + i + " bytes (total "
099: + bytesWrote + ")");
100: }
101:
102: /** Activate the connection if not active yet */
103: protected void activate() {
104: /* The connection will start to function as soon as it gets MODE and CHANNEL from
105: * user session (we must get CHANNEL last as it starts read/write routines).
106: * Those attributes then have to be removed or the next data connection will use them as well.
107: * There is also a FILENAME attribute for file operations.
108: */
109:
110: if (rbc != null || wbc != null)
111: return;
112:
113: if (mode == null) {
114: Session session = controlConnection.getSession();
115: mode = (DataConnectionMode) session
116: .getAttribute(SessionAttributeName.DATA_CONNECTION_MODE);
117: if (mode != null) {
118: log.debug("Mode extracted from user session");
119: }
120: }
121:
122: //Mode first
123: if (mode == null)
124: return;
125:
126: if (filename == null) {
127: Session session = controlConnection.getSession();
128: filename = (String) session
129: .getAttribute(SessionAttributeName.DATA_CONNECTION_FILENAME);
130: if (filename != null) {
131: log.debug("Filename extracted from user session");
132: }
133: }
134:
135: //Filename second
136: if (mode != DataConnectionMode.LIST && filename == null)
137: return;
138:
139: //Channel third (also start an appropriate thread)
140: if (rbc == null && wbc == null) {
141: Session session = controlConnection.getSession();
142: if (mode == DataConnectionMode.LIST
143: || mode == DataConnectionMode.RETR) {
144: rbc = (ReadableByteChannel) session
145: .getAttribute(SessionAttributeName.DATA_CONNECTION_CHANNEL);
146: startWriterThread(); //To write data to user
147: } else {
148: wbc = (WritableByteChannel) session
149: .getAttribute(SessionAttributeName.DATA_CONNECTION_CHANNEL);
150: startReaderThread(); //To read data from user
151: }
152: if (rbc != null || wbc != null) {
153: log
154: .debug("Channel extracted from user session (data transfer begins)");
155: }
156: }
157: }
158:
159: public void service() throws Exception {
160: //User aborted the transfer
161: if (userAborted)
162: throw new TransferAbortedException();
163:
164: //Try to activate the data transfer
165: activate();
166: }
167:
168: /** Close data channel */
169: protected void closeDataChannel() {
170: Session session = controlConnection.getSession();
171: Channel odc = (Channel) session
172: .getAttribute(SessionAttributeName.DATA_CONNECTION_CHANNEL);
173: session
174: .removeAttribute(SessionAttributeName.DATA_CONNECTION_FILENAME);
175: try {
176: if (odc != null)
177: odc.close();
178: } catch (Throwable e) {
179: log.error("Error closing data channel (ignoring)", e);
180: }
181: }
182:
183: /** Send reply to user upon connection termination */
184: protected void reply() {
185: try {
186: //Transfer aborted by user - send "426" and then "226"
187: if (userAborted) {
188: Reply reply = (Reply) ObjectFactory
189: .getObject(ObjectName.REPLY);
190: reply.setCode("426");
191: reply.setText("Connection closed, transfer aborted.");
192: controlConnection.reply(reply);
193:
194: reply = (Reply) ObjectFactory
195: .getObject(ObjectName.REPLY);
196: reply.setCode("226");
197: reply.setText("Abort command successful.");
198: controlConnection.reply(reply);
199:
200: log.debug("User aborted data transfer");
201: return;
202: }
203:
204: //Transfer failed
205: if (!successful) {
206: Reply reply = (Reply) ObjectFactory
207: .getObject(ObjectName.REPLY);
208: reply.setCode("426");
209: reply.setText("Connection closed, transfer aborted.");
210: controlConnection.reply(reply);
211:
212: log.debug("Data transfer failed");
213: return;
214: }
215:
216: //Transfer OK (note that STOU has a different code)
217: Reply reply = (Reply) ObjectFactory
218: .getObject(ObjectName.REPLY);
219: if (mode == DataConnectionMode.STOU)
220: reply.setCode("250");
221: else
222: reply.setCode("226");
223:
224: if (mode == DataConnectionMode.LIST) {
225: reply.setText("Transfer completed.");
226: } else {
227: //Encode double-quated in the filename
228: String encf = filename.replaceAll("\"", "\"\"");
229: reply.setText("Transfer completed for \"" + encf
230: + "\".");
231: }
232: controlConnection.reply(reply);
233:
234: log.debug("Data transfer successful");
235:
236: } catch (Throwable e) {
237: log.error("Error sending completion reply (ignoring)", e);
238: }
239: }
240:
241: public synchronized void destroy() {
242: if (controlConnection != null) {
243: closeDataChannel();
244:
245: //Hook for post-upload/download logic via a callback
246: if (!skipReply && callback != null)
247: try {
248: if (successful)
249: callback.onTransferComplete(this );
250: else
251: callback.onTransferAbort(this );
252: } catch (Throwable e) {
253: log.error("Callback error (ignoring)", e);
254: }
255:
256: //When data transfer finishes, a reply must be send to a user
257: if (!skipReply)
258: reply();
259:
260: //Clear the attributes to prevent misuse by future instances
261: Session session = controlConnection.getSession();
262: session
263: .removeAttribute(SessionAttributeName.DATA_CONNECTION_MODE);
264: session
265: .removeAttribute(SessionAttributeName.DATA_CONNECTION_CHANNEL);
266:
267: //Clear control connection reference
268: controlConnection.setDataConnection(null);
269: } else {
270: log.warn("No control connection in the destroy method");
271: }
272:
273: super .destroy();
274: }
275:
276: public void destroyNoReply() {
277: skipReply = true;
278: destroy();
279: }
280:
281: public void abort() {
282: userAborted = true;
283: }
284:
285: public ControlConnection getControlConnection() {
286: return controlConnection;
287: }
288:
289: public void setControlConnection(ControlConnection controlConnection) {
290: this .controlConnection = controlConnection;
291: }
292:
293: public DataConnectionCallback getDataConnectionCallback() {
294: return callback;
295: }
296:
297: public void setDataConnectionCallback(
298: DataConnectionCallback callback) {
299: this.callback = callback;
300: }
301: }
|