# Twisted, the Framework of Your Internet
# Copyright (C) 2001 Matthew W. Lefkowitz
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of version 2.1 of the GNU Lesser General Public
# License as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
from __future__ import nested_scopes
import os, struct, sys
from twisted.conch import identity,error
from twisted.conch.ssh import keys,transport,factory,userauth,connection,common,session
from twisted.cred import authorizer
from twisted.internet import reactor,defer,app,protocol
from twisted.python import log
from twisted.trial import unittest
from Crypto.PublicKey import RSA,DSA
publicRSA_openssh = "ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAGEArzJx8OYOnJmzf4tfBEvLi8DVPrJ3/c9k2I/Az64fxjHf9imyRJbixtQhlH9lfNjUIx+4LmrJH5QNRsFporcHDKOTwTTYLh5KmRpslkYHRivcJSkbh/C+BR3utDS555mV comment"
privateRSA_openssh = """-----BEGIN RSA PRIVATE KEY-----
MIIByAIBAAJhAK8ycfDmDpyZs3+LXwRLy4vA1T6yd/3PZNiPwM+uH8Yx3/YpskSW
4sbUIZR/ZXzY1CMfuC5qyR+UDUbBaaK3Bwyjk8E02C4eSpkabJZGB0Yr3CUpG4fw
vgUd7rQ0ueeZlQIBIwJgbh+1VZfr7WftK5lu7MHtqE1S1vPWZQYE3+VUn8yJADyb
Z4fsZaCrzW9lkIqXkE3GIY+ojdhZhkO1gbG0118sIgphwSWKRxK0mvh6ERxKqIt1
xJEJO74EykXZV4oNJ8sjAjEA3J9r2ZghVhGN6V8DnQrTk24Td0E8hU8AcP0FVP+8
PQm/g/aXf2QQkQT+omdHVEJrAjEAy0pL0EBH6EVS98evDCBtQw22OZT52qXlAwZ2
gyTriKFVoqjeEjt3SZKKqXHSApP/AjBLpF99zcJJZRq2abgYlf9lv1chkrWqDHUu
DZttmYJeEfiFBBavVYIF1dOlZT0G8jMCMBc7sOSZodFnAiryP+Qg9otSBjJ3bQML
pSTqy7c3a2AScC/YyOwkDaICHnnD3XyjMwIxALRzl0tQEKMXs6hH8ToUdlLROCrP
EhQ0wahUTCk1gKA4uPD6TMTChavbh4K63OvbKg==
-----END RSA PRIVATE KEY-----"""
publicDSA_openssh = "ssh-dss AAAAB3NzaC1kc3MAAABBAIbwTOSsZ7Bl7U1KyMNqV13Tu7yRAtTr70PVI3QnfrPumf2UzCgpL1ljbKxSfAi05XvrE/1vfCFAsFYXRZLhQy0AAAAVAM965Akmo6eAi7K+k9qDR4TotFAXAAAAQADZlpTW964haQWS4vC063NGdldT6xpUGDcDRqbm90CoPEa2RmNOuOqi8lnbhYraEzypYH3K4Gzv/bxCBnKtHRUAAABAK+1osyWBS0+P90u/rAuko6chZ98thUSY2kLSHp6hLKyy2bjnT29h7haELE+XHfq2bM9fckDx2FLOSIJzy83VmQ== comment"
privateDSA_openssh = """-----BEGIN DSA PRIVATE KEY-----
MIH4AgEAAkEAhvBM5KxnsGXtTUrIw2pXXdO7vJEC1OvvQ9UjdCd+s+6Z/ZTMKCkv
WWNsrFJ8CLTle+sT/W98IUCwVhdFkuFDLQIVAM965Akmo6eAi7K+k9qDR4TotFAX
AkAA2ZaU1veuIWkFkuLwtOtzRnZXU+saVBg3A0am5vdAqDxGtkZjTrjqovJZ24WK
2hM8qWB9yuBs7/28QgZyrR0VAkAr7WizJYFLT4/3S7+sC6SjpyFn3y2FRJjaQtIe
nqEsrLLZuOdPb2HuFoQsT5cd+rZsz19yQPHYUs5IgnPLzdWZAhUAl1TqdmlAG/b4
nnVchGiO9sML8MM=
-----END DSA PRIVATE KEY-----"""
publicRSA_lsh = """{KDEwOnB1YmxpYy1rZXkoMTQ6cnNhLXBrY3MxLXNoYTEoMTpuNjU6AJidzg8akh9enh1JrIQyL8mrqfnJT3sBxhDkIFXqjlyN2OK2al2s5mRVNMrhzL7rX8hptPX597nHmfAS65yA85cpKDE6ZTQ6PTiAYykpKQ==}"""
privateRSA_lsh = """(11:private-key(9:rsa-pkcs1(1:n65:\x00\x98\x9d\xce\x0f\x1a\x92\x1f^\x9e\x1dI\xac\x842/\xc9\xab\xa9\xf9\xc9O{\x01\xc6\x10\xe4 U\xea\x8e\\\x8d\xd8\xe2\xb6j]\xac\xe6dU4\xca\xe1\xcc\xbe\xeb_\xc8i\xb4\xf5\xf9\xf7\xb9\xc7\x99\xf0\x12\xeb\x9c\x80\xf3\x97)(1:e4:=8\x80c)(1:d64:h>)i\xb7\xc3z_\x94\xd30\xbd\xdf\xf5\x9d\x8d\xd7\xb4\xb2*\xcb\xef\xae~yq\xb8\x8a\xda\xae\xdf\xa3h\x9a=6{c\xb9\xf4\xa5\xe9\xe0\xf9a\xf5\xe7$*\x83\r\x1e\xcb[\xc8\xda\n\xa1\x94+\x00\x96d\xfb)(1:p33:\x00\xfd\x92\xdf\xdb\xd6\xebU\x82\xc6\x86eq9Dv\x98B\xd6\xfd\xa7\xa8,\x99\x1e\xa3\x88>\xa4A\xb7;i)(1:q33:\x00\x9a\x13\xa3\t\xd1@u\x86\xe9\xdeZym\xa8\x9c\xba\xcb\x18\x8c\xfcwJ*\x08\x0c\xac\xee\x0bU[\xd6\xff)(1:a33:\x00\xc4\xe3w\xe4\xbc\xf1q\x16\x84%D*]\xd0\x8d\xa2\xaf\x99\xff\x11\xf5\x8f\x06\xd5\x8c\xa6FH\xfe\x8e\xea\x8b)(1:b32:qx\xbd\xa6\x88\x13p\x94W\xfd\xbff\x941\xc3\xac\xa8\xaf\xe6\xaavO+\x95\xa7\x06|\x91~\xc5\xc7\xb1)(1:c32:9z\xf1\x80\xbdLE\x8c?\x8f\xd3\xe8\x05\x12\xc2@\xedZ\xec/\xb9\x8c\xdd\x07\xccM\x88g\x05jG2)))"""
publicDSA_lsh = """{KDEwOnB1YmxpYy1rZXkoMzpkc2EoMTpwNjU6AOiMNL79iqUfSqaIHIySHKt4Jlc272yYTzAXmEg77NCgtyfDjuAcHHgwTphBA1l53i/4AAiaUBcU8qPY/Ug/MPcpKDE6cTIxOgDYKP8uLv/m6aUDAA7l5hjMq6Iy7ykoMTpnNjU6ANLKfX/CG7L9o7TQzwLa/X/hb1ZZ+++bySGQep5Ka2lCLm+gff3erqKdxwn5kjqEWq/tXtnSx3rl3TgiwO5R1GEpKDE6eTY1OgDZKD/rhxonz8sugmAcf/wIIhq4M4A+XFOzkEHj0XWHGpjycC8moBWwsIXRuRYCjbl5dA6wVv+xDrf9c6a6GMhhKSkp}"""
privateDSA_lsh = """(11:private-key(3:dsa(1:p65:\x00\xe8\x8c4\xbe\xfd\x8a\xa5\x1fJ\xa6\x88\x1c\x8c\x92\x1c\xabx&W6\xefl\x98O0\x17\x98H;\xec\xd0\xa0\xb7\'\xc3\x8e\xe0\x1c\x1cx0N\x98A\x03Yy\xde/\xf8\x00\x08\x9aP\x17\x14\xf2\xa3\xd8\xfdH?0\xf7)(1:q21:\x00\xd8(\xff..\xff\xe6\xe9\xa5\x03\x00\x0e\xe5\xe6\x18\xcc\xab\xa22\xef)(1:g65:\x00\xd2\xca}\x7f\xc2\x1b\xb2\xfd\xa3\xb4\xd0\xcf\x02\xda\xfd\x7f\xe1oVY\xfb\xef\x9b\xc9!\x90z\x9eJkiB.o\xa0}\xfd\xde\xae\xa2\x9d\xc7\t\xf9\x92:\x84Z\xaf\xed^\xd9\xd2\xc7z\xe5\xdd8"\xc0\xeeQ\xd4a)(1:y65:\x00\xd9(?\xeb\x87\x1a\'\xcf\xcb.\x82`\x1c\x7f\xfc\x08"\x1a\xb83\x80>\\S\xb3\x90A\xe3\xd1u\x87\x1a\x98\xf2p/&\xa0\x15\xb0\xb0\x85\xd1\xb9\x16\x02\x8d\xb9yt\x0e\xb0V\xff\xb1\x0e\xb7\xfds\xa6\xba\x18\xc8a)(1:x20:>\xbb\xe4D\xb9\xb8\xb5\xf8\xf2-}\xf7\x0f\x90`\x968\xd3\x98Q)))"""
class SSHKeysHandlingTestCase(unittest.TestCase):
"""
test the handling of reading/signing/verifying with RSA and DSA keys
assumed test keys are in test/
"""
def testDSA(self):
"""test DSA keys
"""
self._testKey(publicDSA_openssh, privateDSA_openssh, 'openssh')
self._testKey(publicDSA_lsh, privateDSA_lsh, 'lsh')
def testRSA(self):
"""test RSA keys
"""
self._testKey(publicRSA_openssh, privateRSA_openssh, 'openssh')
self._testKey(publicRSA_lsh, privateRSA_lsh, 'lsh')
def _testKey(self, pubData, privData, keyType):
privKey = keys.getPrivateKeyObject(data = privData)
pubStr = keys.getPublicKeyString(data = pubData)
pubKey = keys.getPublicKeyObject(pubStr)
self._testKeySignVerify(privKey, pubKey)
self._testKeyFromString(privKey, pubKey, privData, pubData)
self._testGenerateKey(privKey, pubKey, privData, pubData, keyType)
def _testKeySignVerify(self, priv, pub):
testData = 'this is the test data'
sig = keys.signData(priv, testData)
self.assert_(keys.verifySignature(priv, sig, testData),
'verifying with private %s failed' %
keys.objectType(priv))
self.assert_(keys.verifySignature(pub, sig, testData),
'verifying with public %s failed' %
keys.objectType(pub))
self.failIf(keys.verifySignature(priv, sig, 'other data'),
'verified bad data with %s' %
keys.objectType(priv))
self.failIf(keys.verifySignature(priv, 'bad sig', testData),
'verified badsign with %s' %
keys.objectType(priv))
def _testKeyFromString(self, privKey, pubKey, privData, pubData):
keyType = keys.objectType(privKey)
privFS = keys.getPrivateKeyObject(data = privData)
pubFS = keys.getPublicKeyObject(keys.getPublicKeyString(data=pubData))
for k in privFS.keydata:
if getattr(privFS, k) != getattr(privKey, k):
self.fail('getting %s private key from string failed' % keyType)
for k in pubFS.keydata:
if hasattr(pubFS, k):
if getattr(pubFS, k) != getattr(pubKey, k):
self.fail('getting %s public key from string failed' % keyType)
def _testGenerateKey(self, privKey, pubKey, privData, pubData, keyType):
self.assertEquals(keys.makePublicKeyString(pubKey, 'comment', keyType), pubData)
self.assertEquals(keys.makePublicKeyString(privKey, 'comment', keyType), pubData)
self.assertEquals(keys.makePrivateKeyString(privKey, kind=keyType), privData)
encData = keys.makePrivateKeyString(privKey, passphrase='test', kind=keyType)
self.assertEquals(
keys.getPrivateKeyObject(data = encData,
passphrase = 'test').__getstate__(),
privKey.__getstate__())
theTest = None
class ConchTestIdentity(identity.ConchIdentity):
def validatePublicKey(self, pubKey):
global theTest
theTest.assert_(pubKey==keys.getPublicKeyString('dsa_test.pub'), 'bad public key')
return defer.succeed(1)
def verifyPlainPassword(self, password):
global theTest
theTest.assert_(password == 'testpass', 'bad password')
return defer.succeed(1)
class ConchTestAuthorizer(authorizer.Authorizer):
def addIdentity(self, ident):
self.ident = ident
def getIdentityRequest(self, name):
global theTest
theTest.assert_(name == 'testuser')
return defer.succeed(self.ident)
class SSHTestBase:
allowedToError = 0
def connectionLost(self, reason):
global theTest
if not hasattr(self,'expectedLoseConnection'):
theTest.fail('unexpectedly lost connection %s\n%s' % (self, reason))
reactor.crash()
def receiveError(self, reasonCode, desc):
global theTest
reactor.crash()
self.expectedLoseConnection = 1
if not self.allowedToError:
theTest.fail('got disconnect for %s: reason %s, desc: %s' %
(self, reasonCode, desc))
def receiveUnimplemented(self, seqID):
global theTest
reactor.crash()
theTest.fail('got unimplemented: seqid %s' % seqID)
class SSHTestServer(SSHTestBase, transport.SSHServerTransport): pass
class SSHTestServerAuth(userauth.SSHUserAuthServer):
authCount = None # this will be set by each test
def areDone(self):
return len(self.authenticatedWith) == self.authCount
class SSHTestClientAuth(userauth.SSHUserAuthClient):
hasTriedNone = 0 # have we tried the 'none' auth yet?
canSucceedPublicKey = 0 # can we succed with this yet?
canSucceedPassword = 0
def ssh_USERAUTH_SUCCESS(self, packet):
if not self.canSucceedPassword and self.canSucceedPublicKey:
global theTest
reactor.crash()
theTest.fail('got USERAUTH_SUCESS before password and publickey')
userauth.SSHUserAuthClient.ssh_USERAUTH_SUCCESS(self, packet)
def getPassword(self):
self.canSucceedPassword = 1
return defer.succeed('testpass')
def getPrivateKey(self):
self.canSucceedPublicKey = 1
return defer.succeed(keys.getPrivateKeyObject('dsa_test'))
def getPublicKey(self):
return keys.getPublicKeyString('dsa_test.pub')
class SSHTestClient(SSHTestBase, transport.SSHClientTransport):
def verifyHostKey(self, key, fp):
global theTest
theTest.assertEquals(key, keys.getPublicKeyString(data = publicRSA_openssh))
theTest.assertEquals(fp,'3d:13:5f:cb:c9:79:8a:93:06:27:65:bc:3d:0b:8f:af')
return defer.succeed(1)
def connectionSecure(self):
self.requestService(SSHTestClientAuth('testuser',SSHTestClientConnection()))
class SSHTestClientFactory(protocol.ClientFactory):
noisy = 0
def buildProtocol(self, addr):
self.client = SSHTestClient()
return self.client
def clientConnectionFailed(self, connector, reason):
global theTest
theTest.fail('connection between client and server failed!')
reactor.crash()
class SSHTestServerConnection(connection.SSHConnection):
def getChannel(self, ct, ws, mp, d):
if ct != 'session':
global theTest
theTest.fail('should not get %s as a channel type' % ct)
reactor.crash()
return SSHTestServerSession(remoteWindow = ws,
remoteMaxPacket = mp,
conn = self)
class SSHTestServerSession(connection.SSHChannel):
def request_exec(self, data):
program = common.getNS(data)[0].split()
log.msg('execing %s' % (program,))
self.client = session.SSHSessionClient()
reactor.spawnProcess(session.SSHSessionProtocol(self, self.client), \
program[0], program, {}, '/tmp')
return 1
class SSHTestClientConnection(connection.SSHConnection):
name = 'ssh-connection'
results = 0
def serviceStarted(self):
self.openChannel(SSHTestTrueChannel(conn = self))
self.openChannel(SSHTestFalseChannel(conn = self))
c = SSHTestEchoChannel(conn = self)
self.openChannel(c)
class SSHTestTrueChannel(connection.SSHChannel):
name = 'session'
def openFailed(self, reason):
global theTest
theTest.fail('true open failed: %s' % reason)
reactor.crash()
def channelOpen(self, ignore):
d = self.conn.sendRequest(self, 'exec', common.NS('true'), 1)
d.addErrback(self._ebRequestFailed)
log.msg('opened true')
def _ebRequestFailed(self, reason):
global theTest
theTest.fail('true exec failed: %s' % reason)
reactor.crash()
def dataReceived(self, data):
global theTest
theTest.fail('got data when using true')
reactor.crash()
def request_exit_status(self, status):
status = struct.unpack('>L', status)[0]
if status != 0:
global theTest
theTest.fail('true exit status was not 0: %i' % status)
reactor.crash()
self.conn.results +=1
log.msg('finished true')
if self.conn.results == 3: # all tests run
self.conn.transport.expectedLoseConnection = 1
theTest.fac.proto.expectedLoseConnection = 1
self.loseConnection()
reactor.crash()
return 1
class SSHTestFalseChannel(connection.SSHChannel):
name = 'session'
def openFailed(self, reason):
global theTest
theTest.fail('false open failed: %s' % reason)
reactor.crash()
def channelOpen(self, ignore):
d = self.conn.sendRequest(self, 'exec', common.NS('false'), 1)
d.addErrback(self._ebRequestFailed)
log.msg('opened false')
def _ebRequestFailed(self, reason):
global theTest
theTest.fail('false exec failed: %s' % reason)
reactor.crash()
def dataReceived(self, data):
global theTest
theTest.fail('got data when using false')
reactor.crash()
def request_exit_status(self, status):
status = struct.unpack('>L', status)[0]
if status == 0:
global theTest
theTest.fail('false exit status was 0')
reactor.crash()
self.conn.results +=1
log.msg('finished false')
if self.conn.results == 3: # all tests run
self.conn.transport.expectedLoseConnection = 1
theTest.fac.proto.expectedLoseConnection = 1
self.loseConnection()
reactor.crash()
return 1
class SSHTestEchoChannel(connection.SSHChannel):
name = 'session'
buf = ''
def openFailed(self, reason):
global theTest
theTest.fail('echo open failed: %s' % reason)
reactor.crash()
def channelOpen(self, ignore):
d = self.conn.sendRequest(self, 'exec', common.NS('echo hello'), 1)
d.addErrback(self._ebRequestFailed)
log.msg('opened echo')
def _ebRequestFailed(self, reason):
global theTest
theTest.fail('echo exec failed: %s' % reason)
reactor.crash()
def dataReceived(self, data):
self.buf += data
def request_exit_status(self, status):
global theTest
status = struct.unpack('>L', status)[0]
if status != 0:
theTest.fail('echo exit status was not 0: %i' % status)
reactor.crash()
self.buf = self.buf.replace('\r\n', '\n')
if self.buf != 'hello\n':
theTest.fail('echo did not return hello: %s' % repr(self.buf))
reactor.crash()
self.conn.results +=1
log.msg('finished echo')
if self.conn.results == 3: # all tests run
self.conn.transport.expectedLoseConnection = 1
theTest.fac.proto.expectedLoseConnection = 1
self.loseConnection()
reactor.crash()
return 1
class SSHTestFactory(factory.SSHFactory):
noisy = 0
services = {
'ssh-userauth':SSHTestServerAuth,
'ssh-connection':SSHTestServerConnection
}
def buildProtocol(self, addr):
if hasattr(self, 'proto'):
global theTest
reactor.crash()
theTest.fail('connected twice to factory')
self.proto = SSHTestServer()
self.proto.supportedPublicKeys = self.privateKeys.keys()
self.proto.factory = self
return self.proto
def getPublicKeys(self):
return {
'ssh-rsa':keys.getPublicKeyString(data=publicRSA_openssh),
'ssh-dss':keys.getPublicKeyString(data=publicDSA_openssh)
}
def getPrivateKeys(self):
return {
'ssh-rsa':keys.getPrivateKeyObject(data=privateRSA_openssh),
'ssh-dss':keys.getPrivateKeyObject(data=privateDSA_openssh)
}
def getPrimes(self):
return {
2048:[(transport.DH_GENERATOR, transport.DH_PRIME)]
}
def getService(self, trans, name):
return factory.SSHFactory.getService(self, trans, name)
class SSHTestOpenSSHProcess(protocol.ProcessProtocol):
buf = ''
done = 0
def outReceived(self, data):
self.buf += data
theTest.fac.proto.expectedLoseConnection = 1
def processEnded(self, reason):
global theTest
self.done = 1
theTest.assertEquals(reason.value.exitCode, 0, 'exit code was not 0: %i' % reason.value.exitCode)
self.buf = self.buf.replace('\r\n', '\n')
theTest.assertEquals(self.buf, 'hello\n')
class SSHTransportTestCase(unittest.TestCase):
def setUp(self):
open('rsa_test','w').write(privateRSA_openssh)
open('rsa_test.pub','w').write(publicRSA_openssh)
open('dsa_test.pub','w').write(publicDSA_openssh)
open('dsa_test','w').write(privateDSA_openssh)
os.chmod('dsa_test', 33152)
os.chmod('rsa_test', 33152)
open('kh_test','w').write('localhost '+publicRSA_openssh)
def tearDown(self):
for f in ['rsa_test','rsa_test.pub','dsa_test','dsa_test.pub', 'kh_test']:
os.remove(f)
def testOurServerOurClient(self):
"""test the SSH server against the SSH client
"""
if os.name != 'posix': return
global theTest
theTest = self
auth = ConchTestAuthorizer()
ident = ConchTestIdentity('testuser', auth)
SSHTestServerAuth.authCount = 2
auth.addIdentity(ident)
fac = SSHTestFactory()
fac.authorizer = auth
theTest.fac = fac
host = reactor.listenTCP(0, fac).getHost()
port = host[2]
cfac = SSHTestClientFactory()
def _failTest():
fac.proto.transport.loseConnection()
cfac.client.transport.loseConnection()
reactor.iterate(0.1)
reactor.iterate(0.1)
reactor.iterate(0.1)
reactor.crash()
self.fail('test took too long')
call = reactor.callLater(10, _failTest)
reactor.connectTCP('localhost', port, cfac)
reactor.run()
try:
call.cancel()
except:
pass
def testOurServerOpenSSHClient(self):
"""test the SSH server against the OpenSSH client
"""
if os.name != 'posix': return
cmdline = 'ssh -v -l testuser -p %i -oUserKnownHostsFile=kh_test -oPasswordAuthentication=no -i dsa_test localhost echo hello'
global theTest
theTest = self
auth = ConchTestAuthorizer()
ident = ConchTestIdentity('testuser', auth)
SSHTestServerAuth.authCount = 1 # only public key
auth.addIdentity(ident)
fac = SSHTestFactory()
fac.authorizer = auth
theTest.fac = fac
host = reactor.listenTCP(0, fac).getHost()
port = host[2]
ssh_path = None
for p in ['/usr', '', '/usr/local']:
if os.path.exists(p+'/bin/ssh'):
ssh_path = p+'/bin/ssh'
break
if not ssh_path:
log.msg('skipping test, cannot find ssh')
return
cmds = (cmdline % port).split()
p = SSHTestOpenSSHProcess()
def _failTest():
try:
os.kill(p.transport.pid, 9)
except OSError:
pass
try:
fac.proto.transport.loseConnection()
except AttributeError:
pass
reactor.iterate(0.1)
reactor.iterate(0.1)
reactor.iterate(0.1)
reactor.crash()
p.done = 1
self.fail('test took too long')
call = reactor.callLater(10, _failTest)
reactor.spawnProcess(p, ssh_path, cmds)
reactor.run()
# wait for process to finish
while not p.done:
reactor.iterate()
try:
call.cancel()
except:
pass
|