summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorJeff Forcier <jeff@bitprophet.org>2019-12-03 14:35:04 -0500
committerJeff Forcier <jeff@bitprophet.org>2019-12-03 14:35:04 -0500
commit9e6fc80397582838de8124344efcde6b17472b73 (patch)
tree50fd8121c695ad5f322ebb377b9815711098e531 /tests
parent25de1a7a02a2189614787718739efbde86d5bb8e (diff)
parent84fa355a253d30d6c39adaea8bb095ced0c3b751 (diff)
downloadparamiko-9e6fc80397582838de8124344efcde6b17472b73.tar.gz
Merge branch 'master' into 1343-int
Diffstat (limited to 'tests')
-rw-r--r--tests/configs/basic4
-rw-r--r--tests/configs/canon8
-rw-r--r--tests/configs/canon-always5
-rw-r--r--tests/configs/canon-ipv46
-rw-r--r--tests/configs/canon-local6
-rw-r--r--tests/configs/canon-local-always6
-rw-r--r--tests/configs/deep-canon11
-rw-r--r--tests/configs/deep-canon-maxdots12
-rw-r--r--tests/configs/empty-canon6
-rw-r--r--tests/configs/fallback-no6
-rw-r--r--tests/configs/fallback-yes6
-rw-r--r--tests/configs/hostname-exec-tokenized2
-rw-r--r--tests/configs/hostname-tokenized1
-rw-r--r--tests/configs/invalid1
-rw-r--r--tests/configs/match-all2
-rw-r--r--tests/configs/match-all-after-canonical5
-rw-r--r--tests/configs/match-all-and-more2
-rw-r--r--tests/configs/match-all-and-more-before2
-rw-r--r--tests/configs/match-all-before-canonical5
-rw-r--r--tests/configs/match-canonical-no7
-rw-r--r--tests/configs/match-canonical-yes5
-rw-r--r--tests/configs/match-complex17
-rw-r--r--tests/configs/match-exec16
-rw-r--r--tests/configs/match-exec-canonical10
-rw-r--r--tests/configs/match-exec-negation5
-rw-r--r--tests/configs/match-exec-no-arg2
-rw-r--r--tests/configs/match-host2
-rw-r--r--tests/configs/match-host-canonicalized8
-rw-r--r--tests/configs/match-host-from-match5
-rw-r--r--tests/configs/match-host-glob2
-rw-r--r--tests/configs/match-host-glob-list8
-rw-r--r--tests/configs/match-host-name4
-rw-r--r--tests/configs/match-host-negated2
-rw-r--r--tests/configs/match-host-no-arg2
-rw-r--r--tests/configs/match-localuser14
-rw-r--r--tests/configs/match-localuser-no-arg2
-rw-r--r--tests/configs/match-orighost16
-rw-r--r--tests/configs/match-orighost-canonical5
-rw-r--r--tests/configs/match-orighost-no-arg2
-rw-r--r--tests/configs/match-user14
-rw-r--r--tests/configs/match-user-explicit4
-rw-r--r--tests/configs/match-user-no-arg2
-rw-r--r--tests/configs/multi-canon-domains5
-rw-r--r--tests/configs/no-canon5
-rw-r--r--tests/configs/robey17
-rw-r--r--tests/configs/zero-maxdots9
-rw-r--r--tests/stub_sftp.py19
-rw-r--r--tests/test_auth.py4
-rw-r--r--tests/test_buffered_pipe.py9
-rw-r--r--tests/test_channelfile.py60
-rw-r--r--tests/test_client.py114
-rw-r--r--tests/test_config.py996
-rw-r--r--tests/test_ed25519-funky-padding.key7
-rw-r--r--tests/test_ed25519-funky-padding_password.key8
-rw-r--r--tests/test_file.py26
-rw-r--r--tests/test_gssapi.py89
-rw-r--r--tests/test_hostkeys.py12
-rw-r--r--tests/test_kex.py216
-rw-r--r--tests/test_kex_gss.py27
-rw-r--r--tests/test_message.py16
-rw-r--r--tests/test_packetizer.py10
-rw-r--r--tests/test_pkey.py86
-rw-r--r--tests/test_proxy.py144
-rw-r--r--tests/test_sftp.py117
-rw-r--r--tests/test_sftp_big.py25
-rw-r--r--tests/test_ssh_exception.py42
-rw-r--r--tests/test_ssh_gss.py20
-rw-r--r--tests/test_transport.py178
-rw-r--r--tests/test_util.py522
-rw-r--r--tests/util.py114
70 files changed, 2296 insertions, 851 deletions
diff --git a/tests/configs/basic b/tests/configs/basic
new file mode 100644
index 00000000..93fe3beb
--- /dev/null
+++ b/tests/configs/basic
@@ -0,0 +1,4 @@
+CanonicalDomains paramiko.org
+
+Host www.paramiko.org
+ User rando
diff --git a/tests/configs/canon b/tests/configs/canon
new file mode 100644
index 00000000..7b979408
--- /dev/null
+++ b/tests/configs/canon
@@ -0,0 +1,8 @@
+CanonicalizeHostname yes
+CanonicalDomains paramiko.org
+
+IdentityFile base.key
+
+Host www.paramiko.org
+ User rando
+ IdentityFile canonicalized.key
diff --git a/tests/configs/canon-always b/tests/configs/canon-always
new file mode 100644
index 00000000..f3f56b70
--- /dev/null
+++ b/tests/configs/canon-always
@@ -0,0 +1,5 @@
+CanonicalDomains paramiko.org
+CanonicalizeHostname always
+
+Host www.paramiko.org
+ User rando
diff --git a/tests/configs/canon-ipv4 b/tests/configs/canon-ipv4
new file mode 100644
index 00000000..92c3875f
--- /dev/null
+++ b/tests/configs/canon-ipv4
@@ -0,0 +1,6 @@
+CanonicalDomains paramiko.org
+CanonicalizeHostname yes
+AddressFamily inet
+
+Host www.paramiko.org
+ User rando
diff --git a/tests/configs/canon-local b/tests/configs/canon-local
new file mode 100644
index 00000000..dde9f77b
--- /dev/null
+++ b/tests/configs/canon-local
@@ -0,0 +1,6 @@
+Host www.paramiko.org
+ User rando
+
+Host www
+ CanonicalDomains paramiko.org
+ CanonicalizeHostname yes
diff --git a/tests/configs/canon-local-always b/tests/configs/canon-local-always
new file mode 100644
index 00000000..0ad0535a
--- /dev/null
+++ b/tests/configs/canon-local-always
@@ -0,0 +1,6 @@
+Host www.paramiko.org
+ User rando
+
+Host www
+ CanonicalDomains paramiko.org
+ CanonicalizeHostname always
diff --git a/tests/configs/deep-canon b/tests/configs/deep-canon
new file mode 100644
index 00000000..483823d5
--- /dev/null
+++ b/tests/configs/deep-canon
@@ -0,0 +1,11 @@
+CanonicalizeHostname yes
+CanonicalDomains paramiko.org
+
+Host www.paramiko.org
+ User rando
+
+Host sub.www.paramiko.org
+ User deep
+
+Host subber.sub.www.paramiko.org
+ User deeper
diff --git a/tests/configs/deep-canon-maxdots b/tests/configs/deep-canon-maxdots
new file mode 100644
index 00000000..7785f660
--- /dev/null
+++ b/tests/configs/deep-canon-maxdots
@@ -0,0 +1,12 @@
+CanonicalizeHostname yes
+CanonicalDomains paramiko.org
+CanonicalizeMaxDots 2
+
+Host www.paramiko.org
+ User rando
+
+Host sub.www.paramiko.org
+ User deep
+
+Host subber.sub.www.paramiko.org
+ User deeper
diff --git a/tests/configs/empty-canon b/tests/configs/empty-canon
new file mode 100644
index 00000000..19743ad6
--- /dev/null
+++ b/tests/configs/empty-canon
@@ -0,0 +1,6 @@
+CanonicalizeHostname yes
+CanonicalDomains
+AddressFamily inet
+
+Host www.paramiko.org
+ User rando
diff --git a/tests/configs/fallback-no b/tests/configs/fallback-no
new file mode 100644
index 00000000..ec8d13ee
--- /dev/null
+++ b/tests/configs/fallback-no
@@ -0,0 +1,6 @@
+CanonicalizeHostname yes
+CanonicalDomains paramiko.org
+CanonicalizeFallbackLocal no
+
+Host www.paramiko.org
+ User rando
diff --git a/tests/configs/fallback-yes b/tests/configs/fallback-yes
new file mode 100644
index 00000000..bc4f4eee
--- /dev/null
+++ b/tests/configs/fallback-yes
@@ -0,0 +1,6 @@
+CanonicalizeHostname yes
+CanonicalDomains paramiko.org
+CanonicalizeFallbackLocal yes
+
+Host www.paramiko.org
+ User rando
diff --git a/tests/configs/hostname-exec-tokenized b/tests/configs/hostname-exec-tokenized
new file mode 100644
index 00000000..1cae2c03
--- /dev/null
+++ b/tests/configs/hostname-exec-tokenized
@@ -0,0 +1,2 @@
+Match exec "ping %h"
+ HostName pingable.%h
diff --git a/tests/configs/hostname-tokenized b/tests/configs/hostname-tokenized
new file mode 100644
index 00000000..1905c0cc
--- /dev/null
+++ b/tests/configs/hostname-tokenized
@@ -0,0 +1 @@
+HostName prefix.%h
diff --git a/tests/configs/invalid b/tests/configs/invalid
new file mode 100644
index 00000000..81332fe8
--- /dev/null
+++ b/tests/configs/invalid
@@ -0,0 +1 @@
+lolwut
diff --git a/tests/configs/match-all b/tests/configs/match-all
new file mode 100644
index 00000000..7673e0a0
--- /dev/null
+++ b/tests/configs/match-all
@@ -0,0 +1,2 @@
+Match all
+ User awesome
diff --git a/tests/configs/match-all-after-canonical b/tests/configs/match-all-after-canonical
new file mode 100644
index 00000000..531112cb
--- /dev/null
+++ b/tests/configs/match-all-after-canonical
@@ -0,0 +1,5 @@
+CanonicalizeHostname yes
+CanonicalDomains paramiko.org
+
+Match canonical all
+ User awesome
diff --git a/tests/configs/match-all-and-more b/tests/configs/match-all-and-more
new file mode 100644
index 00000000..bb50696e
--- /dev/null
+++ b/tests/configs/match-all-and-more
@@ -0,0 +1,2 @@
+Match all exec "lol nope"
+ HostName whatever
diff --git a/tests/configs/match-all-and-more-before b/tests/configs/match-all-and-more-before
new file mode 100644
index 00000000..4d5b2e34
--- /dev/null
+++ b/tests/configs/match-all-and-more-before
@@ -0,0 +1,2 @@
+Match exec "lol nope" all
+ HostName whatever
diff --git a/tests/configs/match-all-before-canonical b/tests/configs/match-all-before-canonical
new file mode 100644
index 00000000..35e3b0e2
--- /dev/null
+++ b/tests/configs/match-all-before-canonical
@@ -0,0 +1,5 @@
+CanonicalizeHostname yes
+CanonicalDomains paramiko.org
+
+Match all canonical
+ User oops
diff --git a/tests/configs/match-canonical-no b/tests/configs/match-canonical-no
new file mode 100644
index 00000000..e528dc64
--- /dev/null
+++ b/tests/configs/match-canonical-no
@@ -0,0 +1,7 @@
+CanonicalizeHostname no
+
+Match canonical all
+ User awesome
+
+Match !canonical host specific
+ User overload
diff --git a/tests/configs/match-canonical-yes b/tests/configs/match-canonical-yes
new file mode 100644
index 00000000..d6c20928
--- /dev/null
+++ b/tests/configs/match-canonical-yes
@@ -0,0 +1,5 @@
+CanonicalizeHostname yes
+CanonicalDomains paramiko.org
+
+Match !canonical host www*
+ User hidden
diff --git a/tests/configs/match-complex b/tests/configs/match-complex
new file mode 100644
index 00000000..63634039
--- /dev/null
+++ b/tests/configs/match-complex
@@ -0,0 +1,17 @@
+HostName bogus
+
+Match originalhost target host bogus
+ User rand
+
+Match originalhost remote localuser rando
+ User calrissian
+
+# Just to set user for subsequent match
+Match originalhost www
+ User calrissian
+
+Match !canonical originalhost www host bogus localuser rando user calrissian
+ Port 7777
+
+Match !canonical !originalhost www host bogus localuser rando !user calrissian
+ Port 1234
diff --git a/tests/configs/match-exec b/tests/configs/match-exec
new file mode 100644
index 00000000..763346ea
--- /dev/null
+++ b/tests/configs/match-exec
@@ -0,0 +1,16 @@
+Match exec "quoted"
+ User benjamin
+
+Match exec unquoted
+ User rando
+
+Match exec "quoted spaced"
+ User neil
+
+# Just to prepopulate values for tokenizing subsequent exec
+Host target
+ User intermediate
+ HostName configured
+
+Match exec "%d %h %L %l %n %p %r %u"
+ Port 1337
diff --git a/tests/configs/match-exec-canonical b/tests/configs/match-exec-canonical
new file mode 100644
index 00000000..794ee9d5
--- /dev/null
+++ b/tests/configs/match-exec-canonical
@@ -0,0 +1,10 @@
+CanonicalDomains paramiko.org
+CanonicalizeHostname always
+
+# This will match in the first, uncanonicalized pass
+Match !canonical exec uncanonicalized
+ User defenseless
+
+# And this will match the second time
+Match canonical exec canonicalized
+ Port 8007
diff --git a/tests/configs/match-exec-negation b/tests/configs/match-exec-negation
new file mode 100644
index 00000000..937c910e
--- /dev/null
+++ b/tests/configs/match-exec-negation
@@ -0,0 +1,5 @@
+Match !exec "this succeeds"
+ User nope
+
+Match !exec "this fails"
+ User yup
diff --git a/tests/configs/match-exec-no-arg b/tests/configs/match-exec-no-arg
new file mode 100644
index 00000000..20c16d16
--- /dev/null
+++ b/tests/configs/match-exec-no-arg
@@ -0,0 +1,2 @@
+Match exec
+ User uh-oh
diff --git a/tests/configs/match-host b/tests/configs/match-host
new file mode 100644
index 00000000..86cbff5d
--- /dev/null
+++ b/tests/configs/match-host
@@ -0,0 +1,2 @@
+Match host target
+ User rand
diff --git a/tests/configs/match-host-canonicalized b/tests/configs/match-host-canonicalized
new file mode 100644
index 00000000..52dadeae
--- /dev/null
+++ b/tests/configs/match-host-canonicalized
@@ -0,0 +1,8 @@
+CanonicalizeHostname yes
+CanonicalDomains paramiko.org
+
+Match host www.paramiko.org
+ User rand
+
+Match canonical host docs.paramiko.org
+ User eric
diff --git a/tests/configs/match-host-from-match b/tests/configs/match-host-from-match
new file mode 100644
index 00000000..172ee116
--- /dev/null
+++ b/tests/configs/match-host-from-match
@@ -0,0 +1,5 @@
+Match host original-host
+ HostName substituted-host
+
+Match host substituted-host
+ User inner
diff --git a/tests/configs/match-host-glob b/tests/configs/match-host-glob
new file mode 100644
index 00000000..3d53cf48
--- /dev/null
+++ b/tests/configs/match-host-glob
@@ -0,0 +1,2 @@
+Match host *ever
+ User matrim
diff --git a/tests/configs/match-host-glob-list b/tests/configs/match-host-glob-list
new file mode 100644
index 00000000..3617d136
--- /dev/null
+++ b/tests/configs/match-host-glob-list
@@ -0,0 +1,8 @@
+Match host *ever
+ User matrim
+
+Match host somehost,someotherhost
+ User thom
+
+Match host goo*,!goof
+ User perrin
diff --git a/tests/configs/match-host-name b/tests/configs/match-host-name
new file mode 100644
index 00000000..783d939e
--- /dev/null
+++ b/tests/configs/match-host-name
@@ -0,0 +1,4 @@
+HostName default-host
+
+Match host default-host
+ User silly
diff --git a/tests/configs/match-host-negated b/tests/configs/match-host-negated
new file mode 100644
index 00000000..7c5d3f3e
--- /dev/null
+++ b/tests/configs/match-host-negated
@@ -0,0 +1,2 @@
+Match !host www
+ User jeff
diff --git a/tests/configs/match-host-no-arg b/tests/configs/match-host-no-arg
new file mode 100644
index 00000000..191cebb5
--- /dev/null
+++ b/tests/configs/match-host-no-arg
@@ -0,0 +1,2 @@
+Match host
+ User oops
diff --git a/tests/configs/match-localuser b/tests/configs/match-localuser
new file mode 100644
index 00000000..fe4a276c
--- /dev/null
+++ b/tests/configs/match-localuser
@@ -0,0 +1,14 @@
+Match localuser gandalf
+ HostName gondor
+
+Match localuser b*
+ HostName shire
+
+Match localuser aragorn,frodo
+ HostName moria
+
+Match localuser gimli,!legolas
+ Port 7373
+
+Match !localuser sauron
+ HostName mordor
diff --git a/tests/configs/match-localuser-no-arg b/tests/configs/match-localuser-no-arg
new file mode 100644
index 00000000..6623553a
--- /dev/null
+++ b/tests/configs/match-localuser-no-arg
@@ -0,0 +1,2 @@
+Match localuser
+ User oops
diff --git a/tests/configs/match-orighost b/tests/configs/match-orighost
new file mode 100644
index 00000000..10541993
--- /dev/null
+++ b/tests/configs/match-orighost
@@ -0,0 +1,16 @@
+HostName bogus
+
+Match originalhost target
+ User tuon
+
+Match originalhost what*
+ User matrim
+
+Match originalhost comma,sep*
+ User chameleon
+
+Match originalhost yep,!nope
+ User skipped
+
+Match !originalhost www !originalhost nope
+ User thom
diff --git a/tests/configs/match-orighost-canonical b/tests/configs/match-orighost-canonical
new file mode 100644
index 00000000..737345e8
--- /dev/null
+++ b/tests/configs/match-orighost-canonical
@@ -0,0 +1,5 @@
+CanonicalizeHostname yes
+CanonicalDomains paramiko.org
+
+Match originalhost www
+ User tuon
diff --git a/tests/configs/match-orighost-no-arg b/tests/configs/match-orighost-no-arg
new file mode 100644
index 00000000..427382ba
--- /dev/null
+++ b/tests/configs/match-orighost-no-arg
@@ -0,0 +1,2 @@
+Match originalhost
+ User oops
diff --git a/tests/configs/match-user b/tests/configs/match-user
new file mode 100644
index 00000000..14d6ac12
--- /dev/null
+++ b/tests/configs/match-user
@@ -0,0 +1,14 @@
+Match user gandalf
+ HostName gondor
+
+Match user b*
+ HostName shire
+
+Match user aragorn,frodo
+ HostName moria
+
+Match user gimli,!legolas
+ Port 7373
+
+Match !user sauron
+ HostName mordor
diff --git a/tests/configs/match-user-explicit b/tests/configs/match-user-explicit
new file mode 100644
index 00000000..9a2b1d82
--- /dev/null
+++ b/tests/configs/match-user-explicit
@@ -0,0 +1,4 @@
+User explicit
+
+Match user explicit
+ HostName dumb
diff --git a/tests/configs/match-user-no-arg b/tests/configs/match-user-no-arg
new file mode 100644
index 00000000..65a11ab4
--- /dev/null
+++ b/tests/configs/match-user-no-arg
@@ -0,0 +1,2 @@
+Match user
+ User oops
diff --git a/tests/configs/multi-canon-domains b/tests/configs/multi-canon-domains
new file mode 100644
index 00000000..5674b442
--- /dev/null
+++ b/tests/configs/multi-canon-domains
@@ -0,0 +1,5 @@
+CanonicalizeHostname yes
+CanonicalDomains not-a-real-tld paramiko.org
+
+Host www.paramiko.org
+ User rando
diff --git a/tests/configs/no-canon b/tests/configs/no-canon
new file mode 100644
index 00000000..033f8c53
--- /dev/null
+++ b/tests/configs/no-canon
@@ -0,0 +1,5 @@
+CanonicalizeHostname no
+CanonicalDomains paramiko.org
+
+Host www.paramiko.org
+ User rando
diff --git a/tests/configs/robey b/tests/configs/robey
new file mode 100644
index 00000000..b2026224
--- /dev/null
+++ b/tests/configs/robey
@@ -0,0 +1,17 @@
+# A timeless classic?
+# NOTE: some lines in here have 'extra' whitespace (incl trailing, and mixed
+# tabs/spaces!) on purpose.
+
+Host *
+ User robey
+ IdentityFile =~/.ssh/id_rsa
+
+# comment
+Host *.example.com
+ User bjork
+Port=3333
+Host *
+ Crazy something dumb
+Host spoo.example.com
+Crazy something else
+
diff --git a/tests/configs/zero-maxdots b/tests/configs/zero-maxdots
new file mode 100644
index 00000000..dc00054c
--- /dev/null
+++ b/tests/configs/zero-maxdots
@@ -0,0 +1,9 @@
+CanonicalizeHostname yes
+CanonicalDomains paramiko.org
+CanonicalizeMaxDots 0
+
+Host www.paramiko.org
+ User rando
+
+Host sub.www.paramiko.org
+ User deep
diff --git a/tests/stub_sftp.py b/tests/stub_sftp.py
index 100076d6..1528a0b8 100644
--- a/tests/stub_sftp.py
+++ b/tests/stub_sftp.py
@@ -21,18 +21,17 @@ A stub SFTP server for loopback SFTP testing.
"""
import os
-import sys
from paramiko import (
- ServerInterface,
- SFTPServerInterface,
- SFTPServer,
+ AUTH_SUCCESSFUL,
+ OPEN_SUCCEEDED,
SFTPAttributes,
SFTPHandle,
- SFTP_OK,
+ SFTPServer,
+ SFTPServerInterface,
SFTP_FAILURE,
- AUTH_SUCCESSFUL,
- OPEN_SUCCEEDED,
+ SFTP_OK,
+ ServerInterface,
)
from paramiko.common import o666
@@ -65,7 +64,8 @@ class StubSFTPHandle(SFTPHandle):
class StubSFTPServer(SFTPServerInterface):
# assume current folder is a fine root
- # (the tests always create and eventually delete a subfolder, so there shouldn't be any mess)
+ # (the tests always create and eventually delete a subfolder, so there
+ # shouldn't be any mess)
ROOT = os.getcwd()
def _realpath(self, path):
@@ -206,7 +206,8 @@ class StubSFTPServer(SFTPServerInterface):
# compute relative to path
abspath = os.path.join(os.path.dirname(path), target_path)
if abspath[: len(self.ROOT)] != self.ROOT:
- # this symlink isn't going to work anyway -- just break it immediately
+ # this symlink isn't going to work anyway -- just break it
+ # immediately
target_path = "<error>"
try:
os.symlink(target_path, path)
diff --git a/tests/test_auth.py b/tests/test_auth.py
index d98a00c4..01fbac5b 100644
--- a/tests/test_auth.py
+++ b/tests/test_auth.py
@@ -250,7 +250,7 @@ class AuthTest(unittest.TestCase):
self.start_server()
self.tc.connect(hostkey=self.public_host_key)
try:
- remain = self.tc.auth_password("bad-server", "hello")
+ self.tc.auth_password("bad-server", "hello")
except:
etype, evalue, etb = sys.exc_info()
self.assertTrue(issubclass(etype, AuthenticationException))
@@ -265,7 +265,7 @@ class AuthTest(unittest.TestCase):
self.start_server()
self.tc.connect()
try:
- remain = self.tc.auth_password("unresponsive-server", "hello")
+ self.tc.auth_password("unresponsive-server", "hello")
except:
etype, evalue, etb = sys.exc_info()
self.assertTrue(issubclass(etype, AuthenticationException))
diff --git a/tests/test_buffered_pipe.py b/tests/test_buffered_pipe.py
index 28d6e4a2..61c99cc0 100644
--- a/tests/test_buffered_pipe.py
+++ b/tests/test_buffered_pipe.py
@@ -26,7 +26,6 @@ import unittest
from paramiko.buffered_pipe import BufferedPipe, PipeTimeout
from paramiko import pipe
-from paramiko.py3compat import b
def delay_thread(p):
@@ -42,7 +41,7 @@ def close_thread(p):
class BufferedPipeTest(unittest.TestCase):
- def test_1_buffered_pipe(self):
+ def test_buffered_pipe(self):
p = BufferedPipe()
self.assertTrue(not p.read_ready())
p.feed("hello.")
@@ -59,7 +58,7 @@ class BufferedPipeTest(unittest.TestCase):
self.assertTrue(not p.read_ready())
self.assertEqual(b"", p.read(1))
- def test_2_delay(self):
+ def test_delay(self):
p = BufferedPipe()
self.assertTrue(not p.read_ready())
threading.Thread(target=delay_thread, args=(p,)).start()
@@ -72,13 +71,13 @@ class BufferedPipeTest(unittest.TestCase):
self.assertEqual(b"b", p.read(1, 1.0))
self.assertEqual(b"", p.read(1))
- def test_3_close_while_reading(self):
+ def test_close_while_reading(self):
p = BufferedPipe()
threading.Thread(target=close_thread, args=(p,)).start()
data = p.read(1, 1.0)
self.assertEqual(b"", data)
- def test_4_or_pipe(self):
+ def test_or_pipe(self):
p = pipe.make_pipe()
p1, p2 = pipe.make_or_pipe(p)
self.assertFalse(p._set)
diff --git a/tests/test_channelfile.py b/tests/test_channelfile.py
new file mode 100644
index 00000000..4448fdfb
--- /dev/null
+++ b/tests/test_channelfile.py
@@ -0,0 +1,60 @@
+from mock import patch, MagicMock
+
+from paramiko import Channel, ChannelFile, ChannelStderrFile, ChannelStdinFile
+
+
+class ChannelFileBase(object):
+ @patch("paramiko.channel.ChannelFile._set_mode")
+ def test_defaults_to_unbuffered_reading(self, setmode):
+ self.klass(Channel(None))
+ setmode.assert_called_once_with("r", -1)
+
+ @patch("paramiko.channel.ChannelFile._set_mode")
+ def test_can_override_mode_and_bufsize(self, setmode):
+ self.klass(Channel(None), mode="w", bufsize=25)
+ setmode.assert_called_once_with("w", 25)
+
+ def test_read_recvs_from_channel(self):
+ chan = MagicMock()
+ cf = self.klass(chan)
+ cf.read(100)
+ chan.recv.assert_called_once_with(100)
+
+ def test_write_calls_channel_sendall(self):
+ chan = MagicMock()
+ cf = self.klass(chan, mode="w")
+ cf.write("ohai")
+ chan.sendall.assert_called_once_with(b"ohai")
+
+
+class TestChannelFile(ChannelFileBase):
+ klass = ChannelFile
+
+
+class TestChannelStderrFile(object):
+ def test_read_calls_channel_recv_stderr(self):
+ chan = MagicMock()
+ cf = ChannelStderrFile(chan)
+ cf.read(100)
+ chan.recv_stderr.assert_called_once_with(100)
+
+ def test_write_calls_channel_sendall(self):
+ chan = MagicMock()
+ cf = ChannelStderrFile(chan, mode="w")
+ cf.write("ohai")
+ chan.sendall_stderr.assert_called_once_with(b"ohai")
+
+
+class TestChannelStdinFile(ChannelFileBase):
+ klass = ChannelStdinFile
+
+ def test_close_calls_channel_shutdown_write(self):
+ chan = MagicMock()
+ cf = ChannelStdinFile(chan, mode="wb")
+ cf.flush = MagicMock()
+ cf.close()
+ # Sanity check that we still call BufferedFile.close()
+ cf.flush.assert_called_once_with()
+ assert cf._closed is True
+ # Actual point of test
+ chan.shutdown_write.assert_called_once_with()
diff --git a/tests/test_client.py b/tests/test_client.py
index 80b28adf..60ad310c 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -34,10 +34,11 @@ import weakref
from tempfile import mkstemp
from pytest_relaxed import raises
+from mock import patch, Mock
import paramiko
+from paramiko import SSHClient
from paramiko.pkey import PublicBlob
-from paramiko.common import PY2
from paramiko.ssh_exception import SSHException, AuthenticationException
from .util import _support, slow
@@ -48,9 +49,9 @@ requires_gss_auth = unittest.skipUnless(
)
FINGERPRINTS = {
- "ssh-dss": b"\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c",
- "ssh-rsa": b"\x60\x73\x38\x44\xcb\x51\x86\x65\x7f\xde\xda\xa2\x2b\x5a\x57\xd5",
- "ecdsa-sha2-nistp256": b"\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60",
+ "ssh-dss": b"\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c", # noqa
+ "ssh-rsa": b"\x60\x73\x38\x44\xcb\x51\x86\x65\x7f\xde\xda\xa2\x2b\x5a\x57\xd5", # noqa
+ "ecdsa-sha2-nistp256": b"\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60", # noqa
"ssh-ed25519": b'\xb3\xd5"\xaa\xf9u^\xe8\xcd\x0e\xea\x02\xb9)\xa2\x80',
}
@@ -192,7 +193,7 @@ class ClientTest(unittest.TestCase):
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
# Client setup
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.get_host_keys().add(
"[%s]:%d" % (self.addr, self.port), "ssh-rsa", public_host_key
)
@@ -212,6 +213,12 @@ class ClientTest(unittest.TestCase):
stdin, stdout, stderr = self.tc.exec_command("yes")
schan = self.ts.accept(1.0)
+ # Nobody else tests the API of exec_command so let's do it here for
+ # now. :weary:
+ assert isinstance(stdin, paramiko.ChannelStdinFile)
+ assert isinstance(stdout, paramiko.ChannelFile)
+ assert isinstance(stderr, paramiko.ChannelStderrFile)
+
schan.send("Hello there.\n")
schan.send_stderr("This is on stderr.\n")
schan.close()
@@ -228,13 +235,13 @@ class ClientTest(unittest.TestCase):
class SSHClientTest(ClientTest):
- def test_1_client(self):
+ def test_client(self):
"""
verify that the SSHClient stuff works too.
"""
self._test_connection(password="pygmalion")
- def test_2_client_dsa(self):
+ def test_client_dsa(self):
"""
verify that SSHClient works with a DSA key.
"""
@@ -246,7 +253,7 @@ class SSHClientTest(ClientTest):
"""
self._test_connection(key_filename=_support("test_rsa.key"))
- def test_2_5_client_ecdsa(self):
+ def test_client_ecdsa(self):
"""
verify that SSHClient works with an ECDSA key.
"""
@@ -255,7 +262,7 @@ class SSHClientTest(ClientTest):
def test_client_ed25519(self):
self._test_connection(key_filename=_support("test_ed25519.key"))
- def test_3_multiple_key_files(self):
+ def test_multiple_key_files(self):
"""
verify that SSHClient accepts and tries multiple key files.
"""
@@ -335,7 +342,7 @@ class SSHClientTest(ClientTest):
# code path (!) so we're punting too, sob.
pass
- def test_4_auto_add_policy(self):
+ def test_auto_add_policy(self):
"""
verify that SSHClient's AutoAddPolicy works.
"""
@@ -344,7 +351,7 @@ class SSHClientTest(ClientTest):
key_file = _support("test_ecdsa_256.key")
public_host_key = paramiko.ECDSAKey.from_private_key_file(key_file)
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.assertEqual(0, len(self.tc.get_host_keys()))
self.tc.connect(password="pygmalion", **self.connect_kwargs)
@@ -358,7 +365,7 @@ class SSHClientTest(ClientTest):
new_host_key = list(self.tc.get_host_keys()[hostname].values())[0]
self.assertEqual(public_host_key, new_host_key)
- def test_5_save_host_keys(self):
+ def test_save_host_keys(self):
"""
verify that SSHClient correctly saves a known_hosts file.
"""
@@ -371,16 +378,14 @@ class SSHClientTest(ClientTest):
fd, localname = mkstemp()
os.close(fd)
- client = paramiko.SSHClient()
- self.assertEquals(0, len(client.get_host_keys()))
+ client = SSHClient()
+ assert len(client.get_host_keys()) == 0
host_id = "[%s]:%d" % (self.addr, self.port)
client.get_host_keys().add(host_id, "ssh-rsa", public_host_key)
- self.assertEquals(1, len(client.get_host_keys()))
- self.assertEquals(
- public_host_key, client.get_host_keys()[host_id]["ssh-rsa"]
- )
+ assert len(client.get_host_keys()) == 1
+ assert public_host_key == client.get_host_keys()[host_id]["ssh-rsa"]
client.save_host_keys(localname)
@@ -389,7 +394,7 @@ class SSHClientTest(ClientTest):
os.unlink(localname)
- def test_6_cleanup(self):
+ def test_cleanup(self):
"""
verify that when an SSHClient is collected, its transport (and the
transport's packetizer) is closed.
@@ -400,17 +405,17 @@ class SSHClientTest(ClientTest):
threading.Thread(target=self._run).start()
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- self.assertEqual(0, len(self.tc.get_host_keys()))
+ assert len(self.tc.get_host_keys()) == 0
self.tc.connect(**dict(self.connect_kwargs, password="pygmalion"))
self.event.wait(1.0)
- self.assertTrue(self.event.is_set())
- self.assertTrue(self.ts.is_active())
+ assert self.event.is_set()
+ assert self.ts.is_active()
p = weakref.ref(self.tc._transport.packetizer)
- self.assertTrue(p() is not None)
+ assert p() is not None
self.tc.close()
del self.tc
@@ -420,7 +425,7 @@ class SSHClientTest(ClientTest):
gc.collect()
gc.collect()
- self.assertTrue(p() is None)
+ assert p() is None
def test_client_can_be_used_as_context_manager(self):
"""
@@ -428,10 +433,10 @@ class SSHClientTest(ClientTest):
"""
threading.Thread(target=self._run).start()
- with paramiko.SSHClient() as tc:
+ with SSHClient() as tc:
self.tc = tc
self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- self.assertEquals(0, len(self.tc.get_host_keys()))
+ assert len(self.tc.get_host_keys()) == 0
self.tc.connect(**dict(self.connect_kwargs, password="pygmalion"))
self.event.wait(1.0)
@@ -442,7 +447,7 @@ class SSHClientTest(ClientTest):
self.assertTrue(self.tc._transport is None)
- def test_7_banner_timeout(self):
+ def test_banner_timeout(self):
"""
verify that the SSHClient has a configurable banner timeout.
"""
@@ -453,7 +458,7 @@ class SSHClientTest(ClientTest):
)
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.get_host_keys().add(
"[%s]:%d" % (self.addr, self.port), "ssh-rsa", public_host_key
)
@@ -461,7 +466,7 @@ class SSHClientTest(ClientTest):
kwargs = dict(self.connect_kwargs, banner_timeout=0.5)
self.assertRaises(paramiko.SSHException, self.tc.connect, **kwargs)
- def test_8_auth_trickledown(self):
+ def test_auth_trickledown(self):
"""
Failed key auth doesn't prevent subsequent pw auth from succeeding
"""
@@ -482,7 +487,7 @@ class SSHClientTest(ClientTest):
self._test_connection(**kwargs)
@slow
- def test_9_auth_timeout(self):
+ def test_auth_timeout(self):
"""
verify that the SSHClient has a configurable auth timeout
"""
@@ -495,28 +500,28 @@ class SSHClientTest(ClientTest):
)
@requires_gss_auth
- def test_10_auth_trickledown_gsskex(self):
+ def test_auth_trickledown_gsskex(self):
"""
- Failed gssapi-keyex auth doesn't prevent subsequent key auth from succeeding
+ Failed gssapi-keyex doesn't prevent subsequent key from succeeding
"""
kwargs = dict(gss_kex=True, key_filename=[_support("test_rsa.key")])
self._test_connection(**kwargs)
@requires_gss_auth
- def test_11_auth_trickledown_gssauth(self):
+ def test_auth_trickledown_gssauth(self):
"""
- Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding
+ Failed gssapi-with-mic doesn't prevent subsequent key from succeeding
"""
kwargs = dict(gss_auth=True, key_filename=[_support("test_rsa.key")])
self._test_connection(**kwargs)
- def test_12_reject_policy(self):
+ def test_reject_policy(self):
"""
verify that SSHClient's RejectPolicy works.
"""
threading.Thread(target=self._run).start()
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.set_missing_host_key_policy(paramiko.RejectPolicy())
self.assertEqual(0, len(self.tc.get_host_keys()))
self.assertRaises(
@@ -527,15 +532,16 @@ class SSHClientTest(ClientTest):
)
@requires_gss_auth
- def test_13_reject_policy_gsskex(self):
+ def test_reject_policy_gsskex(self):
"""
verify that SSHClient's RejectPolicy works,
even if gssapi-keyex was enabled but not used.
"""
- # Test for a bug present in paramiko versions released before 2017-08-01
+ # Test for a bug present in paramiko versions released before
+ # 2017-08-01
threading.Thread(target=self._run).start()
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.set_missing_host_key_policy(paramiko.RejectPolicy())
self.assertEqual(0, len(self.tc.get_host_keys()))
self.assertRaises(
@@ -550,7 +556,7 @@ class SSHClientTest(ClientTest):
threading.Thread(target=self._run).start()
hostname = "[%s]:%d" % (self.addr, self.port)
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.set_missing_host_key_policy(paramiko.WarningPolicy())
known_hosts = self.tc.get_host_keys()
known_hosts.add(hostname, host_key.get_name(), host_key)
@@ -566,7 +572,7 @@ class SSHClientTest(ClientTest):
threading.Thread(target=self._run).start()
hostname = "[%s]:%d" % (self.addr, self.port)
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.set_missing_host_key_policy(paramiko.RejectPolicy())
host_key = ktype.from_private_key_file(_support(kfile))
known_hosts = self.tc.get_host_keys()
@@ -595,7 +601,7 @@ class SSHClientTest(ClientTest):
def _setup_for_env(self):
threading.Thread(target=self._run).start()
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.assertEqual(0, len(self.tc.get_host_keys()))
self.tc.connect(
@@ -639,7 +645,7 @@ class SSHClientTest(ClientTest):
"""
# AN ACTUAL UNIT TEST?! GOOD LORD
# (But then we have to test a private API...meh.)
- client = paramiko.SSHClient()
+ client = SSHClient()
# Default
assert isinstance(client._policy, paramiko.RejectPolicy)
# Hand in an instance (classic behavior)
@@ -649,6 +655,22 @@ class SSHClientTest(ClientTest):
client.set_missing_host_key_policy(paramiko.AutoAddPolicy)
assert isinstance(client._policy, paramiko.AutoAddPolicy)
+ @patch("paramiko.client.Transport")
+ def test_disabled_algorithms_defaults_to_None(self, Transport):
+ SSHClient().connect("host", sock=Mock(), password="no")
+ assert Transport.call_args[1]["disabled_algorithms"] is None
+
+ @patch("paramiko.client.Transport")
+ def test_disabled_algorithms_passed_directly_if_given(self, Transport):
+ SSHClient().connect(
+ "host",
+ sock=Mock(),
+ password="no",
+ disabled_algorithms={"keys": ["ssh-dss"]},
+ )
+ call_arg = Transport.call_args[1]["disabled_algorithms"]
+ assert call_arg == {"keys": ["ssh-dss"]}
+
class PasswordPassphraseTests(ClientTest):
# TODO: most of these could reasonably be set up to use mocks/assertions
@@ -684,9 +706,9 @@ class PasswordPassphraseTests(ClientTest):
)
@raises(AuthenticationException) # TODO: more granular
- def test_password_kwarg_not_used_for_passphrase_when_passphrase_kwarg_given(
+ def test_password_kwarg_not_used_for_passphrase_when_passphrase_kwarg_given( # noqa
self
- ): # noqa
+ ):
# Sanity: if we're given both fields, the password field is NOT used as
# a passphrase.
self._test_connection(
diff --git a/tests/test_config.py b/tests/test_config.py
index cbd3f623..5e9aa059 100644
--- a/tests/test_config.py
+++ b/tests/test_config.py
@@ -1,74 +1,992 @@
# This file is part of Paramiko and subject to the license in /LICENSE in this
# repository
-import pytest
+from os.path import expanduser
+from socket import gaierror
-from paramiko import config
-from paramiko.util import parse_ssh_config
-from paramiko.py3compat import StringIO
+from paramiko.py3compat import string_types
+from invoke import Result
+from mock import patch
+from pytest import raises, mark, fixture
-def test_SSHConfigDict_construct_empty():
- assert not config.SSHConfigDict()
+from paramiko import (
+ SSHConfig,
+ SSHConfigDict,
+ CouldNotCanonicalize,
+ ConfigParseError,
+)
+from .util import _config
-def test_SSHConfigDict_construct_from_list():
- assert config.SSHConfigDict([(1, 2)])[1] == 2
+@fixture
+def socket():
+ """
+ Patch all of socket.* in our config module to prevent eg real DNS lookups.
+
+ Also forces getaddrinfo (used in our addressfamily lookup stuff) to always
+ fail by default to mimic usual lack of AddressFamily related crap.
-def test_SSHConfigDict_construct_from_dict():
- assert config.SSHConfigDict({1: 2})[1] == 2
+ Callers who want to mock DNS lookups can then safely assume gethostbyname()
+ will be in use.
+ """
+ with patch("paramiko.config.socket") as mocket:
+ # Reinstate gaierror as an actual exception and not a sub-mock.
+ # (Presumably this would work with any exception, but why not use the
+ # real one?)
+ mocket.gaierror = gaierror
+ # Patch out getaddrinfo, used to detect family-specific IP lookup -
+ # only useful for a few specific tests.
+ mocket.getaddrinfo.side_effect = mocket.gaierror
+ # Patch out getfqdn to return some real string for when it gets called;
+ # some code (eg tokenization) gets mad w/ MagicMocks
+ mocket.getfqdn.return_value = "some.fake.fqdn"
+ yield mocket
-@pytest.mark.parametrize("true_ish", ("yes", "YES", "Yes", True))
-def test_SSHConfigDict_as_bool_true_ish(true_ish):
- assert config.SSHConfigDict({"key": true_ish}).as_bool("key") is True
+def load_config(name):
+ return SSHConfig.from_path(_config(name))
-@pytest.mark.parametrize("false_ish", ("no", "NO", "No", False))
-def test_SSHConfigDict_as_bool(false_ish):
- assert config.SSHConfigDict({"key": false_ish}).as_bool("key") is False
+class TestSSHConfig(object):
+ def setup(self):
+ self.config = load_config("robey")
+ def test_init(self):
+ # No args!
+ with raises(TypeError):
+ SSHConfig("uh oh!")
+ # No args.
+ assert not SSHConfig()._config
-@pytest.mark.parametrize("int_val", ("42", 42))
-def test_SSHConfigDict_as_int(int_val):
- assert config.SSHConfigDict({"key": int_val}).as_int("key") == 42
+ def test_from_text(self):
+ config = SSHConfig.from_text("User foo")
+ assert config.lookup("foo.example.com")["user"] == "foo"
+ def test_from_file(self):
+ with open(_config("robey")) as flo:
+ config = SSHConfig.from_file(flo)
+ assert config.lookup("whatever")["user"] == "robey"
-@pytest.mark.parametrize("non_int", ("not an int", None, object()))
-def test_SSHConfigDict_as_int_failures(non_int):
- conf = config.SSHConfigDict({"key": non_int})
+ def test_from_path(self):
+ # NOTE: DO NOT replace with use of load_config() :D
+ config = SSHConfig.from_path(_config("robey"))
+ assert config.lookup("meh.example.com")["port"] == "3333"
- try:
- int(non_int)
- except Exception as e:
- exception_type = type(e)
+ def test_parse_config(self):
+ expected = [
+ {"host": ["*"], "config": {}},
+ {
+ "host": ["*"],
+ "config": {"identityfile": ["~/.ssh/id_rsa"], "user": "robey"},
+ },
+ {
+ "host": ["*.example.com"],
+ "config": {"user": "bjork", "port": "3333"},
+ },
+ {"host": ["*"], "config": {"crazy": "something dumb"}},
+ {
+ "host": ["spoo.example.com"],
+ "config": {"crazy": "something else"},
+ },
+ ]
+ assert self.config._config == expected
- with pytest.raises(exception_type):
- conf.as_int("key")
+ @mark.parametrize(
+ "host,values",
+ (
+ (
+ "irc.danger.com",
+ {
+ "crazy": "something dumb",
+ "hostname": "irc.danger.com",
+ "user": "robey",
+ },
+ ),
+ (
+ "irc.example.com",
+ {
+ "crazy": "something dumb",
+ "hostname": "irc.example.com",
+ "user": "robey",
+ "port": "3333",
+ },
+ ),
+ (
+ "spoo.example.com",
+ {
+ "crazy": "something dumb",
+ "hostname": "spoo.example.com",
+ "user": "robey",
+ "port": "3333",
+ },
+ ),
+ ),
+ )
+ def test_host_config(self, host, values):
+ expected = dict(
+ values, hostname=host, identityfile=[expanduser("~/.ssh/id_rsa")]
+ )
+ assert self.config.lookup(host) == expected
+ def test_fabric_issue_33(self):
+ config = SSHConfig.from_text(
+ """
+Host www13.*
+ Port 22
-def test_SSHConfig_host_dicts_are_SSHConfigDict_instances():
- test_config_file = """
Host *.example.com
Port 2222
Host *
Port 3333
- """
- f = StringIO(test_config_file)
- config = parse_ssh_config(f)
- assert config.lookup("foo.example.com").as_int("port") == 2222
+"""
+ )
+ host = "www13.example.com"
+ expected = {"hostname": host, "port": "22"}
+ assert config.lookup(host) == expected
+
+ def test_proxycommand_config_equals_parsing(self):
+ """
+ ProxyCommand should not split on equals signs within the value.
+ """
+ config = SSHConfig.from_text(
+ """
+Host space-delimited
+ ProxyCommand foo bar=biz baz
+
+Host equals-delimited
+ ProxyCommand=foo bar=biz baz
+"""
+ )
+ for host in ("space-delimited", "equals-delimited"):
+ value = config.lookup(host)["proxycommand"]
+ assert value == "foo bar=biz baz"
+
+ def test_proxycommand_interpolation(self):
+ """
+ ProxyCommand should perform interpolation on the value
+ """
+ config = SSHConfig.from_text(
+ """
+Host specific
+ Port 37
+ ProxyCommand host %h port %p lol
+
+Host portonly
+ Port 155
+
+Host *
+ Port 25
+ ProxyCommand host %h port %p
+"""
+ )
+ for host, val in (
+ ("foo.com", "host foo.com port 25"),
+ ("specific", "host specific port 37 lol"),
+ ("portonly", "host portonly port 155"),
+ ):
+ assert config.lookup(host)["proxycommand"] == val
+
+ def test_proxycommand_tilde_expansion(self):
+ """
+ Tilde (~) should be expanded inside ProxyCommand
+ """
+ config = SSHConfig.from_text(
+ """
+Host test
+ ProxyCommand ssh -F ~/.ssh/test_config bastion nc %h %p
+"""
+ )
+ expected = "ssh -F {}/.ssh/test_config bastion nc test 22".format(
+ expanduser("~")
+ )
+ got = config.lookup("test")["proxycommand"]
+ assert got == expected
+
+ @patch("paramiko.config.getpass")
+ def test_controlpath_token_expansion(self, getpass):
+ getpass.getuser.return_value = "gandalf"
+ config = SSHConfig.from_text(
+ """
+Host explicit_user
+ User root
+ ControlPath user %u remoteuser %r
+
+Host explicit_host
+ HostName ohai
+ ControlPath remoteuser %r host %h orighost %n
+ """
+ )
+ result = config.lookup("explicit_user")["controlpath"]
+ # Remote user is User val, local user is User val
+ assert result == "user gandalf remoteuser root"
+ result = config.lookup("explicit_host")["controlpath"]
+ # Remote user falls back to local user; host and orighost may differ
+ assert result == "remoteuser gandalf host ohai orighost explicit_host"
+
+ def test_negation(self):
+ config = SSHConfig.from_text(
+ """
+Host www13.* !*.example.com
+ Port 22
+
+Host *.example.com !www13.*
+ Port 2222
+
+Host www13.*
+ Port 8080
+
+Host *
+ Port 3333
+"""
+ )
+ host = "www13.example.com"
+ expected = {"hostname": host, "port": "8080"}
+ assert config.lookup(host) == expected
+
+ def test_proxycommand(self):
+ config = SSHConfig.from_text(
+ """
+Host proxy-with-equal-divisor-and-space
+ProxyCommand = foo=bar
+
+Host proxy-with-equal-divisor-and-no-space
+ProxyCommand=foo=bar
+
+Host proxy-without-equal-divisor
+ProxyCommand foo=bar:%h-%p
+"""
+ )
+ for host, values in {
+ "proxy-with-equal-divisor-and-space": {
+ "hostname": "proxy-with-equal-divisor-and-space",
+ "proxycommand": "foo=bar",
+ },
+ "proxy-with-equal-divisor-and-no-space": {
+ "hostname": "proxy-with-equal-divisor-and-no-space",
+ "proxycommand": "foo=bar",
+ },
+ "proxy-without-equal-divisor": {
+ "hostname": "proxy-without-equal-divisor",
+ "proxycommand": "foo=bar:proxy-without-equal-divisor-22",
+ },
+ }.items():
+
+ assert config.lookup(host) == values
+
+ def test_identityfile(self):
+ config = SSHConfig.from_text(
+ """
+
+IdentityFile id_dsa0
+
+Host *
+IdentityFile id_dsa1
+
+Host dsa2
+IdentityFile id_dsa2
+
+Host dsa2*
+IdentityFile id_dsa22
+"""
+ )
+ for host, values in {
+ "foo": {"hostname": "foo", "identityfile": ["id_dsa0", "id_dsa1"]},
+ "dsa2": {
+ "hostname": "dsa2",
+ "identityfile": ["id_dsa0", "id_dsa1", "id_dsa2", "id_dsa22"],
+ },
+ "dsa22": {
+ "hostname": "dsa22",
+ "identityfile": ["id_dsa0", "id_dsa1", "id_dsa22"],
+ },
+ }.items():
+
+ assert config.lookup(host) == values
+
+ def test_config_addressfamily_and_lazy_fqdn(self):
+ """
+ Ensure the code path honoring non-'all' AddressFamily doesn't asplode
+ """
+ config = SSHConfig.from_text(
+ """
+AddressFamily inet
+IdentityFile something_%l_using_fqdn
+"""
+ )
+ assert config.lookup(
+ "meh"
+ ) # will die during lookup() if bug regresses
+
+ def test_config_dos_crlf_succeeds(self):
+ config = SSHConfig.from_text(
+ """
+Host abcqwerty\r\nHostName 127.0.0.1\r\n
+"""
+ )
+ assert config.lookup("abcqwerty")["hostname"] == "127.0.0.1"
+
+ def test_get_hostnames(self):
+ expected = {"*", "*.example.com", "spoo.example.com"}
+ assert self.config.get_hostnames() == expected
+
+ def test_quoted_host_names(self):
+ config = SSHConfig.from_text(
+ """
+Host "param pam" param "pam"
+ Port 1111
+
+Host "param2"
+ Port 2222
+
+Host param3 parara
+ Port 3333
+Host param4 "p a r" "p" "par" para
+ Port 4444
+"""
+ )
+ res = {
+ "param pam": {"hostname": "param pam", "port": "1111"},
+ "param": {"hostname": "param", "port": "1111"},
+ "pam": {"hostname": "pam", "port": "1111"},
+ "param2": {"hostname": "param2", "port": "2222"},
+ "param3": {"hostname": "param3", "port": "3333"},
+ "parara": {"hostname": "parara", "port": "3333"},
+ "param4": {"hostname": "param4", "port": "4444"},
+ "p a r": {"hostname": "p a r", "port": "4444"},
+ "p": {"hostname": "p", "port": "4444"},
+ "par": {"hostname": "par", "port": "4444"},
+ "para": {"hostname": "para", "port": "4444"},
+ }
+ for host, values in res.items():
+ assert config.lookup(host) == values
-def test_SSHConfig_wildcard_host_dicts_are_SSHConfigDict_instances():
- test_config_file = """\
+ def test_quoted_params_in_config(self):
+ config = SSHConfig.from_text(
+ """
+Host "param pam" param "pam"
+ IdentityFile id_rsa
+
+Host "param2"
+ IdentityFile "test rsa key"
+
+Host param3 parara
+ IdentityFile id_rsa
+ IdentityFile "test rsa key"
+"""
+ )
+ res = {
+ "param pam": {"hostname": "param pam", "identityfile": ["id_rsa"]},
+ "param": {"hostname": "param", "identityfile": ["id_rsa"]},
+ "pam": {"hostname": "pam", "identityfile": ["id_rsa"]},
+ "param2": {"hostname": "param2", "identityfile": ["test rsa key"]},
+ "param3": {
+ "hostname": "param3",
+ "identityfile": ["id_rsa", "test rsa key"],
+ },
+ "parara": {
+ "hostname": "parara",
+ "identityfile": ["id_rsa", "test rsa key"],
+ },
+ }
+ for host, values in res.items():
+ assert config.lookup(host) == values
+
+ def test_quoted_host_in_config(self):
+ conf = SSHConfig()
+ correct_data = {
+ "param": ["param"],
+ '"param"': ["param"],
+ "param pam": ["param", "pam"],
+ '"param" "pam"': ["param", "pam"],
+ '"param" pam': ["param", "pam"],
+ 'param "pam"': ["param", "pam"],
+ 'param "pam" p': ["param", "pam", "p"],
+ '"param" pam "p"': ["param", "pam", "p"],
+ '"pa ram"': ["pa ram"],
+ '"pa ram" pam': ["pa ram", "pam"],
+ 'param "p a m"': ["param", "p a m"],
+ }
+ incorrect_data = ['param"', '"param', 'param "pam', 'param "pam" "p a']
+ for host, values in correct_data.items():
+ assert conf._get_hosts(host) == values
+ for host in incorrect_data:
+ with raises(ConfigParseError):
+ conf._get_hosts(host)
+
+ def test_invalid_line_format_excepts(self):
+ with raises(ConfigParseError):
+ load_config("invalid")
+
+ def test_proxycommand_none_issue_418(self):
+ config = SSHConfig.from_text(
+ """
+Host proxycommand-standard-none
+ ProxyCommand None
+
+Host proxycommand-with-equals-none
+ ProxyCommand=None
+"""
+ )
+ for host, values in {
+ "proxycommand-standard-none": {
+ "hostname": "proxycommand-standard-none"
+ },
+ "proxycommand-with-equals-none": {
+ "hostname": "proxycommand-with-equals-none"
+ },
+ }.items():
+
+ assert config.lookup(host) == values
+
+ def test_proxycommand_none_masking(self):
+ # Re: https://github.com/paramiko/paramiko/issues/670
+ config = SSHConfig.from_text(
+ """
+Host specific-host
+ ProxyCommand none
+
+Host other-host
+ ProxyCommand other-proxy
+
+Host *
+ ProxyCommand default-proxy
+"""
+ )
+ # When bug is present, the full stripping-out of specific-host's
+ # ProxyCommand means it actually appears to pick up the default
+ # ProxyCommand value instead, due to cascading. It should (for
+ # backwards compatibility reasons in 1.x/2.x) appear completely blank,
+ # as if the host had no ProxyCommand whatsoever.
+ # Threw another unrelated host in there just for sanity reasons.
+ assert "proxycommand" not in config.lookup("specific-host")
+ assert config.lookup("other-host")["proxycommand"] == "other-proxy"
+ cmd = config.lookup("some-random-host")["proxycommand"]
+ assert cmd == "default-proxy"
+
+ def test_hostname_tokenization(self):
+ result = load_config("hostname-tokenized").lookup("whatever")
+ assert result["hostname"] == "prefix.whatever"
+
+
+class TestSSHConfigDict(object):
+ def test_SSHConfigDict_construct_empty(self):
+ assert not SSHConfigDict()
+
+ def test_SSHConfigDict_construct_from_list(self):
+ assert SSHConfigDict([(1, 2)])[1] == 2
+
+ def test_SSHConfigDict_construct_from_dict(self):
+ assert SSHConfigDict({1: 2})[1] == 2
+
+ @mark.parametrize("true_ish", ("yes", "YES", "Yes", True))
+ def test_SSHConfigDict_as_bool_true_ish(self, true_ish):
+ assert SSHConfigDict({"key": true_ish}).as_bool("key") is True
+
+ @mark.parametrize("false_ish", ("no", "NO", "No", False))
+ def test_SSHConfigDict_as_bool(self, false_ish):
+ assert SSHConfigDict({"key": false_ish}).as_bool("key") is False
+
+ @mark.parametrize("int_val", ("42", 42))
+ def test_SSHConfigDict_as_int(self, int_val):
+ assert SSHConfigDict({"key": int_val}).as_int("key") == 42
+
+ @mark.parametrize("non_int", ("not an int", None, object()))
+ def test_SSHConfigDict_as_int_failures(self, non_int):
+ conf = SSHConfigDict({"key": non_int})
+
+ try:
+ int(non_int)
+ except Exception as e:
+ exception_type = type(e)
+
+ with raises(exception_type):
+ conf.as_int("key")
+
+ def test_SSHConfig_host_dicts_are_SSHConfigDict_instances(self):
+ config = SSHConfig.from_text(
+ """
+Host *.example.com
+ Port 2222
+
+Host *
+ Port 3333
+"""
+ )
+ assert config.lookup("foo.example.com").as_int("port") == 2222
+
+ def test_SSHConfig_wildcard_host_dicts_are_SSHConfigDict_instances(self):
+ config = SSHConfig.from_text(
+ """
Host *.example.com
Port 2222
Host *
Port 3333
+"""
+ )
+ assert config.lookup("anything-else").as_int("port") == 3333
+
+
+class TestHostnameCanonicalization(object):
+ # NOTE: this class uses on-disk configs, and ones with real (at time of
+ # writing) DNS names, so that one can easily test OpenSSH's behavior using
+ # "ssh -F path/to/file.config -G <target>".
+
+ def test_off_by_default(self, socket):
+ result = load_config("basic").lookup("www")
+ assert result["hostname"] == "www"
+ assert "user" not in result
+ assert not socket.gethostbyname.called
+
+ def test_explicit_no_same_as_default(self, socket):
+ result = load_config("no-canon").lookup("www")
+ assert result["hostname"] == "www"
+ assert "user" not in result
+ assert not socket.gethostbyname.called
+
+ @mark.parametrize(
+ "config_name",
+ ("canon", "canon-always", "canon-local", "canon-local-always"),
+ )
+ def test_canonicalization_base_cases(self, socket, config_name):
+ result = load_config(config_name).lookup("www")
+ assert result["hostname"] == "www.paramiko.org"
+ assert result["user"] == "rando"
+ socket.gethostbyname.assert_called_once_with("www.paramiko.org")
+
+ def test_uses_getaddrinfo_when_AddressFamily_given(self, socket):
+ # Undo default 'always fails' mock
+ socket.getaddrinfo.side_effect = None
+ socket.getaddrinfo.return_value = [True] # just need 1st value truthy
+ result = load_config("canon-ipv4").lookup("www")
+ assert result["hostname"] == "www.paramiko.org"
+ assert result["user"] == "rando"
+ assert not socket.gethostbyname.called
+ gai_args = socket.getaddrinfo.call_args[0]
+ assert gai_args[0] == "www.paramiko.org"
+ assert gai_args[2] is socket.AF_INET # Mocked, but, still useful
+
+ @mark.skip
+ def test_empty_CanonicalDomains_canonicalizes_despite_noop(self, socket):
+ # Confirmed this is how OpenSSH behaves as well. Bit silly, but.
+ # TODO: this requires modifying SETTINGS_REGEX, which is a mite scary
+ # (honestly I'd prefer to move to a real parser lib anyhow) and since
+ # this is a very dumb corner case, it's marked skip for now.
+ result = load_config("empty-canon").lookup("www")
+ assert result["hostname"] == "www" # no paramiko.org
+ assert "user" not in result # did not discover canonicalized block
+
+ def test_CanonicalDomains_may_be_set_to_space_separated_list(self, socket):
+ # Test config has a bogus domain, followed by paramiko.org
+ socket.gethostbyname.side_effect = [socket.gaierror, True]
+ result = load_config("multi-canon-domains").lookup("www")
+ assert result["hostname"] == "www.paramiko.org"
+ assert result["user"] == "rando"
+ assert [x[0][0] for x in socket.gethostbyname.call_args_list] == [
+ "www.not-a-real-tld",
+ "www.paramiko.org",
+ ]
+
+ def test_canonicalization_applies_to_single_dot_by_default(self, socket):
+ result = load_config("deep-canon").lookup("sub.www")
+ assert result["hostname"] == "sub.www.paramiko.org"
+ assert result["user"] == "deep"
+
+ def test_canonicalization_not_applied_to_two_dots_by_default(self, socket):
+ result = load_config("deep-canon").lookup("subber.sub.www")
+ assert result["hostname"] == "subber.sub.www"
+ assert "user" not in result
+
+ def test_hostname_depth_controllable_with_max_dots_directive(self, socket):
+ # This config sets MaxDots of 2, so now canonicalization occurs
+ result = load_config("deep-canon-maxdots").lookup("subber.sub.www")
+ assert result["hostname"] == "subber.sub.www.paramiko.org"
+ assert result["user"] == "deeper"
+
+ def test_max_dots_may_be_zero(self, socket):
+ result = load_config("zero-maxdots").lookup("sub.www")
+ assert result["hostname"] == "sub.www"
+ assert "user" not in result
+
+ def test_fallback_yes_does_not_canonicalize_or_error(self, socket):
+ socket.gethostbyname.side_effect = socket.gaierror
+ result = load_config("fallback-yes").lookup("www")
+ assert result["hostname"] == "www"
+ assert "user" not in result
+
+ def test_fallback_no_causes_errors_for_unresolvable_names(self, socket):
+ socket.gethostbyname.side_effect = socket.gaierror
+ with raises(CouldNotCanonicalize) as info:
+ load_config("fallback-no").lookup("doesnotexist")
+ assert str(info.value) == "doesnotexist"
+
+ def test_identityfile_continues_being_appended_to(self, socket):
+ result = load_config("canon").lookup("www")
+ assert result["identityfile"] == ["base.key", "canonicalized.key"]
+
+
+@mark.skip
+class TestCanonicalizationOfCNAMEs(object):
+ def test_permitted_cnames_may_be_one_to_one_mapping(self):
+ # CanonicalizePermittedCNAMEs *.foo.com:*.bar.com
+ pass
+
+ def test_permitted_cnames_may_be_one_to_many_mapping(self):
+ # CanonicalizePermittedCNAMEs *.foo.com:*.bar.com,*.biz.com
+ pass
+
+ def test_permitted_cnames_may_be_many_to_one_mapping(self):
+ # CanonicalizePermittedCNAMEs *.foo.com,*.bar.com:*.biz.com
+ pass
+
+ def test_permitted_cnames_may_be_many_to_many_mapping(self):
+ # CanonicalizePermittedCNAMEs *.foo.com,*.bar.com:*.biz.com,*.baz.com
+ pass
+
+ def test_permitted_cnames_may_be_multiple_mappings(self):
+ # CanonicalizePermittedCNAMEs *.foo.com,*.bar.com *.biz.com:*.baz.com
+ pass
+
+ def test_permitted_cnames_may_be_multiple_complex_mappings(self):
+ # Same as prev but with multiple patterns on both ends in both args
+ pass
+
+
+class TestMatchAll(object):
+ def test_always_matches(self):
+ result = load_config("match-all").lookup("general")
+ assert result["user"] == "awesome"
+
+ def test_may_not_mix_with_non_canonical_keywords(self):
+ for config in ("match-all-and-more", "match-all-and-more-before"):
+ with raises(ConfigParseError):
+ load_config(config).lookup("whatever")
+
+ def test_may_come_after_canonical(self, socket):
+ result = load_config("match-all-after-canonical").lookup("www")
+ assert result["user"] == "awesome"
+
+ def test_may_not_come_before_canonical(self, socket):
+ with raises(ConfigParseError):
+ load_config("match-all-before-canonical")
+
+ def test_after_canonical_not_loaded_when_non_canonicalized(self, socket):
+ result = load_config("match-canonical-no").lookup("a-host")
+ assert "user" not in result
+
+
+def _expect(success_on):
+ """
+ Returns a side_effect-friendly Invoke success result for given command(s).
+
+ Ensures that any other commands fail; this is useful for testing 'Match
+ exec' because it means all other such clauses under test act like no-ops.
+
+ :param success_on:
+ Single string or list of strings, noting commands that should appear to
+ succeed.
"""
- f = StringIO(test_config_file)
- config = parse_ssh_config(f)
- assert config.lookup("anything-else").as_int("port") == 3333
+ if isinstance(success_on, string_types):
+ success_on = [success_on]
+
+ def inner(command, *args, **kwargs):
+ # Sanity checking - we always expect that invoke.run is called with
+ # these.
+ assert kwargs.get("hide", None) == "stdout"
+ assert kwargs.get("warn", None) is True
+ # Fake exit
+ exit = 0 if command in success_on else 1
+ return Result(exited=exit)
+
+ return inner
+
+
+class TestMatchExec(object):
+ @patch("paramiko.config.invoke", new=None)
+ @patch("paramiko.config.invoke_import_error", new=ImportError("meh"))
+ def test_raises_invoke_ImportErrors_at_runtime(self):
+ # Not an ideal test, but I don't know of a non-bad way to fake out
+ # module-time ImportErrors. So we mock the symptoms. Meh!
+ with raises(ImportError) as info:
+ load_config("match-exec").lookup("oh-noes")
+ assert str(info.value) == "meh"
+
+ @patch("paramiko.config.invoke.run")
+ @mark.parametrize(
+ "cmd,user",
+ [
+ ("unquoted", "rando"),
+ ("quoted", "benjamin"),
+ ("quoted spaced", "neil"),
+ ],
+ )
+ def test_accepts_single_possibly_quoted_argument(self, run, cmd, user):
+ run.side_effect = _expect(cmd)
+ result = load_config("match-exec").lookup("whatever")
+ assert result["user"] == user
+
+ @patch("paramiko.config.invoke.run")
+ def test_does_not_match_nonzero_exit_codes(self, run):
+ # Nothing will succeed -> no User ever gets loaded
+ run.return_value = Result(exited=1)
+ result = load_config("match-exec").lookup("whatever")
+ assert "user" not in result
+
+ @patch("paramiko.config.getpass")
+ @patch("paramiko.config.invoke.run")
+ def test_tokenizes_argument(self, run, getpass, socket):
+ socket.gethostname.return_value = "local.fqdn"
+ getpass.getuser.return_value = "gandalf"
+ # Actual exec value is "%d %h %L %l %n %p %r %u"
+ parts = (
+ expanduser("~"),
+ "configured",
+ "local",
+ "some.fake.fqdn",
+ "target",
+ "22",
+ "intermediate",
+ "gandalf",
+ )
+ run.side_effect = _expect(" ".join(parts))
+ result = load_config("match-exec").lookup("target")
+ assert result["port"] == "1337"
+
+ @patch("paramiko.config.invoke.run")
+ def test_works_with_canonical(self, run, socket):
+ # Ensure both stanzas' exec components appear to match
+ run.side_effect = _expect(["uncanonicalized", "canonicalized"])
+ result = load_config("match-exec-canonical").lookup("who-cares")
+ # Prove both config values got loaded up, across the two passes
+ assert result["user"] == "defenseless"
+ assert result["port"] == "8007"
+
+ @patch("paramiko.config.invoke.run")
+ def test_may_be_negated(self, run):
+ run.side_effect = _expect("this succeeds")
+ result = load_config("match-exec-negation").lookup("so-confusing")
+ # If negation did not work, the first of the two Match exec directives
+ # would have set User to 'nope' (and/or the second would have NOT set
+ # User to 'yup')
+ assert result["user"] == "yup"
+
+ def test_requires_an_argument(self):
+ with raises(ConfigParseError):
+ load_config("match-exec-no-arg")
+
+ @patch("paramiko.config.invoke.run")
+ def test_works_with_tokenized_hostname(self, run):
+ run.side_effect = _expect("ping target")
+ result = load_config("hostname-exec-tokenized").lookup("target")
+ assert result["hostname"] == "pingable.target"
+
+
+class TestMatchHost(object):
+ def test_matches_target_name_when_no_hostname(self):
+ result = load_config("match-host").lookup("target")
+ assert result["user"] == "rand"
+
+ def test_matches_hostname_from_global_setting(self):
+ # Also works for ones set in regular Host stanzas
+ result = load_config("match-host-name").lookup("anything")
+ assert result["user"] == "silly"
+
+ def test_matches_hostname_from_earlier_match(self):
+ # Corner case: one Match matches original host, sets HostName,
+ # subsequent Match matches the latter.
+ result = load_config("match-host-from-match").lookup("original-host")
+ assert result["user"] == "inner"
+
+ def test_may_be_globbed(self):
+ result = load_config("match-host-glob-list").lookup("whatever")
+ assert result["user"] == "matrim"
+
+ def test_may_be_comma_separated_list(self):
+ for target in ("somehost", "someotherhost"):
+ result = load_config("match-host-glob-list").lookup(target)
+ assert result["user"] == "thom"
+
+ def test_comma_separated_list_may_have_internal_negation(self):
+ conf = load_config("match-host-glob-list")
+ assert conf.lookup("good")["user"] == "perrin"
+ assert "user" not in conf.lookup("goof")
+
+ def test_matches_canonicalized_name(self, socket):
+ # Without 'canonical' explicitly declared, mind.
+ result = load_config("match-host-canonicalized").lookup("www")
+ assert result["user"] == "rand"
+
+ def test_works_with_canonical_keyword(self, socket):
+ # NOTE: distinct from 'happens to be canonicalized' above
+ result = load_config("match-host-canonicalized").lookup("docs")
+ assert result["user"] == "eric"
+
+ def test_may_be_negated(self):
+ conf = load_config("match-host-negated")
+ assert conf.lookup("docs")["user"] == "jeff"
+ assert "user" not in conf.lookup("www")
+
+ def test_requires_an_argument(self):
+ with raises(ConfigParseError):
+ load_config("match-host-no-arg")
+
+
+class TestMatchOriginalHost(object):
+ def test_matches_target_host_not_hostname(self):
+ result = load_config("match-orighost").lookup("target")
+ assert result["hostname"] == "bogus"
+ assert result["user"] == "tuon"
+
+ def test_matches_target_host_not_canonicalized_name(self, socket):
+ result = load_config("match-orighost-canonical").lookup("www")
+ assert result["hostname"] == "www.paramiko.org"
+ assert result["user"] == "tuon"
+
+ def test_may_be_globbed(self):
+ result = load_config("match-orighost").lookup("whatever")
+ assert result["user"] == "matrim"
+
+ def test_may_be_comma_separated_list(self):
+ for target in ("comma", "separated"):
+ result = load_config("match-orighost").lookup(target)
+ assert result["user"] == "chameleon"
+
+ def test_comma_separated_list_may_have_internal_negation(self):
+ result = load_config("match-orighost").lookup("nope")
+ assert "user" not in result
+
+ def test_may_be_negated(self):
+ result = load_config("match-orighost").lookup("docs")
+ assert result["user"] == "thom"
+
+ def test_requires_an_argument(self):
+ with raises(ConfigParseError):
+ load_config("match-orighost-no-arg")
+
+
+class TestMatchUser(object):
+ def test_matches_configured_username(self):
+ result = load_config("match-user-explicit").lookup("anything")
+ assert result["hostname"] == "dumb"
+
+ @patch("paramiko.config.getpass.getuser")
+ def test_matches_local_username_by_default(self, getuser):
+ getuser.return_value = "gandalf"
+ result = load_config("match-user").lookup("anything")
+ assert result["hostname"] == "gondor"
+
+ @patch("paramiko.config.getpass.getuser")
+ def test_may_be_globbed(self, getuser):
+ for user in ("bilbo", "bombadil"):
+ getuser.return_value = user
+ result = load_config("match-user").lookup("anything")
+ assert result["hostname"] == "shire"
+
+ @patch("paramiko.config.getpass.getuser")
+ def test_may_be_comma_separated_list(self, getuser):
+ for user in ("aragorn", "frodo"):
+ getuser.return_value = user
+ result = load_config("match-user").lookup("anything")
+ assert result["hostname"] == "moria"
+
+ @patch("paramiko.config.getpass.getuser")
+ def test_comma_separated_list_may_have_internal_negation(self, getuser):
+ getuser.return_value = "legolas"
+ result = load_config("match-user").lookup("anything")
+ assert "port" not in result
+ getuser.return_value = "gimli"
+ result = load_config("match-user").lookup("anything")
+ assert result["port"] == "7373"
+
+ @patch("paramiko.config.getpass.getuser")
+ def test_may_be_negated(self, getuser):
+ getuser.return_value = "saruman"
+ result = load_config("match-user").lookup("anything")
+ assert result["hostname"] == "mordor"
+
+ def test_requires_an_argument(self):
+ with raises(ConfigParseError):
+ load_config("match-user-no-arg")
+
+
+# NOTE: highly derivative of previous suite due to the former's use of
+# localuser fallback. Doesn't seem worth conflating/refactoring right now.
+class TestMatchLocalUser(object):
+ @patch("paramiko.config.getpass.getuser")
+ def test_matches_local_username(self, getuser):
+ getuser.return_value = "gandalf"
+ result = load_config("match-localuser").lookup("anything")
+ assert result["hostname"] == "gondor"
+
+ @patch("paramiko.config.getpass.getuser")
+ def test_may_be_globbed(self, getuser):
+ for user in ("bilbo", "bombadil"):
+ getuser.return_value = user
+ result = load_config("match-localuser").lookup("anything")
+ assert result["hostname"] == "shire"
+
+ @patch("paramiko.config.getpass.getuser")
+ def test_may_be_comma_separated_list(self, getuser):
+ for user in ("aragorn", "frodo"):
+ getuser.return_value = user
+ result = load_config("match-localuser").lookup("anything")
+ assert result["hostname"] == "moria"
+
+ @patch("paramiko.config.getpass.getuser")
+ def test_comma_separated_list_may_have_internal_negation(self, getuser):
+ getuser.return_value = "legolas"
+ result = load_config("match-localuser").lookup("anything")
+ assert "port" not in result
+ getuser.return_value = "gimli"
+ result = load_config("match-localuser").lookup("anything")
+ assert result["port"] == "7373"
+
+ @patch("paramiko.config.getpass.getuser")
+ def test_may_be_negated(self, getuser):
+ getuser.return_value = "saruman"
+ result = load_config("match-localuser").lookup("anything")
+ assert result["hostname"] == "mordor"
+
+ def test_requires_an_argument(self):
+ with raises(ConfigParseError):
+ load_config("match-localuser-no-arg")
+
+
+class TestComplexMatching(object):
+ # NOTE: this is still a cherry-pick of a few levels of complexity, there's
+ # no point testing literally all possible combinations.
+
+ def test_originalhost_host(self):
+ result = load_config("match-complex").lookup("target")
+ assert result["hostname"] == "bogus"
+ assert result["user"] == "rand"
+
+ @patch("paramiko.config.getpass.getuser")
+ def test_originalhost_localuser(self, getuser):
+ getuser.return_value = "rando"
+ result = load_config("match-complex").lookup("remote")
+ assert result["user"] == "calrissian"
+
+ @patch("paramiko.config.getpass.getuser")
+ def test_everything_but_all(self, getuser):
+ getuser.return_value = "rando"
+ result = load_config("match-complex").lookup("www")
+ assert result["port"] == "7777"
+
+ @patch("paramiko.config.getpass.getuser")
+ def test_everything_but_all_with_some_negated(self, getuser):
+ getuser.return_value = "rando"
+ result = load_config("match-complex").lookup("docs")
+ assert result["port"] == "1234"
+
+ def test_negated_canonical(self, socket):
+ # !canonical in a config that is not canonicalized - does match
+ result = load_config("match-canonical-no").lookup("specific")
+ assert result["user"] == "overload"
+ # !canonical in a config that is canonicalized - does NOT match
+ result = load_config("match-canonical-yes").lookup("www")
+ assert result["user"] == "hidden"
diff --git a/tests/test_ed25519-funky-padding.key b/tests/test_ed25519-funky-padding.key
new file mode 100644
index 00000000..f178ca45
--- /dev/null
+++ b/tests/test_ed25519-funky-padding.key
@@ -0,0 +1,7 @@
+-----BEGIN OPENSSH PRIVATE KEY-----
+b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW
+QyNTUxOQAAACAHzPvYoDSkMVX52/CbA2M2aSBS7R0wt/9b2n5n+osNygAAAJAHZ1meB2dZ
+ngAAAAtzc2gtZWQyNTUxOQAAACAHzPvYoDSkMVX52/CbA2M2aSBS7R0wt/9b2n5n+osNyg
+AAAEAIyamvYUpzCovQuUtLhz+fwE4qYQo+rTuUVIX4fmTzMAfM+9igNKQxVfnb8JsDYzZp
+IFLtHTC3/1vafmf6iw3KAAAADW15IGNvbW1lbnQgaXM=
+-----END OPENSSH PRIVATE KEY-----
diff --git a/tests/test_ed25519-funky-padding_password.key b/tests/test_ed25519-funky-padding_password.key
new file mode 100644
index 00000000..1b135d69
--- /dev/null
+++ b/tests/test_ed25519-funky-padding_password.key
@@ -0,0 +1,8 @@
+-----BEGIN OPENSSH PRIVATE KEY-----
+b3BlbnNzaC1rZXktdjEAAAAACmFlczI1Ni1jdHIAAAAGYmNyeXB0AAAAGAAAABDo3dGRlE
+xKndv32nDnz2mHAAAAEAAAAAEAAAAzAAAAC3NzaC1lZDI1NTE5AAAAIDcAVH8yDxoiqj0O
+rX3YTRMsnvJr+XdKJW16YQpxx8UvAAAAoI78IY+u8lYOzxAEO2N8qEVQH8b/m27yQhcSbK
+q1RvvuHmql3NoQvjYQe9/om4oqE+uesNRnoQGNplBHCeroD3ZcksXhLGDhwTh577NR+NQ+
+GNYAK5Ex7Va3Xgao5HUYtBQXlXbtzY1Q+71hcOlRVNnLUDvwShdCa9o6ETIOGcZl04fbzv
+Z3vC1C68G3+JMNFenAGYU+iQq0XENtpT6xAIU=
+-----END OPENSSH PRIVATE KEY-----
diff --git a/tests/test_file.py b/tests/test_file.py
index fba14b1b..2a3da74b 100644
--- a/tests/test_file.py
+++ b/tests/test_file.py
@@ -52,7 +52,7 @@ class LoopbackFile(BufferedFile):
class BufferedFileTest(unittest.TestCase):
- def test_1_simple(self):
+ def test_simple(self):
f = LoopbackFile("r")
try:
f.write(b"hi")
@@ -69,7 +69,7 @@ class BufferedFileTest(unittest.TestCase):
pass
f.close()
- def test_2_readline(self):
+ def test_readline(self):
f = LoopbackFile("r+U")
f.write(
b"First line.\nSecond line.\r\nThird line.\n"
@@ -96,7 +96,7 @@ class BufferedFileTest(unittest.TestCase):
self.assertTrue(crlf in f.newlines)
self.assertTrue(cr_byte not in f.newlines)
- def test_3_lf(self):
+ def test_lf(self):
"""
try to trick the linefeed detector.
"""
@@ -108,7 +108,7 @@ class BufferedFileTest(unittest.TestCase):
f.close()
self.assertEqual(f.newlines, crlf)
- def test_4_write(self):
+ def test_write(self):
"""
verify that write buffering is on.
"""
@@ -120,7 +120,7 @@ class BufferedFileTest(unittest.TestCase):
self.assertEqual(f.readline(), "Incomplete line...\n")
f.close()
- def test_5_flush(self):
+ def test_flush(self):
"""
verify that flush will force a write.
"""
@@ -134,7 +134,7 @@ class BufferedFileTest(unittest.TestCase):
self.assertEqual(f.read(3), b"")
f.close()
- def test_6_buffering(self):
+ def test_buffering_flushes(self):
"""
verify that flushing happens automatically on buffer crossing.
"""
@@ -147,7 +147,7 @@ class BufferedFileTest(unittest.TestCase):
self.assertEqual(f.read(20), b"Too small. Enough.")
f.close()
- def test_7_read_all(self):
+ def test_read_all(self):
"""
verify that read(-1) returns everything left in the file.
"""
@@ -162,30 +162,30 @@ class BufferedFileTest(unittest.TestCase):
)
f.close()
- def test_8_buffering(self):
+ def test_buffering_writes(self):
"""
verify that buffered objects can be written
"""
if sys.version_info[0] == 2:
f = LoopbackFile("r+", 16)
- f.write(buffer(b"Too small."))
+ f.write(buffer(b"Too small.")) # noqa
f.close()
- def test_9_readable(self):
+ def test_readable(self):
f = LoopbackFile("r")
self.assertTrue(f.readable())
self.assertFalse(f.writable())
self.assertFalse(f.seekable())
f.close()
- def test_A_writable(self):
+ def test_writable(self):
f = LoopbackFile("w")
self.assertTrue(f.writable())
self.assertFalse(f.readable())
self.assertFalse(f.seekable())
f.close()
- def test_B_readinto(self):
+ def test_readinto(self):
data = bytearray(5)
f = LoopbackFile("r+")
f._write(b"hello")
@@ -215,7 +215,7 @@ class BufferedFileTest(unittest.TestCase):
offsets = range(0, len(data), 8)
with LoopbackFile("rb+") as f:
for offset in offsets:
- f.write(buffer(data, offset, 8))
+ f.write(buffer(data, offset, 8)) # noqa
self.assertEqual(f.read(), data)
@needs_builtin("memoryview")
diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py
index 3e8c39e8..30ffb56d 100644
--- a/tests/test_gssapi.py
+++ b/tests/test_gssapi.py
@@ -22,22 +22,23 @@
Test the used APIs for GSS-API / SSPI authentication
"""
-import unittest
import socket
-from .util import needs_gssapi
+from .util import needs_gssapi, KerberosTestCase, update_env
@needs_gssapi
-class GSSAPITest(unittest.TestCase):
- def setup():
+class GSSAPITest(KerberosTestCase):
+ def setUp(self):
+ super(GSSAPITest, self).setUp()
# TODO: these vars should all come from os.environ or whatever the
# approved pytest method is for runtime-configuring test data.
self.krb5_mech = "1.2.840.113554.1.2.2"
- self.targ_name = "hostname"
+ self.targ_name = self.realm.hostname
self.server_mode = False
+ update_env(self, self.realm.env)
- def test_1_pyasn1(self):
+ def test_pyasn1(self):
"""
Test the used methods of pyasn1.
"""
@@ -48,13 +49,20 @@ class GSSAPITest(unittest.TestCase):
mech, __ = decoder.decode(oid)
self.assertEquals(self.krb5_mech, mech.__str__())
- def test_2_gssapi_sspi(self):
+ def _gssapi_sspi_test(self):
"""
Test the used methods of python-gssapi or sspi, sspicon from pywin32.
"""
- _API = "MIT"
try:
import gssapi
+
+ if (
+ hasattr(gssapi, "__title__")
+ and gssapi.__title__ == "python-gssapi"
+ ):
+ _API = "PYTHON-GSSAPI-OLD"
+ else:
+ _API = "PYTHON-GSSAPI-NEW"
except ImportError:
import sspicon
import sspi
@@ -65,7 +73,7 @@ class GSSAPITest(unittest.TestCase):
gss_ctxt_status = False
mic_msg = b"G'day Mate!"
- if _API == "MIT":
+ if _API == "PYTHON-GSSAPI-OLD":
if self.server_mode:
gss_flags = (
gssapi.C_PROT_READY_FLAG,
@@ -113,6 +121,56 @@ class GSSAPITest(unittest.TestCase):
# Check MIC
status = gss_srv_ctxt.verify_mic(mic_msg, mic_token)
self.assertEquals(0, status)
+ elif _API == "PYTHON-GSSAPI-NEW":
+ if self.server_mode:
+ gss_flags = (
+ gssapi.RequirementFlag.protection_ready,
+ gssapi.RequirementFlag.integrity,
+ gssapi.RequirementFlag.mutual_authentication,
+ gssapi.RequirementFlag.delegate_to_peer,
+ )
+ else:
+ gss_flags = (
+ gssapi.RequirementFlag.protection_ready,
+ gssapi.RequirementFlag.integrity,
+ gssapi.RequirementFlag.delegate_to_peer,
+ )
+ # Initialize a GSS-API context.
+ krb5_oid = gssapi.MechType.kerberos
+ target_name = gssapi.Name(
+ "host@" + self.targ_name,
+ name_type=gssapi.NameType.hostbased_service,
+ )
+ gss_ctxt = gssapi.SecurityContext(
+ name=target_name,
+ flags=gss_flags,
+ mech=krb5_oid,
+ usage="initiate",
+ )
+ if self.server_mode:
+ c_token = gss_ctxt.step(c_token)
+ gss_ctxt_status = gss_ctxt.complete
+ self.assertEquals(False, gss_ctxt_status)
+ # Accept a GSS-API context.
+ gss_srv_ctxt = gssapi.SecurityContext(usage="accept")
+ s_token = gss_srv_ctxt.step(c_token)
+ gss_ctxt_status = gss_srv_ctxt.complete
+ self.assertNotEquals(None, s_token)
+ self.assertEquals(True, gss_ctxt_status)
+ # Establish the client context
+ c_token = gss_ctxt.step(s_token)
+ self.assertEquals(None, c_token)
+ else:
+ while not gss_ctxt.complete:
+ c_token = gss_ctxt.step(c_token)
+ self.assertNotEquals(None, c_token)
+ # Build MIC
+ mic_token = gss_ctxt.get_signature(mic_msg)
+
+ if self.server_mode:
+ # Check MIC
+ status = gss_srv_ctxt.verify_signature(mic_msg, mic_token)
+ self.assertEquals(0, status)
else:
gss_flags = (
sspicon.ISC_REQ_INTEGRITY
@@ -145,3 +203,16 @@ class GSSAPITest(unittest.TestCase):
error, token = gss_ctxt.authorize(c_token)
c_token = token[0].Buffer
self.assertNotEquals(0, error)
+
+ def test_2_gssapi_sspi_client(self):
+ """
+ Test the used methods of python-gssapi or sspi, sspicon from pywin32.
+ """
+ self._gssapi_sspi_test()
+
+ def test_3_gssapi_sspi_server(self):
+ """
+ Test the used methods of python-gssapi or sspi, sspicon from pywin32.
+ """
+ self.server_mode = True
+ self._gssapi_sspi_test()
diff --git a/tests/test_hostkeys.py b/tests/test_hostkeys.py
index 295153dd..da47362c 100644
--- a/tests/test_hostkeys.py
+++ b/tests/test_hostkeys.py
@@ -62,7 +62,7 @@ class HostKeysTest(unittest.TestCase):
def tearDown(self):
os.unlink("hostfile.temp")
- def test_1_load(self):
+ def test_load(self):
hostdict = paramiko.HostKeys("hostfile.temp")
self.assertEqual(2, len(hostdict))
self.assertEqual(1, len(list(hostdict.values())[0]))
@@ -72,7 +72,7 @@ class HostKeysTest(unittest.TestCase):
).upper()
self.assertEqual(b"E6684DB30E109B67B70FF1DC5C7F1363", fp)
- def test_2_add(self):
+ def test_add(self):
hostdict = paramiko.HostKeys("hostfile.temp")
hh = "|1|BMsIC6cUIP2zBuXR3t2LRcJYjzM=|hpkJMysjTk/+zzUUzxQEa2ieq6c="
key = paramiko.RSAKey(data=decodebytes(keyblob))
@@ -83,7 +83,7 @@ class HostKeysTest(unittest.TestCase):
self.assertEqual(b"7EC91BB336CB6D810B124B1353C32396", fp)
self.assertTrue(hostdict.check("foo.example.com", key))
- def test_3_dict(self):
+ def test_dict(self):
hostdict = paramiko.HostKeys("hostfile.temp")
self.assertTrue("secure.example.com" in hostdict)
self.assertTrue("not.example.com" not in hostdict)
@@ -98,7 +98,7 @@ class HostKeysTest(unittest.TestCase):
i += 1
self.assertEqual(2, i)
- def test_4_dict_set(self):
+ def test_dict_set(self):
hostdict = paramiko.HostKeys("hostfile.temp")
key = paramiko.RSAKey(data=decodebytes(keyblob))
key_dss = paramiko.DSSKey(data=decodebytes(keyblob_dss))
@@ -122,10 +122,10 @@ class HostKeysTest(unittest.TestCase):
def test_delitem(self):
hostdict = paramiko.HostKeys("hostfile.temp")
target = "happy.example.com"
- entry = hostdict[target] # will KeyError if not present
+ hostdict[target] # will KeyError if not present
del hostdict[target]
try:
- entry = hostdict[target]
+ hostdict[target]
except KeyError:
pass # Good
else:
diff --git a/tests/test_kex.py b/tests/test_kex.py
index 62512beb..0244ae84 100644
--- a/tests/test_kex.py
+++ b/tests/test_kex.py
@@ -24,15 +24,26 @@ from binascii import hexlify, unhexlify
import os
import unittest
+from mock import Mock, patch
+import pytest
+
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import ec
+try:
+ from cryptography.hazmat.primitives.asymmetric import x25519
+except ImportError:
+ x25519 = None
+
import paramiko.util
from paramiko.kex_group1 import KexGroup1
+from paramiko.kex_group14 import KexGroup14SHA256
from paramiko.kex_gex import KexGex, KexGexSHA256
from paramiko import Message
from paramiko.common import byte_chr
from paramiko.kex_ecdh_nist import KexNistp256
+from paramiko.kex_group16 import KexGroup16SHA512
+from paramiko.kex_curve25519 import KexCurve25519
def dummy_urandom(n):
@@ -40,22 +51,22 @@ def dummy_urandom(n):
def dummy_generate_key_pair(obj):
- private_key_value = 94761803665136558137557783047955027733968423115106677159790289642479432803037
- public_key_numbers = "042bdab212fa8ba1b7c843301682a4db424d307246c7e1e6083c41d9ca7b098bf30b3d63e2ec6278488c135360456cc054b3444ecc45998c08894cbc1370f5f989"
- public_key_numbers_obj = ec.EllipticCurvePublicNumbers.from_encoded_point(
+ private_key_value = 94761803665136558137557783047955027733968423115106677159790289642479432803037 # noqa
+ public_key_numbers = "042bdab212fa8ba1b7c843301682a4db424d307246c7e1e6083c41d9ca7b098bf30b3d63e2ec6278488c135360456cc054b3444ecc45998c08894cbc1370f5f989" # noqa
+ public_key_numbers_obj = ec.EllipticCurvePublicKey.from_encoded_point(
ec.SECP256R1(), unhexlify(public_key_numbers)
- )
+ ).public_numbers()
obj.P = ec.EllipticCurvePrivateNumbers(
private_value=private_key_value, public_numbers=public_key_numbers_obj
).private_key(default_backend())
if obj.transport.server_mode:
- obj.Q_S = ec.EllipticCurvePublicNumbers.from_encoded_point(
+ obj.Q_S = ec.EllipticCurvePublicKey.from_encoded_point(
ec.SECP256R1(), unhexlify(public_key_numbers)
- ).public_key(default_backend())
+ )
return
- obj.Q_C = ec.EllipticCurvePublicNumbers.from_encoded_point(
+ obj.Q_C = ec.EllipticCurvePublicKey.from_encoded_point(
ec.SECP256R1(), unhexlify(public_key_numbers)
- ).public_key(default_backend())
+ )
class FakeKey(object):
@@ -70,7 +81,7 @@ class FakeKey(object):
class FakeModulusPack(object):
- P = 0xFFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF
+ P = 0xFFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF # noqa
G = 2
def get_modulus(self, min, ask, max):
@@ -111,7 +122,7 @@ class FakeTransport(object):
class KexTest(unittest.TestCase):
- K = 14730343317708716439807310032871972459448364195094179797249681733965528989482751523943515690110179031004049109375612685505881911274101441415545039654102474376472240501616988799699744135291070488314748284283496055223852115360852283821334858541043710301057312858051901453919067023103730011648890038847384890504
+ K = 14730343317708716439807310032871972459448364195094179797249681733965528989482751523943515690110179031004049109375612685505881911274101441415545039654102474376472240501616988799699744135291070488314748284283496055223852115360852283821334858541043710301057312858051901453919067023103730011648890038847384890504 # noqa
def setUp(self):
self._original_urandom = os.urandom
@@ -119,16 +130,32 @@ class KexTest(unittest.TestCase):
self._original_generate_key_pair = KexNistp256._generate_key_pair
KexNistp256._generate_key_pair = dummy_generate_key_pair
+ if KexCurve25519.is_available():
+ static_x25519_key = x25519.X25519PrivateKey.from_private_bytes(
+ unhexlify(
+ b"2184abc7eb3e656d2349d2470ee695b570c227340c2b2863b6c9ff427af1f040" # noqa
+ )
+ )
+ mock_x25519 = Mock()
+ mock_x25519.generate.return_value = static_x25519_key
+ patcher = patch(
+ "paramiko.kex_curve25519.X25519PrivateKey", mock_x25519
+ )
+ patcher.start()
+ self.x25519_patcher = patcher
+
def tearDown(self):
os.urandom = self._original_urandom
KexNistp256._generate_key_pair = self._original_generate_key_pair
+ if hasattr(self, "x25519_patcher"):
+ self.x25519_patcher.stop()
- def test_1_group1_client(self):
+ def test_group1_client(self):
transport = FakeTransport()
transport.server_mode = False
kex = KexGroup1(transport)
kex.start_kex()
- x = b"1E000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4"
+ x = b"1E000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" # noqa
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertEqual(
(paramiko.kex_group1._MSG_KEXDH_REPLY,), transport._expect
@@ -147,7 +174,7 @@ class KexTest(unittest.TestCase):
self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify)
self.assertTrue(transport._activated)
- def test_2_group1_server(self):
+ def test_group1_server(self):
transport = FakeTransport()
transport.server_mode = True
kex = KexGroup1(transport)
@@ -161,13 +188,13 @@ class KexTest(unittest.TestCase):
msg.rewind()
kex.parse_next(paramiko.kex_group1._MSG_KEXDH_INIT, msg)
H = b"B16BF34DD10945EDE84E9C1EF24A14BFDC843389"
- x = b"1F0000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967"
+ x = b"1F0000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" # noqa
self.assertEqual(self.K, transport._K)
self.assertEqual(H, hexlify(transport._H).upper())
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertTrue(transport._activated)
- def test_3_gex_client(self):
+ def test_gex_client(self):
transport = FakeTransport()
transport.server_mode = False
kex = KexGex(transport)
@@ -183,7 +210,7 @@ class KexTest(unittest.TestCase):
msg.add_mpint(FakeModulusPack.G)
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg)
- x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4"
+ x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" # noqa
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertEqual(
(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect
@@ -201,7 +228,7 @@ class KexTest(unittest.TestCase):
self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify)
self.assertTrue(transport._activated)
- def test_4_gex_old_client(self):
+ def test_gex_old_client(self):
transport = FakeTransport()
transport.server_mode = False
kex = KexGex(transport)
@@ -217,7 +244,7 @@ class KexTest(unittest.TestCase):
msg.add_mpint(FakeModulusPack.G)
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg)
- x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4"
+ x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" # noqa
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertEqual(
(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect
@@ -235,7 +262,7 @@ class KexTest(unittest.TestCase):
self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify)
self.assertTrue(transport._activated)
- def test_5_gex_server(self):
+ def test_gex_server(self):
transport = FakeTransport()
transport.server_mode = True
kex = KexGex(transport)
@@ -254,7 +281,7 @@ class KexTest(unittest.TestCase):
msg.add_int(4096)
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, msg)
- x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102"
+ x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102" # noqa
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertEqual(
(paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect
@@ -264,15 +291,15 @@ class KexTest(unittest.TestCase):
msg.add_mpint(12345)
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg)
- K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581
+ K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581 # noqa
H = b"CE754197C21BF3452863B4F44D0B3951F12516EF"
- x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967"
+ x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" # noqa
self.assertEqual(K, transport._K)
self.assertEqual(H, hexlify(transport._H).upper())
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertTrue(transport._activated)
- def test_6_gex_server_with_old_client(self):
+ def test_gex_server_with_old_client(self):
transport = FakeTransport()
transport.server_mode = True
kex = KexGex(transport)
@@ -289,7 +316,7 @@ class KexTest(unittest.TestCase):
msg.add_int(2048)
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD, msg)
- x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102"
+ x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102" # noqa
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertEqual(
(paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect
@@ -299,15 +326,15 @@ class KexTest(unittest.TestCase):
msg.add_mpint(12345)
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg)
- K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581
+ K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581 # noqa
H = b"B41A06B2E59043CEFC1AE16EC31F1E2D12EC455B"
- x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967"
+ x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" # noqa
self.assertEqual(K, transport._K)
self.assertEqual(H, hexlify(transport._H).upper())
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertTrue(transport._activated)
- def test_7_gex_sha256_client(self):
+ def test_gex_sha256_client(self):
transport = FakeTransport()
transport.server_mode = False
kex = KexGexSHA256(transport)
@@ -323,7 +350,7 @@ class KexTest(unittest.TestCase):
msg.add_mpint(FakeModulusPack.G)
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg)
- x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4"
+ x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" # noqa
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertEqual(
(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect
@@ -341,7 +368,7 @@ class KexTest(unittest.TestCase):
self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify)
self.assertTrue(transport._activated)
- def test_8_gex_sha256_old_client(self):
+ def test_gex_sha256_old_client(self):
transport = FakeTransport()
transport.server_mode = False
kex = KexGexSHA256(transport)
@@ -357,7 +384,7 @@ class KexTest(unittest.TestCase):
msg.add_mpint(FakeModulusPack.G)
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg)
- x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4"
+ x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" # noqa
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertEqual(
(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect
@@ -375,7 +402,7 @@ class KexTest(unittest.TestCase):
self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify)
self.assertTrue(transport._activated)
- def test_9_gex_sha256_server(self):
+ def test_gex_sha256_server(self):
transport = FakeTransport()
transport.server_mode = True
kex = KexGexSHA256(transport)
@@ -394,7 +421,7 @@ class KexTest(unittest.TestCase):
msg.add_int(4096)
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, msg)
- x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102"
+ x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102" # noqa
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertEqual(
(paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect
@@ -404,15 +431,15 @@ class KexTest(unittest.TestCase):
msg.add_mpint(12345)
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg)
- K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581
+ K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581 # noqa
H = b"CCAC0497CF0ABA1DBF55E1A3995D17F4CC31824B0E8D95CDF8A06F169D050D80"
- x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967"
+ x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" # noqa
self.assertEqual(K, transport._K)
self.assertEqual(H, hexlify(transport._H).upper())
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertTrue(transport._activated)
- def test_10_gex_sha256_server_with_old_client(self):
+ def test_gex_sha256_server_with_old_client(self):
transport = FakeTransport()
transport.server_mode = True
kex = KexGexSHA256(transport)
@@ -429,7 +456,7 @@ class KexTest(unittest.TestCase):
msg.add_int(2048)
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD, msg)
- x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102"
+ x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102" # noqa
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertEqual(
(paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect
@@ -439,16 +466,16 @@ class KexTest(unittest.TestCase):
msg.add_mpint(12345)
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg)
- K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581
+ K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581 # noqa
H = b"3DDD2AD840AD095E397BA4D0573972DC60F6461FD38A187CACA6615A5BC8ADBB"
- x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967"
+ x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" # noqa
self.assertEqual(K, transport._K)
self.assertEqual(H, hexlify(transport._H).upper())
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertTrue(transport._activated)
- def test_11_kex_nistp256_client(self):
- K = 91610929826364598472338906427792435253694642563583721654249504912114314269754
+ def test_kex_nistp256_client(self):
+ K = 91610929826364598472338906427792435253694642563583721654249504912114314269754 # noqa
transport = FakeTransport()
transport.server_mode = False
kex = KexNistp256(transport)
@@ -461,7 +488,7 @@ class KexTest(unittest.TestCase):
msg = Message()
msg.add_string("fake-host-key")
Q_S = unhexlify(
- "043ae159594ba062efa121480e9ef136203fa9ec6b6e1f8723a321c16e62b945f573f3b822258cbcd094b9fa1c125cbfe5f043280893e66863cc0cb4dccbe70210"
+ "043ae159594ba062efa121480e9ef136203fa9ec6b6e1f8723a321c16e62b945f573f3b822258cbcd094b9fa1c125cbfe5f043280893e66863cc0cb4dccbe70210" # noqa
)
msg.add_string(Q_S)
msg.add_string("fake-sig")
@@ -473,8 +500,8 @@ class KexTest(unittest.TestCase):
self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify)
self.assertTrue(transport._activated)
- def test_12_kex_nistp256_server(self):
- K = 91610929826364598472338906427792435253694642563583721654249504912114314269754
+ def test_kex_nistp256_server(self):
+ K = 91610929826364598472338906427792435253694642563583721654249504912114314269754 # noqa
transport = FakeTransport()
transport.server_mode = True
kex = KexNistp256(transport)
@@ -486,7 +513,7 @@ class KexTest(unittest.TestCase):
# fake init
msg = Message()
Q_C = unhexlify(
- "043ae159594ba062efa121480e9ef136203fa9ec6b6e1f8723a321c16e62b945f573f3b822258cbcd094b9fa1c125cbfe5f043280893e66863cc0cb4dccbe70210"
+ "043ae159594ba062efa121480e9ef136203fa9ec6b6e1f8723a321c16e62b945f573f3b822258cbcd094b9fa1c125cbfe5f043280893e66863cc0cb4dccbe70210" # noqa
)
H = b"2EF4957AFD530DD3F05DBEABF68D724FACC060974DA9704F2AEE4C3DE861E7CA"
msg.add_string(Q_C)
@@ -495,3 +522,104 @@ class KexTest(unittest.TestCase):
self.assertEqual(K, transport._K)
self.assertTrue(transport._activated)
self.assertEqual(H, hexlify(transport._H).upper())
+
+ def test_kex_group14_sha256_client(self):
+ transport = FakeTransport()
+ transport.server_mode = False
+ kex = KexGroup14SHA256(transport)
+ kex.start_kex()
+ x = b"1E00000101009850B3A8DE3ECCD3F19644139137C93D9C11BC28ED8BE850908EE294E1D43B88B9295311EFAEF5B736A1B652EBE184CCF36CFB0681C1ED66430088FA448B83619F928E7B9592ED6160EC11D639D51C303603F930F743C646B1B67DA38A1D44598DCE6C3F3019422B898044141420E9A10C29B9C58668F7F20A40F154B2C4768FCF7A9AA7179FB6366A7167EE26DD58963E8B880A0572F641DE0A73DC74C930F7C3A0C9388553F3F8403E40CF8B95FEDB1D366596FCF3FDDEB21A0005ADA650EF1733628D807BE5ACB83925462765D9076570056E39994FB328E3108FE406275758D6BF5F32790EF15D8416BF5548164859E785DB45E7787BB0E727ADE08641ED" # noqa
+ self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
+ self.assertEqual(
+ (paramiko.kex_group1._MSG_KEXDH_REPLY,), transport._expect
+ )
+
+ # fake "reply"
+ msg = Message()
+ msg.add_string("fake-host-key")
+ msg.add_mpint(69)
+ msg.add_string("fake-sig")
+ msg.rewind()
+ kex.parse_next(paramiko.kex_group1._MSG_KEXDH_REPLY, msg)
+ K = 21526936926159575624241589599003964979640840086252478029709904308461709651400109485351462666820496096345766733042945918306284902585618061272525323382142547359684512114160415969631877620660064043178086464811345023251493620331559440565662862858765724251890489795332144543057725932216208403143759943169004775947331771556537814494448612329251887435553890674764339328444948425882382475260315505741818518926349729970262019325118040559191290279100613049085709127598666890434114956464502529053036826173452792849566280474995114751780998069614898221773345705289637708545219204637224261997310181473787577166103031529148842107599 # noqa
+ H = b"D007C23686BE8A7737F828DC9E899F8EB5AF423F495F138437BE2529C1B8455F"
+ self.assertEqual(K, transport._K)
+ self.assertEqual(H, hexlify(transport._H).upper())
+ self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify)
+ self.assertTrue(transport._activated)
+
+ def test_kex_group16_sha512_client(self):
+ transport = FakeTransport()
+ transport.server_mode = False
+ kex = KexGroup16SHA512(transport)
+ kex.start_kex()
+ x = b"1E0000020100859FF55A23E0F66463561DD8BFC4764C69C05F85665B06EC9E29EF5003A53A8FA890B6A6EB624DEB55A4FB279DE7010A53580A126817E3D235B05A1081662B1500961D0625F0AAD287F1B597CBA9DB9550D9CC26355C4C59F92E613B5C21AC191F152C09A5DB46DCBA5EA58E3CA6A8B0EB7183E27FAC10106022E8521FA91240FB389060F1E1E4A355049D29DCC82921CE6588791743E4B1DEEE0166F7CC5180C3C75F3773342DF95C8C10AAA5D12975257027936B99B3DED6E6E98CF27EADEAEAE04E7F0A28071F578646B985FCE28A59CEB36287CB65759BE0544D4C4018CDF03C9078FE9CA79ECA611CB6966899E6FD29BE0781491C659FE2380E0D99D50D9CFAAB94E61BE311779719C4C43C6D223AD3799C3915A9E55076A21152DBBF911D6594296D6ECDC1B6FA71997CD29DF987B80FCA7F36BB7F19863C72BBBF839746AFBF9A5B407D468C976AA3E36FA118D3EAAD2E08BF6AE219F81F2CE2BE946337F06CC09BBFABE938A4087E413921CBEC1965ED905999B83396ECA226110CDF6EFB80F815F6489AF87561DA3857F13A7705921306D94176231FBB336B17C3724BC17A28BECB910093AB040873D5D760E8C182B88ECCE3E38DDA68CE35BD152DF7550BD908791FCCEDD1FFDF5ED2A57FFAE79599E487A7726D8A3D950B1729A08FBB60EE462A6BBE8BF0F5F0E1358129A37840FE5B3EEB8BF26E99FA222EAE83" # noqa
+ self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
+ self.assertEqual(
+ (paramiko.kex_group1._MSG_KEXDH_REPLY,), transport._expect
+ )
+
+ # fake "reply"
+ msg = Message()
+ msg.add_string("fake-host-key")
+ msg.add_mpint(69)
+ msg.add_string("fake-sig")
+ msg.rewind()
+ kex.parse_next(paramiko.kex_group1._MSG_KEXDH_REPLY, msg)
+ K = 933242830095376162107925500057692534838883186615567574891154103836907630698358649443101764908667358576734565553213003142941996368306996312915844839972197961603283544950658467545799914435739152351344917376359963584614213874232577733869049670230112638724993540996854599166318001059065780674008011575015459772051180901213815080343343801745386220342919837913506966863570473712948197760657442974564354432738520446202131551650771882909329069340612274196233658123593466135642819578182367229641847749149740891990379052266213711500434128970973602206842980669193719602075489724202241641553472106310932258574377789863734311328542715212248147206865762697424822447603031087553480483833829498375309975229907460562402877655519980113688369262871485777790149373908739910846630414678346163764464587129010141922982925829457954376352735653834300282864445132624993186496129911208133529828461690634463092007726349795944930302881758403402084584307180896465875803621285362317770276493727205689466142632599776710824902573926951951209239626732358074877997756011804454926541386215567756538832824717436605031489511654178384081883801272314328403020205577714999460724519735573055540814037716770051316113795603990199374791348798218428912977728347485489266146775472 # noqa
+ H = b"F6E2BCC846B9B62591EFB86663D55D4769CA06B2EDABE469DF831639B2DDD5A271985011900A724CB2C87F19F347B3632A7C1536AF3D12EE463E6EA75281AF0C" # noqa
+ self.assertEqual(K, transport._K)
+ self.assertEqual(H, hexlify(transport._H).upper())
+ self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify)
+ self.assertTrue(transport._activated)
+
+ @pytest.mark.skipif("not KexCurve25519.is_available()")
+ def test_kex_c25519_client(self):
+ K = 71294722834835117201316639182051104803802881348227506835068888449366462300724 # noqa
+ transport = FakeTransport()
+ transport.server_mode = False
+ kex = KexCurve25519(transport)
+ kex.start_kex()
+ self.assertEqual(
+ (paramiko.kex_curve25519._MSG_KEXECDH_REPLY,), transport._expect
+ )
+
+ # fake reply
+ msg = Message()
+ msg.add_string("fake-host-key")
+ Q_S = unhexlify(
+ "8d13a119452382a1ada8eea4c979f3e63ad3f0c7366786d6c5b54b87219bae49"
+ )
+ msg.add_string(Q_S)
+ msg.add_string("fake-sig")
+ msg.rewind()
+ kex.parse_next(paramiko.kex_curve25519._MSG_KEXECDH_REPLY, msg)
+ H = b"05B6F6437C0CF38D1A6C5A6F6E2558DEB54E7FC62447EBFB1E5D7407326A5475"
+ self.assertEqual(K, kex.transport._K)
+ self.assertEqual(H, hexlify(transport._H).upper())
+ self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify)
+ self.assertTrue(transport._activated)
+
+ @pytest.mark.skipif("not KexCurve25519.is_available()")
+ def test_kex_c25519_server(self):
+ K = 71294722834835117201316639182051104803802881348227506835068888449366462300724 # noqa
+ transport = FakeTransport()
+ transport.server_mode = True
+ kex = KexCurve25519(transport)
+ kex.start_kex()
+ self.assertEqual(
+ (paramiko.kex_curve25519._MSG_KEXECDH_INIT,), transport._expect
+ )
+
+ # fake init
+ msg = Message()
+ Q_C = unhexlify(
+ "8d13a119452382a1ada8eea4c979f3e63ad3f0c7366786d6c5b54b87219bae49"
+ )
+ H = b"DF08FCFCF31560FEE639D9B6D56D760BC3455B5ADA148E4514181023E7A9B042"
+ msg.add_string(Q_C)
+ msg.rewind()
+ kex.parse_next(paramiko.kex_curve25519._MSG_KEXECDH_INIT, msg)
+ self.assertEqual(K, transport._K)
+ self.assertTrue(transport._activated)
+ self.assertEqual(H, hexlify(transport._H).upper())
diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py
index c71ff91c..6f5625dc 100644
--- a/tests/test_kex_gss.py
+++ b/tests/test_kex_gss.py
@@ -31,7 +31,7 @@ import unittest
import paramiko
-from .util import needs_gssapi
+from .util import needs_gssapi, KerberosTestCase, update_env
class NullServer(paramiko.ServerInterface):
@@ -53,27 +53,22 @@ class NullServer(paramiko.ServerInterface):
return paramiko.OPEN_SUCCEEDED
def check_channel_exec_request(self, channel, command):
- if command != "yes":
+ if command != b"yes":
return False
return True
@needs_gssapi
-class GSSKexTest(unittest.TestCase):
- @staticmethod
- def init(username, hostname):
- global krb5_principal, targ_name
- krb5_principal = username
- targ_name = hostname
-
+class GSSKexTest(KerberosTestCase):
def setUp(self):
- self.username = krb5_principal
- self.hostname = socket.getfqdn(targ_name)
+ self.username = self.realm.user_princ
+ self.hostname = socket.getfqdn(self.realm.hostname)
self.sockl = socket.socket()
- self.sockl.bind((targ_name, 0))
+ self.sockl.bind((self.realm.hostname, 0))
self.sockl.listen(1)
self.addr, self.port = self.sockl.getsockname()
self.event = threading.Event()
+ update_env(self, self.realm.env)
thread = threading.Thread(target=self._run)
thread.start()
@@ -87,7 +82,7 @@ class GSSKexTest(unittest.TestCase):
self.ts = paramiko.Transport(self.socks, gss_kex=True)
host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key")
self.ts.add_server_key(host_key)
- self.ts.set_gss_host(targ_name)
+ self.ts.set_gss_host(self.realm.hostname)
try:
self.ts.load_server_moduli()
except:
@@ -142,7 +137,7 @@ class GSSKexTest(unittest.TestCase):
stdout.close()
stderr.close()
- def test_1_gsskex_and_auth(self):
+ def test_gsskex_and_auth(self):
"""
Verify that Paramiko can handle SSHv2 GSS-API / SSPI authenticated
Diffie-Hellman Key Exchange and user authentication with the GSS-API
@@ -150,7 +145,9 @@ class GSSKexTest(unittest.TestCase):
"""
self._test_gsskex_and_auth(gss_host=None)
- def test_2_gsskex_and_auth_rekey(self):
+ # To be investigated, see https://github.com/paramiko/paramiko/issues/1312
+ @unittest.expectedFailure
+ def test_gsskex_and_auth_rekey(self):
"""
Verify that Paramiko can rekey.
"""
diff --git a/tests/test_message.py b/tests/test_message.py
index b843a705..57766d90 100644
--- a/tests/test_message.py
+++ b/tests/test_message.py
@@ -29,14 +29,14 @@ from paramiko.common import byte_chr, zero_byte
class MessageTest(unittest.TestCase):
__a = (
- b"\x00\x00\x00\x17\x07\x60\xe0\x90\x00\x00\x00\x01\x71\x00\x00\x00\x05\x68\x65\x6c\x6c\x6f\x00\x00\x03\xe8"
+ b"\x00\x00\x00\x17\x07\x60\xe0\x90\x00\x00\x00\x01\x71\x00\x00\x00\x05\x68\x65\x6c\x6c\x6f\x00\x00\x03\xe8" # noqa
+ b"x" * 1000
)
- __b = b"\x01\x00\xf3\x00\x3f\x00\x00\x00\x10\x68\x75\x65\x79\x2c\x64\x65\x77\x65\x79\x2c\x6c\x6f\x75\x69\x65"
- __c = b"\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\xf5\xe4\xd3\xc2\xb1\x09\x00\x00\x00\x01\x11\x00\x00\x00\x07\x00\xf5\xe4\xd3\xc2\xb1\x09\x00\x00\x00\x06\x9a\x1b\x2c\x3d\x4e\xf7"
- __d = b"\x00\x00\x00\x05\xff\x00\x00\x00\x05\x11\x22\x33\x44\x55\xff\x00\x00\x00\x0a\x00\xf0\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x03\x63\x61\x74\x00\x00\x00\x03\x61\x2c\x62"
+ __b = b"\x01\x00\xf3\x00\x3f\x00\x00\x00\x10\x68\x75\x65\x79\x2c\x64\x65\x77\x65\x79\x2c\x6c\x6f\x75\x69\x65" # noqa
+ __c = b"\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\xf5\xe4\xd3\xc2\xb1\x09\x00\x00\x00\x01\x11\x00\x00\x00\x07\x00\xf5\xe4\xd3\xc2\xb1\x09\x00\x00\x00\x06\x9a\x1b\x2c\x3d\x4e\xf7" # noqa
+ __d = b"\x00\x00\x00\x05\xff\x00\x00\x00\x05\x11\x22\x33\x44\x55\xff\x00\x00\x00\x0a\x00\xf0\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x03\x63\x61\x74\x00\x00\x00\x03\x61\x2c\x62" # noqa
- def test_1_encode(self):
+ def test_encode(self):
msg = Message()
msg.add_int(23)
msg.add_int(123789456)
@@ -62,7 +62,7 @@ class MessageTest(unittest.TestCase):
msg.add_mpint(-0x65e4d3c2b109)
self.assertEqual(msg.asbytes(), self.__c)
- def test_2_decode(self):
+ def test_decode(self):
msg = Message(self.__a)
self.assertEqual(msg.get_int(), 23)
self.assertEqual(msg.get_int(), 123789456)
@@ -84,7 +84,7 @@ class MessageTest(unittest.TestCase):
self.assertEqual(msg.get_mpint(), 0xf5e4d3c2b109)
self.assertEqual(msg.get_mpint(), -0x65e4d3c2b109)
- def test_3_add(self):
+ def test_add(self):
msg = Message()
msg.add(5)
msg.add(0x1122334455)
@@ -94,7 +94,7 @@ class MessageTest(unittest.TestCase):
msg.add(["a", "b"])
self.assertEqual(msg.asbytes(), self.__d)
- def test_4_misc(self):
+ def test_misc(self):
msg = Message(self.__d)
self.assertEqual(msg.get_adaptive_int(), 5)
self.assertEqual(msg.get_adaptive_int(), 0x1122334455)
diff --git a/tests/test_packetizer.py b/tests/test_packetizer.py
index 6920f08e..de80770e 100644
--- a/tests/test_packetizer.py
+++ b/tests/test_packetizer.py
@@ -38,7 +38,7 @@ x1f = byte_chr(0x1f)
class PacketizerTest(unittest.TestCase):
- def test_1_write(self):
+ def test_write(self):
rsock = LoopSocket()
wsock = LoopSocket()
rsock.link(wsock)
@@ -64,11 +64,11 @@ class PacketizerTest(unittest.TestCase):
# 32 + 12 bytes of MAC = 44
self.assertEqual(44, len(data))
self.assertEqual(
- b"\x43\x91\x97\xbd\x5b\x50\xac\x25\x87\xc2\xc4\x6b\xc7\xe9\x38\xc0",
+ b"\x43\x91\x97\xbd\x5b\x50\xac\x25\x87\xc2\xc4\x6b\xc7\xe9\x38\xc0", # noqa
data[:16],
)
- def test_2_read(self):
+ def test_read(self):
rsock = LoopSocket()
wsock = LoopSocket()
rsock.link(wsock)
@@ -82,7 +82,7 @@ class PacketizerTest(unittest.TestCase):
).decryptor()
p.set_inbound_cipher(decryptor, 16, sha1, 12, x1f * 20)
wsock.send(
- b"\x43\x91\x97\xbd\x5b\x50\xac\x25\x87\xc2\xc4\x6b\xc7\xe9\x38\xc0\x90\xd2\x16\x56\x0d\x71\x73\x61\x38\x7c\x4c\x3d\xfb\x97\x7d\xe2\x6e\x03\xb1\xa0\xc2\x1c\xd6\x41\x41\x4c\xb4\x59"
+ b"\x43\x91\x97\xbd\x5b\x50\xac\x25\x87\xc2\xc4\x6b\xc7\xe9\x38\xc0\x90\xd2\x16\x56\x0d\x71\x73\x61\x38\x7c\x4c\x3d\xfb\x97\x7d\xe2\x6e\x03\xb1\xa0\xc2\x1c\xd6\x41\x41\x4c\xb4\x59" # noqa
)
cmd, m = p.read_message()
self.assertEqual(100, cmd)
@@ -90,7 +90,7 @@ class PacketizerTest(unittest.TestCase):
self.assertEqual(1, m.get_int())
self.assertEqual(900, m.get_int())
- def test_3_closed(self):
+ def test_closed(self):
if sys.platform.startswith("win"): # no SIGALRM on windows
return
rsock = LoopSocket()
diff --git a/tests/test_pkey.py b/tests/test_pkey.py
index 38c1fbef..a01acf6f 100644
--- a/tests/test_pkey.py
+++ b/tests/test_pkey.py
@@ -33,20 +33,20 @@ from .util import _support
# from openssh's ssh-keygen
-PUB_RSA = "ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA049W6geFpmsljTwfvI1UmKWWJPNFI74+vNKTk4dmzkQY2yAMs6FhlvhlI8ysU4oj71ZsRYMecHbBbxdN79+JRFVYTKaLqjwGENeTd+yv4q+V2PvZv3fLnzApI3l7EJCqhWwJUHJ1jAkZzqDx0tyOL4uoZpww3nmE0kb3y21tH4c="
-PUB_DSS = "ssh-dss AAAAB3NzaC1kc3MAAACBAOeBpgNnfRzr/twmAQRu2XwWAp3CFtrVnug6s6fgwj/oLjYbVtjAy6pl/h0EKCWx2rf1IetyNsTxWrniA9I6HeDj65X1FyDkg6g8tvCnaNB8Xp/UUhuzHuGsMIipRxBxw9LF608EqZcj1E3ytktoW5B5OcjrkEoz3xG7C+rpIjYvAAAAFQDwz4UnmsGiSNu5iqjn3uTzwUpshwAAAIEAkxfFeY8P2wZpDjX0MimZl5wkoFQDL25cPzGBuB4OnB8NoUk/yjAHIIpEShw8V+LzouMK5CTJQo5+Ngw3qIch/WgRmMHy4kBq1SsXMjQCte1So6HBMvBPIW5SiMTmjCfZZiw4AYHK+B/JaOwaG9yRg2Ejg4Ok10+XFDxlqZo8Y+wAAACARmR7CCPjodxASvRbIyzaVpZoJ/Z6x7dAumV+ysrV1BVYd0lYukmnjO1kKBWApqpH1ve9XDQYN8zgxM4b16L21kpoWQnZtXrY3GZ4/it9kUgyB7+NwacIBlXa8cMDL7Q/69o0d54U0X/NeX5QxuYR6OMJlrkQB7oiW/P/1mwjQgE="
-PUB_ECDSA_256 = "ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBJSPZm3ZWkvk/Zx8WP+fZRZ5/NBBHnGQwR6uIC6XHGPDIHuWUzIjAwA0bzqkOUffEsbLe+uQgKl5kbc/L8KA/eo="
-PUB_ECDSA_384 = "ecdsa-sha2-nistp384 AAAAE2VjZHNhLXNoYTItbmlzdHAzODQAAAAIbmlzdHAzODQAAABhBBbGibQLW9AAZiGN2hEQxWYYoFaWKwN3PKSaDJSMqmIn1Z9sgRUuw8Y/w502OGvXL/wFk0i2z50l3pWZjD7gfMH7gX5TUiCzwrQkS+Hn1U2S9aF5WJp0NcIzYxXw2r4M2A=="
-PUB_ECDSA_521 = "ecdsa-sha2-nistp521 AAAAE2VjZHNhLXNoYTItbmlzdHA1MjEAAAAIbmlzdHA1MjEAAACFBACaOaFLZGuxa5AW16qj6VLypFbLrEWrt9AZUloCMefxO8bNLjK/O5g0rAVasar1TnyHE9qj4NwzANZASWjQNbc4MAG8vzqezFwLIn/kNyNTsXNfqEko9OgHZknlj2Z79dwTJcRAL4QLcT5aND0EHZLB2fAUDXiWIb2j4rg1mwPlBMiBXA=="
-PUB_RSA_2K_OPENSSH = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDF+Dpr54DX0WdeTDpNAMdkCWEkl3OXtNgf58qlN1gX572OLBqLf0zT4bHstUEpU3piazph/rSWcUMuBoD46tZ6jiH7H9b9Pem2eYQWaELDDkM+v9BMbEy5rMbFRLol5OtEvPFqneyEAanPOgvd8t3yyhSev9QVusakzJ8j8LGgrA8huYZ+Srnw0shEWLG70KUKCh3rG0QIvA8nfhtUOisr2Gp+F0YxMGb5gwBlQYAYE5l6u1SjZ7hNjyNosjK+wRBFgFFBYVpkZKJgWoK9w4ijFyzMZTucnZMqKOKAjIJvHfKBf2/cEfYxSq1EndqTqjYsd9T7/s2vcn1OH5a0wkER"
-PUB_DSS_1K_OPENSSH = "ssh-dss AAAAB3NzaC1kc3MAAACBAL8XEx7F9xuwBNles+vWpNF+YcofrBhjX1r5QhpBe0eoYWLHRcroN6lxwCdGYRfgOoRjTncBiixQX/uUxAY96zDh3ir492s2BcJt4ihvNn/AY0I0OTuX/2IwGk9CGzafjaeZNVYxMa8lcVt0hSOTjkPQ7gVuk6bJzMInvie+VWKLAAAAFQDUgYdY+rhR0SkKbC09BS/SIHcB+wAAAIB44+4zpCNcd0CGvZlowH99zyPX8uxQtmTLQFuR2O8O0FgVVuCdDgD0D9W8CLOp32oatpM0jyyN89EdvSWzjHzZJ+L6H1FtZps7uhpDFWHdva1R25vyGecLMUuXjo5t/D7oCDih+HwHoSAxoi0QvsPd8/qqHQVznNJKtR6thUpXEwAAAIAG4DCBjbgTTgpBw0egRkJwBSz0oTt+1IcapNU2jA6N8urMSk9YXHEQHKN68BAF3YJ59q2Ujv3LOXmBqGd1T+kzwUszfMlgzq8MMu19Yfzse6AIK1Agn1Vj6F7YXLsXDN+T4KszX5+FJa7t/Zsp3nALWy6l0f4WKivEF5Y2QpEFcQ=="
+PUB_RSA = "ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA049W6geFpmsljTwfvI1UmKWWJPNFI74+vNKTk4dmzkQY2yAMs6FhlvhlI8ysU4oj71ZsRYMecHbBbxdN79+JRFVYTKaLqjwGENeTd+yv4q+V2PvZv3fLnzApI3l7EJCqhWwJUHJ1jAkZzqDx0tyOL4uoZpww3nmE0kb3y21tH4c=" # noqa
+PUB_DSS = "ssh-dss AAAAB3NzaC1kc3MAAACBAOeBpgNnfRzr/twmAQRu2XwWAp3CFtrVnug6s6fgwj/oLjYbVtjAy6pl/h0EKCWx2rf1IetyNsTxWrniA9I6HeDj65X1FyDkg6g8tvCnaNB8Xp/UUhuzHuGsMIipRxBxw9LF608EqZcj1E3ytktoW5B5OcjrkEoz3xG7C+rpIjYvAAAAFQDwz4UnmsGiSNu5iqjn3uTzwUpshwAAAIEAkxfFeY8P2wZpDjX0MimZl5wkoFQDL25cPzGBuB4OnB8NoUk/yjAHIIpEShw8V+LzouMK5CTJQo5+Ngw3qIch/WgRmMHy4kBq1SsXMjQCte1So6HBMvBPIW5SiMTmjCfZZiw4AYHK+B/JaOwaG9yRg2Ejg4Ok10+XFDxlqZo8Y+wAAACARmR7CCPjodxASvRbIyzaVpZoJ/Z6x7dAumV+ysrV1BVYd0lYukmnjO1kKBWApqpH1ve9XDQYN8zgxM4b16L21kpoWQnZtXrY3GZ4/it9kUgyB7+NwacIBlXa8cMDL7Q/69o0d54U0X/NeX5QxuYR6OMJlrkQB7oiW/P/1mwjQgE=" # noqa
+PUB_ECDSA_256 = "ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBJSPZm3ZWkvk/Zx8WP+fZRZ5/NBBHnGQwR6uIC6XHGPDIHuWUzIjAwA0bzqkOUffEsbLe+uQgKl5kbc/L8KA/eo=" # noqa
+PUB_ECDSA_384 = "ecdsa-sha2-nistp384 AAAAE2VjZHNhLXNoYTItbmlzdHAzODQAAAAIbmlzdHAzODQAAABhBBbGibQLW9AAZiGN2hEQxWYYoFaWKwN3PKSaDJSMqmIn1Z9sgRUuw8Y/w502OGvXL/wFk0i2z50l3pWZjD7gfMH7gX5TUiCzwrQkS+Hn1U2S9aF5WJp0NcIzYxXw2r4M2A==" # noqa
+PUB_ECDSA_521 = "ecdsa-sha2-nistp521 AAAAE2VjZHNhLXNoYTItbmlzdHA1MjEAAAAIbmlzdHA1MjEAAACFBACaOaFLZGuxa5AW16qj6VLypFbLrEWrt9AZUloCMefxO8bNLjK/O5g0rAVasar1TnyHE9qj4NwzANZASWjQNbc4MAG8vzqezFwLIn/kNyNTsXNfqEko9OgHZknlj2Z79dwTJcRAL4QLcT5aND0EHZLB2fAUDXiWIb2j4rg1mwPlBMiBXA==" # noqa
+PUB_RSA_2K_OPENSSH = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDF+Dpr54DX0WdeTDpNAMdkCWEkl3OXtNgf58qlN1gX572OLBqLf0zT4bHstUEpU3piazph/rSWcUMuBoD46tZ6jiH7H9b9Pem2eYQWaELDDkM+v9BMbEy5rMbFRLol5OtEvPFqneyEAanPOgvd8t3yyhSev9QVusakzJ8j8LGgrA8huYZ+Srnw0shEWLG70KUKCh3rG0QIvA8nfhtUOisr2Gp+F0YxMGb5gwBlQYAYE5l6u1SjZ7hNjyNosjK+wRBFgFFBYVpkZKJgWoK9w4ijFyzMZTucnZMqKOKAjIJvHfKBf2/cEfYxSq1EndqTqjYsd9T7/s2vcn1OH5a0wkER" # noqa
+PUB_DSS_1K_OPENSSH = "ssh-dss AAAAB3NzaC1kc3MAAACBAL8XEx7F9xuwBNles+vWpNF+YcofrBhjX1r5QhpBe0eoYWLHRcroN6lxwCdGYRfgOoRjTncBiixQX/uUxAY96zDh3ir492s2BcJt4ihvNn/AY0I0OTuX/2IwGk9CGzafjaeZNVYxMa8lcVt0hSOTjkPQ7gVuk6bJzMInvie+VWKLAAAAFQDUgYdY+rhR0SkKbC09BS/SIHcB+wAAAIB44+4zpCNcd0CGvZlowH99zyPX8uxQtmTLQFuR2O8O0FgVVuCdDgD0D9W8CLOp32oatpM0jyyN89EdvSWzjHzZJ+L6H1FtZps7uhpDFWHdva1R25vyGecLMUuXjo5t/D7oCDih+HwHoSAxoi0QvsPd8/qqHQVznNJKtR6thUpXEwAAAIAG4DCBjbgTTgpBw0egRkJwBSz0oTt+1IcapNU2jA6N8urMSk9YXHEQHKN68BAF3YJ59q2Ujv3LOXmBqGd1T+kzwUszfMlgzq8MMu19Yfzse6AIK1Agn1Vj6F7YXLsXDN+T4KszX5+FJa7t/Zsp3nALWy6l0f4WKivEF5Y2QpEFcQ==" # noqa
FINGER_RSA = "1024 60:73:38:44:cb:51:86:65:7f:de:da:a2:2b:5a:57:d5"
FINGER_DSS = "1024 44:78:f0:b9:a2:3c:c5:18:20:09:ff:75:5b:c1:d2:6c"
FINGER_ECDSA_256 = "256 25:19:eb:55:e6:a1:47:ff:4f:38:d2:75:6f:a5:d5:60"
FINGER_ECDSA_384 = "384 c1:8d:a0:59:09:47:41:8e:a8:a6:07:01:29:23:b4:65"
FINGER_ECDSA_521 = "521 44:58:22:52:12:33:16:0e:ce:0e:be:2c:7c:7e:cc:1e"
-SIGNED_RSA = "20:d7:8a:31:21:cb:f7:92:12:f2:a4:89:37:f5:78:af:e6:16:b6:25:b9:97:3d:a2:cd:5f:ca:20:21:73:4c:ad:34:73:8f:20:77:28:e2:94:15:08:d8:91:40:7a:85:83:bf:18:37:95:dc:54:1a:9b:88:29:6c:73:ca:38:b4:04:f1:56:b9:f2:42:9d:52:1b:29:29:b4:4f:fd:c9:2d:af:47:d2:40:76:30:f3:63:45:0c:d9:1d:43:86:0f:1c:70:e2:93:12:34:f3:ac:c5:0a:2f:14:50:66:59:f1:88:ee:c1:4a:e9:d1:9c:4e:46:f0:0e:47:6f:38:74:f1:44:a8"
+SIGNED_RSA = "20:d7:8a:31:21:cb:f7:92:12:f2:a4:89:37:f5:78:af:e6:16:b6:25:b9:97:3d:a2:cd:5f:ca:20:21:73:4c:ad:34:73:8f:20:77:28:e2:94:15:08:d8:91:40:7a:85:83:bf:18:37:95:dc:54:1a:9b:88:29:6c:73:ca:38:b4:04:f1:56:b9:f2:42:9d:52:1b:29:29:b4:4f:fd:c9:2d:af:47:d2:40:76:30:f3:63:45:0c:d9:1d:43:86:0f:1c:70:e2:93:12:34:f3:ac:c5:0a:2f:14:50:66:59:f1:88:ee:c1:4a:e9:d1:9c:4e:46:f0:0e:47:6f:38:74:f1:44:a8" # noqa
FINGER_RSA_2K_OPENSSH = "2048 68:d1:72:01:bf:c0:0c:66:97:78:df:ce:75:74:46:d6"
FINGER_DSS_1K_OPENSSH = "1024 cf:1d:eb:d7:61:d3:12:94:c6:c0:c6:54:35:35:b0:82"
@@ -112,8 +112,8 @@ L4QLcT5aND0EHZLB2fAUDXiWIb2j4rg1mwPlBMiBXA==
x1234 = b"\x01\x02\x03\x04"
-TEST_KEY_BYTESTR_2 = "\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x01#\x00\x00\x00\x81\x00\xd3\x8fV\xea\x07\x85\xa6k%\x8d<\x1f\xbc\x8dT\x98\xa5\x96$\xf3E#\xbe>\xbc\xd2\x93\x93\x87f\xceD\x18\xdb \x0c\xb3\xa1a\x96\xf8e#\xcc\xacS\x8a#\xefVlE\x83\x1epv\xc1o\x17M\xef\xdf\x89DUXL\xa6\x8b\xaa<\x06\x10\xd7\x93w\xec\xaf\xe2\xaf\x95\xd8\xfb\xd9\xbfw\xcb\x9f0)#y{\x10\x90\xaa\x85l\tPru\x8c\t\x19\xce\xa0\xf1\xd2\xdc\x8e/\x8b\xa8f\x9c0\xdey\x84\xd2F\xf7\xcbmm\x1f\x87"
-TEST_KEY_BYTESTR_3 = "\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x01#\x00\x00\x00\x00ӏV\x07k%<\x1fT$E#>ғfD\x18 \x0cae#̬S#VlE\x1epvo\x17M߉DUXL<\x06\x10דw\u2bd5ٿw˟0)#y{\x10l\tPru\t\x19Π\u070e/f0yFmm\x1f"
+TEST_KEY_BYTESTR_2 = "\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x01#\x00\x00\x00\x81\x00\xd3\x8fV\xea\x07\x85\xa6k%\x8d<\x1f\xbc\x8dT\x98\xa5\x96$\xf3E#\xbe>\xbc\xd2\x93\x93\x87f\xceD\x18\xdb \x0c\xb3\xa1a\x96\xf8e#\xcc\xacS\x8a#\xefVlE\x83\x1epv\xc1o\x17M\xef\xdf\x89DUXL\xa6\x8b\xaa<\x06\x10\xd7\x93w\xec\xaf\xe2\xaf\x95\xd8\xfb\xd9\xbfw\xcb\x9f0)#y{\x10\x90\xaa\x85l\tPru\x8c\t\x19\xce\xa0\xf1\xd2\xdc\x8e/\x8b\xa8f\x9c0\xdey\x84\xd2F\xf7\xcbmm\x1f\x87" # noqa
+TEST_KEY_BYTESTR_3 = "\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x01#\x00\x00\x00\x00ӏV\x07k%<\x1fT$E#>ғfD\x18 \x0cae#̬S#VlE\x1epvo\x17M߉DUXL<\x06\x10דw\u2bd5ٿw˟0)#y{\x10l\tPru\t\x19Π\u070e/f0yFmm\x1f" # noqa
class KeyTest(unittest.TestCase):
@@ -134,12 +134,12 @@ class KeyTest(unittest.TestCase):
self.assertEqual(fh.readline()[:-1], "Proc-Type: 4,ENCRYPTED")
self.assertEqual(fh.readline()[0:10], "DEK-Info: ")
- def test_1_generate_key_bytes(self):
+ def test_generate_key_bytes(self):
key = util.generate_key_bytes(md5, x1234, "happy birthday", 30)
- exp = b"\x61\xE1\xF2\x72\xF4\xC1\xC4\x56\x15\x86\xBD\x32\x24\x98\xC0\xE9\x24\x67\x27\x80\xF4\x7B\xB3\x7D\xDA\x7D\x54\x01\x9E\x64"
+ exp = b"\x61\xE1\xF2\x72\xF4\xC1\xC4\x56\x15\x86\xBD\x32\x24\x98\xC0\xE9\x24\x67\x27\x80\xF4\x7B\xB3\x7D\xDA\x7D\x54\x01\x9E\x64" # noqa
self.assertEqual(exp, key)
- def test_2_load_rsa(self):
+ def test_load_rsa(self):
key = RSAKey.from_private_key_file(_support("test_rsa.key"))
self.assertEqual("ssh-rsa", key.get_name())
exp_rsa = b(FINGER_RSA.split()[1].replace(":", ""))
@@ -155,7 +155,7 @@ class KeyTest(unittest.TestCase):
key2 = RSAKey.from_private_key(s)
self.assertEqual(key, key2)
- def test_3_load_rsa_password(self):
+ def test_load_rsa_password(self):
key = RSAKey.from_private_key_file(
_support("test_rsa_password.key"), "television"
)
@@ -166,7 +166,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(PUB_RSA.split()[1], key.get_base64())
self.assertEqual(1024, key.get_bits())
- def test_4_load_dss(self):
+ def test_load_dss(self):
key = DSSKey.from_private_key_file(_support("test_dss.key"))
self.assertEqual("ssh-dss", key.get_name())
exp_dss = b(FINGER_DSS.split()[1].replace(":", ""))
@@ -182,7 +182,7 @@ class KeyTest(unittest.TestCase):
key2 = DSSKey.from_private_key(s)
self.assertEqual(key, key2)
- def test_5_load_dss_password(self):
+ def test_load_dss_password(self):
key = DSSKey.from_private_key_file(
_support("test_dss_password.key"), "television"
)
@@ -193,7 +193,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(PUB_DSS.split()[1], key.get_base64())
self.assertEqual(1024, key.get_bits())
- def test_6_compare_rsa(self):
+ def test_compare_rsa(self):
# verify that the private & public keys compare equal
key = RSAKey.from_private_key_file(_support("test_rsa.key"))
self.assertEqual(key, key)
@@ -202,7 +202,7 @@ class KeyTest(unittest.TestCase):
self.assertTrue(not pub.can_sign())
self.assertEqual(key, pub)
- def test_7_compare_dss(self):
+ def test_compare_dss(self):
# verify that the private & public keys compare equal
key = DSSKey.from_private_key_file(_support("test_dss.key"))
self.assertEqual(key, key)
@@ -211,7 +211,7 @@ class KeyTest(unittest.TestCase):
self.assertTrue(not pub.can_sign())
self.assertEqual(key, pub)
- def test_8_sign_rsa(self):
+ def test_sign_rsa(self):
# verify that the rsa private key can sign and verify
key = RSAKey.from_private_key_file(_support("test_rsa.key"))
msg = key.sign_ssh_data(b"ice weasels")
@@ -226,7 +226,7 @@ class KeyTest(unittest.TestCase):
pub = RSAKey(data=key.asbytes())
self.assertTrue(pub.verify_ssh_sig(b"ice weasels", msg))
- def test_9_sign_dss(self):
+ def test_sign_dss(self):
# verify that the dss private key can sign and verify
key = DSSKey.from_private_key_file(_support("test_dss.key"))
msg = key.sign_ssh_data(b"ice weasels")
@@ -241,19 +241,19 @@ class KeyTest(unittest.TestCase):
pub = DSSKey(data=key.asbytes())
self.assertTrue(pub.verify_ssh_sig(b"ice weasels", msg))
- def test_A_generate_rsa(self):
+ def test_generate_rsa(self):
key = RSAKey.generate(1024)
msg = key.sign_ssh_data(b"jerri blank")
msg.rewind()
self.assertTrue(key.verify_ssh_sig(b"jerri blank", msg))
- def test_B_generate_dss(self):
+ def test_generate_dss(self):
key = DSSKey.generate(1024)
msg = key.sign_ssh_data(b"jerri blank")
msg.rewind()
self.assertTrue(key.verify_ssh_sig(b"jerri blank", msg))
- def test_C_generate_ecdsa(self):
+ def test_generate_ecdsa(self):
key = ECDSAKey.generate()
msg = key.sign_ssh_data(b"jerri blank")
msg.rewind()
@@ -282,7 +282,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(key.get_bits(), 521)
self.assertEqual(key.get_name(), "ecdsa-sha2-nistp521")
- def test_10_load_ecdsa_256(self):
+ def test_load_ecdsa_256(self):
key = ECDSAKey.from_private_key_file(_support("test_ecdsa_256.key"))
self.assertEqual("ecdsa-sha2-nistp256", key.get_name())
exp_ecdsa = b(FINGER_ECDSA_256.split()[1].replace(":", ""))
@@ -298,7 +298,7 @@ class KeyTest(unittest.TestCase):
key2 = ECDSAKey.from_private_key(s)
self.assertEqual(key, key2)
- def test_11_load_ecdsa_password_256(self):
+ def test_load_ecdsa_password_256(self):
key = ECDSAKey.from_private_key_file(
_support("test_ecdsa_password_256.key"), b"television"
)
@@ -309,7 +309,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(PUB_ECDSA_256.split()[1], key.get_base64())
self.assertEqual(256, key.get_bits())
- def test_12_compare_ecdsa_256(self):
+ def test_compare_ecdsa_256(self):
# verify that the private & public keys compare equal
key = ECDSAKey.from_private_key_file(_support("test_ecdsa_256.key"))
self.assertEqual(key, key)
@@ -318,7 +318,7 @@ class KeyTest(unittest.TestCase):
self.assertTrue(not pub.can_sign())
self.assertEqual(key, pub)
- def test_13_sign_ecdsa_256(self):
+ def test_sign_ecdsa_256(self):
# verify that the rsa private key can sign and verify
key = ECDSAKey.from_private_key_file(_support("test_ecdsa_256.key"))
msg = key.sign_ssh_data(b"ice weasels")
@@ -334,7 +334,7 @@ class KeyTest(unittest.TestCase):
pub = ECDSAKey(data=key.asbytes())
self.assertTrue(pub.verify_ssh_sig(b"ice weasels", msg))
- def test_14_load_ecdsa_384(self):
+ def test_load_ecdsa_384(self):
key = ECDSAKey.from_private_key_file(_support("test_ecdsa_384.key"))
self.assertEqual("ecdsa-sha2-nistp384", key.get_name())
exp_ecdsa = b(FINGER_ECDSA_384.split()[1].replace(":", ""))
@@ -350,7 +350,7 @@ class KeyTest(unittest.TestCase):
key2 = ECDSAKey.from_private_key(s)
self.assertEqual(key, key2)
- def test_15_load_ecdsa_password_384(self):
+ def test_load_ecdsa_password_384(self):
key = ECDSAKey.from_private_key_file(
_support("test_ecdsa_password_384.key"), b"television"
)
@@ -361,7 +361,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(PUB_ECDSA_384.split()[1], key.get_base64())
self.assertEqual(384, key.get_bits())
- def test_16_compare_ecdsa_384(self):
+ def test_compare_ecdsa_384(self):
# verify that the private & public keys compare equal
key = ECDSAKey.from_private_key_file(_support("test_ecdsa_384.key"))
self.assertEqual(key, key)
@@ -370,7 +370,7 @@ class KeyTest(unittest.TestCase):
self.assertTrue(not pub.can_sign())
self.assertEqual(key, pub)
- def test_17_sign_ecdsa_384(self):
+ def test_sign_ecdsa_384(self):
# verify that the rsa private key can sign and verify
key = ECDSAKey.from_private_key_file(_support("test_ecdsa_384.key"))
msg = key.sign_ssh_data(b"ice weasels")
@@ -386,7 +386,7 @@ class KeyTest(unittest.TestCase):
pub = ECDSAKey(data=key.asbytes())
self.assertTrue(pub.verify_ssh_sig(b"ice weasels", msg))
- def test_18_load_ecdsa_521(self):
+ def test_load_ecdsa_521(self):
key = ECDSAKey.from_private_key_file(_support("test_ecdsa_521.key"))
self.assertEqual("ecdsa-sha2-nistp521", key.get_name())
exp_ecdsa = b(FINGER_ECDSA_521.split()[1].replace(":", ""))
@@ -405,7 +405,7 @@ class KeyTest(unittest.TestCase):
key2 = ECDSAKey.from_private_key(s)
self.assertEqual(key, key2)
- def test_19_load_ecdsa_password_521(self):
+ def test_load_ecdsa_password_521(self):
key = ECDSAKey.from_private_key_file(
_support("test_ecdsa_password_521.key"), b"television"
)
@@ -416,7 +416,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(PUB_ECDSA_521.split()[1], key.get_base64())
self.assertEqual(521, key.get_bits())
- def test_20_compare_ecdsa_521(self):
+ def test_compare_ecdsa_521(self):
# verify that the private & public keys compare equal
key = ECDSAKey.from_private_key_file(_support("test_ecdsa_521.key"))
self.assertEqual(key, key)
@@ -425,7 +425,7 @@ class KeyTest(unittest.TestCase):
self.assertTrue(not pub.can_sign())
self.assertEqual(key, pub)
- def test_21_sign_ecdsa_521(self):
+ def test_sign_ecdsa_521(self):
# verify that the rsa private key can sign and verify
key = ECDSAKey.from_private_key_file(_support("test_ecdsa_521.key"))
msg = key.sign_ssh_data(b"ice weasels")
@@ -493,6 +493,18 @@ class KeyTest(unittest.TestCase):
)
self.assertNotEqual(key1.asbytes(), key2.asbytes())
+ def test_ed25519_funky_padding(self):
+ # Proves #1306 by just not exploding with 'Invalid key'.
+ Ed25519Key.from_private_key_file(
+ _support("test_ed25519-funky-padding.key")
+ )
+
+ def test_ed25519_funky_padding_with_passphrase(self):
+ # Proves #1306 by just not exploding with 'Invalid key'.
+ Ed25519Key.from_private_key_file(
+ _support("test_ed25519-funky-padding_password.key"), b"asdf"
+ )
+
def test_ed25519_compare(self):
# verify that the private & public keys compare equal
key = Ed25519Key.from_private_key_file(_support("test_ed25519.key"))
@@ -504,7 +516,7 @@ class KeyTest(unittest.TestCase):
def test_ed25519_nonbytes_password(self):
# https://github.com/paramiko/paramiko/issues/1039
- key = Ed25519Key.from_private_key_file(
+ Ed25519Key.from_private_key_file(
_support("test_ed25519_password.key"),
# NOTE: not a bytes. Amusingly, the test above for same key DOES
# explicitly cast to bytes...code smell!
@@ -553,7 +565,7 @@ class KeyTest(unittest.TestCase):
# Delve into blob contents, for test purposes
msg = Message(key.public_blob.key_blob)
self.assertEqual(msg.get_text(), "ssh-rsa-cert-v01@openssh.com")
- nonce = msg.get_string()
+ msg.get_string()
e = msg.get_mpint()
n = msg.get_mpint()
self.assertEqual(e, key.public_numbers.e)
diff --git a/tests/test_proxy.py b/tests/test_proxy.py
new file mode 100644
index 00000000..2e0b0e51
--- /dev/null
+++ b/tests/test_proxy.py
@@ -0,0 +1,144 @@
+import signal
+import socket
+
+from mock import patch
+from pytest import raises
+
+from paramiko import ProxyCommand, ProxyCommandFailure
+
+
+class TestProxyCommand(object):
+ @patch("paramiko.proxy.subprocess")
+ def test_init_takes_command_string(self, subprocess):
+ ProxyCommand(command_line="do a thing")
+ subprocess.Popen.assert_called_once_with(
+ ["do", "a", "thing"],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ bufsize=0,
+ )
+
+ @patch("paramiko.proxy.subprocess.Popen")
+ def test_send_writes_to_process_stdin_returning_length(self, Popen):
+ proxy = ProxyCommand("hi")
+ written = proxy.send(b"data")
+ Popen.return_value.stdin.write.assert_called_once_with(b"data")
+ assert written == len(b"data")
+
+ @patch("paramiko.proxy.subprocess.Popen")
+ def test_send_raises_ProxyCommandFailure_on_error(self, Popen):
+ Popen.return_value.stdin.write.side_effect = IOError(0, "whoops")
+ with raises(ProxyCommandFailure) as info:
+ ProxyCommand("hi").send("data")
+ assert info.value.command == "hi"
+ assert info.value.error == "whoops"
+
+ @patch("paramiko.proxy.subprocess.Popen")
+ @patch("paramiko.proxy.os.read")
+ @patch("paramiko.proxy.select")
+ def test_recv_reads_from_process_stdout_returning_bytes(
+ self, select, os_read, Popen
+ ):
+ stdout = Popen.return_value.stdout
+ select.return_value = [stdout], None, None
+ fileno = stdout.fileno.return_value
+ # Intentionally returning <5 at a time sometimes
+ os_read.side_effect = [b"was", b"te", b"of ti", b"me"]
+ proxy = ProxyCommand("hi")
+ data = proxy.recv(5)
+ assert data == b"waste"
+ assert [x[0] for x in os_read.call_args_list] == [
+ (fileno, 5),
+ (fileno, 2),
+ ]
+
+ @patch("paramiko.proxy.subprocess.Popen")
+ @patch("paramiko.proxy.os.read")
+ @patch("paramiko.proxy.select")
+ def test_recv_returns_buffer_on_timeout_if_any_read(
+ self, select, os_read, Popen
+ ):
+ stdout = Popen.return_value.stdout
+ select.return_value = [stdout], None, None
+ fileno = stdout.fileno.return_value
+ os_read.side_effect = [b"was", socket.timeout]
+ proxy = ProxyCommand("hi")
+ data = proxy.recv(5)
+ assert data == b"was" # not b"waste"
+ assert os_read.call_args[0] == (fileno, 2)
+
+ @patch("paramiko.proxy.subprocess.Popen")
+ @patch("paramiko.proxy.os.read")
+ @patch("paramiko.proxy.select")
+ def test_recv_raises_timeout_if_nothing_read(self, select, os_read, Popen):
+ stdout = Popen.return_value.stdout
+ select.return_value = [stdout], None, None
+ fileno = stdout.fileno.return_value
+ os_read.side_effect = socket.timeout
+ proxy = ProxyCommand("hi")
+ with raises(socket.timeout):
+ proxy.recv(5)
+ assert os_read.call_args[0] == (fileno, 5)
+
+ @patch("paramiko.proxy.subprocess.Popen")
+ @patch("paramiko.proxy.os.read")
+ @patch("paramiko.proxy.select")
+ def test_recv_raises_ProxyCommandFailure_on_non_timeout_error(
+ self, select, os_read, Popen
+ ):
+ select.return_value = [Popen.return_value.stdout], None, None
+ os_read.side_effect = IOError(0, "whoops")
+ with raises(ProxyCommandFailure) as info:
+ ProxyCommand("hi").recv(5)
+ assert info.value.command == "hi"
+ assert info.value.error == "whoops"
+
+ @patch("paramiko.proxy.subprocess.Popen")
+ @patch("paramiko.proxy.os.kill")
+ def test_close_kills_subprocess(self, os_kill, Popen):
+ proxy = ProxyCommand("hi")
+ proxy.close()
+ os_kill.assert_called_once_with(Popen.return_value.pid, signal.SIGTERM)
+
+ @patch("paramiko.proxy.subprocess.Popen")
+ def test_closed_exposes_whether_subprocess_has_exited(self, Popen):
+ proxy = ProxyCommand("hi")
+ Popen.return_value.returncode = None
+ assert proxy.closed is False
+ assert proxy._closed is False
+ Popen.return_value.returncode = 0
+ assert proxy.closed is True
+ assert proxy._closed is True
+
+ @patch("paramiko.proxy.time.time")
+ @patch("paramiko.proxy.subprocess.Popen")
+ @patch("paramiko.proxy.os.read")
+ @patch("paramiko.proxy.select")
+ def test_timeout_affects_whether_timeout_is_raised(
+ self, select, os_read, Popen, time
+ ):
+ stdout = Popen.return_value.stdout
+ select.return_value = [stdout], None, None
+ # Base case: None timeout means no timing out
+ os_read.return_value = b"meh"
+ proxy = ProxyCommand("yello")
+ assert proxy.timeout is None
+ # Implicit 'no raise' check
+ assert proxy.recv(3) == b"meh"
+ # Use settimeout to set timeout, and it is honored
+ time.side_effect = [0, 10] # elapsed > 7
+ proxy = ProxyCommand("ohnoz")
+ proxy.settimeout(7)
+ assert proxy.timeout == 7
+ with raises(socket.timeout):
+ proxy.recv(3)
+
+ @patch("paramiko.proxy.subprocess", new=None)
+ @patch("paramiko.proxy.subprocess_import_error", new=ImportError("meh"))
+ def test_raises_subprocess_ImportErrors_at_runtime(self):
+ # Not an ideal test, but I don't know of a non-bad way to fake out
+ # module-time ImportErrors. So we mock the symptoms. Meh!
+ with raises(ImportError) as info:
+ ProxyCommand("hi!!!")
+ assert str(info.value) == "meh"
diff --git a/tests/test_sftp.py b/tests/test_sftp.py
index fdb7c9bc..e4e18e5a 100644
--- a/tests/test_sftp.py
+++ b/tests/test_sftp.py
@@ -26,23 +26,18 @@ do test file operations in (so no existing files will be harmed).
import os
import socket
import sys
-import threading
-import unittest
import warnings
from binascii import hexlify
from tempfile import mkstemp
import pytest
-import paramiko
-import paramiko.util
from paramiko.py3compat import PY2, b, u, StringIO
from paramiko.common import o777, o600, o666, o644
from paramiko.sftp_attr import SFTPAttributes
from .util import needs_builtin
-from .stub_sftp import StubServer, StubSFTPServer
-from .util import _support, slow
+from .util import slow
ARTICLE = """
@@ -74,14 +69,21 @@ decreased compared with chicken.
# Here is how unicode characters are encoded over 1 to 6 bytes in utf-8
-# U-00000000 - U-0000007F: 0xxxxxxx
-# U-00000080 - U-000007FF: 110xxxxx 10xxxxxx
-# U-00000800 - U-0000FFFF: 1110xxxx 10xxxxxx 10xxxxxx
-# U-00010000 - U-001FFFFF: 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx
-# U-00200000 - U-03FFFFFF: 111110xx 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx
-# U-04000000 - U-7FFFFFFF: 1111110x 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx
+# U-00000000 - U-0000007F:
+# 0xxxxxxx
+# U-00000080 - U-000007FF:
+# 110xxxxx 10xxxxxx
+# U-00000800 - U-0000FFFF:
+# 1110xxxx 10xxxxxx 10xxxxxx
+# U-00010000 - U-001FFFFF:
+# 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx
+# U-00200000 - U-03FFFFFF:
+# 111110xx 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx
+# U-04000000 - U-7FFFFFFF:
+# 1111110x 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx
# Note that: hex(int('11000011',2)) == '0xc3'
-# Thus, the following 2-bytes sequence is not valid utf8: "invalid continuation byte"
+# Thus, the following 2-bytes sequence is not valid utf8: "invalid continuation
+# byte"
NON_UTF8_DATA = b"\xC3\xC3"
unicode_folder = u"\u00fcnic\u00f8de" if PY2 else "\u00fcnic\u00f8de"
@@ -90,7 +92,7 @@ utf8_folder = b"/\xc3\xbcnic\xc3\xb8\x64\x65"
@slow
class TestSFTP(object):
- def test_1_file(self, sftp):
+ def test_file(self, sftp):
"""
verify that we can create a file.
"""
@@ -101,7 +103,7 @@ class TestSFTP(object):
f.close()
sftp.remove(sftp.FOLDER + "/test")
- def test_2_close(self, sftp):
+ def test_close(self, sftp):
"""
Verify that SFTP session close() causes a socket error on next action.
"""
@@ -109,7 +111,7 @@ class TestSFTP(object):
with pytest.raises(socket.error, match="Socket is closed"):
sftp.open(sftp.FOLDER + "/test2", "w")
- def test_2_sftp_can_be_used_as_context_manager(self, sftp):
+ def test_sftp_can_be_used_as_context_manager(self, sftp):
"""
verify that the sftp session is closed when exiting the context manager
"""
@@ -118,7 +120,7 @@ class TestSFTP(object):
with pytest.raises(socket.error, match="Socket is closed"):
sftp.open(sftp.FOLDER + "/test2", "w")
- def test_3_write(self, sftp):
+ def test_write(self, sftp):
"""
verify that a file can be created and written, and the size is correct.
"""
@@ -129,7 +131,7 @@ class TestSFTP(object):
finally:
sftp.remove(sftp.FOLDER + "/duck.txt")
- def test_3_sftp_file_can_be_used_as_context_manager(self, sftp):
+ def test_sftp_file_can_be_used_as_context_manager(self, sftp):
"""
verify that an opened file can be used as a context manager
"""
@@ -140,7 +142,7 @@ class TestSFTP(object):
finally:
sftp.remove(sftp.FOLDER + "/duck.txt")
- def test_4_append(self, sftp):
+ def test_append(self, sftp):
"""
verify that a file can be opened for append, and tell() still works.
"""
@@ -158,7 +160,7 @@ class TestSFTP(object):
finally:
sftp.remove(sftp.FOLDER + "/append.txt")
- def test_5_rename(self, sftp):
+ def test_rename(self, sftp):
"""
verify that renaming a file works.
"""
@@ -185,7 +187,7 @@ class TestSFTP(object):
except:
pass
- def test_5a_posix_rename(self, sftp):
+ def testa_posix_rename(self, sftp):
"""Test posix-rename@openssh.com protocol extension."""
try:
# first check that the normal rename works as specified
@@ -214,7 +216,7 @@ class TestSFTP(object):
except:
pass
- def test_6_folder(self, sftp):
+ def test_folder(self, sftp):
"""
create a temporary folder, verify that we can create a file in it, then
remove the folder and verify that we can't create a file in it anymore.
@@ -227,7 +229,7 @@ class TestSFTP(object):
with pytest.raises(IOError, match="No such file"):
sftp.open(sftp.FOLDER + "/subfolder/test")
- def test_7_listdir(self, sftp):
+ def test_listdir(self, sftp):
"""
verify that a folder can be created, a bunch of files can be placed in
it, and those files show up in sftp.listdir.
@@ -248,7 +250,7 @@ class TestSFTP(object):
sftp.remove(sftp.FOLDER + "/fish.txt")
sftp.remove(sftp.FOLDER + "/tertiary.py")
- def test_7_5_listdir_iter(self, sftp):
+ def test_listdir_iter(self, sftp):
"""
listdir_iter version of above test
"""
@@ -268,7 +270,7 @@ class TestSFTP(object):
sftp.remove(sftp.FOLDER + "/fish.txt")
sftp.remove(sftp.FOLDER + "/tertiary.py")
- def test_8_setstat(self, sftp):
+ def test_setstat(self, sftp):
"""
verify that the setstat functions (chown, chmod, utime, truncate) work.
"""
@@ -305,7 +307,7 @@ class TestSFTP(object):
finally:
sftp.remove(sftp.FOLDER + "/special")
- def test_9_fsetstat(self, sftp):
+ def test_fsetstat(self, sftp):
"""
verify that the fsetstat functions (chown, chmod, utime, truncate)
work on open files.
@@ -345,12 +347,12 @@ class TestSFTP(object):
finally:
sftp.remove(sftp.FOLDER + "/special")
- def test_A_readline_seek(self, sftp):
+ def test_readline_seek(self, sftp):
"""
- create a text file and write a bunch of text into it. then count the lines
- in the file, and seek around to retrieve particular lines. this should
- verify that read buffering and 'tell' work well together, and that read
- buffering is reset on 'seek'.
+ create a text file and write a bunch of text into it. then count the
+ lines in the file, and seek around to retrieve particular lines. this
+ should verify that read buffering and 'tell' work well together, and
+ that read buffering is reset on 'seek'.
"""
try:
with sftp.open(sftp.FOLDER + "/duck.txt", "w") as f:
@@ -370,17 +372,14 @@ class TestSFTP(object):
f.seek(pos_list[17], f.SEEK_SET)
assert f.readline()[:4] == "duck"
f.seek(pos_list[10], f.SEEK_SET)
- assert (
- f.readline()
- == "duck types were equally resistant to exogenous insulin compared with chicken.\n"
- )
+ expected = "duck types were equally resistant to exogenous insulin compared with chicken.\n" # noqa
+ assert f.readline() == expected
finally:
sftp.remove(sftp.FOLDER + "/duck.txt")
- def test_B_write_seek(self, sftp):
+ def test_write_seek(self, sftp):
"""
- create a text file, seek back and change part of it, and verify that the
- changes worked.
+ Create a text file, seek back, change it, and verify.
"""
try:
with sftp.open(sftp.FOLDER + "/testing.txt", "w") as f:
@@ -395,7 +394,7 @@ class TestSFTP(object):
finally:
sftp.remove(sftp.FOLDER + "/testing.txt")
- def test_C_symlink(self, sftp):
+ def test_symlink(self, sftp):
"""
create a symlink and then check that lstat doesn't follow it.
"""
@@ -442,7 +441,7 @@ class TestSFTP(object):
except:
pass
- def test_D_flush_seek(self, sftp):
+ def test_flush_seek(self, sftp):
"""
verify that buffered writes are automatically flushed on seek.
"""
@@ -462,7 +461,7 @@ class TestSFTP(object):
except:
pass
- def test_E_realpath(self, sftp):
+ def test_realpath(self, sftp):
"""
test that realpath is returning something non-empty and not an
error.
@@ -473,7 +472,7 @@ class TestSFTP(object):
assert len(f) > 0
assert os.path.join(pwd, sftp.FOLDER) == f
- def test_F_mkdir(self, sftp):
+ def test_mkdir(self, sftp):
"""
verify that mkdir/rmdir work.
"""
@@ -484,7 +483,7 @@ class TestSFTP(object):
with pytest.raises(IOError, match="No such file"):
sftp.rmdir(sftp.FOLDER + "/subfolder")
- def test_G_chdir(self, sftp):
+ def test_chdir(self, sftp):
"""
verify that chdir/getcwd work.
"""
@@ -520,7 +519,7 @@ class TestSFTP(object):
except:
pass
- def test_H_get_put(self, sftp):
+ def test_get_put(self, sftp):
"""
verify that get/put work.
"""
@@ -555,7 +554,7 @@ class TestSFTP(object):
os.unlink(localname)
sftp.unlink(sftp.FOLDER + "/bunny.txt")
- def test_I_check(self, sftp):
+ def test_check(self, sftp):
"""
verify that file.check() works against our own server.
(it's an sftp extension that we support, and may be the only ones who
@@ -577,14 +576,12 @@ class TestSFTP(object):
== u(hexlify(sum)).upper()
)
sum = f.check("md5", 0, 0, 510)
- assert (
- u(hexlify(sum)).upper()
- == "EB3B45B8CD55A0707D99B177544A319F373183D241432BB2157AB9E46358C4AC90370B5CADE5D90336FC1716F90B36D6"
- ) # noqa
+ expected = "EB3B45B8CD55A0707D99B177544A319F373183D241432BB2157AB9E46358C4AC90370B5CADE5D90336FC1716F90B36D6" # noqa
+ assert u(hexlify(sum)).upper() == expected
finally:
sftp.unlink(sftp.FOLDER + "/kitty.txt")
- def test_J_x_flag(self, sftp):
+ def test_x_flag(self, sftp):
"""
verify that the 'x' flag works when opening a file.
"""
@@ -599,7 +596,7 @@ class TestSFTP(object):
finally:
sftp.unlink(sftp.FOLDER + "/unusual.txt")
- def test_K_utf8(self, sftp):
+ def test_utf8(self, sftp):
"""
verify that unicode strings are encoded into utf8 correctly.
"""
@@ -615,7 +612,7 @@ class TestSFTP(object):
self.fail("exception " + str(e))
sftp.unlink(b(sftp.FOLDER) + utf8_folder)
- def test_L_utf8_chdir(self, sftp):
+ def test_utf8_chdir(self, sftp):
sftp.mkdir(sftp.FOLDER + "/" + unicode_folder)
try:
sftp.chdir(sftp.FOLDER + "/" + unicode_folder)
@@ -626,7 +623,7 @@ class TestSFTP(object):
sftp.chdir()
sftp.rmdir(sftp.FOLDER + "/" + unicode_folder)
- def test_M_bad_readv(self, sftp):
+ def test_bad_readv(self, sftp):
"""
verify that readv at the end of the file doesn't essplode.
"""
@@ -642,7 +639,7 @@ class TestSFTP(object):
finally:
sftp.unlink(sftp.FOLDER + "/zero")
- def test_N_put_without_confirm(self, sftp):
+ def test_put_without_confirm(self, sftp):
"""
verify that get/put work without confirmation.
"""
@@ -671,11 +668,11 @@ class TestSFTP(object):
os.unlink(localname)
sftp.unlink(sftp.FOLDER + "/bunny.txt")
- def test_O_getcwd(self, sftp):
+ def test_getcwd(self, sftp):
"""
verify that chdir/getcwd work.
"""
- assert sftp.getcwd() == None
+ assert sftp.getcwd() is None
root = sftp.normalize(".")
if root[-1] != "/":
root += "/"
@@ -690,7 +687,7 @@ class TestSFTP(object):
except:
pass
- def XXX_test_M_seek_append(self, sftp):
+ def test_seek_append(self, sftp):
"""
verify that seek does't affect writes during append.
@@ -728,7 +725,7 @@ class TestSFTP(object):
# appear but they're clearly emitted from subthreads that have no error
# handling. No point running it until that is fixed somehow.
@pytest.mark.skip("Doesn't prove anything right now")
- def test_N_file_with_percent(self, sftp):
+ def test_file_with_percent(self, sftp):
"""
verify that we can create a file with a '%' in the filename.
( it needs to be properly escaped by _log() )
@@ -740,7 +737,7 @@ class TestSFTP(object):
f.close()
sftp.remove(sftp.FOLDER + "/test%file")
- def test_O_non_utf8_data(self, sftp):
+ def test_non_utf8_data(self, sftp):
"""Test write() and read() of non utf8 data"""
try:
with sftp.open("%s/nonutf8data" % sftp.FOLDER, "w") as f:
@@ -770,7 +767,7 @@ class TestSFTP(object):
try:
with sftp.open("%s/write_buffer" % sftp.FOLDER, "wb") as f:
for offset in range(0, len(data), 8):
- f.write(buffer(data, offset, 8))
+ f.write(buffer(data, offset, 8)) # noqa
with sftp.open("%s/write_buffer" % sftp.FOLDER, "rb") as f:
assert f.read() == data
diff --git a/tests/test_sftp_big.py b/tests/test_sftp_big.py
index 9df566e8..fc556faf 100644
--- a/tests/test_sftp_big.py
+++ b/tests/test_sftp_big.py
@@ -23,12 +23,10 @@ a real actual sftp server is contacted, and a new folder is created there to
do test file operations in (so no existing files will be harmed).
"""
-import os
import random
import struct
import sys
import time
-import unittest
from paramiko.common import o660
@@ -37,7 +35,7 @@ from .util import slow
@slow
class TestBigSFTP(object):
- def test_1_lots_of_files(self, sftp):
+ def test_lots_of_files(self, sftp):
"""
create a bunch of files over the same session.
"""
@@ -65,7 +63,7 @@ class TestBigSFTP(object):
except:
pass
- def test_2_big_file(self, sftp):
+ def test_big_file(self, sftp):
"""
write a 1MB file with no buffering.
"""
@@ -96,7 +94,7 @@ class TestBigSFTP(object):
finally:
sftp.remove("%s/hongry.txt" % sftp.FOLDER)
- def test_3_big_file_pipelined(self, sftp):
+ def test_big_file_pipelined(self, sftp):
"""
write a 1MB file, with no linefeeds, using pipelining.
"""
@@ -122,7 +120,8 @@ class TestBigSFTP(object):
file_size = f.stat().st_size
f.prefetch(file_size)
- # read on odd boundaries to make sure the bytes aren't getting scrambled
+ # read on odd boundaries to make sure the bytes aren't getting
+ # scrambled
n = 0
k2blob = kblob + kblob
chunk = 629
@@ -140,7 +139,7 @@ class TestBigSFTP(object):
finally:
sftp.remove("%s/hongry.txt" % sftp.FOLDER)
- def test_4_prefetch_seek(self, sftp):
+ def test_prefetch_seek(self, sftp):
kblob = bytes().join([struct.pack(">H", n) for n in range(512)])
try:
with sftp.open("%s/hongry.txt" % sftp.FOLDER, "wb") as f:
@@ -180,7 +179,7 @@ class TestBigSFTP(object):
finally:
sftp.remove("%s/hongry.txt" % sftp.FOLDER)
- def test_5_readv_seek(self, sftp):
+ def test_readv_seek(self, sftp):
kblob = bytes().join([struct.pack(">H", n) for n in range(512)])
try:
with sftp.open("%s/hongry.txt" % sftp.FOLDER, "wb") as f:
@@ -220,7 +219,7 @@ class TestBigSFTP(object):
finally:
sftp.remove("%s/hongry.txt" % sftp.FOLDER)
- def test_6_lots_of_prefetching(self, sftp):
+ def test_lots_of_prefetching(self, sftp):
"""
prefetch a 1MB file a bunch of times, discarding the file object
without using it, to verify that paramiko doesn't get confused.
@@ -255,7 +254,7 @@ class TestBigSFTP(object):
finally:
sftp.remove("%s/hongry.txt" % sftp.FOLDER)
- def test_7_prefetch_readv(self, sftp):
+ def test_prefetch_readv(self, sftp):
"""
verify that prefetch and readv don't conflict with each other.
"""
@@ -296,7 +295,7 @@ class TestBigSFTP(object):
finally:
sftp.remove("%s/hongry.txt" % sftp.FOLDER)
- def test_8_large_readv(self, sftp):
+ def test_large_readv(self, sftp):
"""
verify that a very large readv is broken up correctly and still
returned as a single blob.
@@ -325,7 +324,7 @@ class TestBigSFTP(object):
finally:
sftp.remove("%s/hongry.txt" % sftp.FOLDER)
- def test_9_big_file_big_buffer(self, sftp):
+ def test_big_file_big_buffer(self, sftp):
"""
write a 1MB file, with no linefeeds, and a big buffer.
"""
@@ -342,7 +341,7 @@ class TestBigSFTP(object):
finally:
sftp.remove("%s/hongry.txt" % sftp.FOLDER)
- def test_A_big_file_renegotiate(self, sftp):
+ def test_big_file_renegotiate(self, sftp):
"""
write a 1MB file, forcing key renegotiation in the middle.
"""
diff --git a/tests/test_ssh_exception.py b/tests/test_ssh_exception.py
index d9e0bd22..1628986a 100644
--- a/tests/test_ssh_exception.py
+++ b/tests/test_ssh_exception.py
@@ -1,7 +1,15 @@
import pickle
import unittest
-from paramiko.ssh_exception import NoValidConnectionsError
+from paramiko import RSAKey
+from paramiko.ssh_exception import (
+ NoValidConnectionsError,
+ BadAuthenticationType,
+ PartialAuthentication,
+ ChannelException,
+ BadHostKeyException,
+ ProxyCommandFailure,
+)
class NoValidConnectionsErrorTest(unittest.TestCase):
@@ -33,3 +41,35 @@ class NoValidConnectionsErrorTest(unittest.TestCase):
)
exp = "Unable to connect to port 22 on 10.0.0.42, 127.0.0.1 or ::1"
assert exp in str(exc)
+
+
+class ExceptionStringDisplayTest(unittest.TestCase):
+ def test_BadAuthenticationType(self):
+ exc = BadAuthenticationType(
+ "Bad authentication type", ["ok", "also-ok"]
+ )
+ expected = "Bad authentication type; allowed types: ['ok', 'also-ok']"
+ assert str(exc) == expected
+
+ def test_PartialAuthentication(self):
+ exc = PartialAuthentication(["ok", "also-ok"])
+ expected = "Partial authentication; allowed types: ['ok', 'also-ok']"
+ assert str(exc) == expected
+
+ def test_BadHostKeyException(self):
+ got_key = RSAKey.generate(2048)
+ wanted_key = RSAKey.generate(2048)
+ exc = BadHostKeyException("myhost", got_key, wanted_key)
+ expected = "Host key for server 'myhost' does not match: got '{}', expected '{}'" # noqa
+ assert str(exc) == expected.format(
+ got_key.get_base64(), wanted_key.get_base64()
+ )
+
+ def test_ProxyCommandFailure(self):
+ exc = ProxyCommandFailure("man squid", 7)
+ expected = 'ProxyCommand("man squid") returned nonzero exit status: 7'
+ assert str(exc) == expected
+
+ def test_ChannelException(self):
+ exc = ChannelException(17, "whatever")
+ assert str(exc) == "ChannelException(17, 'whatever')"
diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py
index b6b50152..92801c20 100644
--- a/tests/test_ssh_gss.py
+++ b/tests/test_ssh_gss.py
@@ -25,11 +25,10 @@ Unit Tests for the GSS-API / SSPI SSHv2 Authentication (gssapi-with-mic)
import socket
import threading
-import unittest
import paramiko
-from .util import _support, needs_gssapi
+from .util import _support, needs_gssapi, KerberosTestCase, update_env
from .test_client import FINGERPRINTS
@@ -61,23 +60,24 @@ class NullServer(paramiko.ServerInterface):
return paramiko.OPEN_SUCCEEDED
def check_channel_exec_request(self, channel, command):
- if command != "yes":
+ if command != b"yes":
return False
return True
@needs_gssapi
-class GSSAuthTest(unittest.TestCase):
+class GSSAuthTest(KerberosTestCase):
def setUp(self):
# TODO: username and targ_name should come from os.environ or whatever
# the approved pytest method is for runtime-configuring test data.
- self.username = "krb5_principal"
- self.hostname = socket.getfqdn("targ_name")
+ self.username = self.realm.user_princ
+ self.hostname = socket.getfqdn(self.realm.hostname)
self.sockl = socket.socket()
- self.sockl.bind(("targ_name", 0))
+ self.sockl.bind((self.realm.hostname, 0))
self.sockl.listen(1)
self.addr, self.port = self.sockl.getsockname()
self.event = threading.Event()
+ update_env(self, self.realm.env)
thread = threading.Thread(target=self._run)
thread.start()
@@ -139,16 +139,16 @@ class GSSAuthTest(unittest.TestCase):
stdout.close()
stderr.close()
- def test_1_gss_auth(self):
+ def test_gss_auth(self):
"""
Verify that Paramiko can handle SSHv2 GSS-API / SSPI authentication
(gssapi-with-mic) in client and server mode.
"""
self._test_connection(allow_agent=False, look_for_keys=False)
- def test_2_auth_trickledown(self):
+ def test_auth_trickledown(self):
"""
- Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding
+ Failed gssapi-with-mic doesn't prevent subsequent key from succeeding
"""
self.hostname = (
"this_host_does_not_exists_and_causes_a_GSSAPI-exception"
diff --git a/tests/test_transport.py b/tests/test_transport.py
index 2b8ee3bc..e2174896 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -28,36 +28,32 @@ import socket
import time
import threading
import random
-from hashlib import sha1
import unittest
from mock import Mock
from paramiko import (
- Transport,
- SecurityOptions,
- ServerInterface,
- RSAKey,
- DSSKey,
- SSHException,
+ AuthHandler,
ChannelException,
+ DSSKey,
Packetizer,
- Channel,
- AuthHandler,
+ RSAKey,
+ SSHException,
+ SecurityOptions,
+ ServerInterface,
+ Transport,
)
from paramiko import AUTH_FAILED, AUTH_SUCCESSFUL
from paramiko import OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
from paramiko.common import (
- MSG_KEXINIT,
- cMSG_CHANNEL_WINDOW_ADJUST,
- cMSG_UNIMPLEMENTED,
+ DEFAULT_MAX_PACKET_SIZE,
+ DEFAULT_WINDOW_SIZE,
+ MAX_WINDOW_SIZE,
MIN_PACKET_SIZE,
MIN_WINDOW_SIZE,
- MAX_WINDOW_SIZE,
- DEFAULT_WINDOW_SIZE,
- DEFAULT_MAX_PACKET_SIZE,
- MSG_NAMES,
- MSG_UNIMPLEMENTED,
+ MSG_KEXINIT,
MSG_USERAUTH_SUCCESS,
+ cMSG_CHANNEL_WINDOW_ADJUST,
+ cMSG_UNIMPLEMENTED,
)
from paramiko.py3compat import bytes, byte_chr
from paramiko.message import Message
@@ -185,7 +181,7 @@ class TransportTest(unittest.TestCase):
self.assertTrue(event.is_set())
self.assertTrue(self.ts.is_active())
- def test_1_security_options(self):
+ def test_security_options(self):
o = self.tc.get_security_options()
self.assertEqual(type(o), SecurityOptions)
self.assertTrue(("aes256-cbc", "blowfish-cbc") != o.ciphers)
@@ -202,7 +198,7 @@ class TransportTest(unittest.TestCase):
except TypeError:
pass
- def test_1b_security_options_reset(self):
+ def testb_security_options_reset(self):
o = self.tc.get_security_options()
# should not throw any exceptions
o.ciphers = o.ciphers
@@ -211,17 +207,17 @@ class TransportTest(unittest.TestCase):
o.kex = o.kex
o.compression = o.compression
- def test_2_compute_key(self):
- self.tc.K = 123281095979686581523377256114209720774539068973101330872763622971399429481072519713536292772709507296759612401802191955568143056534122385270077606457721553469730659233569339356140085284052436697480759510519672848743794433460113118986816826624865291116513647975790797391795651716378444844877749505443714557929
- self.tc.H = b"\x0C\x83\x07\xCD\xE6\x85\x6F\xF3\x0B\xA9\x36\x84\xEB\x0F\x04\xC2\x52\x0E\x9E\xD3"
+ def test_compute_key(self):
+ self.tc.K = 123281095979686581523377256114209720774539068973101330872763622971399429481072519713536292772709507296759612401802191955568143056534122385270077606457721553469730659233569339356140085284052436697480759510519672848743794433460113118986816826624865291116513647975790797391795651716378444844877749505443714557929 # noqa
+ self.tc.H = b"\x0C\x83\x07\xCD\xE6\x85\x6F\xF3\x0B\xA9\x36\x84\xEB\x0F\x04\xC2\x52\x0E\x9E\xD3" # noqa
self.tc.session_id = self.tc.H
key = self.tc._compute_key("C", 32)
self.assertEqual(
- b"207E66594CA87C44ECCBA3B3CD39FDDB378E6FDB0F97C54B2AA0CFBF900CD995",
+ b"207E66594CA87C44ECCBA3B3CD39FDDB378E6FDB0F97C54B2AA0CFBF900CD995", # noqa
hexlify(key).upper(),
)
- def test_3_simple(self):
+ def test_simple(self):
"""
verify that we can establish an ssh link with ourselves across the
loopback sockets. this is hardly "simple" but it's simpler than the
@@ -249,7 +245,7 @@ class TransportTest(unittest.TestCase):
self.assertEqual(True, self.tc.is_authenticated())
self.assertEqual(True, self.ts.is_authenticated())
- def test_3a_long_banner(self):
+ def testa_long_banner(self):
"""
verify that a long banner doesn't mess up the handshake.
"""
@@ -268,7 +264,7 @@ class TransportTest(unittest.TestCase):
self.assertTrue(event.is_set())
self.assertTrue(self.ts.is_active())
- def test_4_special(self):
+ def test_special(self):
"""
verify that the client can demand odd handshake settings, and can
renegotiate keys in mid-stream.
@@ -289,7 +285,7 @@ class TransportTest(unittest.TestCase):
self.ts.send_ignore(1024)
@slow
- def test_5_keepalive(self):
+ def test_keepalive(self):
"""
verify that the keepalive will be sent.
"""
@@ -299,7 +295,7 @@ class TransportTest(unittest.TestCase):
time.sleep(2)
self.assertEqual("keepalive@lag.net", self.server._global_request)
- def test_6_exec_command(self):
+ def test_exec_command(self):
"""
verify that exec_command() does something reasonable.
"""
@@ -343,7 +339,7 @@ class TransportTest(unittest.TestCase):
self.assertEqual("This is on stderr.\n", f.readline())
self.assertEqual("", f.readline())
- def test_6a_channel_can_be_used_as_context_manager(self):
+ def testa_channel_can_be_used_as_context_manager(self):
"""
verify that exec_command() does something reasonable.
"""
@@ -359,7 +355,7 @@ class TransportTest(unittest.TestCase):
self.assertEqual("Hello there.\n", f.readline())
self.assertEqual("", f.readline())
- def test_7_invoke_shell(self):
+ def test_invoke_shell(self):
"""
verify that invoke_shell() does something reasonable.
"""
@@ -373,18 +369,18 @@ class TransportTest(unittest.TestCase):
chan.close()
self.assertEqual("", f.readline())
- def test_8_channel_exception(self):
+ def test_channel_exception(self):
"""
verify that ChannelException is thrown for a bad open-channel request.
"""
self.setup_test_server()
try:
- chan = self.tc.open_channel("bogus")
+ self.tc.open_channel("bogus")
self.fail("expected exception")
except ChannelException as e:
self.assertTrue(e.code == OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED)
- def test_9_exit_status(self):
+ def test_exit_status(self):
"""
verify that get_exit_status() works.
"""
@@ -413,7 +409,7 @@ class TransportTest(unittest.TestCase):
self.assertEqual(23, chan.recv_exit_status())
chan.close()
- def test_A_select(self):
+ def test_select(self):
"""
verify that select() on a channel works.
"""
@@ -468,7 +464,7 @@ class TransportTest(unittest.TestCase):
# ...and now is closed.
self.assertEqual(True, p._closed)
- def test_B_renegotiate(self):
+ def test_renegotiate(self):
"""
verify that a transport can correctly renegotiate mid-stream.
"""
@@ -492,7 +488,7 @@ class TransportTest(unittest.TestCase):
schan.close()
- def test_C_compression(self):
+ def test_compression(self):
"""
verify that zlib compression is basically working.
"""
@@ -510,14 +506,15 @@ class TransportTest(unittest.TestCase):
bytes2 = self.tc.packetizer._Packetizer__sent_bytes
block_size = self.tc._cipher_info[self.tc.local_cipher]["block-size"]
mac_size = self.tc._mac_info[self.tc.local_mac]["size"]
- # tests show this is actually compressed to *52 bytes*! including packet overhead! nice!! :)
+ # tests show this is actually compressed to *52 bytes*! including
+ # packet overhead! nice!! :)
self.assertTrue(bytes2 - bytes < 1024)
self.assertEqual(16 + block_size + mac_size, bytes2 - bytes)
chan.close()
schan.close()
- def test_D_x11(self):
+ def test_x11(self):
"""
verify that an x11 port can be requested and opened.
"""
@@ -555,7 +552,7 @@ class TransportTest(unittest.TestCase):
chan.close()
schan.close()
- def test_E_reverse_port_forwarding(self):
+ def test_reverse_port_forwarding(self):
"""
verify that a client can ask the server to open a reverse port for
forwarding.
@@ -563,7 +560,7 @@ class TransportTest(unittest.TestCase):
self.setup_test_server()
chan = self.tc.open_session()
chan.exec_command("yes")
- schan = self.ts.accept(1.0)
+ self.ts.accept(1.0)
requested = []
@@ -594,7 +591,7 @@ class TransportTest(unittest.TestCase):
self.tc.cancel_port_forward("127.0.0.1", port)
self.assertTrue(self.server._listen is None)
- def test_F_port_forwarding(self):
+ def test_port_forwarding(self):
"""
verify that a client can forward new connections from a locally-
forwarded port.
@@ -602,7 +599,7 @@ class TransportTest(unittest.TestCase):
self.setup_test_server()
chan = self.tc.open_session()
chan.exec_command("yes")
- schan = self.ts.accept(1.0)
+ self.ts.accept(1.0)
# open a port on the "server" that the client will ask to forward to.
greeting_server = socket.socket()
@@ -626,7 +623,7 @@ class TransportTest(unittest.TestCase):
self.assertEqual(b"Hello!\n", cs.recv(7))
cs.close()
- def test_G_stderr_select(self):
+ def test_stderr_select(self):
"""
verify that select() on a channel works even if only stderr is
receiving data.
@@ -665,7 +662,7 @@ class TransportTest(unittest.TestCase):
schan.close()
chan.close()
- def test_H_send_ready(self):
+ def test_send_ready(self):
"""
verify that send_ready() indicates when a send would not block.
"""
@@ -689,9 +686,10 @@ class TransportTest(unittest.TestCase):
chan.close()
self.assertEqual(chan.send_ready(), True)
- def test_I_rekey_deadlock(self):
+ def test_rekey_deadlock(self):
"""
- Regression test for deadlock when in-transit messages are received after MSG_KEXINIT is sent
+ Regression test for deadlock when in-transit messages are received
+ after MSG_KEXINIT is sent
Note: When this test fails, it may leak threads.
"""
@@ -714,12 +712,15 @@ class TransportTest(unittest.TestCase):
# MSG_KEXINIT to the remote host.
#
# On the remote host (using any SSH implementation):
- # 5. The MSG_CHANNEL_DATA is received, and MSG_CHANNEL_WINDOW_ADJUST is sent.
- # 6. The MSG_KEXINIT is received, and a corresponding MSG_KEXINIT is sent.
+ # 5. The MSG_CHANNEL_DATA is received, and MSG_CHANNEL_WINDOW_ADJUST
+ # is sent.
+ # 6. The MSG_KEXINIT is received, and a corresponding MSG_KEXINIT is
+ # sent.
#
# In the main thread:
# 7. The user's program calls Channel.send().
- # 8. Channel.send acquires Channel.lock, then calls Transport._send_user_message().
+ # 8. Channel.send acquires Channel.lock, then calls
+ # Transport._send_user_message().
# 9. Transport._send_user_message waits for Transport.clear_to_send
# to be set (i.e., it waits for re-keying to complete).
# Channel.lock is still held.
@@ -854,7 +855,7 @@ class TransportTest(unittest.TestCase):
schan.close()
chan.close()
- def test_J_sanitze_packet_size(self):
+ def test_sanitze_packet_size(self):
"""
verify that we conform to the rfc of packet and window sizes.
"""
@@ -865,7 +866,7 @@ class TransportTest(unittest.TestCase):
]:
self.assertEqual(self.tc._sanitize_packet_size(val), correct)
- def test_K_sanitze_window_size(self):
+ def test_sanitze_window_size(self):
"""
verify that we conform to the rfc of packet and window sizes.
"""
@@ -877,7 +878,7 @@ class TransportTest(unittest.TestCase):
self.assertEqual(self.tc._sanitize_window_size(val), correct)
@slow
- def test_L_handshake_timeout(self):
+ def test_handshake_timeout(self):
"""
verify that we can get a hanshake timeout.
"""
@@ -914,7 +915,7 @@ class TransportTest(unittest.TestCase):
password="pygmalion",
)
- def test_M_select_after_close(self):
+ def test_select_after_close(self):
"""
verify that select works when a channel is already closed.
"""
@@ -969,11 +970,11 @@ class TransportTest(unittest.TestCase):
# send() accepts buffer instances
sent = 0
while sent < len(data):
- sent += chan.send(buffer(data, sent, 8))
+ sent += chan.send(buffer(data, sent, 8)) # noqa
self.assertEqual(sfile.read(len(data)), data)
# sendall() accepts a buffer instance
- chan.sendall(buffer(data))
+ chan.sendall(buffer(data)) # noqa
self.assertEqual(sfile.read(len(data)), data)
@needs_builtin("memoryview")
@@ -1101,3 +1102,70 @@ class TransportTest(unittest.TestCase):
assert not self.ts.auth_handler.authenticated
# Real fix's behavior
self._expect_unimplemented()
+
+
+class AlgorithmDisablingTests(unittest.TestCase):
+ def test_preferred_lists_default_to_private_attribute_contents(self):
+ t = Transport(sock=Mock())
+ assert t.preferred_ciphers == t._preferred_ciphers
+ assert t.preferred_macs == t._preferred_macs
+ assert t.preferred_keys == t._preferred_keys
+ assert t.preferred_kex == t._preferred_kex
+
+ def test_preferred_lists_filter_disabled_algorithms(self):
+ t = Transport(
+ sock=Mock(),
+ disabled_algorithms={
+ "ciphers": ["aes128-cbc"],
+ "macs": ["hmac-md5"],
+ "keys": ["ssh-dss"],
+ "kex": ["diffie-hellman-group14-sha256"],
+ },
+ )
+ assert "aes128-cbc" in t._preferred_ciphers
+ assert "aes128-cbc" not in t.preferred_ciphers
+ assert "hmac-md5" in t._preferred_macs
+ assert "hmac-md5" not in t.preferred_macs
+ assert "ssh-dss" in t._preferred_keys
+ assert "ssh-dss" not in t.preferred_keys
+ assert "diffie-hellman-group14-sha256" in t._preferred_kex
+ assert "diffie-hellman-group14-sha256" not in t.preferred_kex
+
+ def test_implementation_refers_to_public_algo_lists(self):
+ t = Transport(
+ sock=Mock(),
+ disabled_algorithms={
+ "ciphers": ["aes128-cbc"],
+ "macs": ["hmac-md5"],
+ "keys": ["ssh-dss"],
+ "kex": ["diffie-hellman-group14-sha256"],
+ "compression": ["zlib"],
+ },
+ )
+ # Enable compression cuz otherwise disabling one option for it makes no
+ # sense...
+ t.use_compression(True)
+ # Effectively a random spot check, but kex init touches most/all of the
+ # algorithm lists so it's a good spot.
+ t._send_message = Mock()
+ t._send_kex_init()
+ # Cribbed from Transport._parse_kex_init, which didn't feel worth
+ # refactoring given all the vars involved :(
+ m = t._send_message.call_args[0][0]
+ m.rewind()
+ m.get_byte() # the msg type
+ m.get_bytes(16) # cookie, discarded
+ kexen = m.get_list()
+ server_keys = m.get_list()
+ ciphers = m.get_list()
+ m.get_list()
+ macs = m.get_list()
+ m.get_list()
+ compressions = m.get_list()
+ # OK, now we can actually check that our disabled algos were not
+ # included (as this message includes the full lists)
+ assert "aes128-cbc" not in ciphers
+ assert "hmac-md5" not in macs
+ assert "ssh-dss" not in server_keys
+ assert "diffie-hellman-group14-sha256" not in kexen
+ assert "zlib" not in compressions
diff --git a/tests/test_util.py b/tests/test_util.py
index 705baa14..8ce260d1 100644
--- a/tests/test_util.py
+++ b/tests/test_util.py
@@ -26,32 +26,12 @@ import os
from hashlib import sha1
import unittest
+import paramiko
import paramiko.util
-from paramiko.util import lookup_ssh_host_config as host_config, safe_string
-from paramiko.py3compat import StringIO, byte_ord, b
+from paramiko.util import safe_string
+from paramiko.py3compat import byte_ord
-# Note some lines in this configuration have trailing spaces on purpose
-test_config_file = """\
-Host *
- User robey
- IdentityFile =~/.ssh/id_rsa
-
-# comment
-Host *.example.com
- \tUser bjork
-Port=3333
-Host *
-"""
-
-dont_strip_whitespace_please = "\t \t Crazy something dumb "
-
-test_config_file += dont_strip_whitespace_please
-test_config_file += """
-Host spoo.example.com
-Crazy something else
-"""
-
test_hosts_file = """\
secure.example.com ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA1PD6U2/TVxET6lkpKhOk5r\
9q/kAYG6sP9f5zuUYP8i7FOFp/6ncCEbbtg/lB+A3iidyxoSWl+9jtoyyDOOVX4UIDV9G11Ml8om3\
@@ -62,155 +42,75 @@ BGQ3GQ/Fc7SX6gkpXkwcZryoi4kNFhHu5LvHcZPdxXV1D+uTMfGS1eyd2Yz/DoNWXNAl8TI0cAsW\
"""
-# for test 1:
-from paramiko import *
-
-
class UtilTest(unittest.TestCase):
- def test_import(self):
+ def test_imports(self):
"""
verify that all the classes can be imported from paramiko.
"""
- symbols = list(globals().keys())
- self.assertTrue("Transport" in symbols)
- self.assertTrue("SSHClient" in symbols)
- self.assertTrue("MissingHostKeyPolicy" in symbols)
- self.assertTrue("AutoAddPolicy" in symbols)
- self.assertTrue("RejectPolicy" in symbols)
- self.assertTrue("WarningPolicy" in symbols)
- self.assertTrue("SecurityOptions" in symbols)
- self.assertTrue("SubsystemHandler" in symbols)
- self.assertTrue("Channel" in symbols)
- self.assertTrue("RSAKey" in symbols)
- self.assertTrue("DSSKey" in symbols)
- self.assertTrue("Message" in symbols)
- self.assertTrue("SSHException" in symbols)
- self.assertTrue("AuthenticationException" in symbols)
- self.assertTrue("PasswordRequiredException" in symbols)
- self.assertTrue("BadAuthenticationType" in symbols)
- self.assertTrue("ChannelException" in symbols)
- self.assertTrue("SFTP" in symbols)
- self.assertTrue("SFTPFile" in symbols)
- self.assertTrue("SFTPHandle" in symbols)
- self.assertTrue("SFTPClient" in symbols)
- self.assertTrue("SFTPServer" in symbols)
- self.assertTrue("SFTPError" in symbols)
- self.assertTrue("SFTPAttributes" in symbols)
- self.assertTrue("SFTPServerInterface" in symbols)
- self.assertTrue("ServerInterface" in symbols)
- self.assertTrue("BufferedFile" in symbols)
- self.assertTrue("Agent" in symbols)
- self.assertTrue("AgentKey" in symbols)
- self.assertTrue("HostKeys" in symbols)
- self.assertTrue("SSHConfig" in symbols)
- self.assertTrue("util" in symbols)
-
- def test_parse_config(self):
- global test_config_file
- f = StringIO(test_config_file)
- config = paramiko.util.parse_ssh_config(f)
- self.assertEqual(
- config._config,
- [
- {"host": ["*"], "config": {}},
- {
- "host": ["*"],
- "config": {
- "identityfile": ["~/.ssh/id_rsa"],
- "user": "robey",
- },
- },
- {
- "host": ["*.example.com"],
- "config": {"user": "bjork", "port": "3333"},
- },
- {"host": ["*"], "config": {"crazy": "something dumb"}},
- {
- "host": ["spoo.example.com"],
- "config": {"crazy": "something else"},
- },
- ],
- )
-
- def test_host_config(self):
- global test_config_file
- f = StringIO(test_config_file)
- config = paramiko.util.parse_ssh_config(f)
-
- for host, values in {
- "irc.danger.com": {
- "crazy": "something dumb",
- "hostname": "irc.danger.com",
- "user": "robey",
- },
- "irc.example.com": {
- "crazy": "something dumb",
- "hostname": "irc.example.com",
- "user": "robey",
- "port": "3333",
- },
- "spoo.example.com": {
- "crazy": "something dumb",
- "hostname": "spoo.example.com",
- "user": "robey",
- "port": "3333",
- },
- }.items():
- values = dict(
- values,
- hostname=host,
- identityfile=[os.path.expanduser("~/.ssh/id_rsa")],
- )
- self.assertEqual(
- paramiko.util.lookup_ssh_host_config(host, config), values
- )
+ for name in (
+ "Agent",
+ "AgentKey",
+ "AuthenticationException",
+ "AutoAddPolicy",
+ "BadAuthenticationType",
+ "BufferedFile",
+ "Channel",
+ "ChannelException",
+ "ConfigParseError",
+ "CouldNotCanonicalize",
+ "DSSKey",
+ "HostKeys",
+ "Message",
+ "MissingHostKeyPolicy",
+ "PasswordRequiredException",
+ "RSAKey",
+ "RejectPolicy",
+ "SFTP",
+ "SFTPAttributes",
+ "SFTPClient",
+ "SFTPError",
+ "SFTPFile",
+ "SFTPHandle",
+ "SFTPServer",
+ "SFTPServerInterface",
+ "SSHClient",
+ "SSHConfig",
+ "SSHConfigDict",
+ "SSHException",
+ "SecurityOptions",
+ "ServerInterface",
+ "SubsystemHandler",
+ "Transport",
+ "WarningPolicy",
+ "util",
+ ):
+ assert name in paramiko.__all__
def test_generate_key_bytes(self):
x = paramiko.util.generate_key_bytes(
sha1, b"ABCDEFGH", "This is my secret passphrase.", 64
)
hex = "".join(["%02x" % byte_ord(c) for c in x])
- self.assertEqual(
- hex,
- "9110e2f6793b69363e58173e9436b13a5a4b339005741d5c680e505f57d871347b4239f14fb5c46e857d5e100424873ba849ac699cea98d729e57b3e84378e8b",
- )
+ hexpected = "9110e2f6793b69363e58173e9436b13a5a4b339005741d5c680e505f57d871347b4239f14fb5c46e857d5e100424873ba849ac699cea98d729e57b3e84378e8b" # noqa
+ assert hex == hexpected
def test_host_keys(self):
with open("hostfile.temp", "w") as f:
f.write(test_hosts_file)
try:
hostdict = paramiko.util.load_host_keys("hostfile.temp")
- self.assertEqual(2, len(hostdict))
- self.assertEqual(1, len(list(hostdict.values())[0]))
- self.assertEqual(1, len(list(hostdict.values())[1]))
+ assert 2 == len(hostdict)
+ assert 1 == len(list(hostdict.values())[0])
+ assert 1 == len(list(hostdict.values())[1])
fp = hexlify(
hostdict["secure.example.com"]["ssh-rsa"].get_fingerprint()
).upper()
- self.assertEqual(b"E6684DB30E109B67B70FF1DC5C7F1363", fp)
+ assert b"E6684DB30E109B67B70FF1DC5C7F1363" == fp
finally:
os.unlink("hostfile.temp")
- def test_host_config_expose_issue_33(self):
- test_config_file = """
-Host www13.*
- Port 22
-
-Host *.example.com
- Port 2222
-
-Host *
- Port 3333
- """
- f = StringIO(test_config_file)
- config = paramiko.util.parse_ssh_config(f)
- host = "www13.example.com"
- self.assertEqual(
- paramiko.util.lookup_ssh_host_config(host, config),
- {"hostname": host, "port": "22"},
- )
-
def test_eintr_retry(self):
- self.assertEqual("foo", paramiko.util.retry_on_signal(lambda: "foo"))
+ assert "foo" == paramiko.util.retry_on_signal(lambda: "foo")
# Variables that are set by raises_intr
intr_errors_remaining = [3]
@@ -223,8 +123,8 @@ Host *
raise IOError(errno.EINTR, "file", "interrupted system call")
self.assertTrue(paramiko.util.retry_on_signal(raises_intr) is None)
- self.assertEqual(0, intr_errors_remaining[0])
- self.assertEqual(4, call_count[0])
+ assert 0 == intr_errors_remaining[0]
+ assert 4 == call_count[0]
def raises_ioerror_not_eintr():
raise IOError(errno.ENOENT, "file", "file not found")
@@ -242,273 +142,10 @@ Host *
lambda: paramiko.util.retry_on_signal(raises_other_exception),
)
- def test_proxycommand_config_equals_parsing(self):
- """
- ProxyCommand should not split on equals signs within the value.
- """
- conf = """
-Host space-delimited
- ProxyCommand foo bar=biz baz
-
-Host equals-delimited
- ProxyCommand=foo bar=biz baz
-"""
- f = StringIO(conf)
- config = paramiko.util.parse_ssh_config(f)
- for host in ("space-delimited", "equals-delimited"):
- self.assertEqual(
- host_config(host, config)["proxycommand"], "foo bar=biz baz"
- )
-
- def test_proxycommand_interpolation(self):
- """
- ProxyCommand should perform interpolation on the value
- """
- config = paramiko.util.parse_ssh_config(
- StringIO(
- """
-Host specific
- Port 37
- ProxyCommand host %h port %p lol
-
-Host portonly
- Port 155
-
-Host *
- Port 25
- ProxyCommand host %h port %p
-"""
- )
- )
- for host, val in (
- ("foo.com", "host foo.com port 25"),
- ("specific", "host specific port 37 lol"),
- ("portonly", "host portonly port 155"),
- ):
- self.assertEqual(host_config(host, config)["proxycommand"], val)
-
- def test_proxycommand_tilde_expansion(self):
- """
- Tilde (~) should be expanded inside ProxyCommand
- """
- config = paramiko.util.parse_ssh_config(
- StringIO(
- """
-Host test
- ProxyCommand ssh -F ~/.ssh/test_config bastion nc %h %p
-"""
- )
- )
- self.assertEqual(
- "ssh -F %s/.ssh/test_config bastion nc test 22"
- % os.path.expanduser("~"),
- host_config("test", config)["proxycommand"],
- )
-
- def test_host_config_test_negation(self):
- test_config_file = """
-Host www13.* !*.example.com
- Port 22
-
-Host *.example.com !www13.*
- Port 2222
-
-Host www13.*
- Port 8080
-
-Host *
- Port 3333
- """
- f = StringIO(test_config_file)
- config = paramiko.util.parse_ssh_config(f)
- host = "www13.example.com"
- self.assertEqual(
- paramiko.util.lookup_ssh_host_config(host, config),
- {"hostname": host, "port": "8080"},
- )
-
- def test_host_config_test_proxycommand(self):
- test_config_file = """
-Host proxy-with-equal-divisor-and-space
-ProxyCommand = foo=bar
-
-Host proxy-with-equal-divisor-and-no-space
-ProxyCommand=foo=bar
-
-Host proxy-without-equal-divisor
-ProxyCommand foo=bar:%h-%p
- """
- for host, values in {
- "proxy-with-equal-divisor-and-space": {
- "hostname": "proxy-with-equal-divisor-and-space",
- "proxycommand": "foo=bar",
- },
- "proxy-with-equal-divisor-and-no-space": {
- "hostname": "proxy-with-equal-divisor-and-no-space",
- "proxycommand": "foo=bar",
- },
- "proxy-without-equal-divisor": {
- "hostname": "proxy-without-equal-divisor",
- "proxycommand": "foo=bar:proxy-without-equal-divisor-22",
- },
- }.items():
-
- f = StringIO(test_config_file)
- config = paramiko.util.parse_ssh_config(f)
- self.assertEqual(
- paramiko.util.lookup_ssh_host_config(host, config), values
- )
-
- def test_host_config_test_identityfile(self):
- test_config_file = """
-
-IdentityFile id_dsa0
-
-Host *
-IdentityFile id_dsa1
-
-Host dsa2
-IdentityFile id_dsa2
-
-Host dsa2*
-IdentityFile id_dsa22
- """
- for host, values in {
- "foo": {"hostname": "foo", "identityfile": ["id_dsa0", "id_dsa1"]},
- "dsa2": {
- "hostname": "dsa2",
- "identityfile": ["id_dsa0", "id_dsa1", "id_dsa2", "id_dsa22"],
- },
- "dsa22": {
- "hostname": "dsa22",
- "identityfile": ["id_dsa0", "id_dsa1", "id_dsa22"],
- },
- }.items():
-
- f = StringIO(test_config_file)
- config = paramiko.util.parse_ssh_config(f)
- self.assertEqual(
- paramiko.util.lookup_ssh_host_config(host, config), values
- )
-
- def test_config_addressfamily_and_lazy_fqdn(self):
- """
- Ensure the code path honoring non-'all' AddressFamily doesn't asplode
- """
- test_config = """
-AddressFamily inet
-IdentityFile something_%l_using_fqdn
-"""
- config = paramiko.util.parse_ssh_config(StringIO(test_config))
- assert config.lookup(
- "meh"
- ) # will die during lookup() if bug regresses
-
def test_clamp_value(self):
- self.assertEqual(32768, paramiko.util.clamp_value(32767, 32768, 32769))
- self.assertEqual(32767, paramiko.util.clamp_value(32767, 32765, 32769))
- self.assertEqual(32769, paramiko.util.clamp_value(32767, 32770, 32769))
-
- def test_config_dos_crlf_succeeds(self):
- config_file = StringIO("host abcqwerty\r\nHostName 127.0.0.1\r\n")
- config = paramiko.SSHConfig()
- config.parse(config_file)
- self.assertEqual(config.lookup("abcqwerty")["hostname"], "127.0.0.1")
-
- def test_get_hostnames(self):
- f = StringIO(test_config_file)
- config = paramiko.util.parse_ssh_config(f)
- self.assertEqual(
- config.get_hostnames(), {"*", "*.example.com", "spoo.example.com"}
- )
-
- def test_quoted_host_names(self):
- test_config_file = """\
-Host "param pam" param "pam"
- Port 1111
-
-Host "param2"
- Port 2222
-
-Host param3 parara
- Port 3333
-
-Host param4 "p a r" "p" "par" para
- Port 4444
-"""
- res = {
- "param pam": {"hostname": "param pam", "port": "1111"},
- "param": {"hostname": "param", "port": "1111"},
- "pam": {"hostname": "pam", "port": "1111"},
- "param2": {"hostname": "param2", "port": "2222"},
- "param3": {"hostname": "param3", "port": "3333"},
- "parara": {"hostname": "parara", "port": "3333"},
- "param4": {"hostname": "param4", "port": "4444"},
- "p a r": {"hostname": "p a r", "port": "4444"},
- "p": {"hostname": "p", "port": "4444"},
- "par": {"hostname": "par", "port": "4444"},
- "para": {"hostname": "para", "port": "4444"},
- }
- f = StringIO(test_config_file)
- config = paramiko.util.parse_ssh_config(f)
- for host, values in res.items():
- self.assertEquals(
- paramiko.util.lookup_ssh_host_config(host, config), values
- )
-
- def test_quoted_params_in_config(self):
- test_config_file = """\
-Host "param pam" param "pam"
- IdentityFile id_rsa
-
-Host "param2"
- IdentityFile "test rsa key"
-
-Host param3 parara
- IdentityFile id_rsa
- IdentityFile "test rsa key"
-"""
- res = {
- "param pam": {"hostname": "param pam", "identityfile": ["id_rsa"]},
- "param": {"hostname": "param", "identityfile": ["id_rsa"]},
- "pam": {"hostname": "pam", "identityfile": ["id_rsa"]},
- "param2": {"hostname": "param2", "identityfile": ["test rsa key"]},
- "param3": {
- "hostname": "param3",
- "identityfile": ["id_rsa", "test rsa key"],
- },
- "parara": {
- "hostname": "parara",
- "identityfile": ["id_rsa", "test rsa key"],
- },
- }
- f = StringIO(test_config_file)
- config = paramiko.util.parse_ssh_config(f)
- for host, values in res.items():
- self.assertEquals(
- paramiko.util.lookup_ssh_host_config(host, config), values
- )
-
- def test_quoted_host_in_config(self):
- conf = SSHConfig()
- correct_data = {
- "param": ["param"],
- '"param"': ["param"],
- "param pam": ["param", "pam"],
- '"param" "pam"': ["param", "pam"],
- '"param" pam': ["param", "pam"],
- 'param "pam"': ["param", "pam"],
- 'param "pam" p': ["param", "pam", "p"],
- '"param" pam "p"': ["param", "pam", "p"],
- '"pa ram"': ["pa ram"],
- '"pa ram" pam': ["pa ram", "pam"],
- 'param "p a m"': ["param", "p a m"],
- }
- incorrect_data = ['param"', '"param', 'param "pam', 'param "pam" "p a']
- for host, values in correct_data.items():
- self.assertEquals(conf._get_hosts(host), values)
- for host in incorrect_data:
- self.assertRaises(Exception, conf._get_hosts, host)
+ assert 32768 == paramiko.util.clamp_value(32767, 32768, 32769)
+ assert 32767 == paramiko.util.clamp_value(32767, 32765, 32769)
+ assert 32769 == paramiko.util.clamp_value(32767, 32770, 32769)
def test_safe_string(self):
vanilla = b"vanilla"
@@ -521,54 +158,3 @@ Host param3 parara
assert safe_vanilla == vanilla, msg
msg = err.format(safe_has_bytes, expected_bytes)
assert safe_has_bytes == expected_bytes, msg
-
- def test_proxycommand_none_issue_418(self):
- test_config_file = """
-Host proxycommand-standard-none
- ProxyCommand None
-
-Host proxycommand-with-equals-none
- ProxyCommand=None
- """
- for host, values in {
- "proxycommand-standard-none": {
- "hostname": "proxycommand-standard-none"
- },
- "proxycommand-with-equals-none": {
- "hostname": "proxycommand-with-equals-none"
- },
- }.items():
-
- f = StringIO(test_config_file)
- config = paramiko.util.parse_ssh_config(f)
- self.assertEqual(
- paramiko.util.lookup_ssh_host_config(host, config), values
- )
-
- def test_proxycommand_none_masking(self):
- # Re: https://github.com/paramiko/paramiko/issues/670
- source_config = """
-Host specific-host
- ProxyCommand none
-
-Host other-host
- ProxyCommand other-proxy
-
-Host *
- ProxyCommand default-proxy
-"""
- config = paramiko.SSHConfig()
- config.parse(StringIO(source_config))
- # When bug is present, the full stripping-out of specific-host's
- # ProxyCommand means it actually appears to pick up the default
- # ProxyCommand value instead, due to cascading. It should (for
- # backwards compatibility reasons in 1.x/2.x) appear completely blank,
- # as if the host had no ProxyCommand whatsoever.
- # Threw another unrelated host in there just for sanity reasons.
- self.assertFalse("proxycommand" in config.lookup("specific-host"))
- self.assertEqual(
- config.lookup("other-host")["proxycommand"], "other-proxy"
- )
- self.assertEqual(
- config.lookup("some-random-host")["proxycommand"], "default-proxy"
- )
diff --git a/tests/util.py b/tests/util.py
index 4ca02374..9057f516 100644
--- a/tests/util.py
+++ b/tests/util.py
@@ -1,19 +1,28 @@
from os.path import dirname, realpath, join
+import os
+import sys
+import unittest
import pytest
from paramiko.py3compat import builtins
+from paramiko.ssh_gss import GSS_AUTH_AVAILABLE
+
+
+tests_dir = dirname(realpath(__file__))
def _support(filename):
- return join(dirname(realpath(__file__)), filename)
+ return join(tests_dir, filename)
+
+def _config(name):
+ return join(tests_dir, "configs", name)
-# TODO: consider using pytest.importorskip('gssapi') instead? We presumably
-# still need CLI configurability for the Kerberos parameters, though, so can't
-# JUST key off presence of GSSAPI optional dependency...
-# TODO: anyway, s/True/os.environ.get('RUN_GSSAPI', False)/ or something.
-needs_gssapi = pytest.mark.skipif(True, reason="No GSSAPI to test")
+
+needs_gssapi = pytest.mark.skipif(
+ not GSS_AUTH_AVAILABLE, reason="No GSSAPI to test"
+)
def needs_builtin(name):
@@ -25,3 +34,96 @@ def needs_builtin(name):
slow = pytest.mark.slow
+
+# GSSAPI / Kerberos related tests need a working Kerberos environment.
+# The class `KerberosTestCase` provides such an environment or skips all tests.
+# There are 3 distinct cases:
+#
+# - A Kerberos environment has already been created and the environment
+# contains the required information.
+#
+# - We can use the package 'k5test' to setup an working kerberos environment on
+# the fly.
+#
+# - We skip all tests.
+#
+# ToDo: add a Windows specific implementation?
+
+if (
+ os.environ.get("K5TEST_USER_PRINC", None)
+ and os.environ.get("K5TEST_HOSTNAME", None)
+ and os.environ.get("KRB5_KTNAME", None)
+): # add other vars as needed
+
+ # The environment provides the required information
+ class DummyK5Realm(object):
+ def __init__(self):
+ for k in os.environ:
+ if not k.startswith("K5TEST_"):
+ continue
+ setattr(self, k[7:].lower(), os.environ[k])
+ self.env = {}
+
+ class KerberosTestCase(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.realm = DummyK5Realm()
+
+ @classmethod
+ def tearDownClass(cls):
+ del cls.realm
+
+
+else:
+ try:
+ # Try to setup a kerberos environment
+ from k5test import KerberosTestCase
+ except Exception:
+ # Use a dummy, that skips all tests
+ class KerberosTestCase(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ raise unittest.SkipTest(
+ "Missing extension package k5test. "
+ 'Please run "pip install k5test" '
+ "to install it."
+ )
+
+
+def update_env(testcase, mapping, env=os.environ):
+ """Modify os.environ during a test case and restore during cleanup."""
+ saved_env = env.copy()
+
+ def replace(target, source):
+ target.update(source)
+ for k in list(target):
+ if k not in source:
+ target.pop(k, None)
+
+ testcase.addCleanup(replace, env, saved_env)
+ env.update(mapping)
+ return testcase
+
+
+def k5shell(args=None):
+ """Create a shell with an kerberos environment
+
+ This can be used to debug paramiko or to test the old GSSAPI.
+ To test a different GSSAPI, simply activate a suitable venv
+ within the shell.
+ """
+ import k5test
+ import atexit
+ import subprocess
+
+ k5 = k5test.K5Realm()
+ atexit.register(k5.stop)
+ os.environ.update(k5.env)
+ for n in ("realm", "user_princ", "hostname"):
+ os.environ["K5TEST_" + n.upper()] = getattr(k5, n)
+
+ if not args:
+ args = sys.argv[1:]
+ if not args:
+ args = [os.environ.get("SHELL", "bash")]
+ sys.exit(subprocess.call(args))