summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTom Gillespie <tgbugs@gmail.com>2019-03-20 20:21:19 -0700
committerTom Gillespie <tgbugs@gmail.com>2019-03-23 01:07:12 -0700
commit43512da5ced3dc7bc97f06d28b1342e9fd856e19 (patch)
tree4075c26c6e096aa616e4299fcf52b764dbb924cb
parent03168dd3cb57cbafebccef0d46e00c6bbd4bb0b9 (diff)
downloadpexpect-git-43512da5ced3dc7bc97f06d28b1342e9fd856e19.tar.gz
pxssh.login require either username or ssh_config
If pxssh.login is called with an ssh_config then do not require username since it can be supplied via the config and some programs do not want to manage the username at all since it would require parsing the ssh config. This commit removes the independent username parameter when formatting cmd and instead appends the flag and the username to ssh_options if a username is provided. If neither username nor ssh_config is provided raise a TypeError in line with the error raised natively if username was not provided as a required positional argument. Tests ensure that ssh_config lists a username for the server in question so the user running the program does not leak. Note that ssh_config keywords are case-insensitive. This commit by itself fails tests because it has been modified to simplifiy the merge commit.
-rw-r--r--pexpect/pxssh.py39
-rw-r--r--tests/test_pxssh.py76
2 files changed, 111 insertions, 4 deletions
diff --git a/pexpect/pxssh.py b/pexpect/pxssh.py
index 49d20cc..fa62342 100644
--- a/pexpect/pxssh.py
+++ b/pexpect/pxssh.py
@@ -253,7 +253,7 @@ class pxssh (spawn):
### TODO: This is getting messy and I'm pretty sure this isn't perfect.
### TODO: I need to draw a flow chart for this.
### TODO: Unit tests for SSH tunnels, remote SSH command exec, disabling original prompt sync
- def login (self, server, username, password='', terminal_type='ansi',
+ def login (self, server, username=None, password='', terminal_type='ansi',
original_prompt=r"[#$]", login_timeout=10, port=None,
auto_prompt_reset=True, ssh_key=None, quiet=True,
sync_multiplier=1, check_local_ip=True,
@@ -354,7 +354,42 @@ class pxssh (spawn):
if spawn_local_ssh==False:
tunnel = quote(str(tunnel))
ssh_options = ssh_options + ' -' + cmd_type + ' ' + str(tunnel)
- cmd = "ssh %s -l %s %s" % (ssh_options, username, server)
+
+ if username is not None:
+ ssh_options = ssh_options + ' -l ' + username
+ elif ssh_config is None:
+ raise TypeError('login() needs either a username or an ssh_config')
+ else: # make sure ssh_config has an entry for the server with a username
+ with open(ssh_config, 'rt') as f:
+ lines = [l.strip() for l in f.readlines()]
+
+ server_regex = r'^Host\s+%s\s*$' % server
+ user_regex = r'^User\s+\w+\s*$'
+ config_has_server = False
+ server_has_username = False
+ for line in lines:
+ if not config_has_server and re.match(server_regex, line, re.IGNORECASE):
+ config_has_server = True
+ elif config_has_server and 'hostname' in line.lower():
+ pass
+ elif config_has_server and 'host' in line.lower():
+ server_has_username = False # insurance
+ break # we have left the relevant section
+ elif config_has_server and re.match(user_regex, line, re.IGNORECASE):
+ server_has_username = True
+ break
+
+ if lines:
+ del line
+
+ del lines
+
+ if not config_has_server:
+ raise TypeError('login() ssh_config has no Host entry for %s' % server)
+ elif not server_has_username:
+ raise TypeError('login() ssh_config has no user entry for %s' % server)
+
+ cmd += " %s %s" % (ssh_options, server)
if self.debug_command_string:
return(cmd)
diff --git a/tests/test_pxssh.py b/tests/test_pxssh.py
index 5f82302..a3deced 100644
--- a/tests/test_pxssh.py
+++ b/tests/test_pxssh.py
@@ -87,11 +87,82 @@ class PxsshTestCase(SSHTestBase):
def test_ssh_config_passing_string(self):
ssh = pxssh.pxssh(debug_command_string=True)
- (temp_file,config_path) = tempfile.mkstemp()
+ temp_file = tempfile.NamedTemporaryFile()
+ config_path = temp_file.name
string = ssh.login('server', 'me', password='s3cret', spawn_local_ssh=False, ssh_config=config_path)
if not '-F '+config_path in string:
assert False, 'String generated from SSH config passing is incorrect.'
+ def test_username_or_ssh_config(self):
+ try:
+ ssh = pxssh.pxssh(debug_command_string=True)
+ temp_file = tempfile.NamedTemporaryFile()
+ config_path = temp_file.name
+ string = ssh.login('server')
+ raise AssertionError('Should have failed due to missing username and missing ssh_config.')
+ except TypeError:
+ pass
+
+ def test_ssh_config_user(self):
+ ssh = pxssh.pxssh(debug_command_string=True)
+ temp_file = tempfile.NamedTemporaryFile()
+ config_path = temp_file.name
+ temp_file.write(b'HosT server\n'
+ b'UsEr me\n'
+ b'hOSt not-server\n')
+ temp_file.seek(0)
+ string = ssh.login('server', ssh_config=config_path)
+
+ def test_ssh_config_no_username_empty_config(self):
+ ssh = pxssh.pxssh(debug_command_string=True)
+ temp_file = tempfile.NamedTemporaryFile()
+ config_path = temp_file.name
+ try:
+ string = ssh.login('server', ssh_config=config_path)
+ raise AssertionError('Should have failed due to no Host.')
+ except TypeError:
+ pass
+
+ def test_ssh_config_wrong_Host(self):
+ ssh = pxssh.pxssh(debug_command_string=True)
+ temp_file = tempfile.NamedTemporaryFile()
+ config_path = temp_file.name
+ temp_file.write(b'Host not-server\n'
+ b'Host also-not-server\n')
+ temp_file.seek(0)
+ try:
+ string = ssh.login('server', ssh_config=config_path)
+ raise AssertionError('Should have failed due to no matching Host.')
+ except TypeError:
+ pass
+
+ def test_ssh_config_no_user(self):
+ ssh = pxssh.pxssh(debug_command_string=True)
+ temp_file = tempfile.NamedTemporaryFile()
+ config_path = temp_file.name
+ temp_file.write(b'Host server\n'
+ b'Host not-server\n')
+ temp_file.seek(0)
+ try:
+ string = ssh.login('server', ssh_config=config_path)
+ raise AssertionError('Should have failed due to no user.')
+ except TypeError:
+ pass
+
+ def test_ssh_config_empty_user(self):
+ ssh = pxssh.pxssh(debug_command_string=True)
+ temp_file = tempfile.NamedTemporaryFile()
+ config_path = temp_file.name
+ temp_file.write(b'Host server\n'
+ b'user \n'
+ b'Host not-server\n')
+ temp_file.seek(0)
+ try:
+ string = ssh.login('server', ssh_config=config_path)
+ raise AssertionError('Should have failed due to empty user.')
+ except TypeError:
+ pass
+
def test_ssh_key_string(self):
ssh = pxssh.pxssh(debug_command_string=True)
confirmation_strings = 0
@@ -105,7 +176,8 @@ class PxsshTestCase(SSHTestBase):
assert False, 'String generated from forcing the SSH agent sock is incorrect.'
confirmation_strings = 0
- (temp_file,ssh_key) = tempfile.mkstemp()
+ temp_file = tempfile.NamedTemporaryFile()
+ ssh_key = temp_file.name
confirmation_array = [' -i '+ssh_key]
string = ssh.login('server', 'me', password='s3cret', ssh_key=ssh_key)
for confirmation in confirmation_array: