summaryrefslogtreecommitdiff
path: root/keystoneclient
diff options
context:
space:
mode:
Diffstat (limited to 'keystoneclient')
-rw-r--r--keystoneclient/common/cms.py21
-rw-r--r--keystoneclient/tests/unit/test_cms.py54
2 files changed, 64 insertions, 11 deletions
diff --git a/keystoneclient/common/cms.py b/keystoneclient/common/cms.py
index 1bd0f41..21cd394 100644
--- a/keystoneclient/common/cms.py
+++ b/keystoneclient/common/cms.py
@@ -85,9 +85,7 @@ def _check_files_accessible(files):
except IOError as e:
# Catching IOError means there is an issue with
# the given file.
- err = _('Hit OSError in _process_communicate_handle_oserror()\n'
- 'Likely due to %(file)s: %(error)s') % {'file': try_file,
- 'error': e.strerror}
+ err = try_file, e.strerror
# Emulate openssl behavior, which returns with code 2 when
# access to a file failed.
retcode = OpensslCmsExitStatus.INPUT_FILE_READ_ERROR
@@ -111,12 +109,25 @@ def _process_communicate_handle_oserror(process, data, files):
retcode, err = _check_files_accessible(files)
if process.stderr:
msg = process.stderr.read()
- err = err + msg.decode('utf-8')
+ if isinstance(msg, six.binary_type):
+ msg = msg.decode('utf-8')
+ if err:
+ err = (_('Hit OSError in '
+ '_process_communicate_handle_oserror(): '
+ '%(stderr)s\nLikely due to %(file)s: %(error)s') %
+ {'stderr': msg,
+ 'file': err[0],
+ 'error': err[1]})
+ else:
+ err = (_('Hit OSError in '
+ '_process_communicate_handle_oserror(): %s') % msg)
+
output = ''
else:
retcode = process.poll()
if err is not None:
- err = err.decode('utf-8')
+ if isinstance(err, six.binary_type):
+ err = err.decode('utf-8')
return output, err, retcode
diff --git a/keystoneclient/tests/unit/test_cms.py b/keystoneclient/tests/unit/test_cms.py
index 019730d..dc1d0d1 100644
--- a/keystoneclient/tests/unit/test_cms.py
+++ b/keystoneclient/tests/unit/test_cms.py
@@ -42,6 +42,11 @@ class CMSTest(utils.TestCase, testresources.ResourcedTestCase):
raise Exception('Your version of OpenSSL is not supported. '
'You will need to update it to 1.0 or later.')
+ def _raise_OSError(*args):
+ e = OSError()
+ e.errno = errno.EPIPE
+ raise e
+
def test_cms_verify(self):
self.assertRaises(exceptions.CertificateConfigError,
cms.cms_verify,
@@ -90,12 +95,8 @@ class CMSTest(utils.TestCase, testresources.ResourcedTestCase):
'/no/such/file', '/no/such/key')
def test_cms_verify_token_no_oserror(self):
- def raise_OSError(*args):
- e = OSError()
- e.errno = errno.EPIPE
- raise e
-
- with mock.patch('subprocess.Popen.communicate', new=raise_OSError):
+ with mock.patch('subprocess.Popen.communicate',
+ new=self._raise_OSError):
try:
cms.cms_verify("x", '/no/such/file', '/no/such/key')
except exceptions.CertificateConfigError as e:
@@ -155,6 +156,47 @@ class CMSTest(utils.TestCase, testresources.ResourcedTestCase):
# sha256 hash is 64 chars.
self.assertThat(token_id, matchers.HasLength(64))
+ @mock.patch('keystoneclient.common.cms._check_files_accessible')
+ def test_process_communicate_handle_oserror_epipe(self, files_acc_mock):
+ process_mock = mock.Mock()
+ process_mock.communicate = self._raise_OSError
+ process_mock.stderr = mock.Mock()
+ process_mock.stderr.read = mock.Mock(return_value='proc stderr')
+ files_acc_mock.return_value = 1, ('file_path', 'fileerror')
+ output, err, retcode = cms._process_communicate_handle_oserror(
+ process_mock, '', [])
+
+ self.assertEqual((output, retcode), ('', 1))
+ self.assertIn('file_path', err)
+ self.assertIn('fileerror', err)
+ self.assertIn('proc stderr', err)
+
+ @mock.patch('keystoneclient.common.cms._check_files_accessible')
+ def test_process_communicate_handle_oserror_epipe_files_ok(
+ self, files_acc_mock):
+ process_mock = mock.Mock()
+ process_mock.communicate = self._raise_OSError
+ process_mock.stderr = mock.Mock()
+ process_mock.stderr.read = mock.Mock(return_value='proc stderr')
+ files_acc_mock.return_value = -1, None
+ output, err, retcode = cms._process_communicate_handle_oserror(
+ process_mock, '', [])
+
+ self.assertEqual((output, retcode), ('', -1))
+ self.assertIn('proc stderr', err)
+
+ def test_process_communicate_handle_oserror_no_exception(self):
+ process_mock = mock.Mock()
+ process_mock.communicate.return_value = 'out', 'err'
+ process_mock.poll.return_value = 0
+
+ output, err, retcode = cms._process_communicate_handle_oserror(
+ process_mock, '', [])
+
+ self.assertEqual(output, 'out')
+ self.assertEqual(err, 'err')
+ self.assertEqual(retcode, 0)
+
def load_tests(loader, tests, pattern):
return testresources.OptimisingTestSuite(tests)