001: /*
002: * $Id: RetrieveMessageReceiver.java 11343 2008-03-13 10:58:26Z tcarlson $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.transport.email;
012:
013: import org.mule.DefaultMuleMessage;
014: import org.mule.MuleServer;
015: import org.mule.api.MuleException;
016: import org.mule.api.MuleMessage;
017: import org.mule.api.endpoint.InboundEndpoint;
018: import org.mule.api.lifecycle.CreateException;
019: import org.mule.api.lifecycle.Startable;
020: import org.mule.api.lifecycle.Stoppable;
021: import org.mule.api.routing.RoutingException;
022: import org.mule.api.service.Service;
023: import org.mule.api.transport.Connector;
024: import org.mule.api.transport.ReceiveException;
025: import org.mule.transport.AbstractPollingMessageReceiver;
026: import org.mule.transport.email.i18n.EmailMessages;
027: import org.mule.util.FileUtils;
028: import org.mule.util.StringUtils;
029: import org.mule.util.UUID;
030:
031: import java.io.File;
032: import java.io.FileOutputStream;
033: import java.io.IOException;
034:
035: import javax.mail.Address;
036: import javax.mail.Flags;
037: import javax.mail.Folder;
038: import javax.mail.Message;
039: import javax.mail.MessagingException;
040: import javax.mail.Store;
041: import javax.mail.event.MessageCountEvent;
042: import javax.mail.event.MessageCountListener;
043: import javax.mail.internet.InternetAddress;
044: import javax.mail.internet.MimeMessage;
045:
046: /**
047: * Poll a mailbox for messages, remove the messages and route them as events into Mule.
048: * <p/>
049: * This contains a reference to a mail folder (and also the endpoint and connector, via superclasses)
050: */
051:
052: public class RetrieveMessageReceiver extends
053: AbstractPollingMessageReceiver implements MessageCountListener,
054: Startable, Stoppable {
055: private Folder folder = null;
056: private boolean backupEnabled;
057: private String backupFolder = null;
058:
059: public RetrieveMessageReceiver(Connector connector,
060: Service service, InboundEndpoint endpoint,
061: long checkFrequency, boolean backupEnabled,
062: String backupFolder) throws CreateException {
063: super (connector, service, endpoint);
064: this .backupFolder = backupFolder;
065: this .backupEnabled = backupEnabled;
066: this .setFrequency(checkFrequency);
067: }
068:
069: private AbstractRetrieveMailConnector castConnector() {
070: return (AbstractRetrieveMailConnector) getConnector();
071: }
072:
073: protected void doConnect() throws Exception {
074: SessionDetails session = castConnector().getSessionDetails(
075: endpoint);
076:
077: Store store = session.newStore();
078: store.connect();
079: folder = store.getFolder(castConnector().getMailboxFolder());
080:
081: // set default value if empty/null
082: if (StringUtils.isEmpty(backupFolder)) {
083: this .backupFolder = MuleServer.getMuleContext()
084: .getConfiguration().getWorkingDirectory()
085: + "/mail/" + folder.getName();
086: }
087:
088: if (backupFolder != null
089: && !this .backupFolder.endsWith(File.separator)) {
090: this .backupFolder += File.separator;
091: }
092: }
093:
094: protected void doDisconnect() throws Exception {
095: // nothing to do here
096: }
097:
098: protected void doStop() {
099: if (folder != null) {
100: folder.removeMessageCountListener(this );
101: }
102: }
103:
104: protected void doStart() throws MuleException {
105: super .doStart();
106: folder.addMessageCountListener(this );
107: }
108:
109: public void messagesAdded(MessageCountEvent event) {
110: Message messages[] = event.getMessages();
111: if (messages != null) {
112: MuleMessage message = null;
113: for (int i = 0; i < messages.length; i++) {
114: try {
115: if (!messages[i].getFlags().contains(
116: Flags.Flag.DELETED)) {
117: MimeMessage mimeMessage = new MimeMessage(
118: (MimeMessage) messages[i]);
119: storeMessage(mimeMessage);
120: message = new DefaultMuleMessage(
121: castConnector().getMessageAdapter(
122: mimeMessage));
123:
124: if (castConnector().isDeleteReadMessages()) {
125: // Mark as deleted
126: messages[i].setFlag(Flags.Flag.DELETED,
127: true);
128: } else {
129: messages[i].setFlag(Flags.Flag.SEEN, true);
130: }
131: routeMessage(message, endpoint.isSynchronous());
132: }
133: } catch (MuleException e) {
134: handleException(e);
135: } catch (Exception e) {
136: Exception forwarded;
137:
138: if (message != null) {
139: forwarded = new RoutingException(EmailMessages
140: .routingError(), message, endpoint, e);
141: } else {
142: forwarded = new ReceiveException(endpoint, -1,
143: e);
144: }
145:
146: handleException(forwarded);
147: }
148: }
149: }
150: }
151:
152: // @Override
153: protected MuleMessage handleUnacceptedFilter(MuleMessage message) {
154: super .handleUnacceptedFilter(message);
155: if (message.getPayload() instanceof Message) {
156: Message msg = (Message) message.getPayload();
157: try {
158: msg.setFlag(Flags.Flag.DELETED, endpoint
159: .isDeleteUnacceptedMessages());
160: } catch (MessagingException e) {
161: logger.error("failed to set message deleted: "
162: + e.getMessage(), e);
163: }
164: }
165: return message;
166: }
167:
168: public void messagesRemoved(MessageCountEvent event) {
169: if (logger.isDebugEnabled()) {
170: Message messages[] = event.getMessages();
171: for (int i = 0; i < messages.length; i++) {
172: try {
173: logger.debug("Message removed: "
174: + messages[i].getSubject());
175: } catch (MessagingException ignore) {
176: logger.debug("ignoring exception: "
177: + ignore.getMessage());
178: }
179: }
180: }
181: }
182:
183: /** @return the current Mail folder */
184: public Folder getFolder() {
185: return folder;
186: }
187:
188: /** @param folder */
189: public synchronized void setFolder(Folder folder) {
190: if (folder == null) {
191: throw new IllegalArgumentException(
192: "Mail folder cannot be null");
193: }
194: this .folder = folder;
195: synchronized (this .folder) {
196: if (!this .folder.isOpen()) {
197: try {
198: this .folder.open(Folder.READ_WRITE);
199: } catch (MessagingException e) {
200: logger.warn("Failed to open folder: "
201: + folder.getFullName(), e);
202: }
203: }
204: }
205: }
206:
207: /**
208: * Helper method for testing which stores a copy of the message locally as the
209: * POP3 <p/> message will be deleted from the server
210: *
211: * @param msg the message to store
212: * @throws IOException If a failure happens writing the message
213: * @throws MessagingException If a failure happens reading the message
214: */
215: protected void storeMessage(Message msg) throws IOException,
216: MessagingException {
217: if (backupEnabled) {
218: String filename = msg.getFileName();
219: if (filename == null) {
220: Address[] from = msg.getFrom();
221: if (from != null && from.length > 0) {
222: filename = from[0] instanceof InternetAddress ? ((InternetAddress) from[0])
223: .getAddress()
224: : from[0].toString();
225: } else {
226: filename = "(no from address)";
227: }
228: filename += "[" + UUID.getUUID() + "]";
229: }
230: filename = FileUtils.prepareWinFilename(filename);
231: filename = backupFolder + filename + ".msg";
232: if (logger.isDebugEnabled()) {
233: logger.debug("Writing message to: " + filename);
234: }
235: File f = FileUtils.createFile(filename);
236: FileOutputStream fos = new FileOutputStream(f);
237: msg.writeTo(fos);
238: }
239: }
240:
241: public synchronized void poll() {
242: try {
243: try {
244: if (!folder.isOpen()) {
245: folder.open(Folder.READ_WRITE);
246: }
247: } catch (Exception e) {
248: if (logger.isDebugEnabled()) {
249: logger.debug("ignoring exception: "
250: + e.getMessage());
251: }
252: }
253:
254: int count = folder.getMessageCount();
255: if (count > 0) {
256: Message[] messages = folder.getMessages();
257: MessageCountEvent event = new MessageCountEvent(folder,
258: MessageCountEvent.ADDED, true, messages);
259: messagesAdded(event);
260: } else if (count == -1) {
261: throw new MessagingException("Cannot monitor folder: "
262: + folder.getFullName() + " as folder is closed");
263: }
264: } catch (MessagingException e) {
265: handleException(e);
266: } finally {
267: try {
268: folder.close(true); // close and expunge deleted messages
269: } catch (Exception e) {
270: logger.error("Failed to close pop3 inbox: "
271: + e.getMessage());
272: }
273: }
274: }
275:
276: protected void doDispose() {
277: if (null != folder) {
278: folder.removeMessageCountListener(this );
279: if (folder.isOpen()) {
280: try {
281:
282: folder.close(true);
283: } catch (Exception e) {
284: logger.debug("ignoring exception: "
285: + e.getMessage(), e);
286: }
287: }
288: }
289: }
290:
291: }
|