001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */
019: package org.apache.axis2.transport.nhttp;
020:
021: import java.io.IOException;
022: import java.io.InterruptedIOException;
023: import java.io.OutputStream;
024: import java.net.InetSocketAddress;
025: import java.net.MalformedURLException;
026: import java.net.URL;
027: import java.util.Iterator;
028: import java.util.Map;
029:
030: import javax.net.ssl.SSLContext;
031:
032: import org.apache.axiom.om.OMOutputFormat;
033: import org.apache.axis2.AxisFault;
034: import org.apache.axis2.Constants;
035: import org.apache.axis2.addressing.AddressingConstants;
036: import org.apache.axis2.addressing.EndpointReference;
037: import org.apache.axis2.context.ConfigurationContext;
038: import org.apache.axis2.context.MessageContext;
039: import org.apache.axis2.description.TransportOutDescription;
040: import org.apache.axis2.engine.MessageReceiver;
041: import org.apache.axis2.handlers.AbstractHandler;
042: import org.apache.axis2.transport.MessageFormatter;
043: import org.apache.axis2.transport.OutTransportInfo;
044: import org.apache.axis2.transport.TransportSender;
045: import org.apache.axis2.transport.TransportUtils;
046: import org.apache.axis2.util.MessageContextBuilder;
047: import org.apache.axis2.util.Utils;
048: import org.apache.commons.logging.Log;
049: import org.apache.commons.logging.LogFactory;
050: import org.apache.http.HttpHost;
051: import org.apache.http.HttpResponse;
052: import org.apache.http.HttpStatus;
053: import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
054: import org.apache.http.nio.NHttpClientConnection;
055: import org.apache.http.nio.NHttpClientHandler;
056: import org.apache.http.nio.reactor.ConnectingIOReactor;
057: import org.apache.http.nio.reactor.IOEventDispatch;
058: import org.apache.http.nio.reactor.SessionRequest;
059: import org.apache.http.nio.reactor.SessionRequestCallback;
060: import org.apache.http.params.BasicHttpParams;
061: import org.apache.http.params.HttpConnectionParams;
062: import org.apache.http.params.HttpParams;
063: import org.apache.http.params.HttpProtocolParams;
064: import org.apache.http.protocol.HTTP;
065:
066: /**
067: * NIO transport sender for Axis2 based on HttpCore and NIO extensions
068: */
069: public class HttpCoreNIOSender extends AbstractHandler implements
070: TransportSender {
071:
072: private static final Log log = LogFactory
073: .getLog(HttpCoreNIOSender.class);
074:
075: /** The Axis2 configuration context */
076: private ConfigurationContext cfgCtx;
077: /** The IOReactor */
078: private ConnectingIOReactor ioReactor = null;
079: /** The client handler */
080: private NHttpClientHandler handler = null;
081: /** The session request callback that calls back to the message receiver with errors */
082: private final SessionRequestCallback sessionRequestCallback = getSessionRequestCallback();
083: /** The SSL Context to be used */
084: private SSLContext sslContext = null;
085:
086: /** The SSL session handler that manages hostname verification etc */
087: // private SSLIOSessionHandler sslIOSessionHandler = null;
088: /**
089: * Initialize the transport sender, and execute reactor in new seperate thread
090: * @param cfgCtx the Axis2 configuration context
091: * @param transportOut the description of the http/s transport from Axis2 configuration
092: * @throws AxisFault thrown on an error
093: */
094: public void init(ConfigurationContext cfgCtx,
095: TransportOutDescription transportOut) throws AxisFault {
096: this .cfgCtx = cfgCtx;
097:
098: // is this an SSL Sender?
099: sslContext = getSSLContext(transportOut);
100: //sslIOSessionHandler = getSSLIOSessionHandler(transportOut);
101:
102: // start the Sender in a new seperate thread
103: Thread t = new Thread(new Runnable() {
104: public void run() {
105: executeClientEngine();
106: }
107: }, "HttpCoreNIOSender");
108: t.start();
109: log.info((sslContext == null ? "HTTP" : "HTTPS")
110: + " Sender starting");
111: }
112:
113: /**
114: * Configure and start the IOReactor
115: */
116: private void executeClientEngine() {
117:
118: HttpParams params = getClientParameters();
119: try {
120: ioReactor = new DefaultConnectingIOReactor(
121: NHttpConfiguration.getInstance()
122: .getClientIOWorkers(), params);
123: } catch (IOException e) {
124: log.error("Error starting the IOReactor", e);
125: }
126:
127: handler = new ClientHandler(cfgCtx, params);
128: IOEventDispatch ioEventDispatch = getEventDispatch(handler,
129: sslContext, /*sslIOSessionHandler,*/params);
130:
131: try {
132: ioReactor.execute(ioEventDispatch);
133: } catch (InterruptedIOException ex) {
134: log.fatal("Reactor Interrupted");
135: } catch (IOException e) {
136: log.fatal("Encountered an I/O error: " + e.getMessage(), e);
137: }
138: log.info("Sender Shutdown");
139: }
140:
141: /**
142: * Return the IOEventDispatch implementation to be used. This is overridden by the
143: * SSL sender
144: * @param handler
145: * @param sslContext
146: * @param params
147: * @return
148: */
149: protected IOEventDispatch getEventDispatch(
150: NHttpClientHandler handler,
151: SSLContext sslContext,
152: /*SSLIOSessionHandler sslIOSessionHandler,*/HttpParams params) {
153: return new PlainClientIOEventDispatch(handler, params);
154: }
155:
156: /**
157: * Always return null, as this implementation does not support outgoing SSL
158: * @param transportOut
159: * @return null
160: * @throws AxisFault
161: */
162: protected SSLContext getSSLContext(
163: TransportOutDescription transportOut) throws AxisFault {
164: return null;
165: }
166:
167: /**
168: * Create the SSL IO Session handler to be used by this listener
169: * @param transportOut
170: * @return always null
171: */
172: // protected SSLIOSessionHandler getSSLIOSessionHandler(TransportOutDescription transportOut)
173: // throws AxisFault {
174: // return null;
175: // }
176: /**
177: * get HTTP protocol parameters to which the sender must adhere to
178: * @return the applicable HTTP protocol parameters
179: */
180: private HttpParams getClientParameters() {
181: NHttpConfiguration cfg = NHttpConfiguration.getInstance();
182: HttpParams params = new BasicHttpParams();
183: params
184: .setIntParameter(
185: HttpConnectionParams.SO_TIMEOUT,
186: cfg.getProperty(
187: HttpConnectionParams.SO_TIMEOUT, 60000))
188: .setIntParameter(
189: HttpConnectionParams.CONNECTION_TIMEOUT,
190: cfg
191: .getProperty(
192: HttpConnectionParams.CONNECTION_TIMEOUT,
193: 10000))
194: .setIntParameter(
195: HttpConnectionParams.SOCKET_BUFFER_SIZE,
196: cfg
197: .getProperty(
198: HttpConnectionParams.SOCKET_BUFFER_SIZE,
199: 8 * 1024))
200: .setBooleanParameter(
201: HttpConnectionParams.STALE_CONNECTION_CHECK,
202: cfg
203: .getProperty(
204: HttpConnectionParams.STALE_CONNECTION_CHECK,
205: 0) == 1)
206: .setBooleanParameter(
207: HttpConnectionParams.TCP_NODELAY,
208: cfg.getProperty(
209: HttpConnectionParams.TCP_NODELAY, 1) == 1)
210: .setParameter(HttpProtocolParams.USER_AGENT,
211: "Axis2-HttpComponents-NIO");
212: return params;
213: }
214:
215: /**
216: * transport sender invocation from Axis2 core
217: * @param msgContext message to be sent
218: * @return the invocation response (always InvocationResponse.CONTINUE)
219: * @throws AxisFault on error
220: */
221: public InvocationResponse invoke(MessageContext msgContext)
222: throws AxisFault {
223:
224: // remove unwanted HTTP headers (if any from the current message)
225: removeUnwantedHeaders(msgContext);
226:
227: EndpointReference epr = Util.getDestinationEPR(msgContext);
228: if (epr != null) {
229: if (!AddressingConstants.Final.WSA_NONE_URI.equals(epr
230: .getAddress())) {
231: sendAsyncRequest(epr, msgContext);
232: } else {
233: handleException("Cannot send message to "
234: + AddressingConstants.Final.WSA_NONE_URI);
235: }
236: } else {
237: if (msgContext.getProperty(Constants.OUT_TRANSPORT_INFO) != null) {
238: if (msgContext
239: .getProperty(Constants.OUT_TRANSPORT_INFO) instanceof ServerWorker) {
240: sendAsyncResponse(msgContext);
241: } else {
242: sendUsingOutputStream(msgContext);
243: }
244: } else {
245: handleException("No valid destination EPR or OutputStream to send message");
246: }
247: }
248:
249: if (msgContext.getOperationContext() != null) {
250: msgContext.getOperationContext().setProperty(
251: Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE);
252: }
253:
254: return InvocationResponse.CONTINUE;
255: }
256:
257: /**
258: * Remove unwanted headers from the http response of outgoing request. These are headers which
259: * should be dictated by the transport and not the user. We remove these as these may get
260: * copied from the request messages
261: * @param msgContext the Axis2 Message context from which these headers should be removed
262: */
263: private void removeUnwantedHeaders(MessageContext msgContext) {
264: Map headers = (Map) msgContext
265: .getProperty(MessageContext.TRANSPORT_HEADERS);
266: if (headers != null && !headers.isEmpty()) {
267: headers.remove(HTTP.CONN_DIRECTIVE);
268: headers.remove(HTTP.TRANSFER_ENCODING);
269: headers.remove(HTTP.DATE_DIRECTIVE);
270: headers.remove(HTTP.SERVER_DIRECTIVE);
271: headers.remove(HTTP.CONTENT_TYPE);
272: headers.remove(HTTP.CONTENT_LEN);
273: headers.remove(HTTP.USER_AGENT);
274: }
275: }
276:
277: /**
278: * Send the request message asynchronously to the given EPR
279: * @param epr the destination EPR for the message
280: * @param msgContext the message being sent
281: * @throws AxisFault on error
282: */
283: private void sendAsyncRequest(EndpointReference epr,
284: MessageContext msgContext) throws AxisFault {
285: try {
286: URL url = new URL(epr.getAddress());
287: int port = url.getPort();
288: if (port == -1) {
289: // use default
290: if ("http".equals(url.getProtocol())) {
291: port = 80;
292: } else if ("https".equals(url.getProtocol())) {
293: port = 443;
294: }
295: }
296: HttpHost httpHost = new HttpHost(url.getHost(), port, url
297: .getProtocol());
298:
299: Axis2HttpRequest axis2Req = new Axis2HttpRequest(epr,
300: httpHost, msgContext);
301:
302: NHttpClientConnection conn = ConnectionPool.getConnection(
303: url.getHost(), port);
304:
305: if (conn == null) {
306: ioReactor.connect(new InetSocketAddress(url.getHost(),
307: port), null, axis2Req, sessionRequestCallback);
308: log.debug("A new connection established");
309: } else {
310: ((ClientHandler) handler).submitRequest(conn, axis2Req);
311: log.debug("An existing connection reused");
312: }
313:
314: axis2Req.streamMessageContents();
315:
316: } catch (MalformedURLException e) {
317: handleException("Malformed destination EPR : "
318: + epr.getAddress(), e);
319: } catch (IOException e) {
320: handleException(
321: "IO Error while submiting request message for sending",
322: e);
323: }
324: }
325:
326: /**
327: * Send the passed in response message, asynchronously
328: * @param msgContext the message context to be sent
329: * @throws AxisFault on error
330: */
331: private void sendAsyncResponse(MessageContext msgContext)
332: throws AxisFault {
333:
334: // remove unwanted HTTP headers (if any from the current message)
335: removeUnwantedHeaders(msgContext);
336:
337: ServerWorker worker = (ServerWorker) msgContext
338: .getProperty(Constants.OUT_TRANSPORT_INFO);
339: HttpResponse response = worker.getResponse();
340:
341: OMOutputFormat format = Util.getOMOutputFormat(msgContext);
342: MessageFormatter messageFormatter = TransportUtils
343: .getMessageFormatter(msgContext);
344: response.setHeader(HTTP.CONTENT_TYPE, messageFormatter
345: .getContentType(msgContext, format, msgContext
346: .getSoapAction()));
347:
348: // return http 500 when a SOAP fault is returned
349: if (msgContext.getEnvelope().getBody().hasFault()) {
350: response.setStatusCode(HttpStatus.SC_INTERNAL_SERVER_ERROR);
351: }
352:
353: // if this is a dummy message to handle http 202 case with non-blocking IO
354: // set the status code to 202 and the message body to an empty byte array (see below)
355: if (Utils.isExplicitlyTrue(msgContext,
356: NhttpConstants.SC_ACCEPTED)
357: && msgContext.getProperty(
358: //org.apache.sandesha2.Sandesha2Constants.MessageContextProperties.SEQUENCE_ID
359: "WSRMSequenceId") == null) {
360: response.setStatusCode(HttpStatus.SC_ACCEPTED);
361: }
362:
363: // set any transport headers
364: Map transportHeaders = (Map) msgContext
365: .getProperty(MessageContext.TRANSPORT_HEADERS);
366: if (transportHeaders != null
367: && !transportHeaders.values().isEmpty()) {
368: Iterator iter = transportHeaders.keySet().iterator();
369: while (iter.hasNext()) {
370: Object header = iter.next();
371: Object value = transportHeaders.get(header);
372: if (value != null && header instanceof String
373: && value instanceof String) {
374: response.setHeader((String) header, (String) value);
375: }
376: }
377: }
378: worker.getServiceHandler().commitResponse(worker.getConn(),
379: response);
380:
381: OutputStream out = worker.getOutputStream();
382: try {
383: if (Utils.isExplicitlyTrue(msgContext,
384: NhttpConstants.SC_ACCEPTED)
385: && msgContext.getProperty(
386: //Sandesha2Constants.MessageContextProperties.SEQUENCE_ID
387: "WSRMSequenceId") == null) {
388: // see comment above on the reasoning
389: out.write(new byte[0]);
390: } else {
391: messageFormatter.writeTo(msgContext, format, out, true);
392: }
393: out.close();
394: } catch (IOException e) {
395: handleException("IO Error sending response message", e);
396: }
397:
398: try {
399: worker.getIs().close();
400: } catch (IOException ignore) {
401: }
402: }
403:
404: private void sendUsingOutputStream(MessageContext msgContext)
405: throws AxisFault {
406: OMOutputFormat format = Util.getOMOutputFormat(msgContext);
407: MessageFormatter messageFormatter = TransportUtils
408: .getMessageFormatter(msgContext);
409: OutputStream out = (OutputStream) msgContext
410: .getProperty(MessageContext.TRANSPORT_OUT);
411:
412: if (msgContext.isServerSide()) {
413: OutTransportInfo transportInfo = (OutTransportInfo) msgContext
414: .getProperty(Constants.OUT_TRANSPORT_INFO);
415:
416: if (transportInfo != null) {
417: transportInfo.setContentType(messageFormatter
418: .getContentType(msgContext, format, msgContext
419: .getSoapAction()));
420: } else {
421: throw new AxisFault(Constants.OUT_TRANSPORT_INFO
422: + " has not been set");
423: }
424: }
425:
426: try {
427: messageFormatter.writeTo(msgContext, format, out, true);
428: out.close();
429: } catch (IOException e) {
430: handleException("IO Error sending response message", e);
431: }
432: }
433:
434: public void cleanup(MessageContext msgContext) throws AxisFault {
435: // do nothing
436: }
437:
438: public void stop() {
439: try {
440: ioReactor.shutdown();
441: log.info("Sender shut down");
442: } catch (IOException e) {
443: log.warn("Error shutting down IOReactor", e);
444: }
445: }
446:
447: /**
448: * Return a SessionRequestCallback which gets notified of a connection failure
449: * or an error during a send operation. This method finds the corresponding
450: * Axis2 message context for the outgoing request, and find the message receiver
451: * and sends a fault message back to the message receiver that is marked as
452: * related to the outgoing request
453: * @return a Session request callback
454: */
455: private static SessionRequestCallback getSessionRequestCallback() {
456: return new SessionRequestCallback() {
457: public void completed(SessionRequest request) {
458: }
459:
460: public void failed(SessionRequest request) {
461: handleError(request);
462: }
463:
464: public void timeout(SessionRequest request) {
465: handleError(request);
466: }
467:
468: public void cancelled(SessionRequest sessionRequest) {
469:
470: }
471:
472: private void handleError(SessionRequest request) {
473: if (request.getAttachment() != null
474: && request.getAttachment() instanceof Axis2HttpRequest) {
475:
476: Axis2HttpRequest axis2Request = (Axis2HttpRequest) request
477: .getAttachment();
478: MessageContext mc = axis2Request.getMsgContext();
479: MessageReceiver mr = mc.getAxisOperation()
480: .getMessageReceiver();
481:
482: try {
483: // this fault is NOT caused by the endpoint while processing. so we have to
484: // inform that this is a sending error (e.g. endpoint failure) and handle it
485: // differently at the message receiver.
486:
487: Exception exception = request.getException();
488: MessageContext nioFaultMessageContext = MessageContextBuilder
489: .createFaultMessageContext(
490: /** this is not a mistake I do NOT want getMessage()*/
491: mc, new AxisFault(exception.toString(),
492: exception));
493: nioFaultMessageContext.setProperty(
494: NhttpConstants.SENDING_FAULT,
495: Boolean.TRUE);
496: mr.receive(nioFaultMessageContext);
497:
498: } catch (AxisFault af) {
499: log
500: .error(
501: "Unable to report back failure to the message receiver",
502: af);
503: }
504: }
505: }
506: };
507: }
508:
509: // -------------- utility methods -------------
510: private void handleException(String msg, Exception e)
511: throws AxisFault {
512: log.error(msg, e);
513: throw new AxisFault(msg, e);
514: }
515:
516: private void handleException(String msg) throws AxisFault {
517: log.error(msg);
518: throw new AxisFault(msg);
519: }
520: }
|