001: /*
002: * SSHTools - Java SSH2 API
003: *
004: * Copyright (C) 2002-2003 Lee David Painter and Contributors.
005: *
006: * Contributions made by:
007: *
008: * Brett Smith
009: * Richard Pernavas
010: * Erwin Bolwidt
011: *
012: * This program is free software; you can redistribute it and/or
013: * modify it under the terms of the GNU General Public License
014: * as published by the Free Software Foundation; either version 2
015: * of the License, or (at your option) any later version.
016: *
017: * This program is distributed in the hope that it will be useful,
018: * but WITHOUT ANY WARRANTY; without even the implied warranty of
019: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
020: * GNU General Public License for more details.
021: *
022: * You should have received a copy of the GNU General Public License
023: * along with this program; if not, write to the Free Software
024: * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
025: */
026: package com.sshtools.j2ssh.connection;
027:
028: import com.sshtools.j2ssh.io.IOStreamConnector;
029: import com.sshtools.j2ssh.transport.MessageNotAvailableException;
030: import com.sshtools.j2ssh.transport.MessageStoreEOFException;
031: import com.sshtools.j2ssh.transport.SshMessageStore;
032:
033: import org.apache.commons.logging.Log;
034: import org.apache.commons.logging.LogFactory;
035:
036: import java.io.IOException;
037: import java.io.InputStream;
038: import java.io.OutputStream;
039:
040: /**
041: *
042: *
043: * @author $author$
044: * @version $Revision: 1.16 $
045: */
046: public abstract class IOChannel extends Channel {
047: private static Log log = LogFactory.getLog(IOChannel.class);
048:
049: /** */
050: private SshMessageStore incoming = new SshMessageStore();
051:
052: /** */
053: protected ChannelInputStream in;
054:
055: /** */
056: protected ChannelOutputStream out;
057:
058: /** */
059: protected InputStream boundInputStream = null;
060:
061: /** */
062: protected OutputStream boundOutputStream = null;
063:
064: //protected IOChannel boundIOChannel = null;
065:
066: /** */
067: protected IOStreamConnector ios = null;
068:
069: /**
070: *
071: *
072: * @param connection
073: * @param localChannelId
074: * @param senderChannelId
075: * @param initialWindowSize
076: * @param maximumPacketSize
077: *
078: * @throws IOException
079: */
080: protected void init(ConnectionProtocol connection,
081: long localChannelId, long senderChannelId,
082: long initialWindowSize, long maximumPacketSize)
083: throws IOException {
084: this .in = new ChannelInputStream(incoming); //ChannelInputStream.createStandard(incoming);
085: this .out = new ChannelOutputStream(this );
086: super .init(connection, localChannelId, senderChannelId,
087: initialWindowSize, maximumPacketSize);
088: }
089:
090: /**
091: *
092: *
093: * @throws IOException
094: */
095: protected void open() throws IOException {
096: super .open();
097:
098: // If were bound send any outstanding messages sitting around
099: if (boundOutputStream != null) {
100: sendOutstandingMessages();
101: }
102:
103: // Start the bound inputstream
104: if ((boundInputStream != null) && (ios == null)) {
105: ios.setCloseInput(false);
106: ios.setCloseOutput(false);
107: ios.connect(boundInputStream, out);
108: }
109: }
110:
111: /**
112: *
113: *
114: * @return
115: */
116: public ChannelInputStream getInputStream() {
117: return in;
118: }
119:
120: /**
121: *
122: *
123: * @return
124: */
125: public ChannelOutputStream getOutputStream() {
126: return out;
127: }
128:
129: /**
130: *
131: *
132: * @param msg
133: *
134: * @throws IOException
135: */
136: protected void onChannelData(SshMsgChannelData msg)
137: throws IOException {
138: // Synchronize on the message store to ensure that another thread
139: // does not try to read its data. This will make sure that the incoming
140: // messages are not being flushed to an outputstream after a bind
141: synchronized (incoming) {
142: if (boundOutputStream != null) {
143: try {
144: boundOutputStream.write(msg.getChannelData());
145: } catch (IOException ex) {
146: log
147: .info("Could not route data to the bound OutputStream; Closing channel.");
148: log.info(ex.getMessage());
149: close();
150: }
151: } else {
152: incoming.addMessage(msg);
153: }
154: }
155: }
156:
157: /**
158: *
159: *
160: * @throws IOException
161: */
162: public void setLocalEOF() throws IOException {
163: super .setLocalEOF();
164:
165: if (!out.isClosed()) {
166: out.close();
167: }
168: }
169:
170: /**
171: *
172: *
173: * @throws IOException
174: */
175: protected void onChannelEOF() throws IOException {
176: if (!in.isClosed()) {
177: in.close();
178: }
179: }
180:
181: /**
182: *
183: *
184: * @throws IOException
185: */
186: protected void onChannelClose() throws IOException {
187: // Close the input/output streams
188: if (!in.isClosed()) {
189: in.close();
190: }
191:
192: if (!out.isClosed()) {
193: out.close();
194: }
195:
196: // Close the bound channel
197:
198: /* if(boundIOChannel!=null && !boundIOChannel.isClosed())
199: boundIOChannel.close();*/
200:
201: // Close the IOStream connector if were bound
202: if (ios != null) {
203: ios.close();
204: }
205: }
206:
207: /**
208: *
209: *
210: * @param msg
211: *
212: * @throws IOException
213: */
214: protected void onChannelExtData(SshMsgChannelExtendedData msg)
215: throws IOException {
216: // This class will not deal with extended data
217: // incoming.addMessage(msg);
218: }
219:
220: /*public void bindIOChannel(IOChannel boundIOChannel) throws IOException {
221: this.boundIOChannel = boundIOChannel;
222: // If the bound channel is open then bind the outputstreams
223: if (boundIOChannel.getState().getValue() == ChannelState.CHANNEL_OPEN) {
224: throw new IOException("You cannot bind to an open channel");
225: }
226: // Create an event listener so we can listen
227: boundIOChannel.addEventListener(new ChannelEventListener() {
228: public void onChannelOpen(Channel channel) {
229: try {
230: bindOutputStream(IOChannel.this.boundIOChannel.getOutputStream());
231: IOChannel.this.boundIOChannel.bindOutputStream(getOutputStream());
232: }
233: catch (IOException ex) {
234: log.info("Failed to bind the channel");
235: }
236: }
237: public void onChannelEOF(Channel channel) {
238: try {
239: //setLocalEOF();
240: close();
241: }
242: catch (IOException ex) {
243: log.info("Failed to set the channel to EOF");
244: }
245: }
246: public void onChannelClose(Channel channel) {
247: try {
248: if(!isClosed())
249: close();
250: }
251: catch (IOException ex) {
252: log.info("Failed to close the channel");
253: }
254: }
255: public void onDataReceived(Channel channel, byte[] data) {
256: }
257: public void onDataSent(Channel channel, byte[] data) {
258: }
259: });
260: }*/
261: public void bindOutputStream(OutputStream boundOutputStream)
262: throws IOException {
263: // Synchronize on the incoming message store to ensure that no other
264: // messages are added whilst we transfer to a bound state
265: synchronized (incoming) {
266: this .boundOutputStream = boundOutputStream;
267:
268: if (state.getValue() == ChannelState.CHANNEL_OPEN) {
269: sendOutstandingMessages();
270: }
271: }
272: }
273:
274: /**
275: *
276: *
277: * @param boundInputStream
278: *
279: * @throws IOException
280: */
281: public void bindInputStream(InputStream boundInputStream)
282: throws IOException {
283: this .boundInputStream = boundInputStream;
284: this .ios = new IOStreamConnector();
285:
286: if (state.getValue() == ChannelState.CHANNEL_OPEN) {
287: ios.setCloseInput(false);
288: ios.setCloseOutput(false);
289: ios.connect(boundInputStream, out);
290: }
291: }
292:
293: private void sendOutstandingMessages() throws IOException {
294: if ((boundInputStream != null) && (boundOutputStream != null)
295: && incoming.hasMessages()) {
296: while (true) {
297: try {
298: // Peek into the message store and look for the next message
299: SshMsgChannelData msg = (SshMsgChannelData) incoming
300: .peekMessage(SshMsgChannelData.SSH_MSG_CHANNEL_DATA);
301:
302: // Remove the message so we dont process again
303: incoming.removeMessage(msg);
304:
305: // Write the message out to the bound OutputStream
306: try {
307: boundOutputStream.write(msg.getChannelData());
308: } catch (IOException ex1) {
309: //log.info("Could not write outstanding messages to the bound OutputStream: " +ex1.getMessage());
310: close();
311: }
312: } catch (MessageStoreEOFException ex) {
313: break;
314: } catch (MessageNotAvailableException ex) {
315: break;
316: } catch (InterruptedException ex) {
317: throw new IOException("The thread was interrupted");
318: }
319: }
320: }
321: }
322: }
|