001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */
019:
020: package org.apache.synapse.transport.mail;
021:
022: import org.apache.axis2.AxisFault;
023: import org.apache.axis2.Constants;
024: import org.apache.axis2.addressing.EndpointReference;
025: import org.apache.axis2.context.ConfigurationContext;
026: import org.apache.axis2.description.AxisOperation;
027: import org.apache.axis2.description.AxisService;
028: import org.apache.axis2.description.Parameter;
029: import org.apache.axis2.description.TransportInDescription;
030: import org.apache.synapse.transport.base.AbstractPollingTransportListener;
031: import org.apache.synapse.transport.base.BaseConstants;
032: import org.apache.synapse.transport.base.BaseUtils;
033:
034: import javax.mail.*;
035: import javax.mail.internet.AddressException;
036: import javax.mail.internet.InternetAddress;
037: import javax.mail.internet.MimeMessage;
038: import javax.xml.namespace.QName;
039: import java.io.IOException;
040: import java.io.InputStream;
041: import java.util.*;
042:
043: /**
044: * This mail transport lister implementation uses the base transport framework and is a polling
045: * transport. i.e. a service can register itself with custom a custom mail configuration (i.e.
046: * pop3 or imap) and specify its polling duration, and what action to be taken after processing
047: * messages. The transport always deletes processed mails from the folder they were fetched from
048: * and can be configured to be optionally moved to a different folder, if the server supports it
049: * (e.g. with imap). When checking for new mail, the transport ignores messages already flaged as
050: * SEEN and DELETED
051: */
052: public class MailTransportListener extends
053: AbstractPollingTransportListener {
054:
055: public static final String DELETE = "DELETE";
056: public static final String MOVE = "MOVE";
057:
058: /** Keep the list of directories/files and poll durations */
059: private final List pollTable = new ArrayList();
060:
061: /**
062: * Initializes the Mail transport
063: *
064: * @param cfgCtx the Axsi2 configuration context
065: * @param trpInDesc the POP3 transport in description from the axis2.xml
066: * @throws AxisFault on error
067: */
068: public void init(ConfigurationContext cfgCtx,
069: TransportInDescription trpInDesc) throws AxisFault {
070: setTransportName(MailConstants.TRANSPORT_NAME);
071: super .init(cfgCtx, trpInDesc);
072: }
073:
074: /**
075: * On a poller tick, iterate over the list of mail accounts and check if
076: * it is time to scan the contents for new files
077: */
078: public void onPoll() {
079: Iterator iter = pollTable.iterator();
080: while (iter.hasNext()) {
081: PollTableEntry entry = (PollTableEntry) iter.next();
082: long startTime = System.currentTimeMillis();
083:
084: if (startTime > entry.getNextPollTime()) {
085: checkMail(entry, entry.getEmailAddress());
086: }
087: }
088: }
089:
090: /**
091: * Check mail for a particular service that has registered with the mail transport
092: *
093: * @param entry the poll table entry that stores service specific informaiton
094: * @param emailAddress the email address checked
095: */
096: private void checkMail(final PollTableEntry entry,
097: InternetAddress emailAddress) {
098:
099: if (log.isDebugEnabled()) {
100: log.debug("Checking mail for account : " + emailAddress);
101: }
102:
103: boolean connected = false;
104: int retryCount = 0;
105: int maxRetryCount = entry.getMaxRetryCount();
106: long reconnectionTimeout = entry.getReconnectTimeout();
107: Store store = null;
108:
109: while (!connected) {
110: try {
111: retryCount++;
112: if (log.isDebugEnabled()) {
113: log
114: .debug("Attempting to connect to POP3/IMAP server for : "
115: + entry.getEmailAddress()
116: + " using " + entry.getProperties());
117: }
118:
119: Session session = Session.getInstance(entry
120: .getProperties(), null);
121: session.setDebug(log.isTraceEnabled());
122: store = session.getStore(entry.getProtocol());
123:
124: if (entry.getUserName() != null
125: && entry.getPassword() != null) {
126: store.connect(entry.getUserName(), entry
127: .getPassword());
128: } else {
129: handleException(
130: "Unable to locate username and password for mail login",
131: null);
132: }
133:
134: // were we able to connect?
135: connected = store.isConnected();
136:
137: } catch (Exception e) {
138: log.error(
139: "Error connecting to mail server for address : "
140: + emailAddress, e);
141: if (maxRetryCount <= retryCount) {
142: processFailure(
143: "Error connecting to mail server for address : "
144: + emailAddress + " :: "
145: + e.getMessage(), e, entry);
146: return;
147: }
148: }
149:
150: if (!connected) {
151: try {
152: log.warn("Connection to mail server for account : "
153: + entry.getEmailAddress()
154: + " failed. Retrying in : "
155: + reconnectionTimeout / 1000 + " seconds");
156: Thread.sleep(reconnectionTimeout);
157: } catch (InterruptedException ignore) {
158: }
159: }
160: }
161:
162: if (connected) {
163: Folder folder = null;
164: try {
165:
166: if (entry.getFolder() != null) {
167: folder = store.getFolder(entry.getFolder());
168: } else {
169: folder = store
170: .getFolder(MailConstants.DEFAULT_FOLDER);
171: }
172: if (folder == null) {
173: folder = store.getDefaultFolder();
174: }
175:
176: if (folder == null) {
177: processFailure("Unable to access mail folder",
178: null, entry);
179:
180: } else {
181: if (log.isDebugEnabled()) {
182: log
183: .debug("Connecting to folder : "
184: + folder.getName()
185: + " of email account : "
186: + emailAddress);
187: }
188:
189: folder.open(Folder.READ_WRITE);
190: int total = folder.getMessageCount();
191: Message[] messages = folder.getMessages();
192:
193: if (log.isDebugEnabled()) {
194: log.debug(messages.length
195: + " messgaes in folder : " + folder);
196: }
197:
198: for (int i = 0; i < total; i++) {
199:
200: if (messages[i].isSet(Flags.Flag.SEEN)) {
201: if (log.isDebugEnabled()) {
202: log.debug("Skipping message # : " + i
203: + " : "
204: + messages[i].getSubject()
205: + " - already marked SEEN");
206: }
207: } else if (messages[i]
208: .isSet(Flags.Flag.DELETED)) {
209: if (log.isDebugEnabled()) {
210: log.debug("Skipping message # : " + i
211: + " : "
212: + messages[i].getSubject()
213: + " - already marked DELETED");
214: }
215:
216: } else {
217: entry.setLastPollState(PollTableEntry.NONE);
218: try {
219: processMail(messages[i], entry);
220: entry
221: .setLastPollState(PollTableEntry.SUCCSESSFUL);
222: } catch (Exception e) {
223: entry
224: .setLastPollState(PollTableEntry.FAILED);
225: }
226:
227: moveOrDeleteAfterProcessing(entry, store,
228: folder, messages[i]);
229: }
230: }
231: }
232:
233: } catch (MessagingException me) {
234: processFailure("Error checking mail for account : "
235: + emailAddress + " :: " + me.getMessage(), me,
236: entry);
237:
238: } finally {
239:
240: try {
241: folder.close(true /** expunge messages flagged as DELETED*/
242: );
243: } catch (MessagingException e) {
244: processFailure(
245: "Error closing mail folder : " + folder
246: + " for account : " + emailAddress,
247: e, entry);
248: }
249:
250: if (store != null) {
251: try {
252: store.close();
253: } catch (MessagingException e) {
254: log.warn(
255: "Error closing mail store for account : "
256: + emailAddress + " :: "
257: + e.getMessage(), e);
258: }
259: }
260: }
261: }
262: }
263:
264: /**
265: * Process a mail message through Axis2
266: *
267: * @param message the email message
268: * @param entry the poll table entry
269: * @throws MessagingException on error
270: * @throws IOException on error
271: */
272: private void processMail(Message message, PollTableEntry entry)
273: throws MessagingException, IOException {
274:
275: // populate transport headers using the mail headers
276: Map trpHeaders = new HashMap();
277: try {
278: Enumeration e = message.getAllHeaders();
279: while (e.hasMoreElements()) {
280: Header h = (Header) e.nextElement();
281: if (entry.retainHeader(h.getName())) {
282: trpHeaders.put(h.getName(), h.getValue());
283: }
284: }
285: } catch (MessagingException ignore) {
286: }
287:
288: // figure out content type of primary request. If the content type is specified, use it
289: String contentType = entry.getContentType();
290: if (!BaseUtils.isValid(contentType)) {
291:
292: Object content = message.getContent();
293: if (content instanceof Multipart) {
294: contentType = message.getContentType();
295: } else if (content instanceof String) {
296: contentType = message.getContentType();
297: } else if (content instanceof InputStream) {
298: contentType = MailConstants.APPLICATION_BINARY;
299: }
300: }
301:
302: // if the content type was not found, we have an error
303: if (contentType == null) {
304: processFailure(
305: "Unable to determine Content-type for message : "
306: + message.getMessageNumber() + " :: "
307: + message.getSubject(), null, entry);
308: return;
309: } else if (log.isDebugEnabled()) {
310: log.debug("Processing message as Content-Type : "
311: + contentType);
312: }
313:
314: org.apache.axis2.context.MessageContext msgContext = createMessageContext();
315: // set to bypass dispatching if we know the service - we already should!
316: AxisService service = cfgCtx.getAxisConfiguration().getService(
317: entry.getServiceName());
318: msgContext.setAxisService(service);
319:
320: // find the operation for the message, or default to one
321: Parameter operationParam = service
322: .getParameter(BaseConstants.OPERATION_PARAM);
323: QName operationQName = (operationParam != null ? BaseUtils
324: .getQNameFromString(operationParam.getValue())
325: : BaseConstants.DEFAULT_OPERATION);
326:
327: AxisOperation operation = service.getOperation(operationQName);
328: if (operation != null) {
329: msgContext.setAxisOperation(operation);
330: msgContext.setSoapAction("urn:"
331: + operation.getName().getLocalPart());
332: }
333:
334: InternetAddress[] fromAddress = (InternetAddress[]) message
335: .getReplyTo();
336: if (fromAddress == null) {
337: fromAddress = (InternetAddress[]) message.getFrom();
338: }
339:
340: MailOutTransportInfo outInfo = new MailOutTransportInfo(
341: fromAddress[0]);
342:
343: // determine reply address
344: if (message.getReplyTo() != null) {
345: outInfo.setTargetAddresses((InternetAddress[]) message
346: .getReplyTo());
347: } else if (message.getFrom() != null) {
348: outInfo.setTargetAddresses((InternetAddress[]) message
349: .getFrom());
350: } else {
351: // does the service specify a default reply address ?
352: Parameter param = service
353: .getParameter(MailConstants.TRANSPORT_MAIL_REPLY_ADDRESS);
354: if (param != null && param.getValue() != null) {
355: outInfo.setTargetAddresses(InternetAddress
356: .parse((String) param.getValue()));
357: }
358: }
359:
360: // save CC addresses
361: if (message.getRecipients(Message.RecipientType.CC) != null) {
362: outInfo.setCcAddresses((InternetAddress[]) message
363: .getRecipients(Message.RecipientType.CC));
364: }
365:
366: // determine and subject for the reply message
367: if (message.getSubject() != null) {
368: outInfo.setSubject("Re: " + message.getSubject());
369: }
370:
371: // save original message ID if one exists, so that replies can be correlated
372: if (message.getHeader(MailConstants.MAIL_HEADER_X_MESSAGE_ID) != null) {
373: outInfo
374: .setRequestMessageID(message
375: .getHeader(MailConstants.MAIL_HEADER_X_MESSAGE_ID)[0]);
376: } else if (message instanceof MimeMessage
377: && ((MimeMessage) message).getMessageID() != null) {
378: outInfo.setRequestMessageID(((MimeMessage) message)
379: .getMessageID());
380: }
381:
382: // save out transport information
383: msgContext.setProperty(Constants.OUT_TRANSPORT_INFO, outInfo);
384:
385: // set message context From
386: if (outInfo.getFromAddress() != null) {
387: msgContext.setFrom(new EndpointReference(
388: MailConstants.TRANSPORT_PREFIX
389: + outInfo.getFromAddress().getAddress()));
390: }
391:
392: // save original mail message id message context MessageID
393: msgContext.setMessageID(outInfo.getRequestMessageID());
394:
395: // set the message payload to the message context
396: MailUtils.getInstace().setSOAPEnvelope(message, msgContext,
397: contentType);
398:
399: String soapAction = (String) trpHeaders
400: .get(BaseConstants.SOAPACTION);
401: if (soapAction == null
402: && message.getSubject() != null
403: && message.getSubject().startsWith(
404: BaseConstants.SOAPACTION)) {
405: soapAction = message.getSubject().substring(
406: BaseConstants.SOAPACTION.length());
407: if (soapAction.startsWith(":")) {
408: soapAction = soapAction.substring(1).trim();
409: }
410: }
411:
412: handleIncomingMessage(msgContext, trpHeaders, soapAction,
413: contentType);
414:
415: if (log.isDebugEnabled()) {
416: log.debug("Processed message : "
417: + message.getMessageNumber() + " :: "
418: + message.getSubject());
419: }
420: }
421:
422: /**
423: * Take specified action to either move or delete the processed email
424: *
425: * @param entry the PollTableEntry for the email that has been processed
426: * @param store the mail store
427: * @param folder mail folder
428: * @param message the email message to be moved or deleted
429: */
430: private void moveOrDeleteAfterProcessing(
431: final PollTableEntry entry, Store store, Folder folder,
432: Message message) {
433:
434: String moveToFolder = null;
435: try {
436: switch (entry.getLastPollState()) {
437: case PollTableEntry.SUCCSESSFUL:
438: if (entry.getActionAfterProcess() == PollTableEntry.MOVE) {
439: moveToFolder = entry.getMoveAfterProcess();
440: }
441: break;
442:
443: case PollTableEntry.FAILED:
444: if (entry.getActionAfterProcess() == PollTableEntry.MOVE) {
445: moveToFolder = entry.getMoveAfterFailure();
446: }
447: break;
448: case PollTableEntry.NONE:
449: return;
450: }
451:
452: if (moveToFolder != null) {
453: if (log.isDebugEnabled()) {
454: log.debug("Moving processed email to folder :"
455: + moveToFolder);
456: }
457: Folder dFolder = store.getFolder(moveToFolder);
458: if (!dFolder.exists()) {
459: dFolder.create(Folder.HOLDS_MESSAGES);
460: }
461: folder.copyMessages(new Message[] { message }, dFolder);
462: }
463:
464: if (log.isDebugEnabled()) {
465: log.debug("Deleting email :"
466: + message.getMessageNumber());
467: }
468:
469: message.setFlag(Flags.Flag.DELETED, true);
470:
471: } catch (MessagingException e) {
472: log.error(
473: "Error deleting or resolving folder to move after processing : "
474: + moveToFolder, e);
475: }
476: }
477:
478: /**
479: * method to log a failure to the log file and to update the last poll status and time
480: *
481: * @param msg text for the log message
482: * @param e optiona exception encountered or null
483: * @param entry the PollTableEntry
484: */
485: private void processFailure(String msg, Exception e,
486: PollTableEntry entry) {
487: if (e == null) {
488: log.error(msg);
489: } else {
490: log.error(msg, e);
491: }
492: long now = System.currentTimeMillis();
493: entry.setLastPollState(PollTableEntry.FAILED);
494: entry.setLastPollTime(now);
495: entry.setNextPollTime(now + entry.getPollInterval());
496: }
497:
498: /**
499: * Get the EPR for the given service over the Mail transport
500: *
501: * @param serviceName service name
502: * @param ip ignored
503: * @return the EPR for the service
504: * @throws AxisFault not used
505: */
506: public EndpointReference[] getEPRsForService(String serviceName,
507: String ip) throws AxisFault {
508: Iterator iter = pollTable.iterator();
509: while (iter.hasNext()) {
510: PollTableEntry entry = (PollTableEntry) iter.next();
511: if (entry.getServiceName().equals(serviceName)) {
512: return new EndpointReference[] { new EndpointReference(
513: MailConstants.TRANSPORT_PREFIX
514: + entry.getEmailAddress()) };
515: }
516: }
517: return null;
518: }
519:
520: protected void startListeningForService(AxisService service) {
521:
522: Parameter param = service
523: .getParameter(BaseConstants.TRANSPORT_POLL_INTERVAL);
524: long pollInterval = BaseConstants.DEFAULT_POLL_INTERVAL;
525: if (param != null && param.getValue() instanceof String) {
526: try {
527: pollInterval = Integer.parseInt(param.getValue()
528: .toString());
529: } catch (NumberFormatException e) {
530: log.error("Invalid poll interval : " + param.getValue()
531: + " for service : " + service.getName()
532: + " default to : "
533: + (BaseConstants.DEFAULT_POLL_INTERVAL / 1000)
534: + "sec", e);
535: }
536: }
537:
538: PollTableEntry entry = new PollTableEntry();
539: try {
540: entry.setEmailAddress(BaseUtils.getRequiredServiceParam(
541: service, MailConstants.TRANSPORT_MAIL_ADDRESS));
542:
543: List<Parameter> params = service.getParameters();
544: for (Parameter p : params) {
545: if (p.getName().startsWith("mail.")) {
546: entry.addProperty(p.getName(), (String) p
547: .getValue());
548: }
549:
550: if (MailConstants.MAIL_POP3_USERNAME
551: .equals(p.getName())
552: || MailConstants.MAIL_IMAP_USERNAME.equals(p
553: .getName())) {
554: entry.setUserName((String) p.getValue());
555: }
556: if (MailConstants.MAIL_POP3_PASSWORD
557: .equals(p.getName())
558: || MailConstants.MAIL_IMAP_PASSWORD.equals(p
559: .getName())) {
560: entry.setPassword((String) p.getValue());
561: }
562: if (MailConstants.TRANSPORT_MAIL_PROTOCOL.equals(p
563: .getName())) {
564: entry.setProtocol((String) p.getValue());
565: }
566: }
567:
568: entry
569: .setContentType(BaseUtils.getOptionalServiceParam(
570: service,
571: MailConstants.TRANSPORT_MAIL_CONTENT_TYPE));
572: entry
573: .setReplyAddress(BaseUtils.getOptionalServiceParam(
574: service,
575: MailConstants.TRANSPORT_MAIL_REPLY_ADDRESS));
576:
577: entry.addPreserveHeaders(BaseUtils.getOptionalServiceParam(
578: service,
579: MailConstants.TRANSPORT_MAIL_PRESERVE_HEADERS));
580: entry.addRemoveHeaders(BaseUtils.getOptionalServiceParam(
581: service,
582: MailConstants.TRANSPORT_MAIL_REMOVE_HEADERS));
583:
584: String option = BaseUtils.getOptionalServiceParam(service,
585: MailConstants.TRANSPORT_MAIL_ACTION_AFTER_PROCESS);
586: entry
587: .setActionAfterProcess(MOVE.equals(option) ? PollTableEntry.MOVE
588: : PollTableEntry.DELETE);
589: option = BaseUtils.getOptionalServiceParam(service,
590: MailConstants.TRANSPORT_MAIL_ACTION_AFTER_FAILURE);
591: entry
592: .setActionAfterFailure(MOVE.equals(option) ? PollTableEntry.MOVE
593: : PollTableEntry.DELETE);
594:
595: String moveFolderAfterProcess = BaseUtils
596: .getOptionalServiceParam(
597: service,
598: MailConstants.TRANSPORT_MAIL_MOVE_AFTER_PROCESS);
599: entry.setMoveAfterProcess(moveFolderAfterProcess);
600: String modeFolderAfterFailure = BaseUtils
601: .getOptionalServiceParam(
602: service,
603: MailConstants.TRANSPORT_MAIL_MOVE_AFTER_FAILURE);
604: entry.setMoveAfterFailure(modeFolderAfterFailure);
605:
606: String strMaxRetryCount = BaseUtils
607: .getOptionalServiceParam(service,
608: MailConstants.MAX_RETRY_COUNT);
609: if (strMaxRetryCount != null)
610: entry.setMaxRetryCount(Integer
611: .parseInt(strMaxRetryCount));
612:
613: String strReconnectTimeout = BaseUtils
614: .getOptionalServiceParam(service,
615: MailConstants.RECONNECT_TIMEOUT);
616: if (strReconnectTimeout != null)
617: entry.setReconnectTimeout(Integer
618: .parseInt(strReconnectTimeout) * 1000);
619:
620: entry.setServiceName(service.getName());
621: schedulePoll(service, pollInterval);
622: pollTable.add(entry);
623:
624: } catch (AxisFault axisFault) {
625: String msg = "Error configuring the Mail transport for Service : "
626: + service.getName()
627: + " :: "
628: + axisFault.getMessage();
629: log.warn(msg);
630: } catch (AddressException e) {
631: String msg = "Error configuring the Mail transport for Service : "
632: + " Invalid email address specified by '"
633: + MailConstants.TRANSPORT_MAIL_ADDRESS
634: + "'parameter for service : "
635: + service.getName()
636: + " :: " + e.getMessage();
637: log.warn(msg);
638: }
639: }
640:
641: protected void stopListeningForService(AxisService service) {
642: Iterator iter = pollTable.iterator();
643: while (iter.hasNext()) {
644: PollTableEntry entry = (PollTableEntry) iter.next();
645: if (service.getName().equals(entry.getServiceName())) {
646: cancelPoll(service);
647: pollTable.remove(entry);
648: }
649: }
650: }
651: }
|