001: /****************************************************************
002: * Licensed to the Apache Software Foundation (ASF) under one *
003: * or more contributor license agreements. See the NOTICE file *
004: * distributed with this work for additional information *
005: * regarding copyright ownership. The ASF licenses this file *
006: * to you under the Apache License, Version 2.0 (the *
007: * "License"); you may not use this file except in compliance *
008: * with the License. You may obtain a copy of the License at *
009: * *
010: * http://www.apache.org/licenses/LICENSE-2.0 *
011: * *
012: * Unless required by applicable law or agreed to in writing, *
013: * software distributed under the License is distributed on an *
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
015: * KIND, either express or implied. See the License for the *
016: * specific language governing permissions and limitations *
017: * under the License. *
018: ****************************************************************/package org.apache.james.jspf.executor;
019:
020: import org.apache.james.jspf.core.DNSLookupContinuation;
021: import org.apache.james.jspf.core.DNSResponse;
022: import org.apache.james.jspf.core.Logger;
023: import org.apache.james.jspf.core.SPFChecker;
024: import org.apache.james.jspf.core.SPFCheckerExceptionCatcher;
025: import org.apache.james.jspf.core.SPFSession;
026: import org.apache.james.jspf.core.exceptions.SPFResultException;
027: import org.apache.james.jspf.core.exceptions.TimeoutException;
028:
029: import java.util.Collections;
030: import java.util.HashMap;
031: import java.util.LinkedList;
032: import java.util.List;
033: import java.util.Map;
034:
035: /**
036: * Async implementation of SPFExecutor
037: *
038: */
039: public class StagedMultipleSPFExecutor implements SPFExecutor, Runnable {
040:
041: private static final String ATTRIBUTE_STAGED_EXECUTOR_CONTINUATION = "StagedMultipleSPFExecutor.continuation";
042:
043: private static class ResponseQueueImpl extends LinkedList implements
044: IResponseQueue {
045:
046: private static final long serialVersionUID = 5714025260393791651L;
047:
048: private int waitingThreads = 0;
049:
050: /**
051: * @see org.apache.james.jspf.executor.IResponseQueue#insertResponse(org.apache.james.jspf.executor.IResponse)
052: */
053: public synchronized void insertResponse(IResponse r) {
054: addLast(r);
055: notify();
056: }
057:
058: /**
059: * @see org.apache.james.jspf.executor.IResponseQueue#removeResponse()
060: */
061: public synchronized IResponse removeResponse() {
062: if ((size() - waitingThreads <= 0)) {
063: try {
064: waitingThreads++;
065: wait();
066: } catch (InterruptedException e) {
067: Thread.interrupted();
068: }
069: waitingThreads--;
070: }
071: return (IResponse) removeFirst();
072: }
073:
074: }
075:
076: // Use short as id because the id header is limited to 16 bit
077: // From RFC1035 4.1.1. Header section format :
078: //
079: // ID A 16 bit identifier assigned by the program that
080: // generates any kind of query. This identifier is copied
081: // the corresponding reply and can be used by the requester
082: // to match up replies to outstanding queries.
083: //
084: private static short id;
085:
086: private synchronized int nextId() {
087: return id++;
088: }
089:
090: private Logger log;
091: private DNSAsynchLookupService dnsProbe;
092: private Thread worker;
093: private Map sessions;
094: private Map results;
095: private ResponseQueueImpl responseQueue;
096:
097: public StagedMultipleSPFExecutor(Logger log,
098: DNSAsynchLookupService service) {
099: this .log = log;
100: this .dnsProbe = service;
101:
102: this .responseQueue = new ResponseQueueImpl();
103:
104: this .sessions = Collections.synchronizedMap(new HashMap());
105: this .results = Collections.synchronizedMap(new HashMap());
106:
107: this .worker = new Thread(this );
108: this .worker.setDaemon(true);
109: this .worker.setName("SPFExecutor");
110: this .worker.start();
111: }
112:
113: /**
114: * Execute the non-blocking part of the processing and returns.
115: * If the working queue is full (50 pending responses) this method will not return
116: * until the queue is again not full.
117: *
118: * @see org.apache.james.jspf.executor.SPFExecutor#execute(org.apache.james.jspf.core.SPFSession, org.apache.james.jspf.executor.FutureSPFResult)
119: */
120: public void execute(SPFSession session, FutureSPFResult result) {
121: execute(session, result, true);
122: }
123:
124: public void execute(SPFSession session, FutureSPFResult result,
125: boolean throttle) {
126: SPFChecker checker;
127: while ((checker = session.popChecker()) != null) {
128: // only execute checkers we added (better recursivity)
129: log.debug("Executing checker: " + checker);
130: try {
131: DNSLookupContinuation cont = checker.checkSPF(session);
132: // if the checker returns a continuation we return it
133: if (cont != null) {
134: invokeAsynchService(session, result, cont, throttle);
135: return;
136: }
137: } catch (Exception e) {
138: while (e != null) {
139: while (checker == null
140: || !(checker instanceof SPFCheckerExceptionCatcher)) {
141: checker = session.popChecker();
142: }
143: try {
144: ((SPFCheckerExceptionCatcher) checker)
145: .onException(e, session);
146: e = null;
147: } catch (SPFResultException ex) {
148: e = ex;
149: } finally {
150: checker = null;
151: }
152: }
153: }
154: }
155: result.setSPFResult(session);
156: }
157:
158: /**
159: * throttle should be true only when the caller thread is the client and not the worker thread.
160: * We could even remove the throttle parameter and check the currentThread.
161: * This way the worker is never "blocked" while outside callers will be blocked if our
162: * queue is too big (so this is not fully "asynchronous").
163: */
164: private synchronized void invokeAsynchService(SPFSession session,
165: FutureSPFResult result, DNSLookupContinuation cont,
166: boolean throttle) {
167: while (throttle && results.size() > 50) {
168: try {
169: this .wait(100);
170: } catch (InterruptedException e) {
171: }
172: }
173: int nextId = nextId();
174: sessions.put(new Integer(nextId), session);
175: results.put(new Integer(nextId), result);
176: session.setAttribute(ATTRIBUTE_STAGED_EXECUTOR_CONTINUATION,
177: cont);
178: dnsProbe.getRecordsAsynch(cont.getRequest(), nextId,
179: responseQueue);
180: }
181:
182: public void run() {
183:
184: while (true) {
185:
186: IResponse resp = responseQueue.removeResponse();
187:
188: Integer respId = (Integer) resp.getId();
189: SPFSession session = (SPFSession) sessions.remove(respId);
190: FutureSPFResult result = (FutureSPFResult) results
191: .remove(respId);
192:
193: DNSLookupContinuation cont = (DNSLookupContinuation) session
194: .getAttribute(ATTRIBUTE_STAGED_EXECUTOR_CONTINUATION);
195:
196: DNSResponse response;
197: if (resp.getException() != null) {
198: response = new DNSResponse((TimeoutException) resp
199: .getException());
200: } else {
201: response = new DNSResponse((List) resp.getValue());
202: }
203:
204: try {
205: cont = cont.getListener().onDNSResponse(response,
206: session);
207:
208: if (cont != null) {
209: invokeAsynchService(session, result, cont, false);
210: } else {
211: execute(session, result, false);
212: }
213:
214: } catch (Exception e) {
215: SPFChecker checker = null;
216: while (e != null) {
217: while (checker == null
218: || !(checker instanceof SPFCheckerExceptionCatcher)) {
219: checker = session.popChecker();
220: }
221: try {
222: ((SPFCheckerExceptionCatcher) checker)
223: .onException(e, session);
224: e = null;
225: } catch (SPFResultException ex) {
226: e = ex;
227: } finally {
228: checker = null;
229: }
230: }
231: execute(session, result, false);
232: }
233: }
234: }
235:
236: }
|