summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTom Gillespie <tgbugs@gmail.com>2019-03-23 01:09:09 -0700
committerTom Gillespie <tgbugs@gmail.com>2019-03-23 01:09:09 -0700
commit778b0f15f7b8207b1179623805d5281ae90da8d4 (patch)
treef2f2ec4d0feccee593c05b0e779e53ac7f299266
parent43512da5ced3dc7bc97f06d28b1342e9fd856e19 (diff)
parent71784ffc4dbb85ce209fbc40cee872a2a734de31 (diff)
downloadpexpect-git-778b0f15f7b8207b1179623805d5281ae90da8d4.tar.gz
Merge branch 'master' of https://github.com/pexpect/pexpect into patch-1
-rw-r--r--pexpect/pxssh.py6
-rwxr-xr-xtests/fakessh/ssh46
-rw-r--r--tests/test_pxssh.py80
3 files changed, 128 insertions, 4 deletions
diff --git a/pexpect/pxssh.py b/pexpect/pxssh.py
index fa62342..2e517af 100644
--- a/pexpect/pxssh.py
+++ b/pexpect/pxssh.py
@@ -259,7 +259,7 @@ class pxssh (spawn):
sync_multiplier=1, check_local_ip=True,
password_regex=r'(?i)(?:password:)|(?:passphrase for key)',
ssh_tunnels={}, spawn_local_ssh=True,
- sync_original_prompt=True, ssh_config=None):
+ sync_original_prompt=True, ssh_config=None, cmd='ssh'):
'''This logs the user into the given server.
It uses
@@ -303,6 +303,10 @@ class pxssh (spawn):
file to the client to handle itself. You may set any options you wish in here, however
doing so will require you to post extra information that you may not want to if you
run into issues.
+
+ Alter the ``cmd`` to change the ssh client used, or to prepend it with network
+ namespaces. For example ```cmd="ip netns exec vlan2 ssh"``` to execute the ssh in
+ network namespace named ```vlan```.
'''
session_regex_array = ["(?i)are you sure you want to continue connecting", original_prompt, password_regex, "(?i)permission denied", "(?i)terminal type", TIMEOUT]
diff --git a/tests/fakessh/ssh b/tests/fakessh/ssh
index d3259e4..4a5be1b 100755
--- a/tests/fakessh/ssh
+++ b/tests/fakessh/ssh
@@ -3,13 +3,52 @@ from __future__ import print_function
import getpass
import sys
+import getopt
PY3 = (sys.version_info[0] >= 3)
if not PY3:
input = raw_input
-server = sys.argv[-1]
-if server == 'noserver':
- print('No route to host')
+ssh_usage = "usage: ssh [-2qV] [-c cipher_spec] [-l login_name]\r\n" \
+ + " hostname"
+
+cipher_valid_list = ['aes128-ctr', 'aes192-ctr', 'aes256-ctr', 'arcfour256', 'arcfour128', \
+ 'aes128-cbc','3des-cbc','blowfish-cbc','cast128-cbc','aes192-cbc', \
+ 'aes256-cbc','arcfour']
+
+try:
+ server = sys.argv[-1]
+ if server == 'noserver':
+ print('No route to host')
+ sys.exit(1)
+
+ elif len(sys.argv) < 2:
+ print(ssh_usage)
+ sys.exit(1)
+
+ cipher = ''
+ cipher_list = []
+ fullCmdArguments = sys.argv
+ argumentList = fullCmdArguments[1:]
+ unixOptions = "2qVc:l"
+ arguments, values = getopt.getopt(argumentList, unixOptions)
+ for currentArgument, currentValue in arguments:
+ if currentArgument in ("-2"):
+ pass
+ elif currentArgument in ("-V"):
+ print("Mock SSH client version 0.2")
+ sys.exit(1)
+ elif currentArgument in ("-c"):
+ cipher = currentValue
+ cipher_list = cipher.split(",")
+ for cipher_item in cipher_list:
+ if cipher_item not in cipher_valid_list:
+ print("Unknown cipher type '" + str(cipher_item) + "'")
+ sys.exit(1)
+
+
+except Exception as e:
+ print(ssh_usage)
+ print('error = ' + str(e))
sys.exit(1)
print("Mock SSH client for tests. Do not enter real security info.")
@@ -31,4 +70,5 @@ while True:
elif cmd == 'echo $?':
print(0)
elif cmd in ('exit', 'logout'):
+ print('Closed connection')
break
diff --git a/tests/test_pxssh.py b/tests/test_pxssh.py
index a3deced..0d49b23 100644
--- a/tests/test_pxssh.py
+++ b/tests/test_pxssh.py
@@ -187,6 +187,86 @@ class PxsshTestCase(SSHTestBase):
if confirmation_strings!=len(confirmation_array):
assert False, 'String generated from adding an SSH key is incorrect.'
+ def test_custom_ssh_cmd_debug(self):
+ ssh = pxssh.pxssh(debug_command_string=True)
+ cipher_string = '-c aes128-ctr,aes192-ctr,aes256-ctr,arcfour256,arcfour128,' \
+ + 'aes128-cbc,3des-cbc,blowfish-cbc,cast128-cbc,aes192-cbc,' \
+ + 'aes256-cbc,arcfour'
+ confirmation_strings = 0
+ confirmation_array = [cipher_string, '-2']
+ string = ssh.login('server', 'me', password='s3cret', cmd='ssh ' + cipher_string + ' -2')
+ for confirmation in confirmation_array:
+ if confirmation in string:
+ confirmation_strings+=1
+
+ if confirmation_strings!=len(confirmation_array):
+ assert False, 'String generated for custom ssh client command is incorrect.'
+
+ def test_custom_ssh_cmd_debug(self):
+ ssh = pxssh.pxssh(debug_command_string=True)
+ cipher_string = '-c aes128-ctr,aes192-ctr,aes256-ctr,arcfour256,arcfour128,' \
+ + 'aes128-cbc,3des-cbc,blowfish-cbc,cast128-cbc,aes192-cbc,' \
+ + 'aes256-cbc,arcfour'
+ confirmation_strings = 0
+ confirmation_array = [cipher_string, '-2']
+ string = ssh.login('server', 'me', password='s3cret', cmd='ssh ' + cipher_string + ' -2')
+ for confirmation in confirmation_array:
+ if confirmation in string:
+ confirmation_strings+=1
+
+ if confirmation_strings!=len(confirmation_array):
+ assert False, 'String generated for custom ssh client command is incorrect.'
+
+ def test_failed_custom_ssh_cmd_debug(self):
+ ssh = pxssh.pxssh(debug_command_string=True)
+ cipher_string = '-c invalid_cipher'
+ confirmation_strings = 0
+ confirmation_array = [cipher_string, '-2']
+ string = ssh.login('server', 'me', password='s3cret', cmd='ssh ' + cipher_string + ' -2')
+ for confirmation in confirmation_array:
+ if confirmation in string:
+ confirmation_strings+=1
+
+ if confirmation_strings!=len(confirmation_array):
+ assert False, 'String generated for custom ssh client command is incorrect.'
+
+ def test_custom_ssh_cmd(self):
+ try:
+ ssh = pxssh.pxssh()
+ cipher_string = '-c aes128-ctr,aes192-ctr,aes256-ctr,arcfour256,arcfour128,' \
+ + 'aes128-cbc,3des-cbc,blowfish-cbc,cast128-cbc,aes192-cbc,' \
+ + 'aes256-cbc,arcfour'
+ result = ssh.login('server', 'me', password='s3cret', cmd='ssh ' + cipher_string + ' -2')
+
+ ssh.PROMPT = r'Closed connection'
+ ssh.sendline('exit')
+ ssh.prompt(timeout=5)
+ string = str(ssh.before) + str(ssh.after)
+
+ if 'Closed connection' not in string:
+ assert False, 'should have logged into Mock SSH client and exited'
+ except pxssh.ExceptionPxssh as e:
+ assert False, 'should not have raised exception, pxssh.ExceptionPxssh'
+ else:
+ pass
+
+ def test_failed_custom_ssh_cmd(self):
+ try:
+ ssh = pxssh.pxssh()
+ cipher_string = '-c invalid_cipher'
+ result = ssh.login('server', 'me', password='s3cret', cmd='ssh ' + cipher_string + ' -2')
+
+ ssh.PROMPT = r'Closed connection'
+ ssh.sendline('exit')
+ ssh.prompt(timeout=5)
+ string = str(ssh.before) + str(ssh.after)
+
+ if 'Closed connection' not in string:
+ assert False, 'should not have completed logging into Mock SSH client and exited'
+ except pxssh.ExceptionPxssh as e:
+ pass
+ else:
+ assert False, 'should have raised exception, pxssh.ExceptionPxssh'
if __name__ == '__main__':
unittest.main()