summaryrefslogtreecommitdiff
path: root/tests/unittests/test_ssh_util.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_ssh_util.py')
-rw-r--r--tests/unittests/test_ssh_util.py1565
1 files changed, 1565 insertions, 0 deletions
diff --git a/tests/unittests/test_ssh_util.py b/tests/unittests/test_ssh_util.py
new file mode 100644
index 00000000..d6a72dc1
--- /dev/null
+++ b/tests/unittests/test_ssh_util.py
@@ -0,0 +1,1565 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import os
+import stat
+from functools import partial
+from typing import NamedTuple
+from unittest import mock
+from unittest.mock import patch
+
+import pytest
+
+from cloudinit import ssh_util, util
+
+M_PATH = "cloudinit.ssh_util."
+
+
+class FakePwEnt(NamedTuple):
+ pw_name: str = "UNSET_pw_name"
+ pw_passwd: str = "UNSET_w_passwd"
+ pw_uid: str = "UNSET_pw_uid"
+ pw_gid: str = "UNSET_pw_gid"
+ pw_gecos: str = "UNSET_pw_gecos"
+ pw_dir: str = "UNSET_pw_dir"
+ pw_shell: str = "UNSET_pw_shell"
+
+
+def mock_get_owner(updated_permissions, value):
+ try:
+ return updated_permissions[value][0]
+ except ValueError:
+ return util.get_owner(value)
+
+
+def mock_get_group(updated_permissions, value):
+ try:
+ return updated_permissions[value][1]
+ except ValueError:
+ return util.get_group(value)
+
+
+def mock_get_user_groups(username):
+ return username
+
+
+def mock_get_permissions(updated_permissions, value):
+ try:
+ return updated_permissions[value][2]
+ except ValueError:
+ return util.get_permissions(value)
+
+
+def mock_getpwnam(users, username):
+ return users[username]
+
+
+# Do not use these public keys, most of them are fetched from
+# the testdata for OpenSSH, and their private keys are available
+# https://github.com/openssh/openssh-portable/tree/master/regress/unittests/sshkey/testdata
+VALID_CONTENT = {
+ "dsa": (
+ "AAAAB3NzaC1kc3MAAACBAIrjOQSlSea19bExXBMBKBvcLhBoVvNBjCppNzllipF"
+ "W4jgIOMcNanULRrZGjkOKat6MWJNetSbV1E6IOFDQ16rQgsh/OvYU9XhzM8seLa"
+ "A21VszZuhIV7/2DE3vxu7B54zVzueG1O1Deq6goQCRGWBUnqO2yluJiG4HzrnDa"
+ "jzRAAAAFQDMPO96qXd4F5A+5b2f2MO7SpVomQAAAIBpC3K2zIbDLqBBs1fn7rsv"
+ "KcJvwihdlVjG7UXsDB76P2GNqVG+IlYPpJZ8TO/B/fzTMtrdXp9pSm9OY1+BgN4"
+ "REsZ2WNcvfgY33aWaEM+ieCcQigvxrNAF2FTVcbUIIxAn6SmHuQSWrLSfdHc8H7"
+ "hsrgeUPPdzjBD/cv2ZmqwZ1AAAAIAplIsScrJut5wJMgyK1JG0Kbw9JYQpLe95P"
+ "obB069g8+mYR8U0fysmTEdR44mMu0VNU5E5OhTYoTGfXrVrkR134LqFM2zpVVbE"
+ "JNDnIqDHxTkc6LY2vu8Y2pQ3/bVnllZZOda2oD5HQ7ovygQa6CH+fbaZHbdDUX/"
+ "5z7u2rVAlDw=="
+ ),
+ "ecdsa": (
+ "AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBITrGBB3cgJ"
+ "J7fPxvtMW9H3oRisNpJ3OAslxZeyP7I0A9BPAW0RQIwHVtVnM7zrp4nI+JLZov/"
+ "Ql7lc2leWL7CY="
+ ),
+ "rsa": (
+ "AAAAB3NzaC1yc2EAAAABIwAAAQEA3I7VUf2l5gSn5uavROsc5HRDpZdQueUq5oz"
+ "emNSj8T7enqKHOEaFoU2VoPgGEWC9RyzSQVeyD6s7APMcE82EtmW4skVEgEGSbD"
+ "c1pvxzxtchBj78hJP6Cf5TCMFSXw+Fz5rF1dR23QDbN1mkHs7adr8GW4kSWqU7Q"
+ "7NDwfIrJJtO7Hi42GyXtvEONHbiRPOe8stqUly7MvUoN+5kfjBM8Qqpfl2+FNhT"
+ "YWpMfYdPUnE7u536WqzFmsaqJctz3gBxH9Ex7dFtrxR4qiqEr9Qtlu3xGn7Bw07"
+ "/+i1D+ey3ONkZLN+LQ714cgj8fRS4Hj29SCmXp5Kt5/82cD/VN3NtHw=="
+ ),
+ "ed25519": (
+ "AAAAC3NzaC1lZDI1NTE5AAAAIA1J77+CrJ8p6/vWCEzuylqJNMHUP/XmeYyGVWb8lnDd"
+ ),
+ "ecdsa-sha2-nistp256-cert-v01@openssh.com": (
+ "AAAAKGVjZHNhLXNoYTItbmlzdHAyNTYtY2VydC12MDFAb3BlbnNzaC5jb20AAAA"
+ "gQIfwT/+UX68/hlKsdKuaOuAVB6ftTg03SlP/uH4OBEwAAAAIbmlzdHAyNTYAAA"
+ "BBBEjA0gjJmPM6La3sXyfNlnjilvvGY6I2M8SvJj4o3X/46wcUbPWTaj4RF3EXw"
+ "HvNxplYBwdPlk2zEecvf9Cs2BMAAAAAAAAAAAAAAAEAAAAYa2V5cy9lY2RzYS1z"
+ "aGEyLW5pc3RwMjU2AAAAAAAAAAAAAAAA//////////8AAAAAAAAAggAAABVwZXJ"
+ "taXQtWDExLWZvcndhcmRpbmcAAAAAAAAAF3Blcm1pdC1hZ2VudC1mb3J3YXJkaW"
+ "5nAAAAAAAAABZwZXJtaXQtcG9ydC1mb3J3YXJkaW5nAAAAAAAAAApwZXJtaXQtc"
+ "HR5AAAAAAAAAA5wZXJtaXQtdXNlci1yYwAAAAAAAAAAAAAAaAAAABNlY2RzYS1z"
+ "aGEyLW5pc3RwMjU2AAAACG5pc3RwMjU2AAAAQQRH6Y9Q1+ocQ8ETKW3LjQqtxg7"
+ "OuSSDacxmmQatQVaIawwjCbmntyEAqmVj3v9ElDSXnO5m7TyYMBQu4+vsh76RAA"
+ "AAZQAAABNlY2RzYS1zaGEyLW5pc3RwMjU2AAAASgAAACEA47Cl2MMhr+glPGuxx"
+ "2tM3QXkDcwdP0SxSEW5yy4XV5oAAAAhANNMm1cdVlAt3hmycQgdD82zPlg5YvVO"
+ "iN0SQTbgVD8i"
+ ),
+ "ecdsa-sha2-nistp256": (
+ "AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBEjA0gjJmPM"
+ "6La3sXyfNlnjilvvGY6I2M8SvJj4o3X/46wcUbPWTaj4RF3EXwHvNxplYBwdPlk"
+ "2zEecvf9Cs2BM="
+ ),
+ "ecdsa-sha2-nistp384-cert-v01@openssh.com": (
+ "AAAAKGVjZHNhLXNoYTItbmlzdHAzODQtY2VydC12MDFAb3BlbnNzaC5jb20AAAA"
+ "grnSvDsK1EnCZndO1IyGWcGkVgVSkPWi/XO2ybPFyLVUAAAAIbmlzdHAzODQAAA"
+ "BhBAaYSQs+8TT0Tzciy0dorwhur6yzOGUrYQ6ueUQYWbE7eNdHmhsVrlpGPgSaY"
+ "ByhXtAJiPOMqLU5h0eb3sCtM3ek4NvjXFTGTqPrrxJI6q0OsgrtkGE7UM9ZsfMm"
+ "7q6BOAAAAAAAAAAAAAAAAQAAABhrZXlzL2VjZHNhLXNoYTItbmlzdHAzODQAAAA"
+ "AAAAAAAAAAAD//////////wAAAAAAAACCAAAAFXBlcm1pdC1YMTEtZm9yd2FyZG"
+ "luZwAAAAAAAAAXcGVybWl0LWFnZW50LWZvcndhcmRpbmcAAAAAAAAAFnBlcm1pd"
+ "C1wb3J0LWZvcndhcmRpbmcAAAAAAAAACnBlcm1pdC1wdHkAAAAAAAAADnBlcm1p"
+ "dC11c2VyLXJjAAAAAAAAAAAAAACIAAAAE2VjZHNhLXNoYTItbmlzdHAzODQAAAA"
+ "IbmlzdHAzODQAAABhBLWbubcMzcWc7lMTCMGVXZlaVvUOHLjpr6SOOScFFrd8K9"
+ "Gl8nYELST5HZ1gym65m+MG6/tbrUWIY/flLWNIe+WtqxrdPPGdIhFruCwNw2peZ"
+ "SbQOa/o3AGnJ/vO6EKEGAAAAIQAAAATZWNkc2Etc2hhMi1uaXN0cDM4NAAAAGkA"
+ "AAAxAL10JHd5bvnbpD+fet/k1YE1BEIrqGXaoIIJ9ReE5H4nTK1uQJzMD7+wwGK"
+ "RVYqYQgAAADAiit0UCMDAUbjD+R2x4LvU3x/t8G3sdqDLRNfMRpjZpvcS8AwC+Y"
+ "VFVSQNn0AyzW0="
+ ),
+ "ecdsa-sha2-nistp384": (
+ "AAAAE2VjZHNhLXNoYTItbmlzdHAzODQAAAAIbmlzdHAzODQAAABhBAaYSQs+8TT"
+ "0Tzciy0dorwhur6yzOGUrYQ6ueUQYWbE7eNdHmhsVrlpGPgSaYByhXtAJiPOMqL"
+ "U5h0eb3sCtM3ek4NvjXFTGTqPrrxJI6q0OsgrtkGE7UM9ZsfMm7q6BOA=="
+ ),
+ "ecdsa-sha2-nistp521-cert-v01@openssh.com": (
+ "AAAAKGVjZHNhLXNoYTItbmlzdHA1MjEtY2VydC12MDFAb3BlbnNzaC5jb20AAAA"
+ "gGmRzkkMvRFk1V5U3m3mQ2nfW20SJVXk1NKnT5iZGDcEAAAAIbmlzdHA1MjEAAA"
+ "CFBAHosAOHAI1ZkerbKYQ72S6uit1u77PCj/OalZtXgsxv0TTAZB273puG2X94C"
+ "Q8yyNHcby87zFZHdv5BSKyZ/cyREAAeiAcSakop9VS3+bUfZpEIqwBZXarwUjnR"
+ "nxprkcQ0rfCCdagkGZr/OA7DemK2D8tKLTHsKoEEWNImo6/pXDkFxAAAAAAAAAA"
+ "AAAAAAQAAABhrZXlzL2VjZHNhLXNoYTItbmlzdHA1MjEAAAAAAAAAAAAAAAD///"
+ "///////wAAAAAAAACCAAAAFXBlcm1pdC1YMTEtZm9yd2FyZGluZwAAAAAAAAAXc"
+ "GVybWl0LWFnZW50LWZvcndhcmRpbmcAAAAAAAAAFnBlcm1pdC1wb3J0LWZvcndh"
+ "cmRpbmcAAAAAAAAACnBlcm1pdC1wdHkAAAAAAAAADnBlcm1pdC11c2VyLXJjAAA"
+ "AAAAAAAAAAACsAAAAE2VjZHNhLXNoYTItbmlzdHA1MjEAAAAIbmlzdHA1MjEAAA"
+ "CFBAC6hFVXM1XEg/7qKkp5sLZuANGQVW88b5pPn2ZcK0td9IQstLH6BwWuZ6MPE"
+ "ogiDlvx9HD1BaKGBBfkxgOY8NGFzQHbjU9eTWH3gt0RATDbZsij1pSkFPnAXdU9"
+ "SjfogYloI2xdHaTCgWp3zgsUV+BBQ0QGGv2MqqcOmrF0f5YEJeOffAAAAKcAAAA"
+ "TZWNkc2Etc2hhMi1uaXN0cDUyMQAAAIwAAABCAT+vSOYPuYVTDopDW08576d5Sb"
+ "edXQMOu1op4CQIm98VKtAXvu5dfioi5VYAqpte8M+UxEMOMiQWJp+U9exYf6LuA"
+ "AAAQgEzkIpX3yKXPaPcK17mNx40ujEDitm4ARmbhAge0sFhZtf7YIgI55b6vkI8"
+ "JvMJkzQCBF1cpNOaIpVh1nFZNBphMQ=="
+ ),
+ "ecdsa-sha2-nistp521": (
+ "AAAAE2VjZHNhLXNoYTItbmlzdHA1MjEAAAAIbmlzdHA1MjEAAACFBAHosAOHAI1"
+ "ZkerbKYQ72S6uit1u77PCj/OalZtXgsxv0TTAZB273puG2X94CQ8yyNHcby87zF"
+ "ZHdv5BSKyZ/cyREAAeiAcSakop9VS3+bUfZpEIqwBZXarwUjnRnxprkcQ0rfCCd"
+ "agkGZr/OA7DemK2D8tKLTHsKoEEWNImo6/pXDkFxA=="
+ ),
+ "sk-ecdsa-sha2-nistp256-cert-v01@openssh.com": (
+ "AAAAIHNzaC1lZDI1NTE5LWNlcnQtdjAxQG9wZW5zc2guY29tAAAAIIxzuxl4z3u"
+ "wAIslne8Huft+1n1IhHAlNbWZkQyyECCGAAAAIFOG6kY7Rf4UtCFvPwKgo/BztX"
+ "ck2xC4a2WyA34XtIwZAAAAAAAAAAgAAAACAAAABmp1bGl1cwAAABIAAAAFaG9zd"
+ "DEAAAAFaG9zdDIAAAAANowB8AAAAABNHmBwAAAAAAAAAAAAAAAAAAAAMwAAAAtz"
+ "c2gtZWQyNTUxOQAAACBThupGO0X+FLQhbz8CoKPwc7V3JNsQuGtlsgN+F7SMGQA"
+ "AAFMAAAALc3NoLWVkMjU1MTkAAABABGTn+Bmz86Ajk+iqKCSdP5NClsYzn4alJd"
+ "0V5bizhP0Kumc/HbqQfSt684J1WdSzih+EjvnTgBhK9jTBKb90AQ=="
+ ),
+ "sk-ecdsa-sha2-nistp256@openssh.com": (
+ "AAAAInNrLWVjZHNhLXNoYTItbmlzdHAyNTZAb3BlbnNzaC5jb20AAAAIbmlzdHA"
+ "yNTYAAABBBIELQJ2DgvaX1yQlKFokfWM2suuaCFI2qp0eJodHyg6O4ifxc3XpRK"
+ "d1OS8dNYQtE/YjdXSrA+AOnMF5ns2Nkx4AAAAEc3NoOg=="
+ ),
+ "sk-ssh-ed25519-cert-v01@openssh.com": (
+ "AAAAIHNzaC1lZDI1NTE5LWNlcnQtdjAxQG9wZW5zc2guY29tAAAAIIxzuxl4z3u"
+ "wAIslne8Huft+1n1IhHAlNbWZkQyyECCGAAAAIFOG6kY7Rf4UtCFvPwKgo/BztX"
+ "ck2xC4a2WyA34XtIwZAAAAAAAAAAgAAAACAAAABmp1bGl1cwAAABIAAAAFaG9zd"
+ "DEAAAAFaG9zdDIAAAAANowB8AAAAABNHmBwAAAAAAAAAAAAAAAAAAAAMwAAAAtz"
+ "c2gtZWQyNTUxOQAAACBThupGO0X+FLQhbz8CoKPwc7V3JNsQuGtlsgN+F7SMGQA"
+ "AAFMAAAALc3NoLWVkMjU1MTkAAABABGTn+Bmz86Ajk+iqKCSdP5NClsYzn4alJd"
+ "0V5bizhP0Kumc/HbqQfSt684J1WdSzih+EjvnTgBhK9jTBKb90AQ=="
+ ),
+ "sk-ssh-ed25519@openssh.com": (
+ "AAAAGnNrLXNzaC1lZDI1NTE5QG9wZW5zc2guY29tAAAAICFo/k5LU8863u66YC9"
+ "eUO2170QduohPURkQnbLa/dczAAAABHNzaDo="
+ ),
+ "ssh-dss-cert-v01@openssh.com": (
+ "AAAAHHNzaC1kc3MtY2VydC12MDFAb3BlbnNzaC5jb20AAAAgdTlbNU9Hn9Qng3F"
+ "HxwH971bxCIoq1ern/QWFFDWXgmYAAACBAPqS600VGwdPAQC/p3f0uGyrLVql0c"
+ "Fn1zYd/JGvtabKnIYjLaYprje/NcjwI3CZFJiz4Dp3S8kLs+X5/1DMn/Tg1Y4D4"
+ "yLB+6vCtHcJF7rVBFhvw/KZwc7G54ez3khyOtsg82fzpyOc8/mq+/+C5TMKO7DD"
+ "jMF0k5emWKCsa3ZfAAAAFQCjA/+dKkMu4/CWjJPtfl7YNaStNQAAAIEA7uX1BVV"
+ "tJKjLmWrpw62+l/xSXA5rr7MHBuWjiCYV3VHBfXJaQDyRDtGuEJKDwdzqYgacpG"
+ "ApGWL/cuBtJ9nShsUl6GRG0Ra03g+Hx9VR5LviJBsjAVB4qVgciU1NGga0Bt2Le"
+ "cd1X4EGQRBzVXeuOpiqGM6jP/I2yDMs0Pboet0AAACBAOdXpyfmobEBaOqZAuvg"
+ "j1P0uhjG2P31Ufurv22FWPBU3A9qrkxbOXwE0LwvjCvrsQV/lrYhJz/tiys40Ve"
+ "ahulWZE5SAHMXGIf95LiLSgaXMjko7joot+LK84ltLymwZ4QMnYjnZSSclf1Uuy"
+ "QMcUtb34+I0u9Ycnyhp2mSFsQtAAAAAAAAAAYAAAACAAAABmp1bGl1cwAAABIAA"
+ "AAFaG9zdDEAAAAFaG9zdDIAAAAANowB8AAAAABNHmBwAAAAAAAAAAAAAAAAAAAA"
+ "MwAAAAtzc2gtZWQyNTUxOQAAACBThupGO0X+FLQhbz8CoKPwc7V3JNsQuGtlsgN"
+ "+F7SMGQAAAFMAAAALc3NoLWVkMjU1MTkAAABAh/z1LIdNL1b66tQ8t9DY9BTB3B"
+ "QKpTKmc7ezyFKLwl96yaIniZwD9Ticdbe/8i/Li3uCFE3EAt8NAIv9zff8Bg=="
+ ),
+ "ssh-dss": (
+ "AAAAB3NzaC1kc3MAAACBAPqS600VGwdPAQC/p3f0uGyrLVql0cFn1zYd/JGvtab"
+ "KnIYjLaYprje/NcjwI3CZFJiz4Dp3S8kLs+X5/1DMn/Tg1Y4D4yLB+6vCtHcJF7"
+ "rVBFhvw/KZwc7G54ez3khyOtsg82fzpyOc8/mq+/+C5TMKO7DDjMF0k5emWKCsa"
+ "3ZfAAAAFQCjA/+dKkMu4/CWjJPtfl7YNaStNQAAAIEA7uX1BVVtJKjLmWrpw62+"
+ "l/xSXA5rr7MHBuWjiCYV3VHBfXJaQDyRDtGuEJKDwdzqYgacpGApGWL/cuBtJ9n"
+ "ShsUl6GRG0Ra03g+Hx9VR5LviJBsjAVB4qVgciU1NGga0Bt2Lecd1X4EGQRBzVX"
+ "euOpiqGM6jP/I2yDMs0Pboet0AAACBAOdXpyfmobEBaOqZAuvgj1P0uhjG2P31U"
+ "furv22FWPBU3A9qrkxbOXwE0LwvjCvrsQV/lrYhJz/tiys40VeahulWZE5SAHMX"
+ "GIf95LiLSgaXMjko7joot+LK84ltLymwZ4QMnYjnZSSclf1UuyQMcUtb34+I0u9"
+ "Ycnyhp2mSFsQt"
+ ),
+ "ssh-ed25519-cert-v01@openssh.com": (
+ "AAAAIHNzaC1lZDI1NTE5LWNlcnQtdjAxQG9wZW5zc2guY29tAAAAIIxzuxl4z3u"
+ "wAIslne8Huft+1n1IhHAlNbWZkQyyECCGAAAAIFOG6kY7Rf4UtCFvPwKgo/BztX"
+ "ck2xC4a2WyA34XtIwZAAAAAAAAAAgAAAACAAAABmp1bGl1cwAAABIAAAAFaG9zd"
+ "DEAAAAFaG9zdDIAAAAANowB8AAAAABNHmBwAAAAAAAAAAAAAAAAAAAAMwAAAAtz"
+ "c2gtZWQyNTUxOQAAACBThupGO0X+FLQhbz8CoKPwc7V3JNsQuGtlsgN+F7SMGQA"
+ "AAFMAAAALc3NoLWVkMjU1MTkAAABABGTn+Bmz86Ajk+iqKCSdP5NClsYzn4alJd"
+ "0V5bizhP0Kumc/HbqQfSt684J1WdSzih+EjvnTgBhK9jTBKb90AQ=="
+ ),
+ "ssh-ed25519": (
+ "AAAAC3NzaC1lZDI1NTE5AAAAIFOG6kY7Rf4UtCFvPwKgo/BztXck2xC4a2WyA34XtIwZ"
+ ),
+ "ssh-rsa-cert-v01@openssh.com": (
+ "AAAAHHNzaC1yc2EtY2VydC12MDFAb3BlbnNzaC5jb20AAAAg98LhS2EHxLOWCLo"
+ "pZPwHdg/RJXusnkOqQXSc9R7aITkAAAADAQABAAAAgQDLV5lUTt7FrADseB/CGh"
+ "EZzpoojjEW5y8+ePvLppmK3MmMI18ud6vxzpK3bwZLYkVSyfJYI0HmIuGhdu7yM"
+ "rW6wb84gbq8C31Xoe9EORcIUuGSvDKdNSM1SjlhDquRblDFB8kToqXyx1lqrXec"
+ "XylxIUOL0jE+u0rU1967pDJx+wAAAAAAAAAFAAAAAgAAAAZqdWxpdXMAAAASAAA"
+ "ABWhvc3QxAAAABWhvc3QyAAAAADaMAfAAAAAATR5gcAAAAAAAAAAAAAAAAAAAAD"
+ "MAAAALc3NoLWVkMjU1MTkAAAAgU4bqRjtF/hS0IW8/AqCj8HO1dyTbELhrZbIDf"
+ "he0jBkAAABTAAAAC3NzaC1lZDI1NTE5AAAAQI3QGlUCzC07KorupxpDkkGy6tni"
+ "aZ8EvBflzvv+itXWNchGvfUeHmVT6aX0sRqehdz/lR+GmXRoZBhofwh0qAM="
+ ),
+ "ssh-rsa": (
+ "AAAAB3NzaC1yc2EAAAADAQABAAAAgQDLV5lUTt7FrADseB/CGhEZzpoojjEW5y8"
+ "+ePvLppmK3MmMI18ud6vxzpK3bwZLYkVSyfJYI0HmIuGhdu7yMrW6wb84gbq8C3"
+ "1Xoe9EORcIUuGSvDKdNSM1SjlhDquRblDFB8kToqXyx1lqrXecXylxIUOL0jE+u"
+ "0rU1967pDJx+w=="
+ ),
+ "ssh-xmss-cert-v01@openssh.com": (
+ "AAAAHXNzaC14bXNzLWNlcnQtdjAxQG9wZW5zc2guY29tAAAAIM2UD0IH+Igsekq"
+ "xjTO5f36exX4WGRMCtDGPjwfbXblxAAAAFVhNU1NfU0hBMi0yNTZfVzE2X0gxMA"
+ "AAAEDI83/K5JMOy0BMJgQypRdz35ApAnoQinMJ8ZMoZPaEJF8Z4rANQlfzaAXum"
+ "N3RDU5CGIUGGw+WJ904G/wwEq9CAAAAAAAAAAAAAAABAAAACWtleXMveG1zcwAA"
+ "AAAAAAAAAAAAAP//////////AAAAAAAAAIIAAAAVcGVybWl0LVgxMS1mb3J3YXJ"
+ "kaW5nAAAAAAAAABdwZXJtaXQtYWdlbnQtZm9yd2FyZGluZwAAAAAAAAAWcGVybW"
+ "l0LXBvcnQtZm9yd2FyZGluZwAAAAAAAAAKcGVybWl0LXB0eQAAAAAAAAAOcGVyb"
+ "Wl0LXVzZXItcmMAAAAAAAAAAAAAAHUAAAAUc3NoLXhtc3NAb3BlbnNzaC5jb20A"
+ "AAAVWE1TU19TSEEyLTI1Nl9XMTZfSDEwAAAAQA+irIyT2kaOd07YWZT/QItzNBZ"
+ "kUYwnqZJihQ7BxuyiDP4HEFbnfYnnIZXx9Asyi7vDyZRvi+AMSOzmMSq4JnkAAA"
+ "ngAAAAFHNzaC14bXNzQG9wZW5zc2guY29tAAAJxAAAAAAFjaKTDc+7Hu2uFGIab"
+ "3NAku8HbbGtrq/uGXOxmqxu4RaLqmwofl5iXk3nMwWEhQAb99vAc9D9ZFtfxJO4"
+ "STYUTjbj4BxToov/uvbYfE5VeO6sMvkGglgh9YHkCTAItsG8EmGT1SIPfKYzLlN"
+ "jvUlbcv0PaPFMJ0wzS9mNfuRf+KUhf3dxQ6zaMrBH3KEJ8Me2kNjhnh6rNPROeI"
+ "N+IcStSKsydYuiySGKS/orsH38XysuK5QqLizbHJY3cqLbkW9LsIijb+pfEJh4Y"
+ "bOoAbraWAv9ySnWCyRhvw2x8uJ0ZM+p5WSRiZfB3JxCpOhHgiKa9TdmdjnAtnED"
+ "zqKOj/gM7y9mesn5ydQI0bENOGymlw0ThUGKbXMxn87Hc9dDPURUBmoO3NGjPDf"
+ "7meS39A1ZEGtCe/pbZU9iwxqGx4wJYvB4lutRP2tYC1pA6hjQCcHibvxl5iqj+1"
+ "jRjwPr8dbTm4PdETW/7JDSVQXKjxOT0kRLHLelJNeviGx5zSHR5PtnUP3nOBMme"
+ "hk9DwcQW9vfKeWSnu9CMnF8xvYJxoPKQwmz0TKo+YVOUnc9/Ma+Ykseof9/W+rk"
+ "USQGELc4x7XE5XBKYZZP2PmtxirQ3qTWFw+CeTX2Oa+jPYkzOa7jgmHJ3Fi9Xqw"
+ "3L844vRl97e28GmwS0M1SXH+ohES0mO4EcrGh5OLyXBaRTV5QMo+4Bg6FH/HwEn"
+ "gG1mdEOAqvctK2QC70c4lHGzfexqwQ2U6WUADPcd/BLOE8Noj1EiXYwZrSA1okZ"
+ "FYnS/b89Uo51D2FE4A33V4gcxAglGzVNtrPulkguNT9B4jjNgdIwkTBL9k3ujkG"
+ "og6pyYjZ0J5Jp5XPBn+y0LqrpOdZijzrc1OJbX59tTeIbDkM7Fw8As4a03hQPDU"
+ "FTOdyMHgLnuLhLXOcqIjvW5axZL/Kx3UET8wrSHizPoa6NErCG4v5mC2M4kBSOW"
+ "In1QV27QMaHkL/ZAa3mPsW5iFZtOVEGzw2BW4MZs0qOrcloCENZzOHiMBroKEkH"
+ "AbzX6D1FLwml2JpXq4JXlCrdIiFm4+co5ygnWPqb4QGzMlcbjW/x/A16TthNuok"
+ "wwlmK5ndKZ76LahyGKEwx2Nv0D+0xilEC1EldtiYRdBNlcGbU/A5EhH5bQ9KVIH"
+ "wjWm35pRPLl5224//nqvQKhwFCn9otsR35XHXev3IQ0or3HmQxIvSDOwir1l66z"
+ "FFrkyHMWexoucbTBxw1MN3hLb247lcVYJ5+hspJgyoYbfR5RkQVDzhpzskogP7l"
+ "K5t0bphu+f+hpvrca7DAiiIZkcR4R1UUQoRnJPRXyXOxlxwS10b51cP9p9jzvZj"
+ "d2LUs8yx1KXWSxNHo6WmtYONNaUfdX2OB5+QCvPULfLfFeBrqpX6Yp5wQMM5Cup"
+ "k8FEfV07eEgQkVE9nDGKHglWo3kUdOF+XCqWAnXn0b/2bNS9/SSAz6gB1GTFcN/"
+ "QsFGlC0QgbCJbQ7LQM6hilRWupWvN5zZ/+HJyyRHuSs5VnQnKiGbIa6AIhx7mP7"
+ "8T82gKjU3mHLJWMGKcT3cY8R958Gs+w4OT71VJRMw3kK6qk02WCbD5OtbFeC6ib"
+ "KRJKdLK3BzjVs/Fzu3mHVucVby3jpvG1Z8HKspKFhvV7gjFEPu8qHKi4MdAlif/"
+ "KakyPk8yZB/dMfaxh7Kv/WpJuSwWNs7RNh29e+ZG+POxqRPWiHqiVw7P17a4dN7"
+ "nkVOawdBEyxI4NAY+4zW+0r0bAy6zNBitBvkq3IXfr3De6Upex52sPHvK04PXoV"
+ "RI6gjnpPSbLLjpSpcHPKgB7DWefLfhd63BUQbc57D8zm8Jd6qtmzcSKn+wz5/zT"
+ "0I6v9I4a+DOjjyqpPpzzNU76pt+Y8SuBgHzMm1vcAdNWlbQrqtScvm0T9AkYni6"
+ "47vSh77uwRZKDtMCMSU151tVUavXhtLYLZ6/ll5NhMXkkx8//i7pk1OBjN5LHVQ"
+ "0QeimRmavlXU1dJ2rwsFAV+9dDdJXUNOq3VLTo9FrbOzZiWtzzjkJpVJAFREnBn"
+ "yIDBK5AXtXE1RzfzaBHzbI2e2kO3t+CSNLWYMFYHBDqaeICYQ9+I9aO/8hnzVSo"
+ "fp+8IfWO8iJhppqynUniicW2oCzrn4oczzYNEjImt8CGY7g90GxWfX+ZgXMJfy/"
+ "bQiFQL3dZvVypDHEbFoIGz+sxkL83xrP4MZV1V9Wwa64lDXYv01Kp4kQXmmnAZY"
+ "KlxBoWqYDXLeLLguSOZxDSCIDpd+YPm39wQ3wOysHW2fmsWtp6FPPlQRUYjsGIP"
+ "lfrkJzpoeaPKDtF1m+mOULfEh9kvTKCmKRi385T9ON39D97eWqaM4CCfUGImvdR"
+ "DlZLXvjmaAh5BVJ8VJxk75OkP14vWFFlTMv0/k4BYLDKsrNqCREC/G9nQBGcD2D"
+ "CLwC2zPNaX2Y9dnyDs2csjN1ibsYttUMnXMgBcnCOkIkVS496Bpc0jQMf35GUgb"
+ "PSyliwqCoXjEBP/2eyq0VLFKQ0fXGsHWvElT+Y/7RYNTiYVWttFMxN5H/2EGcgn"
+ "lfNHLpQvXH9u/3YminS9GX30hQ7jFhpHXxkK8gZ1mpHL9K3pfKS3lG6EF9wQ23O"
+ "qS8m995SG3dp3MzmywxXen/ukXx6bDiEl5VaOvdRUcbhr5Eb3exVDfdWiaJdTYF"
+ "WfIfJOWx88drB3J9vFwjmuaoNEOjFsoNAMYthYOxXraXaJblvmUKz6tJ3T8/G7x"
+ "B9QGYNBsOqBolKoKHBtsWCosLdWhEZr9VFFh2AJrOW1fx24CIkHnvfTtwYORvQq"
+ "Ckuq2bZS1EOdsFkU/X5gwPl6gSUTNhV3IooXkBFL3iBEbfZ6JpQHVVyIuNWjIyN"
+ "b2liCn9Nn0VHeNMMRLl7uyw4eKlOX2ogom8SLvihYxcJoqlCwtehpLsKsU4iwME"
+ "PmDteW5GBGf4GbnqPFkpIT5ed1jGhdZt/dpsp+v6QhYH1uX4pPxdkdnuc84/yb9"
+ "k4SQdKBJ+l3KZkfIxApNWOZqicJfz/eWwS/15hiamRKRuiiUV2zS1V+l8bV7g9O"
+ "gy5scPBMONxtfFlGEKikZKurFmzboCOGQKRBEUCpsY44IAp443h59pQdVIb0YAS"
+ "kfp2xKHwYij6ELRNdH5MrlFa3bNTskGO4k5XDR4cl/Sma2SXgBKb5XjTtlNmCQG"
+ "Gv6lOW7pGXNhs5wfd8K9Ukm6KeLTIlYn1iiKM37YQpa+4JQYljCYhumbqNCkPTZ"
+ "rNYClh8fQEQ8XuOCDpomMWu58YOTfbZNMDWs/Ou7RfCjX+VNwjPShDK9joMwWKc"
+ "Jy3QalZbaoWtcyyvXxR2sqhVR9F7Cmasq4="
+ ),
+ "ssh-xmss@openssh.com": (
+ "AAAAFHNzaC14bXNzQG9wZW5zc2guY29tAAAAFVhNU1NfU0hBMi0yNTZfVzE2X0g"
+ "xMAAAAECqptWnK94d+Sj2xcdTu8gz+75lawZoLSZFqC5IhbYuT/Z3oBZCim6yt+"
+ "HAmk6MKldl3Fg+74v4sR/SII0I0Jv/"
+ ),
+}
+
+KEY_TYPES = list(VALID_CONTENT.keys())
+
+TEST_OPTIONS = (
+ "no-port-forwarding,no-agent-forwarding,no-X11-forwarding,"
+ 'command="echo \'Please login as the user "ubuntu" rather than the'
+ 'user "root".\';echo;sleep 10"'
+)
+
+
+class TestAuthKeyLineParser:
+ @pytest.mark.parametrize("with_options", [True, False])
+ @pytest.mark.parametrize("with_comment", [True, False])
+ @pytest.mark.parametrize("ktype", KEY_TYPES)
+ def test_parse(self, ktype, with_comment, with_options):
+ content = VALID_CONTENT[ktype]
+ comment = "user-%s@host" % ktype
+ options = TEST_OPTIONS
+
+ line_args = []
+ if with_options:
+ line_args.append(options)
+ line_args.extend(
+ [
+ ktype,
+ content,
+ ]
+ )
+ if with_comment:
+ line_args.append(comment)
+ line = " ".join(line_args)
+
+ key = ssh_util.AuthKeyLineParser().parse(line)
+
+ assert key.base64 == content
+ assert key.keytype == ktype
+ if with_options:
+ assert key.options == options
+ else:
+ assert key.options is None
+ if with_comment:
+ assert key.comment == comment
+ else:
+ assert key.comment == ""
+
+ def test_parse_with_options_passed_in(self):
+ # test key line with key type and base64 only
+ parser = ssh_util.AuthKeyLineParser()
+
+ baseline = " ".join(("rsa", VALID_CONTENT["rsa"], "user@host"))
+ myopts = "no-port-forwarding,no-agent-forwarding"
+
+ key = parser.parse("allowedopt" + " " + baseline)
+ assert key.options == "allowedopt"
+
+ key = parser.parse("overridden_opt " + baseline, options=myopts)
+ assert key.options == myopts
+
+ def test_parse_invalid_keytype(self):
+ parser = ssh_util.AuthKeyLineParser()
+ key = parser.parse(" ".join(["badkeytype", VALID_CONTENT["rsa"]]))
+
+ assert not key.valid()
+
+
+class TestUpdateAuthorizedKeys:
+ @pytest.mark.parametrize(
+ "new_entries",
+ [
+ (
+ [
+ " ".join(("rsa", VALID_CONTENT["rsa"], "new_comment1")),
+ ]
+ ),
+ pytest.param(
+ [
+ " ".join(("rsa", VALID_CONTENT["rsa"], "new_comment1")),
+ "xxx-invalid-thing1",
+ "xxx-invalid-blob2",
+ ],
+ id="skip-invalid-entries",
+ ),
+ ],
+ )
+ def test_new_keys_replace(self, new_entries):
+ """new entries with the same base64 should replace old."""
+ orig_entries = [
+ " ".join(("rsa", VALID_CONTENT["rsa"], "orig_comment1")),
+ " ".join(("dsa", VALID_CONTENT["dsa"], "orig_comment2")),
+ ]
+
+ expected = "\n".join([new_entries[0], orig_entries[1]]) + "\n"
+
+ parser = ssh_util.AuthKeyLineParser()
+ found = ssh_util.update_authorized_keys(
+ [parser.parse(p) for p in orig_entries],
+ [parser.parse(p) for p in new_entries],
+ )
+
+ assert expected == found
+
+
+@mock.patch(M_PATH + "util.load_file")
+@mock.patch(M_PATH + "os.path.isfile")
+class TestParseSSHConfig:
+ @pytest.mark.parametrize(
+ "is_file, file_content",
+ [
+ pytest.param(True, ("",), id="empty-file"),
+ pytest.param(False, IOError, id="not-a-file"),
+ ],
+ )
+ def test_dummy_file(self, m_is_file, m_load_file, is_file, file_content):
+ m_is_file.return_value = is_file
+ m_load_file.side_effect = file_content
+ ret = ssh_util.parse_ssh_config("notmatter")
+ assert [] == ret
+
+ @pytest.mark.parametrize(
+ "file_content",
+ [
+ pytest.param(["# This is a comment"], id="comment_line"),
+ pytest.param(
+ ["# This is a comment", "# This is another comment"],
+ id="two-comment_lines",
+ ),
+ ],
+ )
+ def test_comment_line(self, m_is_file, m_load_file, file_content):
+ m_is_file.return_value = True
+ m_load_file.return_value = "\n".join(file_content)
+ ret = ssh_util.parse_ssh_config("some real file")
+ assert len(file_content) == len(ret)
+ assert file_content[0] == ret[0].line
+
+ def test_blank_lines(self, m_is_file, m_load_file):
+ m_is_file.return_value = True
+ lines = ["", "\t", " "]
+ m_load_file.return_value = "\n".join(lines)
+ ret = ssh_util.parse_ssh_config("some real file")
+ assert len(lines) == len(ret)
+ for line in ret:
+ assert "" == line.line
+
+ @pytest.mark.parametrize(
+ "file_content, expected_key, expected_value",
+ [
+ pytest.param("foo bar", "foo", "bar", id="lower-case"),
+ pytest.param("Foo Bar", "foo", "Bar", id="upper-case"),
+ pytest.param("foo=bar", "foo", "bar", id="lower-case-with-equals"),
+ pytest.param("Foo=bar", "foo", "bar", id="upper-case-with-equals"),
+ ],
+ )
+ def test_case_config(
+ self,
+ m_is_file,
+ m_load_file,
+ file_content,
+ expected_key,
+ expected_value,
+ ):
+ m_is_file.return_value = True
+ m_load_file.return_value = file_content
+ ret = ssh_util.parse_ssh_config("some real file")
+ assert 1 == len(ret)
+ assert expected_key == ret[0].key
+ assert expected_value == ret[0].value
+
+
+class TestUpdateSshConfigLines:
+ """Test the update_ssh_config_lines method."""
+
+ exlines = [
+ "#PasswordAuthentication yes",
+ "UsePAM yes",
+ "# Comment line",
+ "AcceptEnv LANG LC_*",
+ "X11Forwarding no",
+ ]
+ pwauth = "PasswordAuthentication"
+
+ def check_line(self, line, opt, val):
+ assert line.key == opt.lower()
+ assert line.value == val
+ assert opt in str(line)
+ assert val in str(line)
+
+ @pytest.mark.parametrize(
+ "key, value",
+ [
+ pytest.param("MyKey", "MyVal", id="new_option_added"),
+ pytest.param(
+ pwauth, "no", id="commented_out_not_updated_but_appended"
+ ),
+ ],
+ )
+ def test_update_ssh_config_lines(self, key, value):
+ lines = ssh_util.parse_ssh_config_lines(list(self.exlines))
+ result = ssh_util.update_ssh_config_lines(lines, {key: value})
+ assert [key] == result
+ self.check_line(lines[-1], key, value)
+
+ def test_option_without_value(self):
+ """Implementation only accepts key-value pairs."""
+ extended_exlines = self.exlines.copy()
+ denyusers_opt = "DenyUsers"
+ extended_exlines.append(denyusers_opt)
+ lines = ssh_util.parse_ssh_config_lines(list(extended_exlines))
+ assert denyusers_opt not in str(lines)
+
+ def test_single_option_updated(self):
+ """A single update should have change made and line updated."""
+ opt, val = ("UsePAM", "no")
+ lines = ssh_util.parse_ssh_config_lines(list(self.exlines))
+ result = ssh_util.update_ssh_config_lines(lines, {opt: val})
+ assert [opt] == result
+ self.check_line(lines[1], opt, val)
+
+ def test_multiple_updates_with_add(self):
+ """Verify multiple updates some added some changed, some not."""
+ updates = {
+ "UsePAM": "no",
+ "X11Forwarding": "no",
+ "NewOpt": "newval",
+ "AcceptEnv": "LANG ADD LC_*",
+ }
+ lines = ssh_util.parse_ssh_config_lines(list(self.exlines))
+ result = ssh_util.update_ssh_config_lines(lines, updates)
+ assert set(["UsePAM", "NewOpt", "AcceptEnv"]) == set(result)
+ self.check_line(lines[3], "AcceptEnv", updates["AcceptEnv"])
+
+ def test_return_empty_if_no_changes(self):
+ """If there are no changes, then return should be empty list."""
+ updates = {"UsePAM": "yes"}
+ lines = ssh_util.parse_ssh_config_lines(list(self.exlines))
+ result = ssh_util.update_ssh_config_lines(lines, updates)
+ assert [] == result
+ assert self.exlines == [str(line) for line in lines]
+
+ def test_keycase_not_modified(self):
+ """Original case of key should not be changed on update.
+ This behavior is to keep original config as much intact as can be."""
+ updates = {"usepam": "no"}
+ lines = ssh_util.parse_ssh_config_lines(list(self.exlines))
+ result = ssh_util.update_ssh_config_lines(lines, updates)
+ assert ["usepam"] == result
+ assert "UsePAM no" == str(lines[1])
+
+
+class TestUpdateSshConfig:
+ cfgdata = "\n".join(["#Option val", "MyKey ORIG_VAL", ""])
+
+ def test_modified(self, tmpdir):
+ mycfg = tmpdir.join("ssh_config_1")
+ util.write_file(mycfg, self.cfgdata)
+ ret = ssh_util.update_ssh_config({"MyKey": "NEW_VAL"}, mycfg)
+ assert True is ret
+ found = util.load_file(mycfg)
+ assert self.cfgdata.replace("ORIG_VAL", "NEW_VAL") == found
+ # assert there is a newline at end of file (LP: #1677205)
+ assert "\n" == found[-1]
+
+ def test_not_modified(self, tmpdir):
+ mycfg = tmpdir.join("ssh_config_2")
+ util.write_file(mycfg, self.cfgdata)
+ with patch("cloudinit.ssh_util.util.write_file") as m_write_file:
+ ret = ssh_util.update_ssh_config({"MyKey": "ORIG_VAL"}, mycfg)
+ assert False is ret
+ assert self.cfgdata == util.load_file(mycfg)
+ m_write_file.assert_not_called()
+
+ def test_without_include(self, tmpdir):
+ mycfg = tmpdir.join("sshd_config")
+ cfg = "X Y"
+ util.write_file(mycfg, cfg)
+ assert ssh_util.update_ssh_config({"key": "value"}, mycfg)
+ assert "X Y\nkey value\n" == util.load_file(mycfg)
+ expected_conf_file = f"{mycfg}.d/50-cloud-init.conf"
+ assert not os.path.isfile(expected_conf_file)
+
+ @pytest.mark.parametrize(
+ "cfg",
+ ["Include {mycfg}.d/*.conf", "Include {mycfg}.d/*.conf # comment"],
+ )
+ def test_with_include(self, cfg, tmpdir):
+ mycfg = tmpdir.join("sshd_config")
+ util.write_file(mycfg, cfg.format(mycfg=mycfg))
+ assert ssh_util.update_ssh_config({"key": "value"}, mycfg)
+ expected_conf_file = f"{mycfg}.d/50-cloud-init.conf"
+ assert os.path.isfile(expected_conf_file)
+ assert 0o600 == stat.S_IMODE(os.stat(expected_conf_file).st_mode)
+ assert "key value\n" == util.load_file(expected_conf_file)
+
+ def test_with_commented_include(self, tmpdir):
+ mycfg = tmpdir.join("sshd_config")
+ cfg = f"# Include {mycfg}.d/*.conf"
+ util.write_file(mycfg, cfg)
+ assert ssh_util.update_ssh_config({"key": "value"}, mycfg)
+ assert f"{cfg}\nkey value\n" == util.load_file(mycfg)
+ expected_conf_file = f"{mycfg}.d/50-cloud-init.conf"
+ assert not os.path.isfile(expected_conf_file)
+
+ def test_with_other_include(self, tmpdir):
+ mycfg = tmpdir.join("sshd_config")
+ cfg = f"Include other_{mycfg}.d/*.conf"
+ util.write_file(mycfg, cfg)
+ assert ssh_util.update_ssh_config({"key": "value"}, mycfg)
+ assert f"{cfg}\nkey value\n" == util.load_file(mycfg)
+ expected_conf_file = f"{mycfg}.d/50-cloud-init.conf"
+ assert not os.path.isfile(expected_conf_file)
+ assert not os.path.isfile(f"other_{mycfg}.d/50-cloud-init.conf")
+
+
+class TestBasicAuthorizedKeyParse:
+ @pytest.mark.parametrize(
+ "value, homedir, username, expected_rendered",
+ [
+ pytest.param(
+ "/opt/%u/keys",
+ "/home/bobby",
+ "bobby",
+ ["/opt/bobby/keys"],
+ id="user",
+ ),
+ pytest.param(
+ "/opt/%u",
+ "/home/bobby",
+ "bobby",
+ ["/opt/bobby"],
+ id="user_file",
+ ),
+ pytest.param(
+ "/opt/%u/%u",
+ "/home/bobby",
+ "bobby",
+ ["/opt/bobby/bobby"],
+ id="user_file_2",
+ ),
+ pytest.param(
+ "/keys/path1 /keys/path2",
+ "/home/bobby",
+ "bobby",
+ ["/keys/path1", "/keys/path2"],
+ id="multiple",
+ ),
+ pytest.param(
+ "/keys/path1 /keys/%u",
+ "/home/bobby",
+ "bobby",
+ ["/keys/path1", "/keys/bobby"],
+ id="multiple_2",
+ ),
+ pytest.param(
+ ".secret/keys",
+ "/home/bobby",
+ "bobby",
+ ["/home/bobby/.secret/keys"],
+ id="relative",
+ ),
+ pytest.param(
+ "%h/.keys",
+ "/homedirs/bobby",
+ "bobby",
+ ["/homedirs/bobby/.keys"],
+ id="home",
+ ),
+ pytest.param(
+ "%h/.keys .secret/keys /keys/path1 /opt/%u/keys",
+ "/homedirs/bobby",
+ "bobby",
+ [
+ "/homedirs/bobby/.keys",
+ "/homedirs/bobby/.secret/keys",
+ "/keys/path1",
+ "/opt/bobby/keys",
+ ],
+ id="all",
+ ),
+ ],
+ )
+ def test_render_authorizedkeysfile_paths(
+ self, value, homedir, username, expected_rendered
+ ):
+ assert expected_rendered == ssh_util.render_authorizedkeysfile_paths(
+ value, homedir, username
+ )
+
+
+class TestMultipleSshAuthorizedKeysFile:
+ def create_fake_users(
+ self,
+ names,
+ mock_permissions,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ users,
+ tmpdir,
+ ):
+ homes = []
+
+ root = str(tmpdir.join("root"))
+ fpw = FakePwEnt(pw_name="root", pw_dir=root)
+ users["root"] = fpw
+
+ for name in names:
+ home = str(tmpdir.join("home", name))
+ fpw = FakePwEnt(pw_name=name, pw_dir=home)
+ users[name] = fpw
+ homes.append(home)
+
+ m_get_permissions.side_effect = partial(
+ mock_get_permissions, mock_permissions
+ )
+ m_get_owner.side_effect = partial(mock_get_owner, mock_permissions)
+ m_get_group.side_effect = partial(mock_get_group, mock_permissions)
+ m_getpwnam.side_effect = partial(mock_getpwnam, users)
+ return homes
+
+ def create_user_authorized_file(self, home, filename, content_key, keys):
+ user_ssh_folder = os.path.join(home, ".ssh")
+ # /tmp/home/<user>/.ssh/authorized_keys = content_key
+ authorized_keys = str(os.path.join(user_ssh_folder, filename))
+ util.write_file(authorized_keys, VALID_CONTENT[content_key])
+ keys[authorized_keys] = content_key
+ return authorized_keys
+
+ def create_global_authorized_file(
+ self, filename, content_key, keys, tmpdir
+ ):
+ authorized_keys = str(tmpdir.join(filename))
+ util.write_file(authorized_keys, VALID_CONTENT[content_key])
+ keys[authorized_keys] = content_key
+ return authorized_keys
+
+ def create_sshd_config(self, authorized_keys_files, tmpdir):
+ sshd_config = str(tmpdir.join("sshd_config"))
+ util.write_file(
+ sshd_config, "AuthorizedKeysFile " + authorized_keys_files
+ )
+ return sshd_config
+
+ def execute_and_check(self, user, sshd_config, solution, keys):
+ (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys(
+ user, sshd_config
+ )
+ content = ssh_util.update_authorized_keys(auth_key_entries, [])
+
+ assert auth_key_fn == solution
+ for path, key in keys.items():
+ if path == solution:
+ assert VALID_CONTENT[key] in content
+ else:
+ assert VALID_CONTENT[key] not in content
+
+ @pytest.mark.parametrize("inverted", [False, True])
+ @patch("cloudinit.ssh_util.pwd.getpwnam")
+ @patch("cloudinit.util.get_permissions")
+ @patch("cloudinit.util.get_owner")
+ @patch("cloudinit.util.get_group")
+ def test_single_user_two_local_files(
+ self,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ inverted,
+ tmpdir,
+ ):
+ user_bobby = "bobby"
+ keys = {}
+ users = {}
+ mock_permissions = {
+ tmpdir.join("home", "bobby"): ("bobby", "bobby", 0o700),
+ tmpdir.join("home", "bobby", ".ssh"): ("bobby", "bobby", 0o700),
+ tmpdir.join("home", "bobby", ".ssh", "user_keys"): (
+ "bobby",
+ "bobby",
+ 0o600,
+ ),
+ tmpdir.join("home", "bobby", ".ssh", "authorized_keys"): (
+ "bobby",
+ "bobby",
+ 0o600,
+ ),
+ }
+
+ homes = self.create_fake_users(
+ [user_bobby],
+ mock_permissions,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ users,
+ tmpdir,
+ )
+ home = homes[0]
+
+ # /tmp/home/bobby/.ssh/authorized_keys = rsa
+ authorized_keys = self.create_user_authorized_file(
+ home, "authorized_keys", "rsa", keys
+ )
+
+ # /tmp/home/bobby/.ssh/user_keys = dsa
+ user_keys = self.create_user_authorized_file(
+ home, "user_keys", "dsa", keys
+ )
+
+ # /tmp/sshd_config
+ if not inverted:
+ options = f"{authorized_keys} {user_keys}"
+ else:
+ options = f"{user_keys} {authorized_keys}"
+ sshd_config = self.create_sshd_config(options, tmpdir)
+
+ if not inverted:
+ exec_args = (user_bobby, sshd_config, authorized_keys, keys)
+ else:
+ exec_args = (user_bobby, sshd_config, user_keys, keys)
+
+ self.execute_and_check(*exec_args)
+
+ @pytest.mark.parametrize("inverted", [False, True])
+ @patch("cloudinit.ssh_util.pwd.getpwnam")
+ @patch("cloudinit.util.get_permissions")
+ @patch("cloudinit.util.get_owner")
+ @patch("cloudinit.util.get_group")
+ def test_single_user_local_global_files(
+ self,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ inverted,
+ tmpdir,
+ ):
+ user_bobby = "bobby"
+ keys = {}
+ users = {}
+ mock_permissions = {
+ tmpdir.join("home", "bobby"): ("bobby", "bobby", 0o700),
+ tmpdir.join("home", "bobby", ".ssh"): ("bobby", "bobby", 0o700),
+ tmpdir.join("home", "bobby", ".ssh", "user_keys"): (
+ "bobby",
+ "bobby",
+ 0o600,
+ ),
+ tmpdir.join("home", "bobby", ".ssh", "authorized_keys"): (
+ "bobby",
+ "bobby",
+ 0o600,
+ ),
+ }
+
+ homes = self.create_fake_users(
+ [user_bobby],
+ mock_permissions,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ users,
+ tmpdir,
+ )
+ home = homes[0]
+
+ # /tmp/home/bobby/.ssh/authorized_keys = rsa
+ authorized_keys = self.create_user_authorized_file(
+ home, "authorized_keys", "rsa", keys
+ )
+
+ # /tmp/home/bobby/.ssh/user_keys = dsa
+ user_keys = self.create_user_authorized_file(
+ home, "user_keys", "dsa", keys
+ )
+
+ authorized_keys_global = self.create_global_authorized_file(
+ "etc/ssh/authorized_keys", "ecdsa", keys, tmpdir
+ )
+
+ if not inverted:
+ options = f"{authorized_keys_global} {user_keys} {authorized_keys}"
+ else:
+ options = f"{authorized_keys_global} {authorized_keys} {user_keys}"
+ sshd_config = self.create_sshd_config(options, tmpdir)
+
+ if not inverted:
+ exec_args = (user_bobby, sshd_config, user_keys, keys)
+ else:
+ exec_args = (user_bobby, sshd_config, authorized_keys, keys)
+ self.execute_and_check(*exec_args)
+
+ @patch("cloudinit.ssh_util.pwd.getpwnam")
+ @patch("cloudinit.util.get_permissions")
+ @patch("cloudinit.util.get_owner")
+ @patch("cloudinit.util.get_group")
+ def test_single_user_global_file(
+ self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, tmpdir
+ ):
+ user_bobby = "bobby"
+ keys = {}
+ users = {}
+ mock_permissions = {
+ tmpdir.join("home", "bobby"): ("bobby", "bobby", 0o700),
+ tmpdir.join("home", "bobby", ".ssh"): ("bobby", "bobby", 0o700),
+ tmpdir.join("home", "bobby", ".ssh", "authorized_keys"): (
+ "bobby",
+ "bobby",
+ 0o600,
+ ),
+ }
+
+ homes = self.create_fake_users(
+ [user_bobby],
+ mock_permissions,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ users,
+ tmpdir,
+ )
+ home = homes[0]
+
+ # /tmp/etc/ssh/authorized_keys = rsa
+ authorized_keys_global = self.create_global_authorized_file(
+ "etc/ssh/authorized_keys", "rsa", keys, tmpdir
+ )
+
+ options = "%s" % authorized_keys_global
+ sshd_config = self.create_sshd_config(options, tmpdir)
+
+ default = "%s/.ssh/authorized_keys" % home
+ self.execute_and_check(user_bobby, sshd_config, default, keys)
+
+ @patch("cloudinit.ssh_util.pwd.getpwnam")
+ @patch("cloudinit.util.get_permissions")
+ @patch("cloudinit.util.get_owner")
+ @patch("cloudinit.util.get_group")
+ def test_two_users_local_file_standard(
+ self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, tmpdir
+ ):
+ keys = {}
+ users = {}
+ mock_permissions = {
+ tmpdir.join("home", "bobby"): ("bobby", "bobby", 0o700),
+ tmpdir.join("home", "bobby", ".ssh"): ("bobby", "bobby", 0o700),
+ tmpdir.join("home", "bobby", ".ssh", "authorized_keys"): (
+ "bobby",
+ "bobby",
+ 0o600,
+ ),
+ tmpdir.join("home", "suzie"): ("suzie", "suzie", 0o700),
+ tmpdir.join("home", "suzie", ".ssh"): ("suzie", "suzie", 0o700),
+ tmpdir.join("home", "suzie", ".ssh", "authorized_keys"): (
+ "suzie",
+ "suzie",
+ 0o600,
+ ),
+ }
+
+ user_bobby = "bobby"
+ user_suzie = "suzie"
+ homes = self.create_fake_users(
+ [user_bobby, user_suzie],
+ mock_permissions,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ users,
+ tmpdir,
+ )
+ home_bobby = homes[0]
+ home_suzie = homes[1]
+
+ # /tmp/home/bobby/.ssh/authorized_keys = rsa
+ authorized_keys = self.create_user_authorized_file(
+ home_bobby, "authorized_keys", "rsa", keys
+ )
+
+ # /tmp/home/suzie/.ssh/authorized_keys = rsa
+ authorized_keys2 = self.create_user_authorized_file(
+ home_suzie, "authorized_keys", "ssh-xmss@openssh.com", keys
+ )
+
+ options = ".ssh/authorized_keys"
+ sshd_config = self.create_sshd_config(options, tmpdir)
+
+ self.execute_and_check(user_bobby, sshd_config, authorized_keys, keys)
+ self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys)
+
+ @patch("cloudinit.ssh_util.pwd.getpwnam")
+ @patch("cloudinit.util.get_permissions")
+ @patch("cloudinit.util.get_owner")
+ @patch("cloudinit.util.get_group")
+ def test_two_users_local_file_custom(
+ self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, tmpdir
+ ):
+ keys = {}
+ users = {}
+ mock_permissions = {
+ tmpdir.join("home", "bobby"): ("bobby", "bobby", 0o700),
+ tmpdir.join("home", "bobby", ".ssh"): ("bobby", "bobby", 0o700),
+ tmpdir.join("home", "bobby", ".ssh", "authorized_keys2"): (
+ "bobby",
+ "bobby",
+ 0o600,
+ ),
+ tmpdir.join("home", "suzie"): ("suzie", "suzie", 0o700),
+ tmpdir.join("home", "suzie", ".ssh"): ("suzie", "suzie", 0o700),
+ tmpdir.join("home", "suzie", ".ssh", "authorized_keys2"): (
+ "suzie",
+ "suzie",
+ 0o600,
+ ),
+ }
+
+ user_bobby = "bobby"
+ user_suzie = "suzie"
+ homes = self.create_fake_users(
+ [user_bobby, user_suzie],
+ mock_permissions,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ users,
+ tmpdir,
+ )
+ home_bobby = homes[0]
+ home_suzie = homes[1]
+
+ # /tmp/home/bobby/.ssh/authorized_keys2 = rsa
+ authorized_keys = self.create_user_authorized_file(
+ home_bobby, "authorized_keys2", "rsa", keys
+ )
+
+ # /tmp/home/suzie/.ssh/authorized_keys2 = rsa
+ authorized_keys2 = self.create_user_authorized_file(
+ home_suzie, "authorized_keys2", "ssh-xmss@openssh.com", keys
+ )
+
+ options = ".ssh/authorized_keys2"
+ sshd_config = self.create_sshd_config(options, tmpdir)
+
+ self.execute_and_check(user_bobby, sshd_config, authorized_keys, keys)
+ self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys)
+
+ @patch("cloudinit.ssh_util.pwd.getpwnam")
+ @patch("cloudinit.util.get_permissions")
+ @patch("cloudinit.util.get_owner")
+ @patch("cloudinit.util.get_group")
+ def test_two_users_local_global_files(
+ self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, tmpdir
+ ):
+ keys = {}
+ users = {}
+ mock_permissions = {
+ tmpdir.join("home", "bobby"): ("bobby", "bobby", 0o700),
+ tmpdir.join("home", "bobby", ".ssh"): ("bobby", "bobby", 0o700),
+ tmpdir.join("home", "bobby", ".ssh", "authorized_keys2"): (
+ "bobby",
+ "bobby",
+ 0o600,
+ ),
+ tmpdir.join("home", "bobby", ".ssh", "user_keys3"): (
+ "bobby",
+ "bobby",
+ 0o600,
+ ),
+ tmpdir.join("home", "suzie"): ("suzie", "suzie", 0o700),
+ tmpdir.join("home", "suzie", ".ssh"): ("suzie", "suzie", 0o700),
+ tmpdir.join("home", "suzie", ".ssh", "authorized_keys2"): (
+ "suzie",
+ "suzie",
+ 0o600,
+ ),
+ tmpdir.join("home", "suzie", ".ssh", "user_keys3"): (
+ "suzie",
+ "suzie",
+ 0o600,
+ ),
+ }
+
+ user_bobby = "bobby"
+ user_suzie = "suzie"
+ homes = self.create_fake_users(
+ [user_bobby, user_suzie],
+ mock_permissions,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ users,
+ tmpdir,
+ )
+ home_bobby = homes[0]
+ home_suzie = homes[1]
+
+ # /tmp/home/bobby/.ssh/authorized_keys2 = rsa
+ self.create_user_authorized_file(
+ home_bobby, "authorized_keys2", "rsa", keys
+ )
+ # /tmp/home/bobby/.ssh/user_keys3 = dsa
+ user_keys = self.create_user_authorized_file(
+ home_bobby, "user_keys3", "dsa", keys
+ )
+
+ # /tmp/home/suzie/.ssh/authorized_keys2 = rsa
+ authorized_keys2 = self.create_user_authorized_file(
+ home_suzie, "authorized_keys2", "ssh-xmss@openssh.com", keys
+ )
+
+ # /tmp/etc/ssh/authorized_keys = ecdsa
+ authorized_keys_global = self.create_global_authorized_file(
+ "etc/ssh/authorized_keys2", "ecdsa", keys, tmpdir
+ )
+
+ options = "%s %s %%h/.ssh/authorized_keys2" % (
+ authorized_keys_global,
+ user_keys,
+ )
+ sshd_config = self.create_sshd_config(options, tmpdir)
+
+ self.execute_and_check(user_bobby, sshd_config, user_keys, keys)
+ self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys)
+
+ @patch("cloudinit.util.get_user_groups")
+ @patch("cloudinit.ssh_util.pwd.getpwnam")
+ @patch("cloudinit.util.get_permissions")
+ @patch("cloudinit.util.get_owner")
+ @patch("cloudinit.util.get_group")
+ def test_two_users_local_global_files_badguy(
+ self,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ m_get_user_groups,
+ tmpdir,
+ ):
+ keys = {}
+ users = {}
+ mock_permissions = {
+ tmpdir.join("home", "bobby"): ("bobby", "bobby", 0o700),
+ tmpdir.join("home", "bobby", ".ssh"): ("bobby", "bobby", 0o700),
+ tmpdir.join("home", "bobby", ".ssh", "authorized_keys2"): (
+ "bobby",
+ "bobby",
+ 0o600,
+ ),
+ tmpdir.join("home", "bobby", ".ssh", "user_keys3"): (
+ "bobby",
+ "bobby",
+ 0o600,
+ ),
+ tmpdir.join("home", "badguy"): ("root", "root", 0o755),
+ tmpdir.join("home", "badguy", "home"): ("root", "root", 0o755),
+ tmpdir.join("home", "badguy", "home", "bobby"): (
+ "root",
+ "root",
+ 0o655,
+ ),
+ }
+
+ user_bobby = "bobby"
+ user_badguy = "badguy"
+ home_bobby, *_ = self.create_fake_users(
+ [user_bobby, user_badguy],
+ mock_permissions,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ users,
+ tmpdir,
+ )
+ m_get_user_groups.side_effect = mock_get_user_groups
+
+ # /tmp/home/bobby/.ssh/authorized_keys2 = rsa
+ authorized_keys = self.create_user_authorized_file(
+ home_bobby, "authorized_keys2", "rsa", keys
+ )
+ # /tmp/home/bobby/.ssh/user_keys3 = dsa
+ user_keys = self.create_user_authorized_file(
+ home_bobby, "user_keys3", "dsa", keys
+ )
+
+ # /tmp/home/badguy/home/bobby = ""
+ authorized_keys2 = str(tmpdir.join("home", "badguy", "home", "bobby"))
+ util.write_file(authorized_keys2, "")
+
+ # /tmp/etc/ssh/authorized_keys = ecdsa
+ authorized_keys_global = self.create_global_authorized_file(
+ "etc/ssh/authorized_keys2", "ecdsa", keys, tmpdir
+ )
+
+ # /tmp/sshd_config
+ options = "%s %%h/.ssh/authorized_keys2 %s %s" % (
+ authorized_keys2,
+ authorized_keys_global,
+ user_keys,
+ )
+ sshd_config = self.create_sshd_config(options, tmpdir)
+
+ self.execute_and_check(user_bobby, sshd_config, authorized_keys, keys)
+ self.execute_and_check(
+ user_badguy, sshd_config, authorized_keys2, keys
+ )
+
+ @patch("cloudinit.util.get_user_groups")
+ @patch("cloudinit.ssh_util.pwd.getpwnam")
+ @patch("cloudinit.util.get_permissions")
+ @patch("cloudinit.util.get_owner")
+ @patch("cloudinit.util.get_group")
+ def test_two_users_unaccessible_file(
+ self,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ m_get_user_groups,
+ tmpdir,
+ ):
+ keys = {}
+ users = {}
+ mock_permissions = {
+ tmpdir.join("home", "bobby"): ("bobby", "bobby", 0o700),
+ tmpdir.join("home", "bobby", ".ssh"): ("bobby", "bobby", 0o700),
+ tmpdir.join("home", "bobby", ".ssh", "authorized_keys"): (
+ "bobby",
+ "bobby",
+ 0o600,
+ ),
+ tmpdir.join("etc"): ("root", "root", 0o755),
+ tmpdir.join("etc", "ssh"): ("root", "root", 0o755),
+ tmpdir.join("etc", "ssh", "userkeys"): ("root", "root", 0o700),
+ tmpdir.join("etc", "ssh", "userkeys", "bobby"): (
+ "bobby",
+ "bobby",
+ 0o600,
+ ),
+ tmpdir.join("etc", "ssh", "userkeys", "badguy"): (
+ "badguy",
+ "badguy",
+ 0o600,
+ ),
+ tmpdir.join("home", "badguy"): ("badguy", "badguy", 0o700),
+ tmpdir.join("home", "badguy", ".ssh"): ("badguy", "badguy", 0o700),
+ tmpdir.join("home", "badguy", ".ssh", "authorized_keys"): (
+ "badguy",
+ "badguy",
+ 0o600,
+ ),
+ }
+
+ user_bobby = "bobby"
+ user_badguy = "badguy"
+ homes = self.create_fake_users(
+ [user_bobby, user_badguy],
+ mock_permissions,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ users,
+ tmpdir,
+ )
+ m_get_user_groups.side_effect = mock_get_user_groups
+ home_bobby = homes[0]
+ home_badguy = homes[1]
+
+ # /tmp/home/bobby/.ssh/authorized_keys = rsa
+ authorized_keys = self.create_user_authorized_file(
+ home_bobby, "authorized_keys", "rsa", keys
+ )
+ # /tmp/etc/ssh/userkeys/bobby = dsa
+ # assume here that we can bypass userkeys, despite permissions
+ self.create_global_authorized_file(
+ "etc/ssh/userkeys/bobby", "dsa", keys, tmpdir
+ )
+
+ # /tmp/home/badguy/.ssh/authorized_keys = ssh-xmss@openssh.com
+ authorized_keys2 = self.create_user_authorized_file(
+ home_badguy, "authorized_keys", "ssh-xmss@openssh.com", keys
+ )
+
+ # /tmp/etc/ssh/userkeys/badguy = ecdsa
+ self.create_global_authorized_file(
+ "etc/ssh/userkeys/badguy", "ecdsa", keys, tmpdir
+ )
+
+ # /tmp/sshd_config
+ options = str(
+ tmpdir.join("etc", "ssh", "userkeys", "%u .ssh", "authorized_keys")
+ )
+ sshd_config = self.create_sshd_config(options, tmpdir)
+
+ self.execute_and_check(user_bobby, sshd_config, authorized_keys, keys)
+ self.execute_and_check(
+ user_badguy, sshd_config, authorized_keys2, keys
+ )
+
+ @patch("cloudinit.util.get_user_groups")
+ @patch("cloudinit.ssh_util.pwd.getpwnam")
+ @patch("cloudinit.util.get_permissions")
+ @patch("cloudinit.util.get_owner")
+ @patch("cloudinit.util.get_group")
+ def test_two_users_accessible_file(
+ self,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ m_get_user_groups,
+ tmpdir,
+ ):
+ keys = {}
+ users = {}
+ mock_permissions = {
+ tmpdir.join("home", "bobby"): ("bobby", "bobby", 0o700),
+ tmpdir.join("home", "bobby", ".ssh"): ("bobby", "bobby", 0o700),
+ tmpdir.join("home", "bobby", ".ssh", "authorized_keys"): (
+ "bobby",
+ "bobby",
+ 0o600,
+ ),
+ tmpdir.join("etc"): ("root", "root", 0o755),
+ tmpdir.join("etc", "ssh"): ("root", "root", 0o755),
+ tmpdir.join("etc", "ssh", "userkeys"): ("root", "root", 0o755),
+ tmpdir.join("etc", "ssh", "userkeys", "bobby"): (
+ "bobby",
+ "bobby",
+ 0o600,
+ ),
+ tmpdir.join("etc", "ssh", "userkeys", "badguy"): (
+ "badguy",
+ "badguy",
+ 0o600,
+ ),
+ tmpdir.join("home", "badguy"): ("badguy", "badguy", 0o700),
+ tmpdir.join("home", "badguy", ".ssh"): ("badguy", "badguy", 0o700),
+ tmpdir.join("home", "badguy", ".ssh", "authorized_keys"): (
+ "badguy",
+ "badguy",
+ 0o600,
+ ),
+ }
+
+ user_bobby = "bobby"
+ user_badguy = "badguy"
+ homes = self.create_fake_users(
+ [user_bobby, user_badguy],
+ mock_permissions,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ users,
+ tmpdir,
+ )
+ m_get_user_groups.side_effect = mock_get_user_groups
+ home_bobby = homes[0]
+ home_badguy = homes[1]
+
+ # /tmp/home/bobby/.ssh/authorized_keys = rsa
+ self.create_user_authorized_file(
+ home_bobby, "authorized_keys", "rsa", keys
+ )
+ # /tmp/etc/ssh/userkeys/bobby = dsa
+ # assume here that we can bypass userkeys, despite permissions
+ authorized_keys = self.create_global_authorized_file(
+ "etc/ssh/userkeys/bobby", "dsa", keys, tmpdir
+ )
+
+ # /tmp/home/badguy/.ssh/authorized_keys = ssh-xmss@openssh.com
+ self.create_user_authorized_file(
+ home_badguy, "authorized_keys", "ssh-xmss@openssh.com", keys
+ )
+
+ # /tmp/etc/ssh/userkeys/badguy = ecdsa
+ authorized_keys2 = self.create_global_authorized_file(
+ "etc/ssh/userkeys/badguy", "ecdsa", keys, tmpdir
+ )
+
+ # /tmp/sshd_config
+ options = str(
+ tmpdir.join("etc", "ssh", "userkeys", "%u .ssh", "authorized_keys")
+ )
+ sshd_config = self.create_sshd_config(options, tmpdir)
+
+ self.execute_and_check(user_bobby, sshd_config, authorized_keys, keys)
+ self.execute_and_check(
+ user_badguy, sshd_config, authorized_keys2, keys
+ )
+
+ @pytest.mark.parametrize("inverted", [False, True])
+ @patch("cloudinit.util.get_user_groups")
+ @patch("cloudinit.ssh_util.pwd.getpwnam")
+ @patch("cloudinit.util.get_permissions")
+ @patch("cloudinit.util.get_owner")
+ @patch("cloudinit.util.get_group")
+ def test_two_users_hardcoded_single_user_file(
+ self,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ m_get_user_groups,
+ inverted,
+ tmpdir,
+ ):
+ keys = {}
+ users = {}
+ mock_permissions = {
+ tmpdir.join("home", "bobby"): ("bobby", "bobby", 0o700),
+ tmpdir.join("home", "bobby", ".ssh"): ("bobby", "bobby", 0o700),
+ tmpdir.join("home", "bobby", ".ssh", "authorized_keys"): (
+ "bobby",
+ "bobby",
+ 0o600,
+ ),
+ tmpdir.join("home", "suzie"): ("suzie", "suzie", 0o700),
+ tmpdir.join("home", "suzie", ".ssh"): ("suzie", "suzie", 0o700),
+ tmpdir.join("home", "suzie", ".ssh", "authorized_keys"): (
+ "suzie",
+ "suzie",
+ 0o600,
+ ),
+ }
+
+ user_bobby = "bobby"
+ user_suzie = "suzie"
+ homes = self.create_fake_users(
+ [user_bobby, user_suzie],
+ mock_permissions,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ users,
+ tmpdir,
+ )
+ home_bobby = homes[0]
+ home_suzie = homes[1]
+ m_get_user_groups.side_effect = mock_get_user_groups
+
+ # /tmp/home/bobby/.ssh/authorized_keys = rsa
+ authorized_keys = self.create_user_authorized_file(
+ home_bobby, "authorized_keys", "rsa", keys
+ )
+
+ # /tmp/home/suzie/.ssh/authorized_keys = ssh-xmss@openssh.com
+ authorized_keys2 = self.create_user_authorized_file(
+ home_suzie, "authorized_keys", "ssh-xmss@openssh.com", keys
+ )
+
+ # /tmp/sshd_config
+ if not inverted:
+ expected_keys = authorized_keys
+ else:
+ expected_keys = authorized_keys2
+ options = "%s" % (expected_keys)
+ sshd_config = self.create_sshd_config(options, tmpdir)
+
+ if not inverted:
+ expected_bobby = expected_keys
+ expected_suzie = "%s/.ssh/authorized_keys" % home_suzie
+ else:
+ expected_bobby = "%s/.ssh/authorized_keys" % home_bobby
+ expected_suzie = expected_keys
+ self.execute_and_check(user_bobby, sshd_config, expected_bobby, keys)
+ self.execute_and_check(user_suzie, sshd_config, expected_suzie, keys)
+
+ @patch("cloudinit.util.get_user_groups")
+ @patch("cloudinit.ssh_util.pwd.getpwnam")
+ @patch("cloudinit.util.get_permissions")
+ @patch("cloudinit.util.get_owner")
+ @patch("cloudinit.util.get_group")
+ def test_two_users_hardcoded_user_files(
+ self,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ m_get_user_groups,
+ tmpdir,
+ ):
+ keys = {}
+ users = {}
+ mock_permissions = {
+ tmpdir.join("home", "bobby"): ("bobby", "bobby", 0o700),
+ tmpdir.join("home", "bobby", ".ssh"): ("bobby", "bobby", 0o700),
+ tmpdir.join("home", "bobby", ".ssh", "authorized_keys"): (
+ "bobby",
+ "bobby",
+ 0o600,
+ ),
+ tmpdir.join("home", "suzie"): ("suzie", "suzie", 0o700),
+ tmpdir.join("home", "suzie", ".ssh"): ("suzie", "suzie", 0o700),
+ tmpdir.join("home", "suzie", ".ssh", "authorized_keys"): (
+ "suzie",
+ "suzie",
+ 0o600,
+ ),
+ }
+
+ user_bobby = "bobby"
+ user_suzie = "suzie"
+ homes = self.create_fake_users(
+ [user_bobby, user_suzie],
+ mock_permissions,
+ m_get_group,
+ m_get_owner,
+ m_get_permissions,
+ m_getpwnam,
+ users,
+ tmpdir,
+ )
+ home_bobby = homes[0]
+ home_suzie = homes[1]
+ m_get_user_groups.side_effect = mock_get_user_groups
+
+ # /tmp/home/bobby/.ssh/authorized_keys = rsa
+ authorized_keys = self.create_user_authorized_file(
+ home_bobby, "authorized_keys", "rsa", keys
+ )
+
+ # /tmp/home/suzie/.ssh/authorized_keys = ssh-xmss@openssh.com
+ authorized_keys2 = self.create_user_authorized_file(
+ home_suzie, "authorized_keys", "ssh-xmss@openssh.com", keys
+ )
+
+ # /tmp/etc/ssh/authorized_keys = ecdsa
+ authorized_keys_global = self.create_global_authorized_file(
+ "etc/ssh/authorized_keys", "ecdsa", keys, tmpdir
+ )
+
+ # /tmp/sshd_config
+ options = "%s %s %s" % (
+ authorized_keys_global,
+ authorized_keys,
+ authorized_keys2,
+ )
+ sshd_config = self.create_sshd_config(options, tmpdir)
+
+ self.execute_and_check(user_bobby, sshd_config, authorized_keys, keys)
+ self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys)
+
+
+# vi: ts=4 expandtab