summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/configs/match-exec2
-rw-r--r--tests/test_config.py30
-rw-r--r--tests/test_pkey.py60
-rw-r--r--tests/test_transport.py8
4 files changed, 91 insertions, 9 deletions
diff --git a/tests/configs/match-exec b/tests/configs/match-exec
index 763346ea..62a147aa 100644
--- a/tests/configs/match-exec
+++ b/tests/configs/match-exec
@@ -12,5 +12,5 @@ Host target
User intermediate
HostName configured
-Match exec "%d %h %L %l %n %p %r %u"
+Match exec "%C %d %h %L %l %n %p %r %u"
Port 1337
diff --git a/tests/test_config.py b/tests/test_config.py
index 2095061f..fcc47734 100644
--- a/tests/test_config.py
+++ b/tests/test_config.py
@@ -46,6 +46,7 @@ def socket():
# Patch out getfqdn to return some real string for when it gets called;
# some code (eg tokenization) gets mad w/ MagicMocks
mocket.getfqdn.return_value = "some.fake.fqdn"
+ mocket.gethostname.return_value = "local.fake.fqdn"
yield mocket
@@ -210,7 +211,7 @@ Host test
assert got == expected
@patch("paramiko.config.getpass")
- def test_controlpath_token_expansion(self, getpass):
+ def test_controlpath_token_expansion(self, getpass, socket):
getpass.getuser.return_value = "gandalf"
config = SSHConfig.from_text(
"""
@@ -221,6 +222,9 @@ Host explicit_user
Host explicit_host
HostName ohai
ControlPath remoteuser %r host %h orighost %n
+
+Host hashbrowns
+ ControlPath %C
"""
)
result = config.lookup("explicit_user")["controlpath"]
@@ -229,6 +233,9 @@ Host explicit_host
result = config.lookup("explicit_host")["controlpath"]
# Remote user falls back to local user; host and orighost may differ
assert result == "remoteuser gandalf host ohai orighost explicit_host"
+ # Supports %C
+ result = config.lookup("hashbrowns")["controlpath"]
+ assert result == "a438e7dbf5308b923aba9db8fe2ca63447ac8688"
def test_negation(self):
config = SSHConfig.from_text(
@@ -280,10 +287,11 @@ ProxyCommand foo=bar:%h-%p
assert config.lookup(host) == values
- def test_identityfile(self):
+ @patch("paramiko.config.getpass")
+ def test_identityfile(self, getpass, socket):
+ getpass.getuser.return_value = "gandalf"
config = SSHConfig.from_text(
"""
-
IdentityFile id_dsa0
Host *
@@ -294,6 +302,9 @@ IdentityFile id_dsa2
Host dsa2*
IdentityFile id_dsa22
+
+Host hashbrowns
+IdentityFile %C
"""
)
for host, values in {
@@ -306,8 +317,15 @@ IdentityFile id_dsa22
"hostname": "dsa22",
"identityfile": ["id_dsa0", "id_dsa1", "id_dsa22"],
},
+ "hashbrowns": {
+ "hostname": "hashbrowns",
+ "identityfile": [
+ "id_dsa0",
+ "id_dsa1",
+ "a438e7dbf5308b923aba9db8fe2ca63447ac8688",
+ ],
+ },
}.items():
-
assert config.lookup(host) == values
def test_config_addressfamily_and_lazy_fqdn(self):
@@ -744,10 +762,10 @@ class TestMatchExec(object):
@patch("paramiko.config.getpass")
@patch("paramiko.config.invoke.run")
def test_tokenizes_argument(self, run, getpass, socket):
- socket.gethostname.return_value = "local.fqdn"
getpass.getuser.return_value = "gandalf"
- # Actual exec value is "%d %h %L %l %n %p %r %u"
+ # Actual exec value is "%C %d %h %L %l %n %p %r %u"
parts = (
+ "bf5ba06778434a9384ee4217e462f64888bd0cd2",
expanduser("~"),
"configured",
"local",
diff --git a/tests/test_pkey.py b/tests/test_pkey.py
index 6acb9121..f8b7eb42 100644
--- a/tests/test_pkey.py
+++ b/tests/test_pkey.py
@@ -23,6 +23,7 @@ Some unit tests for public/private key objects.
import unittest
import os
+import stat
from binascii import hexlify
from hashlib import md5
@@ -36,10 +37,11 @@ from paramiko import (
SSHException,
)
from paramiko.py3compat import StringIO, byte_chr, b, bytes, PY2
+from paramiko.common import o600
from cryptography.exceptions import UnsupportedAlgorithm
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateNumbers
-from mock import patch
+from mock import patch, Mock
import pytest
from .util import _support, is_low_entropy
@@ -707,6 +709,62 @@ class KeyTest(unittest.TestCase):
_support("test_rsa.key-cert.pub"),
)
+ @patch("paramiko.pkey.os")
+ def _test_keyfile_race(self, os_, exists):
+ # Re: CVE-2022-24302
+ password = "television"
+ newpassword = "radio"
+ source = _support("test_ecdsa_384.key")
+ new = source + ".new"
+ # Mock setup
+ os_.path.exists.return_value = exists
+ # Attach os flag values to mock
+ for attr, value in vars(os).items():
+ if attr.startswith("O_"):
+ setattr(os_, attr, value)
+ # Load fixture key
+ key = ECDSAKey(filename=source, password=password)
+ key._write_private_key = Mock()
+ # Write out in new location
+ key.write_private_key_file(new, password=newpassword)
+ # Expected open via os module
+ os_.open.assert_called_once_with(
+ new, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, o600
+ )
+ os_.fdopen.assert_called_once_with(os_.open.return_value, "w")
+ # Old chmod still around for backwards compat
+ os_.chmod.assert_called_once_with(new, o600)
+ assert (
+ key._write_private_key.call_args[0][0]
+ == os_.fdopen.return_value.__enter__.return_value
+ )
+
+ def test_new_keyfiles_avoid_file_descriptor_race_on_chmod(self):
+ self._test_keyfile_race(exists=False)
+
+ def test_existing_keyfiles_still_work_ok(self):
+ self._test_keyfile_race(exists=True)
+
+ def test_new_keyfiles_avoid_descriptor_race_integration(self):
+ # Integration-style version of above
+ password = "television"
+ newpassword = "radio"
+ source = _support("test_ecdsa_384.key")
+ new = source + ".new"
+ # Load fixture key
+ key = ECDSAKey(filename=source, password=password)
+ try:
+ # Write out in new location
+ key.write_private_key_file(new, password=newpassword)
+ # Test mode
+ assert stat.S_IMODE(os.stat(new).st_mode) == o600
+ # Prove can open with new password
+ reloaded = ECDSAKey(filename=new, password=newpassword)
+ assert reloaded == key
+ finally:
+ if os.path.exists(new):
+ os.unlink(new)
+
def test_sign_rsa_with_certificate(self):
data = b"ice weasels"
key_path = _support(os.path.join("cert_support", "test_rsa.key"))
diff --git a/tests/test_transport.py b/tests/test_transport.py
index 737ef705..fa7a3c1a 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -1121,7 +1121,12 @@ class AlgorithmDisablingTests(unittest.TestCase):
t = Transport(sock=Mock())
assert t.preferred_ciphers == t._preferred_ciphers
assert t.preferred_macs == t._preferred_macs
- assert t.preferred_keys == t._preferred_keys
+ assert t.preferred_keys == tuple(
+ t._preferred_keys
+ + tuple(
+ "{}-cert-v01@openssh.com".format(x) for x in t._preferred_keys
+ )
+ )
assert t.preferred_kex == t._preferred_kex
def test_preferred_lists_filter_disabled_algorithms(self):
@@ -1140,6 +1145,7 @@ class AlgorithmDisablingTests(unittest.TestCase):
assert "hmac-md5" not in t.preferred_macs
assert "ssh-dss" in t._preferred_keys
assert "ssh-dss" not in t.preferred_keys
+ assert "ssh-dss-cert-v01@openssh.com" not in t.preferred_keys
assert "diffie-hellman-group14-sha256" in t._preferred_kex
assert "diffie-hellman-group14-sha256" not in t.preferred_kex