summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--heat_cfntools/cfntools/cfn_helper.py19
-rw-r--r--heat_cfntools/tests/test_cfn_helper.py56
2 files changed, 10 insertions, 65 deletions
diff --git a/heat_cfntools/cfntools/cfn_helper.py b/heat_cfntools/cfntools/cfn_helper.py
index 2351c1d..7e75cf9 100644
--- a/heat_cfntools/cfntools/cfn_helper.py
+++ b/heat_cfntools/cfntools/cfn_helper.py
@@ -180,23 +180,7 @@ class CommandRunner(object):
self
"""
LOG.debug("Running command: %s" % self._command)
-
- cmd = self._command
- if isinstance(cmd, str):
- cmd = cmd.split()
-
- if user != 'root':
- # lower the privileges
- try:
- real = pwd.getpwnam(user)
- os.setuid(real.pw_uid)
- except Exception as e:
- LOG.error("Error setting privileges for user '%s': %s"
- % (user, e))
- # 126: command invoked cannot be executed
- self._status = 126
- return
-
+ cmd = ['su', user, '-c', self._command]
subproc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, cwd=cwd, env=env)
output = subproc.communicate()
@@ -576,6 +560,7 @@ class PackagesHandler(object):
* if a version array is supplied, choose the highest version from the
array and follow same logic for version string above
"""
+
cmd = CommandRunner("which yum").run()
if cmd.status == 1:
# yum not available, use DNF if available
diff --git a/heat_cfntools/tests/test_cfn_helper.py b/heat_cfntools/tests/test_cfn_helper.py
index 15641bc..59020e2 100644
--- a/heat_cfntools/tests/test_cfn_helper.py
+++ b/heat_cfntools/tests/test_cfn_helper.py
@@ -29,7 +29,7 @@ from heat_cfntools.cfntools import cfn_helper
def popen_root_calls(calls):
kwargs = {'env': None, 'cwd': None, 'stderr': -1, 'stdout': -1}
return [
- mock.call(call.split(), **kwargs)
+ mock.call(['su', 'root', '-c', call], **kwargs)
for call in calls
]
@@ -51,9 +51,9 @@ class TestCommandRunner(testtools.TestCase):
def test_command_runner(self):
def returns(*args, **kwargs):
- if args[0][0] == '/bin/command1':
+ if args[0][3] == '/bin/command1':
return FakePOpen('All good')
- elif args[0][0] == '/bin/command2':
+ elif args[0][3] == '/bin/command2':
return FakePOpen('Doing something', 'error', -1)
else:
raise Exception('This should never happen')
@@ -73,53 +73,13 @@ class TestCommandRunner(testtools.TestCase):
calls = popen_root_calls(['/bin/command1', '/bin/command2'])
mock_popen.assert_has_calls(calls)
- def test_runner_runs_command_as_list(self):
- with mock.patch('subprocess.Popen') as mock_popen:
- command = '/bin/command --option=value arg1 arg2'
- expected_cmd = ['/bin/command', '--option=value', 'arg1', 'arg2']
- cmd = cfn_helper.CommandRunner(command)
- cmd.run()
- self.assertTrue(mock_popen.called)
- actual_cmd = mock_popen.call_args[0][0]
- self.assertTrue(isinstance(actual_cmd, list))
- self.assertItemsEqual(expected_cmd, actual_cmd)
-
- @mock.patch.object(cfn_helper.pwd, 'getpwnam')
- @mock.patch.object(cfn_helper.os, 'setuid')
- def test_privileges_are_lowered_for_non_root_user(self, mock_setuid,
- mock_getpwnam):
- pw_entry = mock.MagicMock()
- pw_entry.pw_uid = 1001
- mock_getpwnam.return_value = pw_entry
- with mock.patch('subprocess.Popen') as mock_popen:
- command = '/bin/command --option=value arg1 arg2'
- cmd = cfn_helper.CommandRunner(command)
- cmd.run(user='nonroot')
- mock_getpwnam.assert_called_once_with('nonroot')
- mock_setuid.assert_called_once_with(1001)
- self.assertTrue(mock_popen.called)
-
- @mock.patch.object(cfn_helper.pwd, 'getpwnam')
- @mock.patch.object(cfn_helper.os, 'setuid')
- def test_run_returns_when_cannot_set_privileges(self, mock_setuid,
- mock_getpwnam):
- mock_setuid.side_effect = Exception('[Error 1] Permission Denied')
- with mock.patch('subprocess.Popen') as mock_popen:
- command = '/bin/command2'
- cmd = cfn_helper.CommandRunner(command)
- cmd.run(user='nonroot')
- self.assertTrue(mock_getpwnam.called)
- self.assertTrue(mock_setuid.called)
- self.assertFalse(mock_popen.called)
- self.assertEqual(126, cmd.status)
-
class TestPackages(testtools.TestCase):
def test_yum_install(self):
def returns(*args, **kwargs):
- if (args[0][0] == 'rpm' and args[0][1] == '-q'):
+ if args[0][3].startswith('rpm -q '):
return FakePOpen(returncode=1)
else:
return FakePOpen(returncode=0)
@@ -146,8 +106,8 @@ class TestPackages(testtools.TestCase):
def test_dnf_install_yum_unavailable(self):
def returns(*args, **kwargs):
- if ((args[0][0] == 'rpm' and args[0][1] == '-q')
- or (args[0][0] == 'which' and args[0][1] == 'yum')):
+ if args[0][3].startswith('rpm -q ') \
+ or args[0][3] == 'which yum':
return FakePOpen(returncode=1)
else:
return FakePOpen(returncode=0)
@@ -174,7 +134,7 @@ class TestPackages(testtools.TestCase):
def test_dnf_install(self):
def returns(*args, **kwargs):
- if (args[0][0] == 'rpm' and args[0][1] == '-q'):
+ if args[0][3].startswith('rpm -q '):
return FakePOpen(returncode=1)
else:
return FakePOpen(returncode=0)
@@ -201,7 +161,7 @@ class TestPackages(testtools.TestCase):
def test_zypper_install(self):
def returns(*args, **kwargs):
- if (args[0][0] == 'rpm' and args[0][1] == '-q'):
+ if args[0][3].startswith('rpm -q '):
return FakePOpen(returncode=1)
else:
return FakePOpen(returncode=0)