summaryrefslogtreecommitdiff
path: root/tests/unittests/test_sshutil.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_sshutil.py')
-rw-r--r--tests/unittests/test_sshutil.py246
1 files changed, 235 insertions, 11 deletions
diff --git a/tests/unittests/test_sshutil.py b/tests/unittests/test_sshutil.py
index fd1d1bac..bcb8044f 100644
--- a/tests/unittests/test_sshutil.py
+++ b/tests/unittests/test_sshutil.py
@@ -570,20 +570,33 @@ class TestBasicAuthorizedKeyParse(test_helpers.CiTestCase):
ssh_util.render_authorizedkeysfile_paths(
"%h/.keys", "/homedirs/bobby", "bobby"))
+ def test_all(self):
+ self.assertEqual(
+ ["/homedirs/bobby/.keys", "/homedirs/bobby/.secret/keys",
+ "/keys/path1", "/opt/bobby/keys"],
+ ssh_util.render_authorizedkeysfile_paths(
+ "%h/.keys .secret/keys /keys/path1 /opt/%u/keys",
+ "/homedirs/bobby", "bobby"))
+
class TestMultipleSshAuthorizedKeysFile(test_helpers.CiTestCase):
@patch("cloudinit.ssh_util.pwd.getpwnam")
def test_multiple_authorizedkeys_file_order1(self, m_getpwnam):
- fpw = FakePwEnt(pw_name='bobby', pw_dir='/home2/bobby')
+ fpw = FakePwEnt(pw_name='bobby', pw_dir='/tmp/home2/bobby')
m_getpwnam.return_value = fpw
- authorized_keys = self.tmp_path('authorized_keys')
+ user_ssh_folder = "%s/.ssh" % fpw.pw_dir
+
+ # /tmp/home2/bobby/.ssh/authorized_keys = rsa
+ authorized_keys = self.tmp_path('authorized_keys', dir=user_ssh_folder)
util.write_file(authorized_keys, VALID_CONTENT['rsa'])
- user_keys = self.tmp_path('user_keys')
+ # /tmp/home2/bobby/.ssh/user_keys = dsa
+ user_keys = self.tmp_path('user_keys', dir=user_ssh_folder)
util.write_file(user_keys, VALID_CONTENT['dsa'])
- sshd_config = self.tmp_path('sshd_config')
+ # /tmp/sshd_config
+ sshd_config = self.tmp_path('sshd_config', dir="/tmp")
util.write_file(
sshd_config,
"AuthorizedKeysFile %s %s" % (authorized_keys, user_keys)
@@ -593,33 +606,244 @@ class TestMultipleSshAuthorizedKeysFile(test_helpers.CiTestCase):
fpw.pw_name, sshd_config)
content = ssh_util.update_authorized_keys(auth_key_entries, [])
- self.assertEqual("%s/.ssh/authorized_keys" % fpw.pw_dir, auth_key_fn)
+ self.assertEqual(user_keys, auth_key_fn)
self.assertTrue(VALID_CONTENT['rsa'] in content)
self.assertTrue(VALID_CONTENT['dsa'] in content)
@patch("cloudinit.ssh_util.pwd.getpwnam")
def test_multiple_authorizedkeys_file_order2(self, m_getpwnam):
- fpw = FakePwEnt(pw_name='suzie', pw_dir='/home/suzie')
+ fpw = FakePwEnt(pw_name='suzie', pw_dir='/tmp/home/suzie')
m_getpwnam.return_value = fpw
- authorized_keys = self.tmp_path('authorized_keys')
+ user_ssh_folder = "%s/.ssh" % fpw.pw_dir
+
+ # /tmp/home/suzie/.ssh/authorized_keys = rsa
+ authorized_keys = self.tmp_path('authorized_keys', dir=user_ssh_folder)
util.write_file(authorized_keys, VALID_CONTENT['rsa'])
- user_keys = self.tmp_path('user_keys')
+ # /tmp/home/suzie/.ssh/user_keys = dsa
+ user_keys = self.tmp_path('user_keys', dir=user_ssh_folder)
util.write_file(user_keys, VALID_CONTENT['dsa'])
- sshd_config = self.tmp_path('sshd_config')
+ # /tmp/sshd_config
+ sshd_config = self.tmp_path('sshd_config', dir="/tmp")
util.write_file(
sshd_config,
- "AuthorizedKeysFile %s %s" % (authorized_keys, user_keys)
+ "AuthorizedKeysFile %s %s" % (user_keys, authorized_keys)
)
(auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys(
- fpw.pw_name, sshd_config
+ fpw.pw_name, sshd_config)
+ content = ssh_util.update_authorized_keys(auth_key_entries, [])
+
+ self.assertEqual(authorized_keys, auth_key_fn)
+ self.assertTrue(VALID_CONTENT['rsa'] in content)
+ self.assertTrue(VALID_CONTENT['dsa'] in content)
+
+ @patch("cloudinit.ssh_util.pwd.getpwnam")
+ def test_multiple_authorizedkeys_file_local_global(self, m_getpwnam):
+ fpw = FakePwEnt(pw_name='bobby', pw_dir='/tmp/home2/bobby')
+ m_getpwnam.return_value = fpw
+ user_ssh_folder = "%s/.ssh" % fpw.pw_dir
+
+ # /tmp/home2/bobby/.ssh/authorized_keys = rsa
+ authorized_keys = self.tmp_path('authorized_keys', dir=user_ssh_folder)
+ util.write_file(authorized_keys, VALID_CONTENT['rsa'])
+
+ # /tmp/home2/bobby/.ssh/user_keys = dsa
+ user_keys = self.tmp_path('user_keys', dir=user_ssh_folder)
+ util.write_file(user_keys, VALID_CONTENT['dsa'])
+
+ # /tmp/etc/ssh/authorized_keys = ecdsa
+ authorized_keys_global = self.tmp_path('etc/ssh/authorized_keys',
+ dir="/tmp")
+ util.write_file(authorized_keys_global, VALID_CONTENT['ecdsa'])
+
+ # /tmp/sshd_config
+ sshd_config = self.tmp_path('sshd_config', dir="/tmp")
+ util.write_file(
+ sshd_config,
+ "AuthorizedKeysFile %s %s %s" % (authorized_keys_global,
+ user_keys, authorized_keys)
+ )
+
+ (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys(
+ fpw.pw_name, sshd_config)
+ content = ssh_util.update_authorized_keys(auth_key_entries, [])
+
+ self.assertEqual(authorized_keys, auth_key_fn)
+ self.assertTrue(VALID_CONTENT['rsa'] in content)
+ self.assertTrue(VALID_CONTENT['ecdsa'] in content)
+ self.assertTrue(VALID_CONTENT['dsa'] in content)
+
+ @patch("cloudinit.ssh_util.pwd.getpwnam")
+ def test_multiple_authorizedkeys_file_local_global2(self, m_getpwnam):
+ fpw = FakePwEnt(pw_name='bobby', pw_dir='/tmp/home2/bobby')
+ m_getpwnam.return_value = fpw
+ user_ssh_folder = "%s/.ssh" % fpw.pw_dir
+
+ # /tmp/home2/bobby/.ssh/authorized_keys2 = rsa
+ authorized_keys = self.tmp_path('authorized_keys2',
+ dir=user_ssh_folder)
+ util.write_file(authorized_keys, VALID_CONTENT['rsa'])
+
+ # /tmp/home2/bobby/.ssh/user_keys3 = dsa
+ user_keys = self.tmp_path('user_keys3', dir=user_ssh_folder)
+ util.write_file(user_keys, VALID_CONTENT['dsa'])
+
+ # /tmp/etc/ssh/authorized_keys = ecdsa
+ authorized_keys_global = self.tmp_path('etc/ssh/authorized_keys',
+ dir="/tmp")
+ util.write_file(authorized_keys_global, VALID_CONTENT['ecdsa'])
+
+ # /tmp/sshd_config
+ sshd_config = self.tmp_path('sshd_config', dir="/tmp")
+ util.write_file(
+ sshd_config,
+ "AuthorizedKeysFile %s %s %s" % (authorized_keys_global,
+ authorized_keys, user_keys)
+ )
+
+ (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys(
+ fpw.pw_name, sshd_config)
+ content = ssh_util.update_authorized_keys(auth_key_entries, [])
+
+ self.assertEqual(user_keys, auth_key_fn)
+ self.assertTrue(VALID_CONTENT['rsa'] in content)
+ self.assertTrue(VALID_CONTENT['ecdsa'] in content)
+ self.assertTrue(VALID_CONTENT['dsa'] in content)
+
+ @patch("cloudinit.ssh_util.pwd.getpwnam")
+ def test_multiple_authorizedkeys_file_global(self, m_getpwnam):
+ fpw = FakePwEnt(pw_name='bobby', pw_dir='/tmp/home2/bobby')
+ m_getpwnam.return_value = fpw
+
+ # /tmp/etc/ssh/authorized_keys = rsa
+ authorized_keys_global = self.tmp_path('etc/ssh/authorized_keys',
+ dir="/tmp")
+ util.write_file(authorized_keys_global, VALID_CONTENT['rsa'])
+
+ # /tmp/sshd_config
+ sshd_config = self.tmp_path('sshd_config')
+ util.write_file(
+ sshd_config,
+ "AuthorizedKeysFile %s" % (authorized_keys_global)
)
+
+ (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys(
+ fpw.pw_name, sshd_config)
content = ssh_util.update_authorized_keys(auth_key_entries, [])
self.assertEqual("%s/.ssh/authorized_keys" % fpw.pw_dir, auth_key_fn)
self.assertTrue(VALID_CONTENT['rsa'] in content)
+
+ @patch("cloudinit.ssh_util.pwd.getpwnam")
+ def test_multiple_authorizedkeys_file_multiuser(self, m_getpwnam):
+ fpw = FakePwEnt(pw_name='bobby', pw_dir='/tmp/home2/bobby')
+ m_getpwnam.return_value = fpw
+ user_ssh_folder = "%s/.ssh" % fpw.pw_dir
+ # /tmp/home2/bobby/.ssh/authorized_keys2 = rsa
+ authorized_keys = self.tmp_path('authorized_keys2',
+ dir=user_ssh_folder)
+ util.write_file(authorized_keys, VALID_CONTENT['rsa'])
+ # /tmp/home2/bobby/.ssh/user_keys3 = dsa
+ user_keys = self.tmp_path('user_keys3', dir=user_ssh_folder)
+ util.write_file(user_keys, VALID_CONTENT['dsa'])
+
+ fpw2 = FakePwEnt(pw_name='suzie', pw_dir='/tmp/home/suzie')
+ user_ssh_folder = "%s/.ssh" % fpw2.pw_dir
+ # /tmp/home/suzie/.ssh/authorized_keys2 = ssh-xmss@openssh.com
+ authorized_keys2 = self.tmp_path('authorized_keys2',
+ dir=user_ssh_folder)
+ util.write_file(authorized_keys2,
+ VALID_CONTENT['ssh-xmss@openssh.com'])
+
+ # /tmp/etc/ssh/authorized_keys = ecdsa
+ authorized_keys_global = self.tmp_path('etc/ssh/authorized_keys2',
+ dir="/tmp")
+ util.write_file(authorized_keys_global, VALID_CONTENT['ecdsa'])
+
+ # /tmp/sshd_config
+ sshd_config = self.tmp_path('sshd_config', dir="/tmp")
+ util.write_file(
+ sshd_config,
+ "AuthorizedKeysFile %s %%h/.ssh/authorized_keys2 %s" %
+ (authorized_keys_global, user_keys)
+ )
+
+ # process first user
+ (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys(
+ fpw.pw_name, sshd_config)
+ content = ssh_util.update_authorized_keys(auth_key_entries, [])
+
+ self.assertEqual(user_keys, auth_key_fn)
+ self.assertTrue(VALID_CONTENT['rsa'] in content)
+ self.assertTrue(VALID_CONTENT['ecdsa'] in content)
+ self.assertTrue(VALID_CONTENT['dsa'] in content)
+ self.assertFalse(VALID_CONTENT['ssh-xmss@openssh.com'] in content)
+
+ m_getpwnam.return_value = fpw2
+ # process second user
+ (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys(
+ fpw2.pw_name, sshd_config)
+ content = ssh_util.update_authorized_keys(auth_key_entries, [])
+
+ self.assertEqual(authorized_keys2, auth_key_fn)
+ self.assertTrue(VALID_CONTENT['ssh-xmss@openssh.com'] in content)
+ self.assertTrue(VALID_CONTENT['ecdsa'] in content)
+ self.assertTrue(VALID_CONTENT['dsa'] in content)
+ self.assertFalse(VALID_CONTENT['rsa'] in content)
+
+ @patch("cloudinit.ssh_util.pwd.getpwnam")
+ def test_multiple_authorizedkeys_file_multiuser2(self, m_getpwnam):
+ fpw = FakePwEnt(pw_name='bobby', pw_dir='/tmp/home/bobby')
+ m_getpwnam.return_value = fpw
+ user_ssh_folder = "%s/.ssh" % fpw.pw_dir
+ # /tmp/home/bobby/.ssh/authorized_keys2 = rsa
+ authorized_keys = self.tmp_path('authorized_keys2',
+ dir=user_ssh_folder)
+ util.write_file(authorized_keys, VALID_CONTENT['rsa'])
+ # /tmp/home/bobby/.ssh/user_keys3 = dsa
+ user_keys = self.tmp_path('user_keys3', dir=user_ssh_folder)
+ util.write_file(user_keys, VALID_CONTENT['dsa'])
+
+ fpw2 = FakePwEnt(pw_name='badguy', pw_dir='/tmp/home/badguy')
+ user_ssh_folder = "%s/.ssh" % fpw2.pw_dir
+ # /tmp/home/badguy/home/bobby = ""
+ authorized_keys2 = self.tmp_path('home/bobby', dir="/tmp/home/badguy")
+
+ # /tmp/etc/ssh/authorized_keys = ecdsa
+ authorized_keys_global = self.tmp_path('etc/ssh/authorized_keys2',
+ dir="/tmp")
+ util.write_file(authorized_keys_global, VALID_CONTENT['ecdsa'])
+
+ # /tmp/sshd_config
+ sshd_config = self.tmp_path('sshd_config', dir="/tmp")
+ util.write_file(
+ sshd_config,
+ "AuthorizedKeysFile %s %%h/.ssh/authorized_keys2 %s %s" %
+ (authorized_keys_global, user_keys, authorized_keys2)
+ )
+
+ # process first user
+ (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys(
+ fpw.pw_name, sshd_config)
+ content = ssh_util.update_authorized_keys(auth_key_entries, [])
+
+ self.assertEqual(user_keys, auth_key_fn)
+ self.assertTrue(VALID_CONTENT['rsa'] in content)
+ self.assertTrue(VALID_CONTENT['ecdsa'] in content)
+ self.assertTrue(VALID_CONTENT['dsa'] in content)
+
+ m_getpwnam.return_value = fpw2
+ # process second user
+ (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys(
+ fpw2.pw_name, sshd_config)
+ content = ssh_util.update_authorized_keys(auth_key_entries, [])
+
+ # badguy should not take the key from the other user!
+ self.assertEqual(authorized_keys2, auth_key_fn)
+ self.assertTrue(VALID_CONTENT['ecdsa'] in content)
self.assertTrue(VALID_CONTENT['dsa'] in content)
+ self.assertFalse(VALID_CONTENT['rsa'] in content)
# vi: ts=4 expandtab