001: package com.coldcore.coloradoftp.connection.impl;
002:
003: import com.coldcore.coloradoftp.command.Command;
004: import com.coldcore.coloradoftp.command.CommandFactory;
005: import com.coldcore.coloradoftp.command.CommandProcessor;
006: import com.coldcore.coloradoftp.command.Reply;
007: import com.coldcore.coloradoftp.connection.*;
008: import com.coldcore.coloradoftp.factory.ObjectFactory;
009: import com.coldcore.coloradoftp.factory.ObjectName;
010: import com.coldcore.coloradoftp.session.Session;
011: import com.coldcore.coloradoftp.session.SessionAttributeName;
012: import org.apache.log4j.Logger;
013:
014: import java.io.ByteArrayOutputStream;
015: import java.nio.ByteBuffer;
016: import java.nio.channels.SocketChannel;
017: import java.nio.channels.Channel;
018:
019: /**
020: * @see com.coldcore.coloradoftp.connection.Connection
021: *
022: * Control connections usualy do not eat much network traffic (that is a job of
023: * data connections), so there is no need to control network overhead for this type
024: * of connection. Control connection always have lower priority than data connections
025: * and their execution does not take place on every round of lyfe cycle thread.
026: */
027: public class GenericControlConnection extends GenericConnection
028: implements ControlConnection {
029:
030: private static Logger log = Logger
031: .getLogger(GenericControlConnection.class);
032: protected ByteArrayOutputStream warray;
033: protected ByteArrayOutputStream rarray;
034: protected boolean rarrayComplete;
035: protected StringBuffer incomingBuffer;
036: protected StringBuffer outgoingBuffer;
037: protected boolean interruptState;
038: protected CommandProcessor commandProcessor;
039: protected CommandFactory commandFactory;
040: protected Session session;
041: protected DataConnection dataConnection;
042: protected DataConnectionInitiator dataConnectionInitiator;
043: protected boolean utf8;
044:
045: public static final String CHARSET_UTF8 = "UTF-8";
046: public static final String CHARSET_ASCII = "US-ASCII";
047:
048: public static final char UTF8_MAGIC_NUMBER = (char) 65279;
049:
050: public GenericControlConnection(int bufferSize) {
051: super ();
052:
053: utf8 = true;
054:
055: incomingBuffer = new StringBuffer();
056: outgoingBuffer = new StringBuffer();
057:
058: rbuffer = ByteBuffer.allocate(bufferSize);
059: rbuffer.flip();
060:
061: wbuffer = ByteBuffer.allocate(bufferSize);
062: wbuffer.flip();
063:
064: warray = new ByteArrayOutputStream();
065: rarray = new ByteArrayOutputStream();
066: }
067:
068: public synchronized void initialize(SocketChannel channel) {
069: super .initialize(channel);
070:
071: //ObjectFactory cannot be used in a constructor, so we create the rest of the objects here
072: commandProcessor = (CommandProcessor) ObjectFactory
073: .getObject(ObjectName.COMMAND_PROCESSOR);
074: commandFactory = (CommandFactory) ObjectFactory
075: .getObject(ObjectName.COMMAND_FACTORY);
076: session = (Session) ObjectFactory.getObject(ObjectName.SESSION);
077: dataConnectionInitiator = (DataConnectionInitiator) ObjectFactory
078: .getObject(ObjectName.DATA_CONNECTION_INITIATOR);
079: dataConnectionInitiator.setControlConnection(this );
080:
081: startReaderThread();
082: startWriterThread();
083: }
084:
085: /** Try to flush the content of the read array to the incoming buffer.
086: * @return TRUE is flushed, FALSE otherwise
087: */
088: protected boolean flushReadArray() throws Exception {
089: if (!rarrayComplete)
090: return false;
091:
092: //Decode and feed collected bytes to the incoming buffer (also remove the UTF-8 magic number if present)
093: String s = new String(rarray.toByteArray(), utf8 ? CHARSET_UTF8
094: : CHARSET_ASCII);
095: char[] carr = s.toCharArray();
096: synchronized (incomingBuffer) {
097: for (char c : carr)
098: if (c != UTF8_MAGIC_NUMBER)
099: incomingBuffer.append(c);
100: }
101:
102: //Reset the read array
103: rarray.reset();
104: rarrayComplete = false;
105:
106: return true;
107: }
108:
109: /** Decode and write the next part of user's input to the incoming buffer.
110: * Used by the "read" routine to receive UFT-8 bytes.
111: * @param arr Byte array containing the next part of user's input
112: * @param stopIndex User's input stops at this index in the byte array
113: */
114: protected void pushIncomingBuffer(byte[] arr, int stopIndex)
115: throws Exception {
116: /* We will feed user input to the read array one byte at a time till we hit any 1-byte character.
117: * Only then the bytes in the read array may be safely decoded to UTF-8 (where one char may be
118: * represented by 2-3 bytes) and written to the incoming buffer. For performance we will not flush
119: * the read array every time we encounter 1-byte character, but we will do so before any 2-3 bytes
120: * UTF-8 character and in the end if possible (rarrayComplete). If the read array cannot be decoded
121: * then the bytes in it will wait till the next "read" routine.
122: */
123: for (int z = 0; z < stopIndex; z++) {
124: byte b = arr[z];
125:
126: //x00-x7F is a 1-byte character (UTF-8/ASCII), otherwise the character takes 2-3 bytes
127: if (b >= 0 && b <= 127) {
128: //Add the byte to the read array and proceed (read array may now be decoded)
129: rarray.write(b);
130: rarrayComplete = true;
131: } else {
132: //Try to flush the read array then add the byte to it and proceed
133: flushReadArray();
134: rarray.write(b);
135: }
136: }
137:
138: //Try to flush the read array one more time in the end
139: flushReadArray();
140: }
141:
142: protected void read() throws Exception {
143: /* We must not read anything if:
144: * 1. There is some data in outgoing buffer waiting to be send to the user
145: * 2. User did not receive a welcome message yet and it is not yet in the outgoing buffer
146: * 3. Connection is poisoned
147: */
148: if (getOutgoingBufferSize() > 0 || bytesWrote == 0 || poisoned) {
149: Thread.sleep(sleep);
150: return;
151: }
152:
153: //Read data from socket and append it to the incoming buffer.
154: rbuffer.clear();
155: int i = sc.read(rbuffer); //Thread blocks here...
156:
157: //Client disconnected?
158: if (i == -1)
159: throw new BrokenPipeException();
160:
161: bytesRead += i;
162: log.debug("Read from socket " + i + " bytes (total "
163: + bytesRead + ")");
164:
165: //This will add user input to the incoming buffer decoded with proper charset
166: byte[] barr = rbuffer.array();
167: pushIncomingBuffer(barr, i);
168:
169: //Execute commands waiting in the buffer
170: executeCommands();
171: }
172:
173: /** Encode and return the next part of server's response from the the outgoing buffer.
174: * Used by the "write" routine to send UFT-8 bytes.
175: * @param maxBytes Byte array size limit
176: * @return Encoded byte array not longer than the limit or NULL if there is nothing to write
177: */
178: protected byte[] popOutgoingBuffer(int maxBytes) throws Exception {
179: byte[] barr;
180: if (warray.size() > 0) {
181:
182: //Reminder from the last output is still pending
183: barr = warray.toByteArray();
184: warray.reset();
185:
186: } else {
187:
188: //Get a string from the outgoing buffer to write into socket
189: String str;
190: int end = maxBytes;
191: synchronized (outgoingBuffer) {
192: //Correct sub-string length (if current length is longer than available data size)
193: if (end > outgoingBuffer.length())
194: end = outgoingBuffer.length();
195: if (end == 0)
196: return null; //Nothing to write
197: str = outgoingBuffer.substring(0, end);
198: //Remove this string from the outgoing buffer
199: outgoingBuffer.delete(0, end);
200: }
201:
202: //Convert to byte array, the length of the byte array may be greater than the string length (UTF-8 encoding)
203: barr = str.getBytes(utf8 ? CHARSET_UTF8 : CHARSET_ASCII);
204:
205: }
206:
207: //Will the new array fit into the buffer?
208: if (barr.length > maxBytes) {
209: warray.write(barr, maxBytes, barr.length - maxBytes); //This will not fit into the socket buffer, save it for the next "write"
210: byte[] trg = new byte[maxBytes];
211: System.arraycopy(barr, 0, trg, 0, maxBytes);
212: return trg;
213: } else {
214: return barr;
215: }
216: }
217:
218: protected void write() throws Exception {
219:
220: //Read more data from the outgoing buffer into the buffer only if the buffer is empty
221: if (!wbuffer.hasRemaining()) {
222:
223: //This will get server response from the outgoing buffer encoded with proper charset
224: int cap = wbuffer.capacity();
225: byte[] barr = popOutgoingBuffer(cap);
226:
227: //Nothing to write?
228: if (barr == null) {
229: Thread.sleep(sleep);
230: return;
231: }
232:
233: //Write out to the buffer
234: wbuffer.clear();
235: wbuffer.put(barr);
236: wbuffer.flip();
237: }
238:
239: //Forward the data to the user
240: int i = sc.write(wbuffer); //Thread blocks here...
241:
242: //Client disconnected?
243: if (i == -1)
244: throw new BrokenPipeException();
245:
246: bytesWrote += i;
247: log.debug("Wrote into socket " + i + " bytes (total "
248: + bytesWrote + ")");
249: }
250:
251: public void service() throws Exception {
252: /* If connection has been poisoned then we can destroy it only when it writes all data out.
253: * We cannot kill it while it has an active data connection.
254: * We cannot kill if it did not write a welcome message yet.
255: */
256: if (poisoned) {
257: boolean kill = true;
258:
259: if (dataConnection != null && !dataConnection.isDestroyed())
260: kill = false;
261:
262: if (getOutgoingBufferSize() > 0)
263: kill = false;
264:
265: if (bytesWrote == 0)
266: kill = false;
267:
268: if (kill)
269: throw new PoisonedException();
270: }
271: }
272:
273: /** Execute commands waiting in the incoming buffer */
274: protected void executeCommands() throws Exception {
275: while (true) {
276: Command command = getNextCommand();
277: if (command == null)
278: break;
279: commandProcessor.execute(command);
280: }
281: }
282:
283: /** Reads next user command from the incoming buffer
284: * @return Command or NULL if it's not ready yet
285: */
286: protected Command getNextCommand() throws Exception {
287: //Extract the next command from buffer
288: String input;
289: synchronized (incomingBuffer) {
290: int i = incomingBuffer.indexOf("\r\n");
291: if (i == -1)
292: return null; //Nothing to extraxt yet (the line is not finished)
293: input = incomingBuffer.substring(0, i);
294: incomingBuffer.delete(0, i + 2); //Also delete \r\n in the end of the command
295: if (input.trim().length() == 0)
296: return null; //This is an empty string, skip it
297: log.debug("Extracted user input: " + input);
298: }
299:
300: Command command = commandFactory.create(input);
301: command.setConnection(this );
302:
303: //If INTERRUPT state is set then ignore all but special FTP commands (same for the poisoned).
304: if (interruptState && !command.processInInterruptState()) {
305: log
306: .debug("Execution of the command is not allowed while the connection is in INTERRUPT state (dropping command)");
307: return null;
308: }
309: if (poisoned && !command.processInInterruptState()) {
310: log
311: .debug("Execution of the command is not allowed while the connection is poisoned (dropping command)");
312: return null;
313: }
314:
315: return command;
316: }
317:
318: public synchronized void reply(Reply reply) {
319: //Prepare reply and write it out
320: String prepared = reply.prepare();
321: synchronized (outgoingBuffer) {
322: outgoingBuffer.append(prepared);
323: }
324: log.debug("Prepared reply: " + prepared.trim());
325:
326: /* Change "interrupt" state: if code starts with "1" then set it, otherwise unset.
327: * FTP spec: all codes that start with 1 demand client to wait for another reply.
328: */
329: if (reply.getCode().startsWith("1")) {
330: interruptState = true;
331: log.debug("Reply has triggered INTERRUPT state");
332: } else if (interruptState) {
333: //Check if reply's command can clear INTERRUPT state, otherwise leave the state as it is
334: Command command = reply.getCommand();
335: if (command == null || command.canClearInterruptState()) {
336: interruptState = false;
337: log.debug("Reply has cleared INTERRUPT state");
338: }
339: }
340: }
341:
342: public Session getSession() {
343: return session;
344: }
345:
346: public DataConnection getDataConnection() {
347: return dataConnection;
348: }
349:
350: public void setDataConnection(DataConnection dataConnection) {
351: this .dataConnection = dataConnection;
352: }
353:
354: public synchronized void destroy() {
355: //Abort data connection initiator if active
356: if (dataConnectionInitiator.isActive())
357: dataConnectionInitiator.abort();
358:
359: //Destory data connection if exists
360: if (dataConnection != null)
361: dataConnection.destroy();
362:
363: //Test if there is data channel left in the session
364: closeSessionDataChannel();
365:
366: super .destroy();
367: }
368:
369: /** Close a data channel if exists in the session */
370: protected void closeSessionDataChannel() {
371: Channel odc = (Channel) session
372: .getAttribute(SessionAttributeName.DATA_CONNECTION_CHANNEL);
373: if (odc != null) {
374: log.debug("Attempting to close data channel in session");
375: session
376: .removeAttribute(SessionAttributeName.DATA_CONNECTION_CHANNEL);
377: try {
378: odc.close();
379: } catch (Throwable e) {
380: log.error("Error closing data channel (ignoring)", e);
381: }
382: }
383: }
384:
385: public DataConnectionInitiator getDataConnectionInitiator() {
386: return dataConnectionInitiator;
387: }
388:
389: public int getOutgoingBufferSize() {
390: synchronized (outgoingBuffer) {
391: return outgoingBuffer.length();
392: }
393: }
394:
395: public int getIncomingBufferSize() {
396: synchronized (incomingBuffer) {
397: return incomingBuffer.length();
398: }
399: }
400:
401: public boolean isUtf8() {
402: return utf8;
403: }
404:
405: public void setUtf8(boolean utf8) {
406: this.utf8 = utf8;
407: }
408: }
|