001: /*
002: * $Id: UdpConnector.java 10961 2008-02-22 19:01:02Z dfeist $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.transport.udp;
012:
013: import org.mule.api.MuleException;
014: import org.mule.api.endpoint.ImmutableEndpoint;
015: import org.mule.api.endpoint.InboundEndpoint;
016: import org.mule.api.lifecycle.InitialisationException;
017: import org.mule.api.service.Service;
018: import org.mule.transport.AbstractConnector;
019:
020: import java.net.DatagramSocket;
021:
022: import org.apache.commons.pool.impl.GenericKeyedObjectPool;
023:
024: /**
025: * <code>UdpConnector</code> can send and receive Mule events as Datagram packets.
026: */
027: public class UdpConnector extends AbstractConnector {
028:
029: public static final String UDP = "udp";
030: public static final int DEFAULT_SOCKET_TIMEOUT = INT_VALUE_NOT_SET;
031: public static final int DEFAULT_BUFFER_SIZE = 1024 * 16;
032: public static final String KEEP_SEND_SOCKET_OPEN_PROPERTY = "keepSendSocketOpen";
033:
034: protected int sendTimeout = DEFAULT_SOCKET_TIMEOUT;
035: protected int receiveTimeout = DEFAULT_SOCKET_TIMEOUT;
036: protected int sendBufferSize = DEFAULT_BUFFER_SIZE;
037: protected int receiveBufferSize = DEFAULT_BUFFER_SIZE;
038: protected boolean keepSendSocketOpen = true;
039: protected boolean broadcast;
040: protected GenericKeyedObjectPool dispatcherSocketsPool = new GenericKeyedObjectPool();
041:
042: protected void doInitialise() throws InitialisationException {
043: dispatcherSocketsPool.setFactory(new UdpSocketFactory());
044: dispatcherSocketsPool.setTestOnBorrow(true);
045: dispatcherSocketsPool.setTestOnReturn(true);
046: //There should only be one pooled instance per socket (key)
047: dispatcherSocketsPool.setMaxActive(1);
048: }
049:
050: protected void doDispose() {
051: try {
052: dispatcherSocketsPool.close();
053: } catch (Exception e) {
054: logger.warn("Failed to close dispatcher socket pool: "
055: + e.getMessage());
056: }
057: }
058:
059: protected void doConnect() throws Exception {
060: // template method
061: }
062:
063: protected void doDisconnect() throws Exception {
064: dispatcherSocketsPool.clear();
065: }
066:
067: protected void doStart() throws MuleException {
068: // template method
069: }
070:
071: protected void doStop() throws MuleException {
072: // template method
073: }
074:
075: public String getProtocol() {
076: return UDP;
077: }
078:
079: public int getSendTimeout() {
080: return this .sendTimeout;
081: }
082:
083: public void setSendTimeout(int timeout) {
084: if (timeout < 0) {
085: timeout = DEFAULT_SOCKET_TIMEOUT;
086: }
087: this .sendTimeout = timeout;
088: }
089:
090: public int getReceiveTimeout() {
091: return receiveTimeout;
092: }
093:
094: public void setReceiveTimeout(int timeout) {
095: if (timeout < 0) {
096: timeout = DEFAULT_SOCKET_TIMEOUT;
097: }
098: this .receiveTimeout = timeout;
099: }
100:
101: public int getSendBufferSize() {
102: return sendBufferSize;
103: }
104:
105: public void setSendBufferSize(int sendBufferSize) {
106: if (sendBufferSize < 1) {
107: sendBufferSize = DEFAULT_BUFFER_SIZE;
108: }
109: this .sendBufferSize = sendBufferSize;
110: }
111:
112: public int getReceiveBufferSize() {
113: return receiveBufferSize;
114: }
115:
116: public void setReceiveBufferSize(int receiveBufferSize) {
117: if (receiveBufferSize < 1) {
118: receiveBufferSize = DEFAULT_BUFFER_SIZE;
119: }
120: this .receiveBufferSize = receiveBufferSize;
121: }
122:
123: public boolean isBroadcast() {
124: return broadcast;
125: }
126:
127: public void setBroadcast(boolean broadcast) {
128: this .broadcast = broadcast;
129: }
130:
131: public boolean isKeepSendSocketOpen() {
132: return keepSendSocketOpen;
133: }
134:
135: public void setKeepSendSocketOpen(boolean keepSendSocketOpen) {
136: this .keepSendSocketOpen = keepSendSocketOpen;
137: }
138:
139: /**
140: * Lookup a socket in the list of dispatcher sockets but don't create a new
141: * socket
142: *
143: * @param endpoint
144: * @return
145: */
146: DatagramSocket getSocket(ImmutableEndpoint endpoint)
147: throws Exception {
148: return (DatagramSocket) dispatcherSocketsPool
149: .borrowObject(endpoint);
150: }
151:
152: void releaseSocket(DatagramSocket socket, ImmutableEndpoint endpoint)
153: throws Exception {
154: dispatcherSocketsPool.returnObject(endpoint, socket);
155: }
156:
157: protected Object getReceiverKey(Service service,
158: InboundEndpoint endpoint) {
159: return endpoint.getEndpointURI().getAddress() + "/"
160: + service.getName();
161: }
162: }
|