001: /*
002: * $Id: UdpMessageDispatcher.java 10961 2008-02-22 19:01:02Z dfeist $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.transport.udp;
012:
013: import org.mule.DefaultMuleMessage;
014: import org.mule.api.MuleEvent;
015: import org.mule.api.MuleMessage;
016: import org.mule.api.endpoint.ImmutableEndpoint;
017: import org.mule.api.endpoint.OutboundEndpoint;
018: import org.mule.transport.AbstractMessageDispatcher;
019:
020: import java.io.IOException;
021: import java.net.DatagramPacket;
022: import java.net.DatagramSocket;
023: import java.net.InetAddress;
024:
025: /**
026: * <code>UdpMessageDispatcher</code> is responsible for dispatching MuleEvents as
027: * UDP packets on the network
028: */
029:
030: public class UdpMessageDispatcher extends AbstractMessageDispatcher {
031: protected final UdpConnector connector;
032:
033: public UdpMessageDispatcher(OutboundEndpoint endpoint) {
034: super (endpoint);
035: this .connector = (UdpConnector) endpoint.getConnector();
036: }
037:
038: protected void doConnect() throws Exception {
039: // Test the connection
040: DatagramSocket socket = connector.getSocket(endpoint);
041: connector.releaseSocket(socket, endpoint);
042: }
043:
044: protected void doDisconnect() throws Exception {
045: // nothing to do
046: }
047:
048: protected synchronized void doDispatch(MuleEvent event)
049: throws Exception {
050: ImmutableEndpoint ep = event.getEndpoint();
051:
052: DatagramSocket socket = connector.getSocket(ep);
053: try {
054: byte[] payload = event.transformMessageToBytes();
055:
056: int port = ep.getEndpointURI().getPort();
057: InetAddress inetAddress = null;
058: //TODO, check how expensive this operation is
059: if ("null".equalsIgnoreCase(ep.getEndpointURI().getHost())) {
060: inetAddress = InetAddress.getLocalHost();
061: } else {
062: inetAddress = InetAddress.getByName(ep.getEndpointURI()
063: .getHost());
064: }
065:
066: write(socket, payload, port, inetAddress);
067: } finally {
068: connector.releaseSocket(socket, ep);
069: }
070: }
071:
072: protected void write(DatagramSocket socket, byte[] data, int port,
073: InetAddress inetAddress) throws IOException {
074: DatagramPacket packet = new DatagramPacket(data, data.length);
075: if (port >= 0) {
076: packet.setPort(port);
077: }
078: packet.setAddress(inetAddress);
079: socket.send(packet);
080: }
081:
082: protected synchronized MuleMessage doSend(MuleEvent event)
083: throws Exception {
084: doDispatch(event);
085: // If we're doing sync receive try and read return info from socket
086: if (event.getEndpoint().isRemoteSync()) {
087: DatagramSocket socket = connector.getSocket(event
088: .getEndpoint());
089: DatagramPacket result = receive(socket, event.getTimeout());
090: if (result == null) {
091: return null;
092: }
093: return new DefaultMuleMessage(connector
094: .getMessageAdapter(result), event.getMessage());
095: } else {
096: return event.getMessage();
097: }
098: }
099:
100: private DatagramPacket receive(DatagramSocket socket, int timeout)
101: throws IOException {
102: int origTimeout = socket.getSoTimeout();
103: try {
104: DatagramPacket packet = new DatagramPacket(
105: new byte[connector.getReceiveBufferSize()],
106: connector.getReceiveBufferSize());
107:
108: if (timeout > 0 && timeout != socket.getSoTimeout()) {
109: socket.setSoTimeout(timeout);
110: }
111: socket.receive(packet);
112: return packet;
113: } finally {
114: if (socket.getSoTimeout() != origTimeout) {
115: socket.setSoTimeout(origTimeout);
116: }
117: }
118: }
119:
120: protected void doDispose() {
121: // template method
122: }
123: }
|