001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.net.protocol.tcm;
006:
007: import com.tc.async.api.Sink;
008: import com.tc.net.TCSocketAddress;
009: import com.tc.net.core.TCListener;
010: import com.tc.net.protocol.transport.ConnectionIDFactory;
011: import com.tc.net.protocol.transport.WireProtocolMessageSink;
012: import com.tc.util.TCTimeoutException;
013:
014: import java.io.IOException;
015: import java.net.InetAddress;
016: import java.util.Set;
017:
018: /**
019: * A handle to a specific server port listener
020: *
021: * @author teck
022: */
023: class NetworkListenerImpl implements NetworkListener {
024: private final ChannelManagerImpl channelManager;
025: private final TCMessageRouter tcmRouter;
026: private final CommunicationsManagerImpl commsMgr;
027: private final TCSocketAddress addr;
028: private TCListener lsnr;
029: private boolean started;
030: private final TCMessageFactory msgFactory;
031: private final boolean reuseAddr;
032: private final ConnectionIDFactory connectionIdFactory;
033: private final Sink httpSink;
034: private final WireProtocolMessageSink wireProtoMsgSnk;
035:
036: // this cstr is intentionally not public, only the Comms Manager should be
037: // creating them
038: NetworkListenerImpl(TCSocketAddress addr,
039: CommunicationsManagerImpl commsMgr,
040: ChannelManagerImpl channelManager,
041: TCMessageFactory msgFactory, TCMessageRouter router,
042: boolean reuseAddr, ConnectionIDFactory connectionIdFactory,
043: Sink httpSink, WireProtocolMessageSink wireProtoMsgSnk) {
044: this .commsMgr = commsMgr;
045: this .channelManager = channelManager;
046: this .addr = addr;
047: this .connectionIdFactory = connectionIdFactory;
048: this .httpSink = httpSink;
049: this .wireProtoMsgSnk = wireProtoMsgSnk;
050: this .started = false;
051: this .msgFactory = msgFactory;
052: this .reuseAddr = reuseAddr;
053: this .tcmRouter = router;
054: }
055:
056: /**
057: * Start this listener listening on the network. You probably don't want to start a listener until you have properly
058: * setup your protocol routes, since you might miss messages between the time the listener is <code>start()</code>
059: * 'ed and the time you add your routes.
060: *
061: * @throws IOException if an IO error occurs (this will most likely be a problem binding to the specified
062: * port/address)
063: */
064: public synchronized void start(Set initialConnectionIDs)
065: throws IOException {
066: this .lsnr = commsMgr.createCommsListener(this .addr,
067: this .channelManager, this .reuseAddr,
068: initialConnectionIDs, this .connectionIdFactory,
069: this .httpSink, this .wireProtoMsgSnk);
070: this .started = true;
071: commsMgr.registerListener(this );
072: }
073:
074: public synchronized void stop(long timeout)
075: throws TCTimeoutException {
076: if (!started) {
077: return;
078: }
079:
080: try {
081: if (lsnr != null) {
082: lsnr.stop(timeout);
083: }
084: } finally {
085: started = false;
086: commsMgr.unregisterListener(this );
087: }
088: }
089:
090: public void routeMessageType(TCMessageType messageType,
091: TCMessageSink sink) {
092: tcmRouter.routeMessageType(messageType, sink);
093: }
094:
095: /**
096: * Routes a TCMessage to a sink. The hydrate sink will do the hydrate() work
097: */
098: public void routeMessageType(TCMessageType messageType,
099: Sink destSink, Sink hydrateSink) {
100: routeMessageType(messageType, new TCMessageSinkToSedaSink(
101: destSink, hydrateSink));
102: }
103:
104: public ChannelManager getChannelManager() {
105: return channelManager;
106: }
107:
108: public void addClassMapping(TCMessageType type, Class msgClass) {
109: this .msgFactory.addClassMapping(type, msgClass);
110: }
111:
112: public synchronized InetAddress getBindAddress() {
113: if (!started) {
114: throw new IllegalArgumentException("Listener not running");
115: }
116: return lsnr.getBindAddress();
117: }
118:
119: public synchronized int getBindPort() {
120: if (!started) {
121: throw new IllegalArgumentException("Listener not running");
122: }
123: return lsnr.getBindPort();
124: }
125:
126: public String toString() {
127: try {
128: return getBindAddress().getHostAddress() + ":"
129: + getBindPort();
130: } catch (Exception e) {
131: return "Exception in toString(): " + e.getMessage();
132: }
133: }
134: }
|