# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import errno import hashlib import os import shutil import stat import tempfile import time import uuid import mock from oslotest import base as test_base import six from oslo_utils import fileutils TEST_PERMISSIONS = stat.S_IRWXU class EnsureTree(test_base.BaseTestCase): def test_ensure_tree(self): tmpdir = tempfile.mkdtemp() try: testdir = '%s/foo/bar/baz' % (tmpdir,) fileutils.ensure_tree(testdir, TEST_PERMISSIONS) self.assertTrue(os.path.isdir(testdir)) self.assertEqual(os.stat(testdir).st_mode, TEST_PERMISSIONS | stat.S_IFDIR) finally: if os.path.exists(tmpdir): shutil.rmtree(tmpdir) class DeleteIfExists(test_base.BaseTestCase): def test_file_present(self): tmpfile = tempfile.mktemp() open(tmpfile, 'w') fileutils.delete_if_exists(tmpfile) self.assertFalse(os.path.exists(tmpfile)) def test_file_absent(self): tmpfile = tempfile.mktemp() fileutils.delete_if_exists(tmpfile) self.assertFalse(os.path.exists(tmpfile)) def test_dir_present(self): tmpdir = tempfile.mktemp() os.mkdir(tmpdir) fileutils.delete_if_exists(tmpdir, remove=os.rmdir) self.assertFalse(os.path.exists(tmpdir)) def test_file_error(self): def errm(path): raise OSError(errno.EINVAL, '') tmpfile = tempfile.mktemp() open(tmpfile, 'w') self.assertRaises(OSError, fileutils.delete_if_exists, tmpfile, errm) os.unlink(tmpfile) class RemovePathOnError(test_base.BaseTestCase): def test_error(self): tmpfile = tempfile.mktemp() open(tmpfile, 'w') try: with fileutils.remove_path_on_error(tmpfile): raise Exception except Exception: self.assertFalse(os.path.exists(tmpfile)) def test_no_error(self): tmpfile = tempfile.mktemp() open(tmpfile, 'w') with fileutils.remove_path_on_error(tmpfile): pass self.assertTrue(os.path.exists(tmpfile)) os.unlink(tmpfile) def test_remove(self): tmpfile = tempfile.mktemp() open(tmpfile, 'w') try: with fileutils.remove_path_on_error(tmpfile, remove=lambda x: x): raise Exception except Exception: self.assertTrue(os.path.exists(tmpfile)) os.unlink(tmpfile) def test_remove_dir(self): tmpdir = tempfile.mktemp() os.mkdir(tmpdir) try: with fileutils.remove_path_on_error( tmpdir, lambda path: fileutils.delete_if_exists(path, os.rmdir)): raise Exception except Exception: self.assertFalse(os.path.exists(tmpdir)) class WriteToTempfileTestCase(test_base.BaseTestCase): def setUp(self): super(WriteToTempfileTestCase, self).setUp() self.content = 'testing123'.encode('ascii') def check_file_content(self, path): with open(path, 'r') as fd: ans = fd.read() self.assertEqual(self.content, six.b(ans)) def test_file_without_path_and_suffix(self): res = fileutils.write_to_tempfile(self.content) self.assertTrue(os.path.exists(res)) (basepath, tmpfile) = os.path.split(res) self.assertTrue(basepath.startswith(tempfile.gettempdir())) self.assertTrue(tmpfile.startswith('tmp')) self.check_file_content(res) def test_file_with_not_existing_path(self): random_dir = uuid.uuid4().hex path = '/tmp/%s/test1' % random_dir res = fileutils.write_to_tempfile(self.content, path=path) self.assertTrue(os.path.exists(res)) (basepath, tmpfile) = os.path.split(res) self.assertEqual(basepath, path) self.assertTrue(tmpfile.startswith('tmp')) self.check_file_content(res) shutil.rmtree('/tmp/' + random_dir) def test_file_with_not_default_suffix(self): suffix = '.conf' res = fileutils.write_to_tempfile(self.content, suffix=suffix) self.assertTrue(os.path.exists(res)) (basepath, tmpfile) = os.path.split(res) self.assertTrue(basepath.startswith(tempfile.gettempdir())) self.assertTrue(tmpfile.startswith('tmp')) self.assertTrue(tmpfile.endswith('.conf')) self.check_file_content(res) def test_file_with_not_existing_path_and_not_default_suffix(self): suffix = '.txt' random_dir = uuid.uuid4().hex path = '/tmp/%s/test2' % random_dir res = fileutils.write_to_tempfile(self.content, path=path, suffix=suffix) self.assertTrue(os.path.exists(res)) (basepath, tmpfile) = os.path.split(res) self.assertTrue(tmpfile.startswith('tmp')) self.assertEqual(basepath, path) self.assertTrue(tmpfile.endswith(suffix)) self.check_file_content(res) shutil.rmtree('/tmp/' + random_dir) def test_file_with_not_default_prefix(self): prefix = 'test' res = fileutils.write_to_tempfile(self.content, prefix=prefix) self.assertTrue(os.path.exists(res)) (basepath, tmpfile) = os.path.split(res) self.assertTrue(tmpfile.startswith(prefix)) self.assertTrue(basepath.startswith(tempfile.gettempdir())) self.check_file_content(res) class TestComputeFileChecksum(test_base.BaseTestCase): def setUp(self): super(TestComputeFileChecksum, self).setUp() self.content = 'fake_content'.encode('ascii') def check_file_content(self, content, path): with open(path, 'r') as fd: ans = fd.read() self.assertEqual(content, six.b(ans)) def test_compute_checksum_default_algorithm(self): path = fileutils.write_to_tempfile(self.content) self.assertTrue(os.path.exists(path)) self.check_file_content(self.content, path) expected_checksum = hashlib.sha256() expected_checksum.update(self.content) actual_checksum = fileutils.compute_file_checksum(path) self.assertEqual(expected_checksum.hexdigest(), actual_checksum) def test_compute_checksum_sleep_0_called(self): path = fileutils.write_to_tempfile(self.content) self.assertTrue(os.path.exists(path)) self.check_file_content(self.content, path) expected_checksum = hashlib.sha256() expected_checksum.update(self.content) with mock.patch.object(time, "sleep") as sleep_mock: actual_checksum = fileutils.compute_file_checksum( path, read_chunksize=4) sleep_mock.assert_has_calls([mock.call(0)] * 3) # Just to make sure that there were exactly 3 calls self.assertEqual(3, sleep_mock.call_count) self.assertEqual(expected_checksum.hexdigest(), actual_checksum) def test_compute_checksum_named_algorithm(self): path = fileutils.write_to_tempfile(self.content) self.assertTrue(os.path.exists(path)) self.check_file_content(self.content, path) expected_checksum = hashlib.sha512() expected_checksum.update(self.content) actual_checksum = fileutils.compute_file_checksum(path, algorithm='sha512') self.assertEqual(expected_checksum.hexdigest(), actual_checksum) def test_compute_checksum_invalid_algorithm(self): path = fileutils.write_to_tempfile(self.content) self.assertTrue(os.path.exists(path)) self.check_file_content(self.content, path) self.assertRaises(ValueError, fileutils.compute_file_checksum, path, algorithm='foo') def test_file_does_not_exist(self): random_file_name = uuid.uuid4().hex path = os.path.join('/tmp', random_file_name) self.assertRaises(IOError, fileutils.compute_file_checksum, path) def test_generic_io_error(self): tempdir = tempfile.mkdtemp() self.assertRaises(IOError, fileutils.compute_file_checksum, tempdir) class LastBytesTestCase(test_base.BaseTestCase): """Test the last_bytes() utility method.""" def setUp(self): super(LastBytesTestCase, self).setUp() self.content = b'1234567890' def test_truncated(self): res = fileutils.write_to_tempfile(self.content) self.assertTrue(os.path.exists(res)) out, unread_bytes = fileutils.last_bytes(res, 5) self.assertEqual(b'67890', out) self.assertGreater(unread_bytes, 0) def test_read_all(self): res = fileutils.write_to_tempfile(self.content) self.assertTrue(os.path.exists(res)) out, unread_bytes = fileutils.last_bytes(res, 1000) self.assertEqual(b'1234567890', out) self.assertEqual(0, unread_bytes) def test_non_exist_file(self): self.assertRaises(IOError, fileutils.last_bytes, 'non_exist_file', 1000)