001: /*
002: * $Id: FileMessageReceiver.java 10961 2008-02-22 19:01:02Z dfeist $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.transport.file;
012:
013: import org.mule.DefaultMuleMessage;
014: import org.mule.api.DefaultMuleException;
015: import org.mule.api.MuleException;
016: import org.mule.api.endpoint.InboundEndpoint;
017: import org.mule.api.lifecycle.CreateException;
018: import org.mule.api.routing.RoutingException;
019: import org.mule.api.service.Service;
020: import org.mule.api.transport.Connector;
021: import org.mule.api.transport.MessageAdapter;
022: import org.mule.transport.AbstractPollingMessageReceiver;
023: import org.mule.transport.ConnectException;
024: import org.mule.transport.DefaultMessageAdapter;
025: import org.mule.transport.file.i18n.FileMessages;
026: import org.mule.util.FileUtils;
027:
028: import java.io.File;
029: import java.io.FileFilter;
030: import java.io.FileNotFoundException;
031: import java.io.FilenameFilter;
032: import java.io.IOException;
033: import java.io.RandomAccessFile;
034: import java.nio.channels.FileChannel;
035: import java.nio.channels.FileLock;
036: import java.util.Comparator;
037:
038: import edu.emory.mathcs.backport.java.util.Arrays;
039:
040: import org.apache.commons.collections.comparators.ReverseComparator;
041:
042: /**
043: * <code>FileMessageReceiver</code> is a polling listener that reads files from a
044: * directory.
045: */
046:
047: public class FileMessageReceiver extends AbstractPollingMessageReceiver {
048: public static final String COMPARATOR_CLASS_NAME_PROPERTY = "comparator";
049: public static final String COMPARATOR_REVERSE_ORDER_PROPERTY = "reverseOrder";
050:
051: private String readDir = null;
052: private String moveDir = null;
053: private File readDirectory = null;
054: private File moveDirectory = null;
055: private String moveToPattern = null;
056: private FilenameFilter filenameFilter = null;
057: private FileFilter fileFilter = null;
058:
059: public FileMessageReceiver(Connector connector, Service service,
060: InboundEndpoint endpoint, String readDir, String moveDir,
061: String moveToPattern, long frequency)
062: throws CreateException {
063: super (connector, service, endpoint);
064: this .setFrequency(frequency);
065:
066: this .readDir = readDir;
067: this .moveDir = moveDir;
068: this .moveToPattern = moveToPattern;
069:
070: if (endpoint.getFilter() instanceof FilenameFilter) {
071: filenameFilter = (FilenameFilter) endpoint.getFilter();
072: } else if (endpoint.getFilter() instanceof FileFilter) {
073: fileFilter = (FileFilter) endpoint.getFilter();
074: } else if (endpoint.getFilter() != null) {
075: throw new CreateException(FileMessages
076: .invalidFileFilter(endpoint.getEndpointURI()), this );
077: }
078: }
079:
080: protected void doConnect() throws Exception {
081: if (readDir != null) {
082: readDirectory = FileUtils.openDirectory(readDir);
083: if (!(readDirectory.canRead())) {
084: throw new ConnectException(FileMessages
085: .fileDoesNotExist(readDirectory
086: .getAbsolutePath()), this );
087: } else {
088: logger.debug("Listening on endpointUri: "
089: + readDirectory.getAbsolutePath());
090: }
091: }
092:
093: if (moveDir != null) {
094: moveDirectory = FileUtils.openDirectory((moveDir));
095: if (!(moveDirectory.canRead()) || !moveDirectory.canWrite()) {
096: throw new ConnectException(FileMessages
097: .moveToDirectoryNotWritable(), this );
098: }
099: }
100: }
101:
102: protected void doDisconnect() throws Exception {
103: // template method
104: }
105:
106: protected void doDispose() {
107: // nothing to do
108: }
109:
110: public void poll() {
111: try {
112: File[] files = this .listFiles();
113: if (logger.isDebugEnabled()) {
114: logger.debug("Files: " + files);
115: }
116: Comparator comparator = getComparator();
117: if (comparator != null) {
118: Arrays.sort(files, comparator);
119: }
120: for (int i = 0; i < files.length; i++) {
121: // don't process directories
122: if (files[i].isFile()) {
123: this .processFile(files[i]);
124: }
125: }
126: } catch (Exception e) {
127: this .handleException(e);
128: }
129: }
130:
131: public synchronized void processFile(final File sourceFile)
132: throws MuleException {
133: //TODO RM*: This can be put in a Filter. Also we can add an AndFileFilter/OrFileFilter to allow users to
134: //combine file filters (since we can only pass a single filter to File.listFiles, we would need to wrap
135: //the current And/Or filters to extend {@link FilenameFilter}
136: boolean checkFileAge = ((FileConnector) connector)
137: .getCheckFileAge();
138: if (checkFileAge) {
139: long fileAge = ((FileConnector) connector).getFileAge();
140: long lastMod = sourceFile.lastModified();
141: long now = System.currentTimeMillis();
142: long this FileAge = now - lastMod;
143: if (this FileAge < fileAge) {
144: if (logger.isDebugEnabled()) {
145: logger
146: .debug("The file has not aged enough yet, will return nothing for: "
147: + sourceFile);
148: }
149: return;
150: }
151: }
152:
153: // don't process a file that is locked by another process (probably still being written)
154: if (!attemptFileLock(sourceFile)) {
155: return;
156: }
157:
158: FileConnector fc = ((FileConnector) connector);
159: String sourceFileOriginalName = sourceFile.getName();
160:
161: // Perform some quick checks to make sure file can be processed
162: if (!(sourceFile.canRead() && sourceFile.exists() && sourceFile
163: .isFile())) {
164: throw new DefaultMuleException(FileMessages
165: .fileDoesNotExist(sourceFileOriginalName));
166: }
167:
168: // This isn't nice but is needed as MessageAdaptor is required to resolve
169: // destination file name, and StreamingReceiverFileInputStream is
170: // required to create MessageAdaptor
171: DefaultMessageAdapter fileParserMsgAdaptor = new DefaultMessageAdapter(
172: null);
173: fileParserMsgAdaptor.setProperty(
174: FileConnector.PROPERTY_ORIGINAL_FILENAME,
175: sourceFileOriginalName);
176:
177: // set up destination file
178: File destinationFile = null;
179: if (moveDir != null) {
180: String destinationFileName = sourceFileOriginalName;
181: if (moveToPattern != null) {
182: destinationFileName = ((FileConnector) connector)
183: .getFilenameParser().getFilename(
184: fileParserMsgAdaptor, moveToPattern);
185: }
186: // don't use new File() directly, see MULE-1112
187: destinationFile = FileUtils.newFile(moveDir,
188: destinationFileName);
189: }
190:
191: MessageAdapter msgAdapter = null;
192: try {
193: if (fc.isStreaming()) {
194: msgAdapter = connector
195: .getMessageAdapter(new ReceiverFileInputStream(
196: sourceFile, fc.isAutoDelete(),
197: destinationFile));
198: } else {
199: msgAdapter = connector.getMessageAdapter(sourceFile);
200: }
201: } catch (FileNotFoundException e) {
202: // we can ignore since we did manage to acquire a lock, but just in case
203: logger.error("File being read disappeared!", e);
204: return;
205: }
206: msgAdapter.setProperty(
207: FileConnector.PROPERTY_ORIGINAL_FILENAME,
208: sourceFileOriginalName);
209:
210: if (!fc.isStreaming()) {
211: moveAndDelete(sourceFile, destinationFile,
212: sourceFileOriginalName, msgAdapter);
213: } else {
214: // If we are streaming no need to move/delete now, that will be done when
215: // stream is closed
216: this .routeMessage(new DefaultMuleMessage(msgAdapter),
217: endpoint.isSynchronous());
218: }
219: }
220:
221: private void moveAndDelete(final File sourceFile,
222: File destinationFile, String sourceFileOriginalName,
223: MessageAdapter msgAdapter) {
224:
225: boolean fileWasMoved = false;
226:
227: try {
228: // If we are moving the file to a read directory, move it there now and
229: // hand over a reference to the
230: // File in its moved location
231: if (destinationFile != null) {
232: // move sourceFile to new destination
233: fileWasMoved = FileUtils.moveFile(sourceFile,
234: destinationFile);
235:
236: // move didn't work - bail out (will attempt rollback)
237: if (!fileWasMoved) {
238: throw new DefaultMuleException(FileMessages
239: .failedToMoveFile(sourceFile
240: .getAbsolutePath(), destinationFile
241: .getAbsolutePath()));
242: }
243:
244: // create new MessageAdapter for destinationFile
245: msgAdapter = connector
246: .getMessageAdapter(destinationFile);
247:
248: msgAdapter.setProperty(FileConnector.PROPERTY_FILENAME,
249: destinationFile.getName());
250: msgAdapter.setProperty(
251: FileConnector.PROPERTY_ORIGINAL_FILENAME,
252: sourceFileOriginalName);
253: }
254:
255: // finally deliver the file message
256: this .routeMessage(new DefaultMuleMessage(msgAdapter),
257: endpoint.isSynchronous());
258:
259: // at this point msgAdapter either points to the old sourceFile
260: // or the new destinationFile.
261: if (((FileConnector) connector).isAutoDelete()) {
262: // no moveTo directory
263: if (destinationFile == null) {
264: // delete source
265: if (!sourceFile.delete()) {
266: throw new DefaultMuleException(FileMessages
267: .failedToDeleteFile(sourceFile
268: .getAbsolutePath()));
269: }
270: } else {
271: // nothing to do here since moveFile() should have deleted
272: // the source file for us
273: }
274: }
275: } catch (Exception e) {
276: boolean fileWasRolledBack = false;
277:
278: // only attempt rollback if file move was successful
279: if (fileWasMoved) {
280: fileWasRolledBack = rollbackFileMove(destinationFile,
281: sourceFile.getAbsolutePath());
282: }
283:
284: // wrap exception & handle it
285: Exception ex = new RoutingException(FileMessages
286: .exceptionWhileProcessing(sourceFile.getName(),
287: (fileWasRolledBack ? "successful"
288: : "unsuccessful")),
289: new DefaultMuleMessage(msgAdapter), endpoint, e);
290: this .handleException(ex);
291: }
292: }
293:
294: /**
295: * Try to acquire a lock on a file and release it immediately. Usually used as a
296: * quick check to see if another process is still holding onto the file, e.g. a
297: * large file (more than 100MB) is still being written to.
298: *
299: * @param sourceFile file to check
300: * @return <code>true</code> if the file can be locked
301: */
302: protected boolean attemptFileLock(final File sourceFile) {
303: // check if the file can be processed, be sure that it's not still being
304: // written
305: // if the file can't be locked don't process it yet, since creating
306: // a new FileInputStream() will throw an exception
307: FileLock lock = null;
308: FileChannel channel = null;
309: boolean fileCanBeLocked = false;
310: try {
311: channel = new RandomAccessFile(sourceFile, "rw")
312: .getChannel();
313:
314: // Try acquiring the lock without blocking. This method returns
315: // null or throws an exception if the file is already locked.
316: lock = channel.tryLock();
317: } catch (FileNotFoundException fnfe) {
318: logger.warn("Unable to open "
319: + sourceFile.getAbsolutePath(), fnfe);
320: } catch (IOException e) {
321: // Unable to create a lock. This exception should only be thrown when
322: // the file is already locked. No sense in repeating the message over
323: // and over.
324: } finally {
325: if (lock != null) {
326: // if lock is null the file is locked by another process
327: fileCanBeLocked = true;
328: try {
329: // Release the lock
330: lock.release();
331: } catch (IOException e) {
332: // ignore
333: }
334: }
335:
336: if (channel != null) {
337: try {
338: // Close the file
339: channel.close();
340: } catch (IOException e) {
341: // ignore
342: }
343: }
344: }
345:
346: return fileCanBeLocked;
347: }
348:
349: /**
350: * Get a list of files to be processed.
351: *
352: * @return an array of files to be processed.
353: * @throws org.mule.api.MuleException which will wrap any other exceptions or
354: * errors.
355: */
356: File[] listFiles() throws MuleException {
357: try {
358: File[] todoFiles = null;
359: if (fileFilter != null) {
360: todoFiles = readDirectory.listFiles(fileFilter);
361: } else {
362: todoFiles = readDirectory.listFiles(filenameFilter);
363: }
364: // logger.trace("Reading directory " + readDirectory.getAbsolutePath() +
365: // " -> " + TODOFiles.length + " file(s)");
366: return (todoFiles == null ? new File[0] : todoFiles);
367: } catch (Exception e) {
368: throw new DefaultMuleException(FileMessages
369: .errorWhileListingFiles(), e);
370: }
371: }
372:
373: /** Exception tolerant roll back method */
374: protected boolean rollbackFileMove(File sourceFile,
375: String destinationFilePath) {
376: boolean result = false;
377: try {
378: result = FileUtils.moveFile(sourceFile, FileUtils
379: .newFile(destinationFilePath));
380: } catch (Throwable t) {
381: logger.debug("rollback of file move failed: "
382: + t.getMessage());
383: }
384: return result;
385: }
386:
387: protected Comparator getComparator() throws Exception {
388:
389: Object o = getEndpoint().getProperty(
390: COMPARATOR_CLASS_NAME_PROPERTY);
391: Object reverseProperty = this .getEndpoint().getProperty(
392: COMPARATOR_REVERSE_ORDER_PROPERTY);
393: boolean reverse = false;
394: if (o != null) {
395: if (reverseProperty != null) {
396: reverse = Boolean.valueOf((String) reverseProperty)
397: .booleanValue();
398: }
399: Class clazz = Class.forName(o.toString());
400: o = clazz.newInstance();
401: return reverse ? new ReverseComparator((Comparator) o)
402: : (Comparator) o;
403: }
404: return null;
405: }
406: }
|