001: /*
002: * SSHTools - Java SSH2 API
003: *
004: * Copyright (C) 2002-2003 Lee David Painter and Contributors.
005: *
006: * Contributions made by:
007: *
008: * Brett Smith
009: * Richard Pernavas
010: * Erwin Bolwidt
011: *
012: * This program is free software; you can redistribute it and/or
013: * modify it under the terms of the GNU General Public License
014: * as published by the Free Software Foundation; either version 2
015: * of the License, or (at your option) any later version.
016: *
017: * This program is distributed in the hope that it will be useful,
018: * but WITHOUT ANY WARRANTY; without even the implied warranty of
019: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
020: * GNU General Public License for more details.
021: *
022: * You should have received a copy of the GNU General Public License
023: * along with this program; if not, write to the Free Software
024: * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
025: */
026: package com.sshtools.j2ssh.connection;
027:
028: import com.sshtools.j2ssh.SshException;
029: import com.sshtools.j2ssh.transport.AsyncService;
030: import com.sshtools.j2ssh.transport.MessageStoreEOFException;
031: import com.sshtools.j2ssh.transport.ServiceState;
032: import com.sshtools.j2ssh.transport.SshMessage;
033: import com.sshtools.j2ssh.transport.TransportProtocolState;
034:
035: import org.apache.commons.logging.Log;
036: import org.apache.commons.logging.LogFactory;
037:
038: import java.io.IOException;
039:
040: import java.util.HashMap;
041: import java.util.concurrent.ConcurrentHashMap;
042: import java.util.HashSet;
043: import java.util.Iterator;
044: import java.util.Map;
045:
046: /**
047: *
048: *
049: * @author $author$
050: * @version $Revision: 1.68 $
051: */
052: public class ConnectionProtocol extends AsyncService {
053: private static Log log = LogFactory
054: .getLog(ConnectionProtocol.class);
055: private HashSet reusableChannels = new HashSet();
056: private Map activeChannels = new ConcurrentHashMap();
057: private Map allowedChannels = new HashMap();
058: private Map globalRequests = new HashMap();
059: private long nextChannelId = 0;
060:
061: /**
062: * Creates a new ConnectionProtocol object.
063: */
064: public ConnectionProtocol() {
065: super ("ssh-connection");
066: }
067:
068: /**
069: *
070: *
071: * @param channelName
072: * @param cf
073: *
074: * @throws IOException
075: */
076: public void addChannelFactory(String channelName, ChannelFactory cf)
077: throws IOException {
078: allowedChannels.put(channelName, cf);
079: }
080:
081: /**
082: *
083: *
084: * @param channelName
085: */
086: public void removeChannelFactory(String channelName) {
087: allowedChannels.remove(channelName);
088: }
089:
090: /**
091: *
092: *
093: * @param channelName
094: *
095: * @return
096: */
097: public boolean containsChannelFactory(String channelName) {
098: return allowedChannels.containsKey(channelName);
099: }
100:
101: /**
102: *
103: *
104: * @param requestName
105: * @param handler
106: */
107: public void allowGlobalRequest(String requestName,
108: GlobalRequestHandler handler) {
109: globalRequests.put(requestName, handler);
110: }
111:
112: /**
113: *
114: *
115: * @param channel
116: *
117: * @return
118: *
119: * @throws IOException
120: */
121: public synchronized boolean openChannel(Channel channel)
122: throws IOException {
123: return openChannel(channel, null);
124: }
125:
126: /**
127: *
128: *
129: * @return
130: */
131: public boolean isConnected() {
132: return ((transport.getState().getValue() == TransportProtocolState.CONNECTED) || (transport
133: .getState().getValue() == TransportProtocolState.PERFORMING_KEYEXCHANGE))
134: && (getState().getValue() == ServiceState.SERVICE_STARTED);
135: }
136:
137: private Long getChannelId() {
138: // synchronized (activeChannels) {
139: if (reusableChannels.size() <= 0) {
140: return new Long(nextChannelId++);
141: } else {
142: return (Long) reusableChannels.iterator().next();
143: }
144: //}
145: }
146:
147: /**
148: *
149: *
150: * @param channel
151: * @param eventListener
152: *
153: * @return
154: *
155: * @throws IOException
156: * @throws SshException
157: */
158: public synchronized boolean openChannel(Channel channel,
159: ChannelEventListener eventListener) throws IOException {
160: //synchronized (activeChannels) {
161: Long channelId = getChannelId();
162:
163: // Create the message
164: SshMsgChannelOpen msg = new SshMsgChannelOpen(channel
165: .getChannelType(), channelId.longValue(), channel
166: .getLocalWindow().getWindowSpace(), channel
167: .getLocalPacketSize(), channel.getChannelOpenData());
168:
169: // Send the message
170: transport.sendMessage(msg, this );
171:
172: // Wait for the next message to confirm the open channel (or not)
173: int[] messageIdFilter = new int[2];
174: messageIdFilter[0] = SshMsgChannelOpenConfirmation.SSH_MSG_CHANNEL_OPEN_CONFIRMATION;
175: messageIdFilter[1] = SshMsgChannelOpenFailure.SSH_MSG_CHANNEL_OPEN_FAILURE;
176:
177: try {
178: SshMessage result = messageStore
179: .getMessage(messageIdFilter);
180:
181: if (result.getMessageId() == SshMsgChannelOpenConfirmation.SSH_MSG_CHANNEL_OPEN_CONFIRMATION) {
182: SshMsgChannelOpenConfirmation conf = (SshMsgChannelOpenConfirmation) result;
183: activeChannels.put(channelId, channel);
184: log.debug("Initiating channel");
185: channel.init(this , channelId.longValue(), conf
186: .getSenderChannel(), conf
187: .getInitialWindowSize(), conf
188: .getMaximumPacketSize(), eventListener);
189: channel.open();
190: log.info("Channel "
191: + String.valueOf(channel.getLocalChannelId())
192: + " is open [" + channel.getName() + "]");
193:
194: return true;
195: } else {
196: // Make sure the channels state is closed
197: channel.getState()
198: .setValue(ChannelState.CHANNEL_CLOSED);
199:
200: return false;
201: }
202: } catch (MessageStoreEOFException mse) {
203: throw new IOException(mse.getMessage());
204: } catch (InterruptedException ex) {
205: throw new SshException(
206: "The thread was interrupted whilst waiting for a connection protocol message");
207: }
208: //}
209: }
210:
211: /**
212: *
213: */
214: protected synchronized void onStop() {
215: log.info("Closing all active channels");
216: // synchronized (activeChannels) {
217: log.info("thread has " + activeChannels.values().size()
218: + " active channels to stop");
219: try {
220: Channel channel;
221:
222: for (Iterator x = activeChannels.values().iterator(); x
223: .hasNext();) {
224: channel = (Channel) x.next();
225:
226: if (channel != null) {
227: if (log.isDebugEnabled()) {
228: log.debug("Closing "
229: + channel.getName()
230: + " id="
231: + String.valueOf(channel
232: .getLocalChannelId()));
233: }
234:
235: channel.close();
236: }
237: }
238: } catch (Throwable t) {
239: log.error(
240: "Unable to close all channels: " + t.getMessage(),
241: t);
242: }
243:
244: activeChannels.clear();
245: // }
246: }
247:
248: /**
249: *
250: *
251: * @param channel
252: * @param data
253: *
254: * @throws IOException
255: */
256: public synchronized void sendChannelData(Channel channel,
257: byte[] data) throws IOException {
258: synchronized (channel.getState()) {
259: if (log.isDebugEnabled()) {
260: log.debug("Sending " + String.valueOf(data.length)
261: + " bytes for channel id "
262: + String.valueOf(channel.getLocalChannelId()));
263: }
264:
265: int sent = 0;
266: int block;
267: int remaining;
268: long max;
269: byte[] buffer;
270: ChannelDataWindow window = channel.getRemoteWindow();
271:
272: while (sent < data.length) {
273: remaining = data.length - sent;
274: max = ((window.getWindowSpace() < channel
275: .getRemotePacketSize()) && (window
276: .getWindowSpace() > 0)) ? window
277: .getWindowSpace() : channel
278: .getRemotePacketSize();
279: block = (max < remaining) ? (int) max : remaining;
280: channel.remoteWindow.consumeWindowSpace(block);
281: buffer = new byte[block];
282: System.arraycopy(data, sent, buffer, 0, block);
283:
284: SshMsgChannelData msg = new SshMsgChannelData(channel
285: .getRemoteChannelId(), buffer);
286: transport.sendMessage(msg, this );
287:
288: /* if (type != null) {
289: channel.sendChannelExtData(type.intValue(), buffer);
290: } else {
291: channel.sendChannelData(buffer);
292: }*/
293: sent += block;
294: }
295: }
296: }
297:
298: /**
299: *
300: *
301: * @param channel
302: *
303: * @throws IOException
304: */
305: public void sendChannelEOF(Channel channel) throws IOException {
306: //synchronized (activeChannels) {
307: if (!activeChannels.containsValue(channel)) {
308: throw new IOException(
309: "Attempt to send EOF for a non existent channel "
310: + String.valueOf(channel
311: .getLocalChannelId()));
312: }
313:
314: log.info("Local computer has set channel "
315: + String.valueOf(channel.getLocalChannelId())
316: + " to EOF [" + channel.getName() + "]");
317:
318: SshMsgChannelEOF msg = new SshMsgChannelEOF(channel
319: .getRemoteChannelId());
320: transport.sendMessage(msg, this );
321: // }
322: }
323:
324: /**
325: *
326: *
327: * @param channel
328: * @param extendedType
329: * @param data
330: *
331: * @throws IOException
332: */
333: public synchronized void sendChannelExtData(Channel channel,
334: int extendedType, byte[] data) throws IOException {
335: channel.getRemoteWindow().consumeWindowSpace(data.length);
336:
337: int sent = 0;
338: int block;
339: int remaining;
340: long max;
341: byte[] buffer;
342: ChannelDataWindow window = channel.getRemoteWindow();
343:
344: while (sent < data.length) {
345: remaining = data.length - sent;
346: max = ((window.getWindowSpace() < channel
347: .getRemotePacketSize()) && (window.getWindowSpace() > 0)) ? window
348: .getWindowSpace()
349: : channel.getRemotePacketSize();
350: block = (max < remaining) ? (int) max : remaining;
351: channel.remoteWindow.consumeWindowSpace(block);
352: buffer = new byte[block];
353: System.arraycopy(data, sent, buffer, 0, block);
354:
355: SshMsgChannelExtendedData msg = new SshMsgChannelExtendedData(
356: channel.getRemoteChannelId(), extendedType, buffer);
357: transport.sendMessage(msg, this );
358:
359: /* if (type != null) {
360: channel.sendChannelExtData(type.intValue(), buffer);
361: } else {
362: channel.sendChannelData(buffer);
363: }*/
364: sent += block;
365: }
366: }
367:
368: /**
369: *
370: *
371: * @param channel
372: * @param requestType
373: * @param wantReply
374: * @param requestData
375: *
376: * @return
377: *
378: * @throws IOException
379: * @throws SshException
380: */
381: public synchronized boolean sendChannelRequest(Channel channel,
382: String requestType, boolean wantReply, byte[] requestData)
383: throws IOException {
384: boolean success = true;
385: log.info("Sending " + requestType + " request for the "
386: + channel.getChannelType() + " channel");
387:
388: SshMsgChannelRequest msg = new SshMsgChannelRequest(channel
389: .getRemoteChannelId(), requestType, wantReply,
390: requestData);
391: transport.sendMessage(msg, this );
392:
393: // If the user requests a reply then wait for the message and return result
394: if (wantReply) {
395: // Set up our message filter
396: int[] messageIdFilter = new int[2];
397:
398: messageIdFilter[0] = SshMsgChannelSuccess.SSH_MSG_CHANNEL_SUCCESS;
399: messageIdFilter[1] = SshMsgChannelFailure.SSH_MSG_CHANNEL_FAILURE;
400:
401: log.info("Waiting for channel request reply");
402:
403: try {
404: // Wait for either success or failure
405: SshMessage reply = messageStore
406: .getMessage(messageIdFilter);
407:
408: switch (reply.getMessageId()) {
409: case SshMsgChannelSuccess.SSH_MSG_CHANNEL_SUCCESS: {
410: log.info("Channel request succeeded");
411: success = true;
412:
413: break;
414: }
415:
416: case SshMsgChannelFailure.SSH_MSG_CHANNEL_FAILURE: {
417: log.info("Channel request failed");
418: success = false;
419:
420: break;
421: }
422: }
423: } catch (InterruptedException ex) {
424: throw new SshException(
425: "The thread was interrupted whilst waiting for a connection protocol message");
426: }
427: }
428:
429: return success;
430: }
431:
432: /**
433: *
434: *
435: * @param channel
436: *
437: * @throws IOException
438: */
439: public void sendChannelRequestFailure(Channel channel)
440: throws IOException {
441: SshMsgChannelFailure msg = new SshMsgChannelFailure(channel
442: .getRemoteChannelId());
443: transport.sendMessage(msg, this );
444: }
445:
446: /**
447: *
448: *
449: * @param channel
450: *
451: * @throws IOException
452: */
453: public void sendChannelRequestSuccess(Channel channel)
454: throws IOException {
455: SshMsgChannelSuccess msg = new SshMsgChannelSuccess(channel
456: .getRemoteChannelId());
457: transport.sendMessage(msg, this );
458: }
459:
460: /**
461: *
462: *
463: * @param channel
464: * @param bytesToAdd
465: *
466: * @throws IOException
467: */
468: public void sendChannelWindowAdjust(Channel channel, long bytesToAdd)
469: throws IOException {
470: log.debug("Increasing window size by "
471: + String.valueOf(bytesToAdd) + " bytes");
472:
473: SshMsgChannelWindowAdjust msg = new SshMsgChannelWindowAdjust(
474: channel.getRemoteChannelId(), bytesToAdd);
475: transport.sendMessage(msg, this );
476: }
477:
478: /**
479: *
480: *
481: * @param requestName
482: * @param wantReply
483: * @param requestData
484: *
485: * @return
486: *
487: * @throws IOException
488: * @throws SshException
489: */
490: public synchronized byte[] sendGlobalRequest(String requestName,
491: boolean wantReply, byte[] requestData) throws IOException {
492: boolean success = true;
493: SshMsgGlobalRequest msg = new SshMsgGlobalRequest(requestName,
494: true, requestData);
495: transport.sendMessage(msg, this );
496:
497: if (wantReply) {
498: // Set up our message filter
499: int[] messageIdFilter = new int[2];
500: messageIdFilter[0] = SshMsgRequestSuccess.SSH_MSG_REQUEST_SUCCESS;
501: messageIdFilter[1] = SshMsgRequestFailure.SSH_MSG_REQUEST_FAILURE;
502: log.debug("Waiting for global request reply");
503:
504: try {
505: // Wait for either success or failure
506: SshMessage reply = messageStore
507: .getMessage(messageIdFilter);
508:
509: switch (reply.getMessageId()) {
510: case SshMsgRequestSuccess.SSH_MSG_REQUEST_SUCCESS: {
511: log.debug("Global request succeeded");
512:
513: return ((SshMsgRequestSuccess) reply)
514: .getRequestData();
515: }
516:
517: case SshMsgRequestFailure.SSH_MSG_REQUEST_FAILURE: {
518: log.debug("Global request failed");
519: throw new SshException("The request failed");
520: }
521: }
522: } catch (InterruptedException ex) {
523: throw new SshException(
524: "The thread was interrupted whilst waiting for a connection protocol message");
525: }
526: }
527:
528: return null;
529: }
530:
531: /**
532: *
533: *
534: * @return
535: */
536: protected int[] getAsyncMessageFilter() {
537: int[] messageFilter = new int[10];
538: messageFilter[0] = SshMsgGlobalRequest.SSH_MSG_GLOBAL_REQUEST;
539: messageFilter[3] = SshMsgChannelOpen.SSH_MSG_CHANNEL_OPEN;
540: messageFilter[4] = SshMsgChannelClose.SSH_MSG_CHANNEL_CLOSE;
541: messageFilter[5] = SshMsgChannelEOF.SSH_MSG_CHANNEL_EOF;
542: messageFilter[6] = SshMsgChannelExtendedData.SSH_MSG_CHANNEL_EXTENDED_DATA;
543: messageFilter[7] = SshMsgChannelData.SSH_MSG_CHANNEL_DATA;
544: messageFilter[8] = SshMsgChannelRequest.SSH_MSG_CHANNEL_REQUEST;
545: messageFilter[9] = SshMsgChannelWindowAdjust.SSH_MSG_CHANNEL_WINDOW_ADJUST;
546:
547: return messageFilter;
548: }
549:
550: /**
551: *
552: *
553: * @param channel
554: *
555: * @throws IOException
556: */
557: protected void closeChannel(Channel channel) throws IOException {
558: SshMsgChannelClose msg = new SshMsgChannelClose(channel
559: .getRemoteChannelId());
560: log.info("Local computer has closed channel "
561: + String.valueOf(channel.getLocalChannelId()) + "["
562: + channel.getName() + "]");
563: transport.sendMessage(msg, this );
564: }
565:
566: /**
567: *
568: *
569: * @param requestName
570: * @param wantReply
571: * @param requestData
572: *
573: * @throws IOException
574: */
575: protected void onGlobalRequest(String requestName,
576: boolean wantReply, byte[] requestData) throws IOException {
577: log.debug("Processing " + requestName + " global request");
578:
579: if (!globalRequests.containsKey(requestName)) {
580: sendGlobalRequestFailure();
581: } else {
582: GlobalRequestHandler handler = (GlobalRequestHandler) globalRequests
583: .get(requestName);
584: GlobalRequestResponse response = handler
585: .processGlobalRequest(requestName, requestData);
586:
587: if (wantReply) {
588: if (response.hasSucceeded()) {
589: sendGlobalRequestSuccess(response.getResponseData());
590: } else {
591: sendGlobalRequestFailure();
592: }
593: }
594: }
595: }
596:
597: /**
598: *
599: *
600: * @param msg
601: *
602: * @throws IOException
603: */
604: protected void onMessageReceived(SshMessage msg) throws IOException {
605: // Route the message to the correct handling function
606: switch (msg.getMessageId()) {
607: case SshMsgGlobalRequest.SSH_MSG_GLOBAL_REQUEST: {
608: onMsgGlobalRequest((SshMsgGlobalRequest) msg);
609:
610: break;
611: }
612:
613: case SshMsgChannelOpen.SSH_MSG_CHANNEL_OPEN: {
614: onMsgChannelOpen((SshMsgChannelOpen) msg);
615:
616: break;
617: }
618:
619: case SshMsgChannelClose.SSH_MSG_CHANNEL_CLOSE: {
620: onMsgChannelClose((SshMsgChannelClose) msg);
621:
622: break;
623: }
624:
625: case SshMsgChannelEOF.SSH_MSG_CHANNEL_EOF: {
626: onMsgChannelEOF((SshMsgChannelEOF) msg);
627:
628: break;
629: }
630:
631: case SshMsgChannelData.SSH_MSG_CHANNEL_DATA: {
632: onMsgChannelData((SshMsgChannelData) msg);
633:
634: break;
635: }
636:
637: case SshMsgChannelExtendedData.SSH_MSG_CHANNEL_EXTENDED_DATA: {
638: onMsgChannelExtendedData((SshMsgChannelExtendedData) msg);
639:
640: break;
641: }
642:
643: case SshMsgChannelRequest.SSH_MSG_CHANNEL_REQUEST: {
644: onMsgChannelRequest((SshMsgChannelRequest) msg);
645:
646: break;
647: }
648:
649: case SshMsgChannelWindowAdjust.SSH_MSG_CHANNEL_WINDOW_ADJUST: {
650: onMsgChannelWindowAdjust((SshMsgChannelWindowAdjust) msg);
651:
652: break;
653: }
654:
655: default: {
656: // If we never registered it why are we getting it?
657: log.debug("Message not handled");
658: throw new IOException("Unregistered message received!");
659: }
660: }
661: }
662:
663: /**
664: *
665: */
666: protected void onServiceAccept() {
667: }
668:
669: /**
670: *
671: *
672: * @param startMode
673: *
674: * @throws IOException
675: */
676: protected void onServiceInit(int startMode) throws IOException {
677: log.info("Registering connection protocol messages");
678: messageStore
679: .registerMessage(
680: SshMsgChannelOpenConfirmation.SSH_MSG_CHANNEL_OPEN_CONFIRMATION,
681: SshMsgChannelOpenConfirmation.class);
682: messageStore.registerMessage(
683: SshMsgChannelOpenFailure.SSH_MSG_CHANNEL_OPEN_FAILURE,
684: SshMsgChannelOpenFailure.class);
685: messageStore.registerMessage(
686: SshMsgChannelOpen.SSH_MSG_CHANNEL_OPEN,
687: SshMsgChannelOpen.class);
688: messageStore.registerMessage(
689: SshMsgChannelClose.SSH_MSG_CHANNEL_CLOSE,
690: SshMsgChannelClose.class);
691: messageStore.registerMessage(
692: SshMsgChannelEOF.SSH_MSG_CHANNEL_EOF,
693: SshMsgChannelEOF.class);
694: messageStore.registerMessage(
695: SshMsgChannelData.SSH_MSG_CHANNEL_DATA,
696: SshMsgChannelData.class);
697: messageStore
698: .registerMessage(
699: SshMsgChannelExtendedData.SSH_MSG_CHANNEL_EXTENDED_DATA,
700: SshMsgChannelExtendedData.class);
701: messageStore.registerMessage(
702: SshMsgChannelFailure.SSH_MSG_CHANNEL_FAILURE,
703: SshMsgChannelFailure.class);
704: messageStore.registerMessage(
705: SshMsgChannelRequest.SSH_MSG_CHANNEL_REQUEST,
706: SshMsgChannelRequest.class);
707: messageStore.registerMessage(
708: SshMsgChannelSuccess.SSH_MSG_CHANNEL_SUCCESS,
709: SshMsgChannelSuccess.class);
710: messageStore
711: .registerMessage(
712: SshMsgChannelWindowAdjust.SSH_MSG_CHANNEL_WINDOW_ADJUST,
713: SshMsgChannelWindowAdjust.class);
714: messageStore.registerMessage(
715: SshMsgGlobalRequest.SSH_MSG_GLOBAL_REQUEST,
716: SshMsgGlobalRequest.class);
717: messageStore.registerMessage(
718: SshMsgRequestFailure.SSH_MSG_REQUEST_FAILURE,
719: SshMsgRequestFailure.class);
720: messageStore.registerMessage(
721: SshMsgRequestSuccess.SSH_MSG_REQUEST_SUCCESS,
722: SshMsgRequestSuccess.class);
723: }
724:
725: /**
726: *
727: */
728: protected void onServiceRequest() {
729: }
730:
731: /**
732: *
733: *
734: * @param channel
735: *
736: * @throws IOException
737: */
738: protected void sendChannelFailure(Channel channel)
739: throws IOException {
740: SshMsgChannelFailure msg = new SshMsgChannelFailure(channel
741: .getRemoteChannelId());
742: transport.sendMessage(msg, this );
743: }
744:
745: /**
746: *
747: *
748: * @param channel
749: *
750: * @throws IOException
751: */
752: protected void sendChannelOpenConfirmation(Channel channel)
753: throws IOException {
754: SshMsgChannelOpenConfirmation msg = new SshMsgChannelOpenConfirmation(
755: channel.getRemoteChannelId(), channel
756: .getLocalChannelId(), channel.getLocalWindow()
757: .getWindowSpace(),
758: channel.getLocalPacketSize(), channel
759: .getChannelConfirmationData());
760: transport.sendMessage(msg, this );
761: }
762:
763: /**
764: *
765: *
766: * @param remoteChannelId
767: * @param reasonCode
768: * @param additionalInfo
769: * @param languageTag
770: *
771: * @throws IOException
772: */
773: protected void sendChannelOpenFailure(long remoteChannelId,
774: long reasonCode, String additionalInfo, String languageTag)
775: throws IOException {
776: SshMsgChannelOpenFailure msg = new SshMsgChannelOpenFailure(
777: remoteChannelId, reasonCode, additionalInfo,
778: languageTag);
779: transport.sendMessage(msg, this );
780: }
781:
782: /**
783: *
784: *
785: * @throws IOException
786: */
787: protected void sendGlobalRequestFailure() throws IOException {
788: SshMsgRequestFailure msg = new SshMsgRequestFailure();
789: transport.sendMessage(msg, this );
790: }
791:
792: /**
793: *
794: *
795: * @param requestData
796: *
797: * @throws IOException
798: */
799: protected void sendGlobalRequestSuccess(byte[] requestData)
800: throws IOException {
801: SshMsgRequestSuccess msg = new SshMsgRequestSuccess(requestData);
802: transport.sendMessage(msg, this );
803: }
804:
805: private Channel getChannel(long channelId) throws IOException {
806: //synchronized (activeChannels) {
807: Long l = new Long(channelId);
808:
809: if (!activeChannels.containsKey(l)) {
810: throw new IOException("Non existent channel "
811: + l.toString() + " requested");
812: }
813: return (Channel) activeChannels.get(l);
814: //}
815: }
816:
817: private void onMsgChannelClose(SshMsgChannelClose msg)
818: throws IOException {
819: Channel channel = getChannel(msg.getRecipientChannel());
820:
821: // If we have not already closed it then inform the subclasses
822: if (channel == null) {
823: throw new IOException("Remote computer tried to close a "
824: + "non existent channel "
825: + String.valueOf(msg.getRecipientChannel()));
826: }
827:
828: log.info("Remote computer has closed channel "
829: + String.valueOf(channel.getLocalChannelId()) + "["
830: + channel.getName() + "]");
831:
832: // If the channel is not already closed then close it
833: if (channel.getState().getValue() != ChannelState.CHANNEL_CLOSED) {
834: channel.remoteClose();
835: }
836: }
837:
838: private void onMsgChannelData(SshMsgChannelData msg)
839: throws IOException {
840: if (log.isDebugEnabled()) {
841: log.debug("Received "
842: + String.valueOf(msg.getChannelData().length)
843: + " bytes of data for channel id "
844: + String.valueOf(msg.getRecipientChannel()));
845: }
846:
847: // Get the data's channel
848: Channel channel = getChannel(msg.getRecipientChannel());
849: channel.processChannelData(msg);
850: }
851:
852: private void onMsgChannelEOF(SshMsgChannelEOF msg)
853: throws IOException {
854: Channel channel = getChannel(msg.getRecipientChannel());
855:
856: try {
857: log.info("Remote computer has set channel "
858: + String.valueOf(msg.getRecipientChannel())
859: + " to EOF [" + channel.getName() + "]");
860: channel.setRemoteEOF();
861: } catch (IOException ioe) {
862: log
863: .info("Failed to close the ChannelInputStream after EOF event");
864: }
865: }
866:
867: private void onMsgChannelExtendedData(SshMsgChannelExtendedData msg)
868: throws IOException {
869: Channel channel = getChannel(msg.getRecipientChannel());
870:
871: if (channel == null) {
872: throw new IOException(
873: "Remote computer sent data for non existent channel");
874: }
875:
876: channel.getLocalWindow().consumeWindowSpace(
877: msg.getChannelData().length);
878: channel.processChannelData(msg);
879: }
880:
881: private void onMsgChannelOpen(SshMsgChannelOpen msg)
882: throws IOException {
883: //synchronized (activeChannels) {
884: log.info("Request for " + msg.getChannelType()
885: + " channel recieved");
886:
887: // Try to get the channel implementation from the allowed channels
888: ChannelFactory cf = (ChannelFactory) allowedChannels.get(msg
889: .getChannelType());
890:
891: if (cf == null) {
892: sendChannelOpenFailure(msg.getSenderChannelId(),
893: SshMsgChannelOpenFailure.SSH_OPEN_CONNECT_FAILED,
894: "The channel type is not supported", "");
895: log.info("Request for channel type " + msg.getChannelType()
896: + " refused");
897:
898: return;
899: }
900:
901: try {
902: log.info("Creating channel " + msg.getChannelType());
903:
904: Channel channel = cf.createChannel(msg.getChannelType(),
905: msg.getChannelData());
906:
907: // Initialize the channel
908: log.info("Initiating channel");
909:
910: Long channelId = getChannelId();
911: channel.init(this , channelId.longValue(), msg
912: .getSenderChannelId(), msg.getInitialWindowSize(),
913: msg.getMaximumPacketSize());
914: activeChannels.put(channelId, channel);
915: log.info("Sending channel open confirmation");
916:
917: // Send the confirmation message
918: sendChannelOpenConfirmation(channel);
919:
920: // Open the channel for real
921: channel.open();
922: } catch (InvalidChannelException ice) {
923: sendChannelOpenFailure(msg.getSenderChannelId(),
924: SshMsgChannelOpenFailure.SSH_OPEN_CONNECT_FAILED,
925: ice.getMessage(), "");
926: }
927: //}
928: }
929:
930: private void onMsgChannelRequest(SshMsgChannelRequest msg)
931: throws IOException {
932: Channel channel = getChannel(msg.getRecipientChannel());
933:
934: if (channel == null) {
935: log.warn("Remote computer tried to make a request for "
936: + "a non existence channel!");
937: }
938:
939: channel.onChannelRequest(msg.getRequestType(), msg
940: .getWantReply(), msg.getChannelData());
941: }
942:
943: private void onMsgChannelWindowAdjust(SshMsgChannelWindowAdjust msg)
944: throws IOException {
945: Channel channel = getChannel(msg.getRecipientChannel());
946:
947: if (channel == null) {
948: throw new IOException("Remote computer tried to increase "
949: + "window space for non existent channel "
950: + String.valueOf(msg.getRecipientChannel()));
951: }
952:
953: channel.getRemoteWindow().increaseWindowSpace(
954: msg.getBytesToAdd());
955:
956: if (log.isDebugEnabled()) {
957: log.debug(String.valueOf(msg.getBytesToAdd())
958: + " bytes added to remote window");
959: log.debug("Remote window space is "
960: + String.valueOf(channel.getRemoteWindow()
961: .getWindowSpace()));
962: }
963: }
964:
965: private void onMsgGlobalRequest(SshMsgGlobalRequest msg)
966: throws IOException {
967: onGlobalRequest(msg.getRequestName(), msg.getWantReply(), msg
968: .getRequestData());
969: }
970:
971: /**
972: *
973: *
974: * @param channel
975: */
976: protected void freeChannel(Channel channel) {
977: //synchronized (activeChannels) {
978: log.info("Freeing channel "
979: + String.valueOf(channel.getLocalChannelId()) + " ["
980: + channel.getName() + "]");
981:
982: Long channelId = new Long(channel.getLocalChannelId());
983: activeChannels.remove(channelId);
984:
985: //reusableChannels.add(channelId);
986: //}
987: }
988: }
|