# Copyright (C) 2005-2011 Canonical Ltd # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA """Tests for the osutils wrapper.""" from cStringIO import StringIO import errno import os import re import select import socket import sys import time from bzrlib import ( errors, lazy_regex, osutils, symbol_versioning, tests, trace, win32utils, ) from bzrlib.tests import ( features, file_utils, test__walkdirs_win32, ) from bzrlib.tests.scenarios import load_tests_apply_scenarios class _UTF8DirReaderFeature(features.Feature): def _probe(self): try: from bzrlib import _readdir_pyx self.reader = _readdir_pyx.UTF8DirReader return True except ImportError: return False def feature_name(self): return 'bzrlib._readdir_pyx' UTF8DirReaderFeature = features.ModuleAvailableFeature('bzrlib._readdir_pyx') term_ios_feature = features.ModuleAvailableFeature('termios') def _already_unicode(s): return s def _utf8_to_unicode(s): return s.decode('UTF-8') def dir_reader_scenarios(): # For each dir reader we define: # - native_to_unicode: a function converting the native_abspath as returned # by DirReader.read_dir to its unicode representation # UnicodeDirReader is the fallback, it should be tested on all platforms. scenarios = [('unicode', dict(_dir_reader_class=osutils.UnicodeDirReader, _native_to_unicode=_already_unicode))] # Some DirReaders are platform specific and even there they may not be # available. if UTF8DirReaderFeature.available(): from bzrlib import _readdir_pyx scenarios.append(('utf8', dict(_dir_reader_class=_readdir_pyx.UTF8DirReader, _native_to_unicode=_utf8_to_unicode))) if test__walkdirs_win32.win32_readdir_feature.available(): try: from bzrlib import _walkdirs_win32 scenarios.append( ('win32', dict(_dir_reader_class=_walkdirs_win32.Win32ReadDir, _native_to_unicode=_already_unicode))) except ImportError: pass return scenarios load_tests = load_tests_apply_scenarios class TestContainsWhitespace(tests.TestCase): def test_contains_whitespace(self): self.assertTrue(osutils.contains_whitespace(u' ')) self.assertTrue(osutils.contains_whitespace(u'hello there')) self.assertTrue(osutils.contains_whitespace(u'hellothere\n')) self.assertTrue(osutils.contains_whitespace(u'hello\nthere')) self.assertTrue(osutils.contains_whitespace(u'hello\rthere')) self.assertTrue(osutils.contains_whitespace(u'hello\tthere')) # \xa0 is "Non-breaking-space" which on some python locales thinks it # is whitespace, but we do not. self.assertFalse(osutils.contains_whitespace(u'')) self.assertFalse(osutils.contains_whitespace(u'hellothere')) self.assertFalse(osutils.contains_whitespace(u'hello\xa0there')) class TestRename(tests.TestCaseInTempDir): def create_file(self, filename, content): f = open(filename, 'wb') try: f.write(content) finally: f.close() def _fancy_rename(self, a, b): osutils.fancy_rename(a, b, rename_func=os.rename, unlink_func=os.unlink) def test_fancy_rename(self): # This should work everywhere self.create_file('a', 'something in a\n') self._fancy_rename('a', 'b') self.assertPathDoesNotExist('a') self.assertPathExists('b') self.check_file_contents('b', 'something in a\n') self.create_file('a', 'new something in a\n') self._fancy_rename('b', 'a') self.check_file_contents('a', 'something in a\n') def test_fancy_rename_fails_source_missing(self): # An exception should be raised, and the target should be left in place self.create_file('target', 'data in target\n') self.assertRaises((IOError, OSError), self._fancy_rename, 'missingsource', 'target') self.assertPathExists('target') self.check_file_contents('target', 'data in target\n') def test_fancy_rename_fails_if_source_and_target_missing(self): self.assertRaises((IOError, OSError), self._fancy_rename, 'missingsource', 'missingtarget') def test_rename(self): # Rename should be semi-atomic on all platforms self.create_file('a', 'something in a\n') osutils.rename('a', 'b') self.assertPathDoesNotExist('a') self.assertPathExists('b') self.check_file_contents('b', 'something in a\n') self.create_file('a', 'new something in a\n') osutils.rename('b', 'a') self.check_file_contents('a', 'something in a\n') # TODO: test fancy_rename using a MemoryTransport def test_rename_change_case(self): # on Windows we should be able to change filename case by rename self.build_tree(['a', 'b/']) osutils.rename('a', 'A') osutils.rename('b', 'B') # we can't use failUnlessExists on case-insensitive filesystem # so try to check shape of the tree shape = sorted(os.listdir('.')) self.assertEquals(['A', 'B'], shape) def test_rename_exception(self): try: osutils.rename('nonexistent_path', 'different_nonexistent_path') except OSError, e: self.assertEqual(e.old_filename, 'nonexistent_path') self.assertEqual(e.new_filename, 'different_nonexistent_path') self.assertTrue('nonexistent_path' in e.strerror) self.assertTrue('different_nonexistent_path' in e.strerror) class TestRandChars(tests.TestCase): def test_01_rand_chars_empty(self): result = osutils.rand_chars(0) self.assertEqual(result, '') def test_02_rand_chars_100(self): result = osutils.rand_chars(100) self.assertEqual(len(result), 100) self.assertEqual(type(result), str) self.assertContainsRe(result, r'^[a-z0-9]{100}$') class TestIsInside(tests.TestCase): def test_is_inside(self): is_inside = osutils.is_inside self.assertTrue(is_inside('src', 'src/foo.c')) self.assertFalse(is_inside('src', 'srccontrol')) self.assertTrue(is_inside('src', 'src/a/a/a/foo.c')) self.assertTrue(is_inside('foo.c', 'foo.c')) self.assertFalse(is_inside('foo.c', '')) self.assertTrue(is_inside('', 'foo.c')) def test_is_inside_any(self): SRC_FOO_C = osutils.pathjoin('src', 'foo.c') for dirs, fn in [(['src', 'doc'], SRC_FOO_C), (['src'], SRC_FOO_C), (['src'], 'src'), ]: self.assert_(osutils.is_inside_any(dirs, fn)) for dirs, fn in [(['src'], 'srccontrol'), (['src'], 'srccontrol/foo')]: self.assertFalse(osutils.is_inside_any(dirs, fn)) def test_is_inside_or_parent_of_any(self): for dirs, fn in [(['src', 'doc'], 'src/foo.c'), (['src'], 'src/foo.c'), (['src/bar.c'], 'src'), (['src/bar.c', 'bla/foo.c'], 'src'), (['src'], 'src'), ]: self.assert_(osutils.is_inside_or_parent_of_any(dirs, fn)) for dirs, fn in [(['src'], 'srccontrol'), (['srccontrol/foo.c'], 'src'), (['src'], 'srccontrol/foo')]: self.assertFalse(osutils.is_inside_or_parent_of_any(dirs, fn)) class TestLstat(tests.TestCaseInTempDir): def test_lstat_matches_fstat(self): # On Windows, lstat and fstat don't always agree, primarily in the # 'st_ino' and 'st_dev' fields. So we force them to be '0' in our # custom implementation. if sys.platform == 'win32': # We only have special lstat/fstat if we have the extension. # Without it, we may end up re-reading content when we don't have # to, but otherwise it doesn't effect correctness. self.requireFeature(test__walkdirs_win32.win32_readdir_feature) f = open('test-file.txt', 'wb') self.addCleanup(f.close) f.write('some content\n') f.flush() self.assertEqualStat(osutils.fstat(f.fileno()), osutils.lstat('test-file.txt')) class TestRmTree(tests.TestCaseInTempDir): def test_rmtree(self): # Check to remove tree with read-only files/dirs os.mkdir('dir') f = file('dir/file', 'w') f.write('spam') f.close() # would like to also try making the directory readonly, but at the # moment python shutil.rmtree doesn't handle that properly - it would # need to chmod the directory before removing things inside it - deferred # for now -- mbp 20060505 # osutils.make_readonly('dir') osutils.make_readonly('dir/file') osutils.rmtree('dir') self.assertPathDoesNotExist('dir/file') self.assertPathDoesNotExist('dir') class TestDeleteAny(tests.TestCaseInTempDir): def test_delete_any_readonly(self): # from self.build_tree(['d/', 'f']) osutils.make_readonly('d') osutils.make_readonly('f') osutils.delete_any('f') osutils.delete_any('d') class TestKind(tests.TestCaseInTempDir): def test_file_kind(self): self.build_tree(['file', 'dir/']) self.assertEquals('file', osutils.file_kind('file')) self.assertEquals('directory', osutils.file_kind('dir/')) if osutils.has_symlinks(): os.symlink('symlink', 'symlink') self.assertEquals('symlink', osutils.file_kind('symlink')) # TODO: jam 20060529 Test a block device try: os.lstat('/dev/null') except OSError, e: if e.errno not in (errno.ENOENT,): raise else: self.assertEquals('chardev', osutils.file_kind('/dev/null')) mkfifo = getattr(os, 'mkfifo', None) if mkfifo: mkfifo('fifo') try: self.assertEquals('fifo', osutils.file_kind('fifo')) finally: os.remove('fifo') AF_UNIX = getattr(socket, 'AF_UNIX', None) if AF_UNIX: s = socket.socket(AF_UNIX) s.bind('socket') try: self.assertEquals('socket', osutils.file_kind('socket')) finally: os.remove('socket') def test_kind_marker(self): self.assertEqual("", osutils.kind_marker("file")) self.assertEqual("/", osutils.kind_marker('directory')) self.assertEqual("/", osutils.kind_marker(osutils._directory_kind)) self.assertEqual("@", osutils.kind_marker("symlink")) self.assertEqual("+", osutils.kind_marker("tree-reference")) self.assertEqual("", osutils.kind_marker("fifo")) self.assertEqual("", osutils.kind_marker("socket")) self.assertEqual("", osutils.kind_marker("unknown")) class TestUmask(tests.TestCaseInTempDir): def test_get_umask(self): if sys.platform == 'win32': # umask always returns '0', no way to set it self.assertEqual(0, osutils.get_umask()) return orig_umask = osutils.get_umask() self.addCleanup(os.umask, orig_umask) os.umask(0222) self.assertEqual(0222, osutils.get_umask()) os.umask(0022) self.assertEqual(0022, osutils.get_umask()) os.umask(0002) self.assertEqual(0002, osutils.get_umask()) os.umask(0027) self.assertEqual(0027, osutils.get_umask()) class TestDateTime(tests.TestCase): def assertFormatedDelta(self, expected, seconds): """Assert osutils.format_delta formats as expected""" actual = osutils.format_delta(seconds) self.assertEqual(expected, actual) def test_format_delta(self): self.assertFormatedDelta('0 seconds ago', 0) self.assertFormatedDelta('1 second ago', 1) self.assertFormatedDelta('10 seconds ago', 10) self.assertFormatedDelta('59 seconds ago', 59) self.assertFormatedDelta('89 seconds ago', 89) self.assertFormatedDelta('1 minute, 30 seconds ago', 90) self.assertFormatedDelta('3 minutes, 0 seconds ago', 180) self.assertFormatedDelta('3 minutes, 1 second ago', 181) self.assertFormatedDelta('10 minutes, 15 seconds ago', 615) self.assertFormatedDelta('30 minutes, 59 seconds ago', 1859) self.assertFormatedDelta('31 minutes, 0 seconds ago', 1860) self.assertFormatedDelta('60 minutes, 0 seconds ago', 3600) self.assertFormatedDelta('89 minutes, 59 seconds ago', 5399) self.assertFormatedDelta('1 hour, 30 minutes ago', 5400) self.assertFormatedDelta('2 hours, 30 minutes ago', 9017) self.assertFormatedDelta('10 hours, 0 minutes ago', 36000) self.assertFormatedDelta('24 hours, 0 minutes ago', 86400) self.assertFormatedDelta('35 hours, 59 minutes ago', 129599) self.assertFormatedDelta('36 hours, 0 minutes ago', 129600) self.assertFormatedDelta('36 hours, 0 minutes ago', 129601) self.assertFormatedDelta('36 hours, 1 minute ago', 129660) self.assertFormatedDelta('36 hours, 1 minute ago', 129661) self.assertFormatedDelta('84 hours, 10 minutes ago', 303002) # We handle when time steps the wrong direction because computers # don't have synchronized clocks. self.assertFormatedDelta('84 hours, 10 minutes in the future', -303002) self.assertFormatedDelta('1 second in the future', -1) self.assertFormatedDelta('2 seconds in the future', -2) def test_format_date(self): self.assertRaises(errors.UnsupportedTimezoneFormat, osutils.format_date, 0, timezone='foo') self.assertIsInstance(osutils.format_date(0), str) self.assertIsInstance(osutils.format_local_date(0), unicode) # Testing for the actual value of the local weekday without # duplicating the code from format_date is difficult. # Instead blackbox.test_locale should check for localized # dates once they do occur in output strings. def test_format_date_with_offset_in_original_timezone(self): self.assertEqual("Thu 1970-01-01 00:00:00 +0000", osutils.format_date_with_offset_in_original_timezone(0)) self.assertEqual("Fri 1970-01-02 03:46:40 +0000", osutils.format_date_with_offset_in_original_timezone(100000)) self.assertEqual("Fri 1970-01-02 05:46:40 +0200", osutils.format_date_with_offset_in_original_timezone(100000, 7200)) def test_local_time_offset(self): """Test that local_time_offset() returns a sane value.""" offset = osutils.local_time_offset() self.assertTrue(isinstance(offset, int)) # Test that the offset is no more than a eighteen hours in # either direction. # Time zone handling is system specific, so it is difficult to # do more specific tests, but a value outside of this range is # probably wrong. eighteen_hours = 18 * 3600 self.assertTrue(-eighteen_hours < offset < eighteen_hours) def test_local_time_offset_with_timestamp(self): """Test that local_time_offset() works with a timestamp.""" offset = osutils.local_time_offset(1000000000.1234567) self.assertTrue(isinstance(offset, int)) eighteen_hours = 18 * 3600 self.assertTrue(-eighteen_hours < offset < eighteen_hours) class TestLinks(tests.TestCaseInTempDir): def test_dereference_path(self): self.requireFeature(features.SymlinkFeature) cwd = osutils.realpath('.') os.mkdir('bar') bar_path = osutils.pathjoin(cwd, 'bar') # Using './' to avoid bug #1213894 (first path component not # dereferenced) in Python 2.4.1 and earlier self.assertEqual(bar_path, osutils.realpath('./bar')) os.symlink('bar', 'foo') self.assertEqual(bar_path, osutils.realpath('./foo')) # Does not dereference terminal symlinks foo_path = osutils.pathjoin(cwd, 'foo') self.assertEqual(foo_path, osutils.dereference_path('./foo')) # Dereferences parent symlinks os.mkdir('bar/baz') baz_path = osutils.pathjoin(bar_path, 'baz') self.assertEqual(baz_path, osutils.dereference_path('./foo/baz')) # Dereferences parent symlinks that are the first path element self.assertEqual(baz_path, osutils.dereference_path('foo/baz')) # Dereferences parent symlinks in absolute paths foo_baz_path = osutils.pathjoin(foo_path, 'baz') self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path)) def test_changing_access(self): f = file('file', 'w') f.write('monkey') f.close() # Make a file readonly osutils.make_readonly('file') mode = os.lstat('file').st_mode self.assertEqual(mode, mode & 0777555) # Make a file writable osutils.make_writable('file') mode = os.lstat('file').st_mode self.assertEqual(mode, mode | 0200) if osutils.has_symlinks(): # should not error when handed a symlink os.symlink('nonexistent', 'dangling') osutils.make_readonly('dangling') osutils.make_writable('dangling') def test_host_os_dereferences_symlinks(self): osutils.host_os_dereferences_symlinks() class TestCanonicalRelPath(tests.TestCaseInTempDir): _test_needs_features = [features.CaseInsCasePresFilenameFeature] def test_canonical_relpath_simple(self): f = file('MixedCaseName', 'w') f.close() actual = osutils.canonical_relpath(self.test_base_dir, 'mixedcasename') self.assertEqual('work/MixedCaseName', actual) def test_canonical_relpath_missing_tail(self): os.mkdir('MixedCaseParent') actual = osutils.canonical_relpath(self.test_base_dir, 'mixedcaseparent/nochild') self.assertEqual('work/MixedCaseParent/nochild', actual) class Test_CICPCanonicalRelpath(tests.TestCaseWithTransport): def assertRelpath(self, expected, base, path): actual = osutils._cicp_canonical_relpath(base, path) self.assertEqual(expected, actual) def test_simple(self): self.build_tree(['MixedCaseName']) base = osutils.realpath(self.get_transport('.').local_abspath('.')) self.assertRelpath('MixedCaseName', base, 'mixedcAsename') def test_subdir_missing_tail(self): self.build_tree(['MixedCaseParent/', 'MixedCaseParent/a_child']) base = osutils.realpath(self.get_transport('.').local_abspath('.')) self.assertRelpath('MixedCaseParent/a_child', base, 'MixedCaseParent/a_child') self.assertRelpath('MixedCaseParent/a_child', base, 'MixedCaseParent/A_Child') self.assertRelpath('MixedCaseParent/not_child', base, 'MixedCaseParent/not_child') def test_at_root_slash(self): # We can't test this on Windows, because it has a 'MIN_ABS_PATHLENGTH' # check... if osutils.MIN_ABS_PATHLENGTH > 1: raise tests.TestSkipped('relpath requires %d chars' % osutils.MIN_ABS_PATHLENGTH) self.assertRelpath('foo', '/', '/foo') def test_at_root_drive(self): if sys.platform != 'win32': raise tests.TestNotApplicable('we can only test drive-letter relative' ' paths on Windows where we have drive' ' letters.') # see bug #322807 # The specific issue is that when at the root of a drive, 'abspath' # returns "C:/" or just "/". However, the code assumes that abspath # always returns something like "C:/foo" or "/foo" (no trailing slash). self.assertRelpath('foo', 'C:/', 'C:/foo') self.assertRelpath('foo', 'X:/', 'X:/foo') self.assertRelpath('foo', 'X:/', 'X://foo') class TestPumpFile(tests.TestCase): """Test pumpfile method.""" def setUp(self): tests.TestCase.setUp(self) # create a test datablock self.block_size = 512 pattern = '0123456789ABCDEF' self.test_data = pattern * (3 * self.block_size / len(pattern)) self.test_data_len = len(self.test_data) def test_bracket_block_size(self): """Read data in blocks with the requested read size bracketing the block size.""" # make sure test data is larger than max read size self.assertTrue(self.test_data_len > self.block_size) from_file = file_utils.FakeReadFile(self.test_data) to_file = StringIO() # read (max / 2) bytes and verify read size wasn't affected num_bytes_to_read = self.block_size / 2 osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size) self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read) self.assertEqual(from_file.get_read_count(), 1) # read (max) bytes and verify read size wasn't affected num_bytes_to_read = self.block_size from_file.reset_read_count() osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size) self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read) self.assertEqual(from_file.get_read_count(), 1) # read (max + 1) bytes and verify read size was limited num_bytes_to_read = self.block_size + 1 from_file.reset_read_count() osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size) self.assertEqual(from_file.get_max_read_size(), self.block_size) self.assertEqual(from_file.get_read_count(), 2) # finish reading the rest of the data num_bytes_to_read = self.test_data_len - to_file.tell() osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size) # report error if the data wasn't equal (we only report the size due # to the length of the data) response_data = to_file.getvalue() if response_data != self.test_data: message = "Data not equal. Expected %d bytes, received %d." self.fail(message % (len(response_data), self.test_data_len)) def test_specified_size(self): """Request a transfer larger than the maximum block size and verify that the maximum read doesn't exceed the block_size.""" # make sure test data is larger than max read size self.assertTrue(self.test_data_len > self.block_size) # retrieve data in blocks from_file = file_utils.FakeReadFile(self.test_data) to_file = StringIO() osutils.pumpfile(from_file, to_file, self.test_data_len, self.block_size) # verify read size was equal to the maximum read size self.assertTrue(from_file.get_max_read_size() > 0) self.assertEqual(from_file.get_max_read_size(), self.block_size) self.assertEqual(from_file.get_read_count(), 3) # report error if the data wasn't equal (we only report the size due # to the length of the data) response_data = to_file.getvalue() if response_data != self.test_data: message = "Data not equal. Expected %d bytes, received %d." self.fail(message % (len(response_data), self.test_data_len)) def test_to_eof(self): """Read to end-of-file and verify that the reads are not larger than the maximum read size.""" # make sure test data is larger than max read size self.assertTrue(self.test_data_len > self.block_size) # retrieve data to EOF from_file = file_utils.FakeReadFile(self.test_data) to_file = StringIO() osutils.pumpfile(from_file, to_file, -1, self.block_size) # verify read size was equal to the maximum read size self.assertEqual(from_file.get_max_read_size(), self.block_size) self.assertEqual(from_file.get_read_count(), 4) # report error if the data wasn't equal (we only report the size due # to the length of the data) response_data = to_file.getvalue() if response_data != self.test_data: message = "Data not equal. Expected %d bytes, received %d." self.fail(message % (len(response_data), self.test_data_len)) def test_defaults(self): """Verifies that the default arguments will read to EOF -- this test verifies that any existing usages of pumpfile will not be broken with this new version.""" # retrieve data using default (old) pumpfile method from_file = file_utils.FakeReadFile(self.test_data) to_file = StringIO() osutils.pumpfile(from_file, to_file) # report error if the data wasn't equal (we only report the size due # to the length of the data) response_data = to_file.getvalue() if response_data != self.test_data: message = "Data not equal. Expected %d bytes, received %d." self.fail(message % (len(response_data), self.test_data_len)) def test_report_activity(self): activity = [] def log_activity(length, direction): activity.append((length, direction)) from_file = StringIO(self.test_data) to_file = StringIO() osutils.pumpfile(from_file, to_file, buff_size=500, report_activity=log_activity, direction='read') self.assertEqual([(500, 'read'), (500, 'read'), (500, 'read'), (36, 'read')], activity) from_file = StringIO(self.test_data) to_file = StringIO() del activity[:] osutils.pumpfile(from_file, to_file, buff_size=500, report_activity=log_activity, direction='write') self.assertEqual([(500, 'write'), (500, 'write'), (500, 'write'), (36, 'write')], activity) # And with a limited amount of data from_file = StringIO(self.test_data) to_file = StringIO() del activity[:] osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028, report_activity=log_activity, direction='read') self.assertEqual([(500, 'read'), (500, 'read'), (28, 'read')], activity) class TestPumpStringFile(tests.TestCase): def test_empty(self): output = StringIO() osutils.pump_string_file("", output) self.assertEqual("", output.getvalue()) def test_more_than_segment_size(self): output = StringIO() osutils.pump_string_file("123456789", output, 2) self.assertEqual("123456789", output.getvalue()) def test_segment_size(self): output = StringIO() osutils.pump_string_file("12", output, 2) self.assertEqual("12", output.getvalue()) def test_segment_size_multiple(self): output = StringIO() osutils.pump_string_file("1234", output, 2) self.assertEqual("1234", output.getvalue()) class TestRelpath(tests.TestCase): def test_simple_relpath(self): cwd = osutils.getcwd() subdir = cwd + '/subdir' self.assertEqual('subdir', osutils.relpath(cwd, subdir)) def test_deep_relpath(self): cwd = osutils.getcwd() subdir = cwd + '/sub/subsubdir' self.assertEqual('sub/subsubdir', osutils.relpath(cwd, subdir)) def test_not_relative(self): self.assertRaises(errors.PathNotChild, osutils.relpath, 'C:/path', 'H:/path') self.assertRaises(errors.PathNotChild, osutils.relpath, 'C:/', 'H:/path') class TestSafeUnicode(tests.TestCase): def test_from_ascii_string(self): self.assertEqual(u'foobar', osutils.safe_unicode('foobar')) def test_from_unicode_string_ascii_contents(self): self.assertEqual(u'bargam', osutils.safe_unicode(u'bargam')) def test_from_unicode_string_unicode_contents(self): self.assertEqual(u'bargam\xae', osutils.safe_unicode(u'bargam\xae')) def test_from_utf8_string(self): self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae')) def test_bad_utf8_string(self): self.assertRaises(errors.BzrBadParameterNotUnicode, osutils.safe_unicode, '\xbb\xbb') class TestSafeUtf8(tests.TestCase): def test_from_ascii_string(self): f = 'foobar' self.assertEqual('foobar', osutils.safe_utf8(f)) def test_from_unicode_string_ascii_contents(self): self.assertEqual('bargam', osutils.safe_utf8(u'bargam')) def test_from_unicode_string_unicode_contents(self): self.assertEqual('bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae')) def test_from_utf8_string(self): self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae')) def test_bad_utf8_string(self): self.assertRaises(errors.BzrBadParameterNotUnicode, osutils.safe_utf8, '\xbb\xbb') class TestSafeRevisionId(tests.TestCase): def test_from_ascii_string(self): # this shouldn't give a warning because it's getting an ascii string self.assertEqual('foobar', osutils.safe_revision_id('foobar')) def test_from_unicode_string_ascii_contents(self): self.assertEqual('bargam', osutils.safe_revision_id(u'bargam', warn=False)) def test_from_unicode_deprecated(self): self.assertEqual('bargam', self.callDeprecated([osutils._revision_id_warning], osutils.safe_revision_id, u'bargam')) def test_from_unicode_string_unicode_contents(self): self.assertEqual('bargam\xc2\xae', osutils.safe_revision_id(u'bargam\xae', warn=False)) def test_from_utf8_string(self): self.assertEqual('foo\xc2\xae', osutils.safe_revision_id('foo\xc2\xae')) def test_none(self): """Currently, None is a valid revision_id""" self.assertEqual(None, osutils.safe_revision_id(None)) class TestSafeFileId(tests.TestCase): def test_from_ascii_string(self): self.assertEqual('foobar', osutils.safe_file_id('foobar')) def test_from_unicode_string_ascii_contents(self): self.assertEqual('bargam', osutils.safe_file_id(u'bargam', warn=False)) def test_from_unicode_deprecated(self): self.assertEqual('bargam', self.callDeprecated([osutils._file_id_warning], osutils.safe_file_id, u'bargam')) def test_from_unicode_string_unicode_contents(self): self.assertEqual('bargam\xc2\xae', osutils.safe_file_id(u'bargam\xae', warn=False)) def test_from_utf8_string(self): self.assertEqual('foo\xc2\xae', osutils.safe_file_id('foo\xc2\xae')) def test_none(self): """Currently, None is a valid revision_id""" self.assertEqual(None, osutils.safe_file_id(None)) class TestPosixFuncs(tests.TestCase): """Test that the posix version of normpath returns an appropriate path when used with 2 leading slashes.""" def test_normpath(self): self.assertEqual('/etc/shadow', osutils._posix_normpath('/etc/shadow')) self.assertEqual('/etc/shadow', osutils._posix_normpath('//etc/shadow')) self.assertEqual('/etc/shadow', osutils._posix_normpath('///etc/shadow')) class TestWin32Funcs(tests.TestCase): """Test that _win32 versions of os utilities return appropriate paths.""" def test_abspath(self): self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo')) self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo')) self.assertEqual('//HOST/path', osutils._win32_abspath(r'\\HOST\path')) self.assertEqual('//HOST/path', osutils._win32_abspath('//HOST/path')) def test_realpath(self): self.assertEqual('C:/foo', osutils._win32_realpath('C:\\foo')) self.assertEqual('C:/foo', osutils._win32_realpath('C:/foo')) def test_pathjoin(self): self.assertEqual('path/to/foo', osutils._win32_pathjoin('path', 'to', 'foo')) self.assertEqual('C:/foo', osutils._win32_pathjoin('path\\to', 'C:\\foo')) self.assertEqual('C:/foo', osutils._win32_pathjoin('path/to', 'C:/foo')) self.assertEqual('path/to/foo', osutils._win32_pathjoin('path/to/', 'foo')) self.assertEqual('/foo', osutils._win32_pathjoin('C:/path/to/', '/foo')) self.assertEqual('/foo', osutils._win32_pathjoin('C:\\path\\to\\', '\\foo')) def test_normpath(self): self.assertEqual('path/to/foo', osutils._win32_normpath(r'path\\from\..\to\.\foo')) self.assertEqual('path/to/foo', osutils._win32_normpath('path//from/../to/./foo')) def test_getcwd(self): cwd = osutils._win32_getcwd() os_cwd = os.getcwdu() self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:]) # win32 is inconsistent whether it returns lower or upper case # and even if it was consistent the user might type the other # so we force it to uppercase # running python.exe under cmd.exe return capital C:\\ # running win32 python inside a cygwin shell returns lowercase self.assertEqual(os_cwd[0].upper(), cwd[0]) def test_fixdrive(self): self.assertEqual('H:/foo', osutils._win32_fixdrive('h:/foo')) self.assertEqual('H:/foo', osutils._win32_fixdrive('H:/foo')) self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo')) def test_win98_abspath(self): # absolute path self.assertEqual('C:/foo', osutils._win98_abspath('C:\\foo')) self.assertEqual('C:/foo', osutils._win98_abspath('C:/foo')) # UNC path self.assertEqual('//HOST/path', osutils._win98_abspath(r'\\HOST\path')) self.assertEqual('//HOST/path', osutils._win98_abspath('//HOST/path')) # relative path cwd = osutils.getcwd().rstrip('/') drive = osutils.ntpath.splitdrive(cwd)[0] self.assertEqual(cwd+'/path', osutils._win98_abspath('path')) self.assertEqual(drive+'/path', osutils._win98_abspath('/path')) # unicode path u = u'\u1234' self.assertEqual(cwd+'/'+u, osutils._win98_abspath(u)) class TestWin32FuncsDirs(tests.TestCaseInTempDir): """Test win32 functions that create files.""" def test_getcwd(self): self.requireFeature(features.UnicodeFilenameFeature) os.mkdir(u'mu-\xb5') os.chdir(u'mu-\xb5') # TODO: jam 20060427 This will probably fail on Mac OSX because # it will change the normalization of B\xe5gfors # Consider using a different unicode character, or make # osutils.getcwd() renormalize the path. self.assertEndsWith(osutils._win32_getcwd(), u'mu-\xb5') def test_minimum_path_selection(self): self.assertEqual(set(), osutils.minimum_path_selection([])) self.assertEqual(set(['a']), osutils.minimum_path_selection(['a'])) self.assertEqual(set(['a', 'b']), osutils.minimum_path_selection(['a', 'b'])) self.assertEqual(set(['a/', 'b']), osutils.minimum_path_selection(['a/', 'b'])) self.assertEqual(set(['a/', 'b']), osutils.minimum_path_selection(['a/c', 'a/', 'b'])) self.assertEqual(set(['a-b', 'a', 'a0b']), osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a'])) def test_mkdtemp(self): tmpdir = osutils._win32_mkdtemp(dir='.') self.assertFalse('\\' in tmpdir) def test_rename(self): a = open('a', 'wb') a.write('foo\n') a.close() b = open('b', 'wb') b.write('baz\n') b.close() osutils._win32_rename('b', 'a') self.assertPathExists('a') self.assertPathDoesNotExist('b') self.assertFileEqual('baz\n', 'a') def test_rename_missing_file(self): a = open('a', 'wb') a.write('foo\n') a.close() try: osutils._win32_rename('b', 'a') except (IOError, OSError), e: self.assertEqual(errno.ENOENT, e.errno) self.assertFileEqual('foo\n', 'a') def test_rename_missing_dir(self): os.mkdir('a') try: osutils._win32_rename('b', 'a') except (IOError, OSError), e: self.assertEqual(errno.ENOENT, e.errno) def test_rename_current_dir(self): os.mkdir('a') os.chdir('a') # You can't rename the working directory # doing rename non-existant . usually # just raises ENOENT, since non-existant # doesn't exist. try: osutils._win32_rename('b', '.') except (IOError, OSError), e: self.assertEqual(errno.ENOENT, e.errno) def test_splitpath(self): def check(expected, path): self.assertEqual(expected, osutils.splitpath(path)) check(['a'], 'a') check(['a', 'b'], 'a/b') check(['a', 'b'], 'a/./b') check(['a', '.b'], 'a/.b') check(['a', '.b'], 'a\\.b') self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b') class TestParentDirectories(tests.TestCaseInTempDir): """Test osutils.parent_directories()""" def test_parent_directories(self): self.assertEqual([], osutils.parent_directories('a')) self.assertEqual(['a'], osutils.parent_directories('a/b')) self.assertEqual(['a/b', 'a'], osutils.parent_directories('a/b/c')) class TestMacFuncsDirs(tests.TestCaseInTempDir): """Test mac special functions that require directories.""" def test_getcwd(self): self.requireFeature(features.UnicodeFilenameFeature) os.mkdir(u'B\xe5gfors') os.chdir(u'B\xe5gfors') self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors') def test_getcwd_nonnorm(self): self.requireFeature(features.UnicodeFilenameFeature) # Test that _mac_getcwd() will normalize this path os.mkdir(u'Ba\u030agfors') os.chdir(u'Ba\u030agfors') self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors') class TestChunksToLines(tests.TestCase): def test_smoketest(self): self.assertEqual(['foo\n', 'bar\n', 'baz\n'], osutils.chunks_to_lines(['foo\nbar', '\nbaz\n'])) self.assertEqual(['foo\n', 'bar\n', 'baz\n'], osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n'])) def test_osutils_binding(self): from bzrlib.tests import test__chunks_to_lines if test__chunks_to_lines.compiled_chunkstolines_feature.available(): from bzrlib._chunks_to_lines_pyx import chunks_to_lines else: from bzrlib._chunks_to_lines_py import chunks_to_lines self.assertIs(chunks_to_lines, osutils.chunks_to_lines) class TestSplitLines(tests.TestCase): def test_split_unicode(self): self.assertEqual([u'foo\n', u'bar\xae'], osutils.split_lines(u'foo\nbar\xae')) self.assertEqual([u'foo\n', u'bar\xae\n'], osutils.split_lines(u'foo\nbar\xae\n')) def test_split_with_carriage_returns(self): self.assertEqual(['foo\rbar\n'], osutils.split_lines('foo\rbar\n')) class TestWalkDirs(tests.TestCaseInTempDir): def assertExpectedBlocks(self, expected, result): self.assertEqual(expected, [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result]) def test_walkdirs(self): tree = [ '.bzr', '0file', '1dir/', '1dir/0file', '1dir/1dir/', '2file' ] self.build_tree(tree) expected_dirblocks = [ (('', '.'), [('0file', '0file', 'file'), ('1dir', '1dir', 'directory'), ('2file', '2file', 'file'), ] ), (('1dir', './1dir'), [('1dir/0file', '0file', 'file'), ('1dir/1dir', '1dir', 'directory'), ] ), (('1dir/1dir', './1dir/1dir'), [ ] ), ] result = [] found_bzrdir = False for dirdetail, dirblock in osutils.walkdirs('.'): if len(dirblock) and dirblock[0][1] == '.bzr': # this tests the filtering of selected paths found_bzrdir = True del dirblock[0] result.append((dirdetail, dirblock)) self.assertTrue(found_bzrdir) self.assertExpectedBlocks(expected_dirblocks, result) # you can search a subdir only, with a supplied prefix. result = [] for dirblock in osutils.walkdirs('./1dir', '1dir'): result.append(dirblock) self.assertExpectedBlocks(expected_dirblocks[1:], result) def test_walkdirs_os_error(self): # # Pyrex readdir didn't raise useful messages if it had an error # reading the directory if sys.platform == 'win32': raise tests.TestNotApplicable( "readdir IOError not tested on win32") self.requireFeature(features.not_running_as_root) os.mkdir("test-unreadable") os.chmod("test-unreadable", 0000) # must chmod it back so that it can be removed self.addCleanup(os.chmod, "test-unreadable", 0700) # The error is not raised until the generator is actually evaluated. # (It would be ok if it happened earlier but at the moment it # doesn't.) e = self.assertRaises(OSError, list, osutils._walkdirs_utf8(".")) self.assertEquals('./test-unreadable', e.filename) self.assertEquals(errno.EACCES, e.errno) # Ensure the message contains the file name self.assertContainsRe(str(e), "\./test-unreadable") def test_walkdirs_encoding_error(self): # # walkdirs didn't raise a useful message when the filenames # are not using the filesystem's encoding # require a bytestring based filesystem self.requireFeature(features.ByteStringNamedFilesystem) tree = [ '.bzr', '0file', '1dir/', '1dir/0file', '1dir/1dir/', '1file' ] self.build_tree(tree) # rename the 1file to a latin-1 filename os.rename("./1file", "\xe8file") if "\xe8file" not in os.listdir("."): self.skip("Lack filesystem that preserves arbitrary bytes") self._save_platform_info() win32utils.winver = None # Avoid the win32 detection code osutils._fs_enc = 'UTF-8' # this should raise on error def attempt(): for dirdetail, dirblock in osutils.walkdirs('.'): pass self.assertRaises(errors.BadFilenameEncoding, attempt) def test__walkdirs_utf8(self): tree = [ '.bzr', '0file', '1dir/', '1dir/0file', '1dir/1dir/', '2file' ] self.build_tree(tree) expected_dirblocks = [ (('', '.'), [('0file', '0file', 'file'), ('1dir', '1dir', 'directory'), ('2file', '2file', 'file'), ] ), (('1dir', './1dir'), [('1dir/0file', '0file', 'file'), ('1dir/1dir', '1dir', 'directory'), ] ), (('1dir/1dir', './1dir/1dir'), [ ] ), ] result = [] found_bzrdir = False for dirdetail, dirblock in osutils._walkdirs_utf8('.'): if len(dirblock) and dirblock[0][1] == '.bzr': # this tests the filtering of selected paths found_bzrdir = True del dirblock[0] result.append((dirdetail, dirblock)) self.assertTrue(found_bzrdir) self.assertExpectedBlocks(expected_dirblocks, result) # you can search a subdir only, with a supplied prefix. result = [] for dirblock in osutils.walkdirs('./1dir', '1dir'): result.append(dirblock) self.assertExpectedBlocks(expected_dirblocks[1:], result) def _filter_out_stat(self, result): """Filter out the stat value from the walkdirs result""" for dirdetail, dirblock in result: new_dirblock = [] for info in dirblock: # Ignore info[3] which is the stat new_dirblock.append((info[0], info[1], info[2], info[4])) dirblock[:] = new_dirblock def _save_platform_info(self): self.overrideAttr(win32utils, 'winver') self.overrideAttr(osutils, '_fs_enc') self.overrideAttr(osutils, '_selected_dir_reader') def assertDirReaderIs(self, expected): """Assert the right implementation for _walkdirs_utf8 is chosen.""" # Force it to redetect osutils._selected_dir_reader = None # Nothing to list, but should still trigger the selection logic self.assertEqual([(('', '.'), [])], list(osutils._walkdirs_utf8('.'))) self.assertIsInstance(osutils._selected_dir_reader, expected) def test_force_walkdirs_utf8_fs_utf8(self): self.requireFeature(UTF8DirReaderFeature) self._save_platform_info() win32utils.winver = None # Avoid the win32 detection code osutils._fs_enc = 'utf-8' self.assertDirReaderIs( UTF8DirReaderFeature.module.UTF8DirReader) def test_force_walkdirs_utf8_fs_ascii(self): self.requireFeature(UTF8DirReaderFeature) self._save_platform_info() win32utils.winver = None # Avoid the win32 detection code osutils._fs_enc = 'ascii' self.assertDirReaderIs( UTF8DirReaderFeature.module.UTF8DirReader) def test_force_walkdirs_utf8_fs_latin1(self): self._save_platform_info() win32utils.winver = None # Avoid the win32 detection code osutils._fs_enc = 'iso-8859-1' self.assertDirReaderIs(osutils.UnicodeDirReader) def test_force_walkdirs_utf8_nt(self): # Disabled because the thunk of the whole walkdirs api is disabled. self.requireFeature(test__walkdirs_win32.win32_readdir_feature) self._save_platform_info() win32utils.winver = 'Windows NT' from bzrlib._walkdirs_win32 import Win32ReadDir self.assertDirReaderIs(Win32ReadDir) def test_force_walkdirs_utf8_98(self): self.requireFeature(test__walkdirs_win32.win32_readdir_feature) self._save_platform_info() win32utils.winver = 'Windows 98' self.assertDirReaderIs(osutils.UnicodeDirReader) def test_unicode_walkdirs(self): """Walkdirs should always return unicode paths.""" self.requireFeature(features.UnicodeFilenameFeature) name0 = u'0file-\xb6' name1 = u'1dir-\u062c\u0648' name2 = u'2file-\u0633' tree = [ name0, name1 + '/', name1 + '/' + name0, name1 + '/' + name1 + '/', name2, ] self.build_tree(tree) expected_dirblocks = [ ((u'', u'.'), [(name0, name0, 'file', './' + name0), (name1, name1, 'directory', './' + name1), (name2, name2, 'file', './' + name2), ] ), ((name1, './' + name1), [(name1 + '/' + name0, name0, 'file', './' + name1 + '/' + name0), (name1 + '/' + name1, name1, 'directory', './' + name1 + '/' + name1), ] ), ((name1 + '/' + name1, './' + name1 + '/' + name1), [ ] ), ] result = list(osutils.walkdirs('.')) self._filter_out_stat(result) self.assertEqual(expected_dirblocks, result) result = list(osutils.walkdirs(u'./'+name1, name1)) self._filter_out_stat(result) self.assertEqual(expected_dirblocks[1:], result) def test_unicode__walkdirs_utf8(self): """Walkdirs_utf8 should always return utf8 paths. The abspath portion might be in unicode or utf-8 """ self.requireFeature(features.UnicodeFilenameFeature) name0 = u'0file-\xb6' name1 = u'1dir-\u062c\u0648' name2 = u'2file-\u0633' tree = [ name0, name1 + '/', name1 + '/' + name0, name1 + '/' + name1 + '/', name2, ] self.build_tree(tree) name0 = name0.encode('utf8') name1 = name1.encode('utf8') name2 = name2.encode('utf8') expected_dirblocks = [ (('', '.'), [(name0, name0, 'file', './' + name0), (name1, name1, 'directory', './' + name1), (name2, name2, 'file', './' + name2), ] ), ((name1, './' + name1), [(name1 + '/' + name0, name0, 'file', './' + name1 + '/' + name0), (name1 + '/' + name1, name1, 'directory', './' + name1 + '/' + name1), ] ), ((name1 + '/' + name1, './' + name1 + '/' + name1), [ ] ), ] result = [] # For ease in testing, if walkdirs_utf8 returns Unicode, assert that # all abspaths are Unicode, and encode them back into utf8. for dirdetail, dirblock in osutils._walkdirs_utf8('.'): self.assertIsInstance(dirdetail[0], str) if isinstance(dirdetail[1], unicode): dirdetail = (dirdetail[0], dirdetail[1].encode('utf8')) dirblock = [list(info) for info in dirblock] for info in dirblock: self.assertIsInstance(info[4], unicode) info[4] = info[4].encode('utf8') new_dirblock = [] for info in dirblock: self.assertIsInstance(info[0], str) self.assertIsInstance(info[1], str) self.assertIsInstance(info[4], str) # Remove the stat information new_dirblock.append((info[0], info[1], info[2], info[4])) result.append((dirdetail, new_dirblock)) self.assertEqual(expected_dirblocks, result) def test__walkdirs_utf8_with_unicode_fs(self): """UnicodeDirReader should be a safe fallback everywhere The abspath portion should be in unicode """ self.requireFeature(features.UnicodeFilenameFeature) # Use the unicode reader. TODO: split into driver-and-driven unit # tests. self._save_platform_info() osutils._selected_dir_reader = osutils.UnicodeDirReader() name0u = u'0file-\xb6' name1u = u'1dir-\u062c\u0648' name2u = u'2file-\u0633' tree = [ name0u, name1u + '/', name1u + '/' + name0u, name1u + '/' + name1u + '/', name2u, ] self.build_tree(tree) name0 = name0u.encode('utf8') name1 = name1u.encode('utf8') name2 = name2u.encode('utf8') # All of the abspaths should be in unicode, all of the relative paths # should be in utf8 expected_dirblocks = [ (('', '.'), [(name0, name0, 'file', './' + name0u), (name1, name1, 'directory', './' + name1u), (name2, name2, 'file', './' + name2u), ] ), ((name1, './' + name1u), [(name1 + '/' + name0, name0, 'file', './' + name1u + '/' + name0u), (name1 + '/' + name1, name1, 'directory', './' + name1u + '/' + name1u), ] ), ((name1 + '/' + name1, './' + name1u + '/' + name1u), [ ] ), ] result = list(osutils._walkdirs_utf8('.')) self._filter_out_stat(result) self.assertEqual(expected_dirblocks, result) def test__walkdirs_utf8_win32readdir(self): self.requireFeature(test__walkdirs_win32.win32_readdir_feature) self.requireFeature(features.UnicodeFilenameFeature) from bzrlib._walkdirs_win32 import Win32ReadDir self._save_platform_info() osutils._selected_dir_reader = Win32ReadDir() name0u = u'0file-\xb6' name1u = u'1dir-\u062c\u0648' name2u = u'2file-\u0633' tree = [ name0u, name1u + '/', name1u + '/' + name0u, name1u + '/' + name1u + '/', name2u, ] self.build_tree(tree) name0 = name0u.encode('utf8') name1 = name1u.encode('utf8') name2 = name2u.encode('utf8') # All of the abspaths should be in unicode, all of the relative paths # should be in utf8 expected_dirblocks = [ (('', '.'), [(name0, name0, 'file', './' + name0u), (name1, name1, 'directory', './' + name1u), (name2, name2, 'file', './' + name2u), ] ), ((name1, './' + name1u), [(name1 + '/' + name0, name0, 'file', './' + name1u + '/' + name0u), (name1 + '/' + name1, name1, 'directory', './' + name1u + '/' + name1u), ] ), ((name1 + '/' + name1, './' + name1u + '/' + name1u), [ ] ), ] result = list(osutils._walkdirs_utf8(u'.')) self._filter_out_stat(result) self.assertEqual(expected_dirblocks, result) def assertStatIsCorrect(self, path, win32stat): os_stat = os.stat(path) self.assertEqual(os_stat.st_size, win32stat.st_size) self.assertAlmostEqual(os_stat.st_mtime, win32stat.st_mtime, places=4) self.assertAlmostEqual(os_stat.st_ctime, win32stat.st_ctime, places=4) self.assertAlmostEqual(os_stat.st_atime, win32stat.st_atime, places=4) self.assertEqual(os_stat.st_dev, win32stat.st_dev) self.assertEqual(os_stat.st_ino, win32stat.st_ino) self.assertEqual(os_stat.st_mode, win32stat.st_mode) def test__walkdirs_utf_win32_find_file_stat_file(self): """make sure our Stat values are valid""" self.requireFeature(test__walkdirs_win32.win32_readdir_feature) self.requireFeature(features.UnicodeFilenameFeature) from bzrlib._walkdirs_win32 import Win32ReadDir name0u = u'0file-\xb6' name0 = name0u.encode('utf8') self.build_tree([name0u]) # I hate to sleep() here, but I'm trying to make the ctime different # from the mtime time.sleep(2) f = open(name0u, 'ab') try: f.write('just a small update') finally: f.close() result = Win32ReadDir().read_dir('', u'.') entry = result[0] self.assertEqual((name0, name0, 'file'), entry[:3]) self.assertEqual(u'./' + name0u, entry[4]) self.assertStatIsCorrect(entry[4], entry[3]) self.assertNotEqual(entry[3].st_mtime, entry[3].st_ctime) def test__walkdirs_utf_win32_find_file_stat_directory(self): """make sure our Stat values are valid""" self.requireFeature(test__walkdirs_win32.win32_readdir_feature) self.requireFeature(features.UnicodeFilenameFeature) from bzrlib._walkdirs_win32 import Win32ReadDir name0u = u'0dir-\u062c\u0648' name0 = name0u.encode('utf8') self.build_tree([name0u + '/']) result = Win32ReadDir().read_dir('', u'.') entry = result[0] self.assertEqual((name0, name0, 'directory'), entry[:3]) self.assertEqual(u'./' + name0u, entry[4]) self.assertStatIsCorrect(entry[4], entry[3]) def assertPathCompare(self, path_less, path_greater): """check that path_less and path_greater compare correctly.""" self.assertEqual(0, osutils.compare_paths_prefix_order( path_less, path_less)) self.assertEqual(0, osutils.compare_paths_prefix_order( path_greater, path_greater)) self.assertEqual(-1, osutils.compare_paths_prefix_order( path_less, path_greater)) self.assertEqual(1, osutils.compare_paths_prefix_order( path_greater, path_less)) def test_compare_paths_prefix_order(self): # root before all else self.assertPathCompare("/", "/a") # alpha within a dir self.assertPathCompare("/a", "/b") self.assertPathCompare("/b", "/z") # high dirs before lower. self.assertPathCompare("/z", "/a/a") # except if the deeper dir should be output first self.assertPathCompare("/a/b/c", "/d/g") # lexical betwen dirs of the same height self.assertPathCompare("/a/z", "/z/z") self.assertPathCompare("/a/c/z", "/a/d/e") # this should also be consistent for no leading / paths # root before all else self.assertPathCompare("", "a") # alpha within a dir self.assertPathCompare("a", "b") self.assertPathCompare("b", "z") # high dirs before lower. self.assertPathCompare("z", "a/a") # except if the deeper dir should be output first self.assertPathCompare("a/b/c", "d/g") # lexical betwen dirs of the same height self.assertPathCompare("a/z", "z/z") self.assertPathCompare("a/c/z", "a/d/e") def test_path_prefix_sorting(self): """Doing a sort on path prefix should match our sample data.""" original_paths = [ 'a', 'a/b', 'a/b/c', 'b', 'b/c', 'd', 'd/e', 'd/e/f', 'd/f', 'd/g', 'g', ] dir_sorted_paths = [ 'a', 'b', 'd', 'g', 'a/b', 'a/b/c', 'b/c', 'd/e', 'd/f', 'd/g', 'd/e/f', ] self.assertEqual( dir_sorted_paths, sorted(original_paths, key=osutils.path_prefix_key)) # using the comparison routine shoudl work too: self.assertEqual( dir_sorted_paths, sorted(original_paths, cmp=osutils.compare_paths_prefix_order)) class TestCopyTree(tests.TestCaseInTempDir): def test_copy_basic_tree(self): self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c']) osutils.copy_tree('source', 'target') self.assertEqual(['a', 'b'], sorted(os.listdir('target'))) self.assertEqual(['c'], os.listdir('target/b')) def test_copy_tree_target_exists(self): self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c', 'target/']) osutils.copy_tree('source', 'target') self.assertEqual(['a', 'b'], sorted(os.listdir('target'))) self.assertEqual(['c'], os.listdir('target/b')) def test_copy_tree_symlinks(self): self.requireFeature(features.SymlinkFeature) self.build_tree(['source/']) os.symlink('a/generic/path', 'source/lnk') osutils.copy_tree('source', 'target') self.assertEqual(['lnk'], os.listdir('target')) self.assertEqual('a/generic/path', os.readlink('target/lnk')) def test_copy_tree_handlers(self): processed_files = [] processed_links = [] def file_handler(from_path, to_path): processed_files.append(('f', from_path, to_path)) def dir_handler(from_path, to_path): processed_files.append(('d', from_path, to_path)) def link_handler(from_path, to_path): processed_links.append((from_path, to_path)) handlers = {'file':file_handler, 'directory':dir_handler, 'symlink':link_handler, } self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c']) if osutils.has_symlinks(): os.symlink('a/generic/path', 'source/lnk') osutils.copy_tree('source', 'target', handlers=handlers) self.assertEqual([('d', 'source', 'target'), ('f', 'source/a', 'target/a'), ('d', 'source/b', 'target/b'), ('f', 'source/b/c', 'target/b/c'), ], processed_files) self.assertPathDoesNotExist('target') if osutils.has_symlinks(): self.assertEqual([('source/lnk', 'target/lnk')], processed_links) class TestSetUnsetEnv(tests.TestCase): """Test updating the environment""" def setUp(self): super(TestSetUnsetEnv, self).setUp() self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'), 'Environment was not cleaned up properly.' ' Variable BZR_TEST_ENV_VAR should not exist.') def cleanup(): if 'BZR_TEST_ENV_VAR' in os.environ: del os.environ['BZR_TEST_ENV_VAR'] self.addCleanup(cleanup) def test_set(self): """Test that we can set an env variable""" old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo') self.assertEqual(None, old) self.assertEqual('foo', os.environ.get('BZR_TEST_ENV_VAR')) def test_double_set(self): """Test that we get the old value out""" osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo') old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'bar') self.assertEqual('foo', old) self.assertEqual('bar', os.environ.get('BZR_TEST_ENV_VAR')) def test_unicode(self): """Environment can only contain plain strings So Unicode strings must be encoded. """ uni_val, env_val = tests.probe_unicode_in_user_encoding() if uni_val is None: raise tests.TestSkipped( 'Cannot find a unicode character that works in encoding %s' % (osutils.get_user_encoding(),)) old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val) self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR')) def test_unset(self): """Test that passing None will remove the env var""" osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo') old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None) self.assertEqual('foo', old) self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR')) self.assertFalse('BZR_TEST_ENV_VAR' in os.environ) class TestSizeShaFile(tests.TestCaseInTempDir): def test_sha_empty(self): self.build_tree_contents([('foo', '')]) expected_sha = osutils.sha_string('') f = open('foo') self.addCleanup(f.close) size, sha = osutils.size_sha_file(f) self.assertEqual(0, size) self.assertEqual(expected_sha, sha) def test_sha_mixed_endings(self): text = 'test\r\nwith\nall\rpossible line endings\r\n' self.build_tree_contents([('foo', text)]) expected_sha = osutils.sha_string(text) f = open('foo', 'rb') self.addCleanup(f.close) size, sha = osutils.size_sha_file(f) self.assertEqual(38, size) self.assertEqual(expected_sha, sha) class TestShaFileByName(tests.TestCaseInTempDir): def test_sha_empty(self): self.build_tree_contents([('foo', '')]) expected_sha = osutils.sha_string('') self.assertEqual(expected_sha, osutils.sha_file_by_name('foo')) def test_sha_mixed_endings(self): text = 'test\r\nwith\nall\rpossible line endings\r\n' self.build_tree_contents([('foo', text)]) expected_sha = osutils.sha_string(text) self.assertEqual(expected_sha, osutils.sha_file_by_name('foo')) class TestResourceLoading(tests.TestCaseInTempDir): def test_resource_string(self): # test resource in bzrlib text = osutils.resource_string('bzrlib', 'debug.py') self.assertContainsRe(text, "debug_flags = set()") # test resource under bzrlib text = osutils.resource_string('bzrlib.ui', 'text.py') self.assertContainsRe(text, "class TextUIFactory") # test unsupported package self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz', 'yyy.xx') # test unknown resource self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx') class TestReCompile(tests.TestCase): def _deprecated_re_compile_checked(self, *args, **kwargs): return self.applyDeprecated(symbol_versioning.deprecated_in((2, 2, 0)), osutils.re_compile_checked, *args, **kwargs) def test_re_compile_checked(self): r = self._deprecated_re_compile_checked(r'A*', re.IGNORECASE) self.assertTrue(r.match('aaaa')) self.assertTrue(r.match('aAaA')) def test_re_compile_checked_error(self): # like https://bugs.launchpad.net/bzr/+bug/251352 # Due to possible test isolation error, re.compile is not lazy at # this point. We re-install lazy compile. lazy_regex.install_lazy_compile() err = self.assertRaises( errors.BzrCommandError, self._deprecated_re_compile_checked, '*', re.IGNORECASE, 'test case') self.assertEqual( 'Invalid regular expression in test case: ' '"*" nothing to repeat', str(err)) class TestDirReader(tests.TestCaseInTempDir): scenarios = dir_reader_scenarios() # Set by load_tests _dir_reader_class = None _native_to_unicode = None def setUp(self): tests.TestCaseInTempDir.setUp(self) self.overrideAttr(osutils, '_selected_dir_reader', self._dir_reader_class()) def _get_ascii_tree(self): tree = [ '0file', '1dir/', '1dir/0file', '1dir/1dir/', '2file' ] expected_dirblocks = [ (('', '.'), [('0file', '0file', 'file'), ('1dir', '1dir', 'directory'), ('2file', '2file', 'file'), ] ), (('1dir', './1dir'), [('1dir/0file', '0file', 'file'), ('1dir/1dir', '1dir', 'directory'), ] ), (('1dir/1dir', './1dir/1dir'), [ ] ), ] return tree, expected_dirblocks def test_walk_cur_dir(self): tree, expected_dirblocks = self._get_ascii_tree() self.build_tree(tree) result = list(osutils._walkdirs_utf8('.')) # Filter out stat and abspath self.assertEqual(expected_dirblocks, [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result]) def test_walk_sub_dir(self): tree, expected_dirblocks = self._get_ascii_tree() self.build_tree(tree) # you can search a subdir only, with a supplied prefix. result = list(osutils._walkdirs_utf8('./1dir', '1dir')) # Filter out stat and abspath self.assertEqual(expected_dirblocks[1:], [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result]) def _get_unicode_tree(self): name0u = u'0file-\xb6' name1u = u'1dir-\u062c\u0648' name2u = u'2file-\u0633' tree = [ name0u, name1u + '/', name1u + '/' + name0u, name1u + '/' + name1u + '/', name2u, ] name0 = name0u.encode('UTF-8') name1 = name1u.encode('UTF-8') name2 = name2u.encode('UTF-8') expected_dirblocks = [ (('', '.'), [(name0, name0, 'file', './' + name0u), (name1, name1, 'directory', './' + name1u), (name2, name2, 'file', './' + name2u), ] ), ((name1, './' + name1u), [(name1 + '/' + name0, name0, 'file', './' + name1u + '/' + name0u), (name1 + '/' + name1, name1, 'directory', './' + name1u + '/' + name1u), ] ), ((name1 + '/' + name1, './' + name1u + '/' + name1u), [ ] ), ] return tree, expected_dirblocks def _filter_out(self, raw_dirblocks): """Filter out a walkdirs_utf8 result. stat field is removed, all native paths are converted to unicode """ filtered_dirblocks = [] for dirinfo, block in raw_dirblocks: dirinfo = (dirinfo[0], self._native_to_unicode(dirinfo[1])) details = [] for line in block: details.append(line[0:3] + (self._native_to_unicode(line[4]), )) filtered_dirblocks.append((dirinfo, details)) return filtered_dirblocks def test_walk_unicode_tree(self): self.requireFeature(features.UnicodeFilenameFeature) tree, expected_dirblocks = self._get_unicode_tree() self.build_tree(tree) result = list(osutils._walkdirs_utf8('.')) self.assertEqual(expected_dirblocks, self._filter_out(result)) def test_symlink(self): self.requireFeature(features.SymlinkFeature) self.requireFeature(features.UnicodeFilenameFeature) target = u'target\N{Euro Sign}' link_name = u'l\N{Euro Sign}nk' os.symlink(target, link_name) target_utf8 = target.encode('UTF-8') link_name_utf8 = link_name.encode('UTF-8') expected_dirblocks = [ (('', '.'), [(link_name_utf8, link_name_utf8, 'symlink', './' + link_name),], )] result = list(osutils._walkdirs_utf8('.')) self.assertEqual(expected_dirblocks, self._filter_out(result)) class TestReadLink(tests.TestCaseInTempDir): """Exposes os.readlink() problems and the osutils solution. The only guarantee offered by os.readlink(), starting with 2.6, is that a unicode string will be returned if a unicode string is passed. But prior python versions failed to properly encode the passed unicode string. """ _test_needs_features = [features.SymlinkFeature, features.UnicodeFilenameFeature] def setUp(self): super(tests.TestCaseInTempDir, self).setUp() self.link = u'l\N{Euro Sign}ink' self.target = u'targe\N{Euro Sign}t' os.symlink(self.target, self.link) def test_os_readlink_link_encoding(self): self.assertEquals(self.target, os.readlink(self.link)) def test_os_readlink_link_decoding(self): self.assertEquals(self.target.encode(osutils._fs_enc), os.readlink(self.link.encode(osutils._fs_enc))) class TestConcurrency(tests.TestCase): def setUp(self): super(TestConcurrency, self).setUp() self.overrideAttr(osutils, '_cached_local_concurrency') def test_local_concurrency(self): concurrency = osutils.local_concurrency() self.assertIsInstance(concurrency, int) def test_local_concurrency_environment_variable(self): self.overrideEnv('BZR_CONCURRENCY', '2') self.assertEqual(2, osutils.local_concurrency(use_cache=False)) self.overrideEnv('BZR_CONCURRENCY', '3') self.assertEqual(3, osutils.local_concurrency(use_cache=False)) self.overrideEnv('BZR_CONCURRENCY', 'foo') self.assertEqual(1, osutils.local_concurrency(use_cache=False)) def test_option_concurrency(self): self.overrideEnv('BZR_CONCURRENCY', '1') self.run_bzr('rocks --concurrency 42') # Command line overrides environment variable self.assertEquals('42', os.environ['BZR_CONCURRENCY']) self.assertEquals(42, osutils.local_concurrency(use_cache=False)) class TestFailedToLoadExtension(tests.TestCase): def _try_loading(self): try: import bzrlib._fictional_extension_py except ImportError, e: osutils.failed_to_load_extension(e) return True def setUp(self): super(TestFailedToLoadExtension, self).setUp() self.overrideAttr(osutils, '_extension_load_failures', []) def test_failure_to_load(self): self._try_loading() self.assertLength(1, osutils._extension_load_failures) self.assertEquals(osutils._extension_load_failures[0], "No module named _fictional_extension_py") def test_report_extension_load_failures_no_warning(self): self.assertTrue(self._try_loading()) warnings, result = self.callCatchWarnings(osutils.report_extension_load_failures) # it used to give a Python warning; it no longer does self.assertLength(0, warnings) def test_report_extension_load_failures_message(self): log = StringIO() trace.push_log_file(log) self.assertTrue(self._try_loading()) osutils.report_extension_load_failures() self.assertContainsRe( log.getvalue(), r"bzr: warning: some compiled extensions could not be loaded; " "see \n" ) class TestTerminalWidth(tests.TestCase): def setUp(self): tests.TestCase.setUp(self) self._orig_terminal_size_state = osutils._terminal_size_state self._orig_first_terminal_size = osutils._first_terminal_size self.addCleanup(self.restore_osutils_globals) osutils._terminal_size_state = 'no_data' osutils._first_terminal_size = None def restore_osutils_globals(self): osutils._terminal_size_state = self._orig_terminal_size_state osutils._first_terminal_size = self._orig_first_terminal_size def replace_stdout(self, new): self.overrideAttr(sys, 'stdout', new) def replace__terminal_size(self, new): self.overrideAttr(osutils, '_terminal_size', new) def set_fake_tty(self): class I_am_a_tty(object): def isatty(self): return True self.replace_stdout(I_am_a_tty()) def test_default_values(self): self.assertEqual(80, osutils.default_terminal_width) def test_defaults_to_BZR_COLUMNS(self): # BZR_COLUMNS is set by the test framework self.assertNotEqual('12', os.environ['BZR_COLUMNS']) self.overrideEnv('BZR_COLUMNS', '12') self.assertEqual(12, osutils.terminal_width()) def test_BZR_COLUMNS_0_no_limit(self): self.overrideEnv('BZR_COLUMNS', '0') self.assertEqual(None, osutils.terminal_width()) def test_falls_back_to_COLUMNS(self): self.overrideEnv('BZR_COLUMNS', None) self.assertNotEqual('42', os.environ['COLUMNS']) self.set_fake_tty() self.overrideEnv('COLUMNS', '42') self.assertEqual(42, osutils.terminal_width()) def test_tty_default_without_columns(self): self.overrideEnv('BZR_COLUMNS', None) self.overrideEnv('COLUMNS', None) def terminal_size(w, h): return 42, 42 self.set_fake_tty() # We need to override the osutils definition as it depends on the # running environment that we can't control (PQM running without a # controlling terminal is one example). self.replace__terminal_size(terminal_size) self.assertEqual(42, osutils.terminal_width()) def test_non_tty_default_without_columns(self): self.overrideEnv('BZR_COLUMNS', None) self.overrideEnv('COLUMNS', None) self.replace_stdout(None) self.assertEqual(None, osutils.terminal_width()) def test_no_TIOCGWINSZ(self): self.requireFeature(term_ios_feature) termios = term_ios_feature.module # bug 63539 is about a termios without TIOCGWINSZ attribute try: orig = termios.TIOCGWINSZ except AttributeError: # We won't remove TIOCGWINSZ, because it doesn't exist anyway :) pass else: self.overrideAttr(termios, 'TIOCGWINSZ') del termios.TIOCGWINSZ self.overrideEnv('BZR_COLUMNS', None) self.overrideEnv('COLUMNS', None) # Whatever the result is, if we don't raise an exception, it's ok. osutils.terminal_width() class TestCreationOps(tests.TestCaseInTempDir): _test_needs_features = [features.chown_feature] def setUp(self): tests.TestCaseInTempDir.setUp(self) self.overrideAttr(os, 'chown', self._dummy_chown) # params set by call to _dummy_chown self.path = self.uid = self.gid = None def _dummy_chown(self, path, uid, gid): self.path, self.uid, self.gid = path, uid, gid def test_copy_ownership_from_path(self): """copy_ownership_from_path test with specified src.""" ownsrc = '/' f = open('test_file', 'wt') osutils.copy_ownership_from_path('test_file', ownsrc) s = os.stat(ownsrc) self.assertEquals(self.path, 'test_file') self.assertEquals(self.uid, s.st_uid) self.assertEquals(self.gid, s.st_gid) def test_copy_ownership_nonesrc(self): """copy_ownership_from_path test with src=None.""" f = open('test_file', 'wt') # should use parent dir for permissions osutils.copy_ownership_from_path('test_file') s = os.stat('..') self.assertEquals(self.path, 'test_file') self.assertEquals(self.uid, s.st_uid) self.assertEquals(self.gid, s.st_gid) class TestPathFromEnviron(tests.TestCase): def test_is_unicode(self): self.overrideEnv('BZR_TEST_PATH', './anywhere at all/') path = osutils.path_from_environ('BZR_TEST_PATH') self.assertIsInstance(path, unicode) self.assertEqual(u'./anywhere at all/', path) def test_posix_path_env_ascii(self): self.overrideEnv('BZR_TEST_PATH', '/tmp') home = osutils._posix_path_from_environ('BZR_TEST_PATH') self.assertIsInstance(home, unicode) self.assertEqual(u'/tmp', home) def test_posix_path_env_unicode(self): self.requireFeature(features.ByteStringNamedFilesystem) self.overrideEnv('BZR_TEST_PATH', '/home/\xa7test') self.overrideAttr(osutils, "_fs_enc", "iso8859-1") self.assertEqual(u'/home/\xa7test', osutils._posix_path_from_environ('BZR_TEST_PATH')) osutils._fs_enc = "iso8859-5" self.assertEqual(u'/home/\u0407test', osutils._posix_path_from_environ('BZR_TEST_PATH')) osutils._fs_enc = "utf-8" self.assertRaises(errors.BadFilenameEncoding, osutils._posix_path_from_environ, 'BZR_TEST_PATH') class TestGetHomeDir(tests.TestCase): def test_is_unicode(self): home = osutils._get_home_dir() self.assertIsInstance(home, unicode) def test_posix_homeless(self): self.overrideEnv('HOME', None) home = osutils._get_home_dir() self.assertIsInstance(home, unicode) def test_posix_home_ascii(self): self.overrideEnv('HOME', '/home/test') home = osutils._posix_get_home_dir() self.assertIsInstance(home, unicode) self.assertEqual(u'/home/test', home) def test_posix_home_unicode(self): self.requireFeature(features.ByteStringNamedFilesystem) self.overrideEnv('HOME', '/home/\xa7test') self.overrideAttr(osutils, "_fs_enc", "iso8859-1") self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir()) osutils._fs_enc = "iso8859-5" self.assertEqual(u'/home/\u0407test', osutils._posix_get_home_dir()) osutils._fs_enc = "utf-8" self.assertRaises(errors.BadFilenameEncoding, osutils._posix_get_home_dir) class TestGetuserUnicode(tests.TestCase): def test_is_unicode(self): user = osutils.getuser_unicode() self.assertIsInstance(user, unicode) def envvar_to_override(self): if sys.platform == "win32": # Disable use of platform calls on windows so envvar is used self.overrideAttr(win32utils, 'has_ctypes', False) return 'USERNAME' # only variable used on windows return 'LOGNAME' # first variable checked by getpass.getuser() def test_ascii_user(self): self.overrideEnv(self.envvar_to_override(), 'jrandom') self.assertEqual(u'jrandom', osutils.getuser_unicode()) def test_unicode_user(self): ue = osutils.get_user_encoding() uni_val, env_val = tests.probe_unicode_in_user_encoding() if uni_val is None: raise tests.TestSkipped( 'Cannot find a unicode character that works in encoding %s' % (osutils.get_user_encoding(),)) uni_username = u'jrandom' + uni_val encoded_username = uni_username.encode(ue) self.overrideEnv(self.envvar_to_override(), encoded_username) self.assertEqual(uni_username, osutils.getuser_unicode()) class TestBackupNames(tests.TestCase): def setUp(self): super(TestBackupNames, self).setUp() self.backups = [] def backup_exists(self, name): return name in self.backups def available_backup_name(self, name): backup_name = osutils.available_backup_name(name, self.backup_exists) self.backups.append(backup_name) return backup_name def assertBackupName(self, expected, name): self.assertEqual(expected, self.available_backup_name(name)) def test_empty(self): self.assertBackupName('file.~1~', 'file') def test_existing(self): self.available_backup_name('file') self.available_backup_name('file') self.assertBackupName('file.~3~', 'file') # Empty slots are found, this is not a strict requirement and may be # revisited if we test against all implementations. self.backups.remove('file.~2~') self.assertBackupName('file.~2~', 'file') class TestFindExecutableInPath(tests.TestCase): def test_windows(self): if sys.platform != 'win32': raise tests.TestSkipped('test requires win32') self.assertTrue(osutils.find_executable_on_path('explorer') is not None) self.assertTrue( osutils.find_executable_on_path('explorer.exe') is not None) self.assertTrue( osutils.find_executable_on_path('EXPLORER.EXE') is not None) self.assertTrue( osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None) self.assertTrue(osutils.find_executable_on_path('file.txt') is None) def test_windows_app_path(self): if sys.platform != 'win32': raise tests.TestSkipped('test requires win32') # Override PATH env var so that exe can only be found on App Path self.overrideEnv('PATH', '') # Internt Explorer is always registered in the App Path self.assertTrue(osutils.find_executable_on_path('iexplore') is not None) def test_other(self): if sys.platform == 'win32': raise tests.TestSkipped('test requires non-win32') self.assertTrue(osutils.find_executable_on_path('sh') is not None) self.assertTrue( osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None) class TestEnvironmentErrors(tests.TestCase): """Test handling of environmental errors""" def test_is_oserror(self): self.assertTrue(osutils.is_environment_error( OSError(errno.EINVAL, "Invalid parameter"))) def test_is_ioerror(self): self.assertTrue(osutils.is_environment_error( IOError(errno.EINVAL, "Invalid parameter"))) def test_is_socket_error(self): self.assertTrue(osutils.is_environment_error( socket.error(errno.EINVAL, "Invalid parameter"))) def test_is_select_error(self): self.assertTrue(osutils.is_environment_error( select.error(errno.EINVAL, "Invalid parameter"))) def test_is_pywintypes_error(self): self.requireFeature(features.pywintypes) import pywintypes self.assertTrue(osutils.is_environment_error( pywintypes.error(errno.EINVAL, "Invalid parameter", "Caller")))