summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRed_M <1468433+Red-M@users.noreply.github.com>2019-03-23 18:21:57 +1000
committerGitHub <noreply@github.com>2019-03-23 18:21:57 +1000
commit6efd533df1f114a7352babfcf3a3da4d3238b32c (patch)
treef2f2ec4d0feccee593c05b0e779e53ac7f299266
parent71784ffc4dbb85ce209fbc40cee872a2a734de31 (diff)
parent778b0f15f7b8207b1179623805d5281ae90da8d4 (diff)
downloadpexpect-6efd533df1f114a7352babfcf3a3da4d3238b32c.tar.gz
Merge pull request #562 from tgbugs/patch-1
pxssh.login require either username or ssh_config
-rw-r--r--pexpect/pxssh.py39
-rw-r--r--tests/test_pxssh.py76
2 files changed, 111 insertions, 4 deletions
diff --git a/pexpect/pxssh.py b/pexpect/pxssh.py
index 1dfa29a..2e517af 100644
--- a/pexpect/pxssh.py
+++ b/pexpect/pxssh.py
@@ -253,7 +253,7 @@ class pxssh (spawn):
### TODO: This is getting messy and I'm pretty sure this isn't perfect.
### TODO: I need to draw a flow chart for this.
### TODO: Unit tests for SSH tunnels, remote SSH command exec, disabling original prompt sync
- def login (self, server, username, password='', terminal_type='ansi',
+ def login (self, server, username=None, password='', terminal_type='ansi',
original_prompt=r"[#$]", login_timeout=10, port=None,
auto_prompt_reset=True, ssh_key=None, quiet=True,
sync_multiplier=1, check_local_ip=True,
@@ -358,7 +358,42 @@ class pxssh (spawn):
if spawn_local_ssh==False:
tunnel = quote(str(tunnel))
ssh_options = ssh_options + ' -' + cmd_type + ' ' + str(tunnel)
- cmd += " %s -l %s %s" % (ssh_options, username, server)
+
+ if username is not None:
+ ssh_options = ssh_options + ' -l ' + username
+ elif ssh_config is None:
+ raise TypeError('login() needs either a username or an ssh_config')
+ else: # make sure ssh_config has an entry for the server with a username
+ with open(ssh_config, 'rt') as f:
+ lines = [l.strip() for l in f.readlines()]
+
+ server_regex = r'^Host\s+%s\s*$' % server
+ user_regex = r'^User\s+\w+\s*$'
+ config_has_server = False
+ server_has_username = False
+ for line in lines:
+ if not config_has_server and re.match(server_regex, line, re.IGNORECASE):
+ config_has_server = True
+ elif config_has_server and 'hostname' in line.lower():
+ pass
+ elif config_has_server and 'host' in line.lower():
+ server_has_username = False # insurance
+ break # we have left the relevant section
+ elif config_has_server and re.match(user_regex, line, re.IGNORECASE):
+ server_has_username = True
+ break
+
+ if lines:
+ del line
+
+ del lines
+
+ if not config_has_server:
+ raise TypeError('login() ssh_config has no Host entry for %s' % server)
+ elif not server_has_username:
+ raise TypeError('login() ssh_config has no user entry for %s' % server)
+
+ cmd += " %s %s" % (ssh_options, server)
if self.debug_command_string:
return(cmd)
diff --git a/tests/test_pxssh.py b/tests/test_pxssh.py
index ac53a9c..0d49b23 100644
--- a/tests/test_pxssh.py
+++ b/tests/test_pxssh.py
@@ -87,11 +87,82 @@ class PxsshTestCase(SSHTestBase):
def test_ssh_config_passing_string(self):
ssh = pxssh.pxssh(debug_command_string=True)
- (temp_file,config_path) = tempfile.mkstemp()
+ temp_file = tempfile.NamedTemporaryFile()
+ config_path = temp_file.name
string = ssh.login('server', 'me', password='s3cret', spawn_local_ssh=False, ssh_config=config_path)
if not '-F '+config_path in string:
assert False, 'String generated from SSH config passing is incorrect.'
+ def test_username_or_ssh_config(self):
+ try:
+ ssh = pxssh.pxssh(debug_command_string=True)
+ temp_file = tempfile.NamedTemporaryFile()
+ config_path = temp_file.name
+ string = ssh.login('server')
+ raise AssertionError('Should have failed due to missing username and missing ssh_config.')
+ except TypeError:
+ pass
+
+ def test_ssh_config_user(self):
+ ssh = pxssh.pxssh(debug_command_string=True)
+ temp_file = tempfile.NamedTemporaryFile()
+ config_path = temp_file.name
+ temp_file.write(b'HosT server\n'
+ b'UsEr me\n'
+ b'hOSt not-server\n')
+ temp_file.seek(0)
+ string = ssh.login('server', ssh_config=config_path)
+
+ def test_ssh_config_no_username_empty_config(self):
+ ssh = pxssh.pxssh(debug_command_string=True)
+ temp_file = tempfile.NamedTemporaryFile()
+ config_path = temp_file.name
+ try:
+ string = ssh.login('server', ssh_config=config_path)
+ raise AssertionError('Should have failed due to no Host.')
+ except TypeError:
+ pass
+
+ def test_ssh_config_wrong_Host(self):
+ ssh = pxssh.pxssh(debug_command_string=True)
+ temp_file = tempfile.NamedTemporaryFile()
+ config_path = temp_file.name
+ temp_file.write(b'Host not-server\n'
+ b'Host also-not-server\n')
+ temp_file.seek(0)
+ try:
+ string = ssh.login('server', ssh_config=config_path)
+ raise AssertionError('Should have failed due to no matching Host.')
+ except TypeError:
+ pass
+
+ def test_ssh_config_no_user(self):
+ ssh = pxssh.pxssh(debug_command_string=True)
+ temp_file = tempfile.NamedTemporaryFile()
+ config_path = temp_file.name
+ temp_file.write(b'Host server\n'
+ b'Host not-server\n')
+ temp_file.seek(0)
+ try:
+ string = ssh.login('server', ssh_config=config_path)
+ raise AssertionError('Should have failed due to no user.')
+ except TypeError:
+ pass
+
+ def test_ssh_config_empty_user(self):
+ ssh = pxssh.pxssh(debug_command_string=True)
+ temp_file = tempfile.NamedTemporaryFile()
+ config_path = temp_file.name
+ temp_file.write(b'Host server\n'
+ b'user \n'
+ b'Host not-server\n')
+ temp_file.seek(0)
+ try:
+ string = ssh.login('server', ssh_config=config_path)
+ raise AssertionError('Should have failed due to empty user.')
+ except TypeError:
+ pass
+
def test_ssh_key_string(self):
ssh = pxssh.pxssh(debug_command_string=True)
confirmation_strings = 0
@@ -105,7 +176,8 @@ class PxsshTestCase(SSHTestBase):
assert False, 'String generated from forcing the SSH agent sock is incorrect.'
confirmation_strings = 0
- (temp_file,ssh_key) = tempfile.mkstemp()
+ temp_file = tempfile.NamedTemporaryFile()
+ ssh_key = temp_file.name
confirmation_array = [' -i '+ssh_key]
string = ssh.login('server', 'me', password='s3cret', ssh_key=ssh_key)
for confirmation in confirmation_array: