summaryrefslogtreecommitdiff
path: root/bzrlib/tests/test_lazy_import.py
diff options
context:
space:
mode:
Diffstat (limited to 'bzrlib/tests/test_lazy_import.py')
-rw-r--r--bzrlib/tests/test_lazy_import.py1217
1 files changed, 1217 insertions, 0 deletions
diff --git a/bzrlib/tests/test_lazy_import.py b/bzrlib/tests/test_lazy_import.py
new file mode 100644
index 0000000..079c66b
--- /dev/null
+++ b/bzrlib/tests/test_lazy_import.py
@@ -0,0 +1,1217 @@
+# Copyright (C) 2006-2011 Canonical Ltd
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+"""Test the lazy_import functionality."""
+
+import linecache
+import os
+import re
+import sys
+
+from bzrlib import (
+ errors,
+ lazy_import,
+ osutils,
+ )
+from bzrlib.tests import (
+ TestCase,
+ TestCaseInTempDir,
+ )
+
+
+class InstrumentedReplacer(lazy_import.ScopeReplacer):
+ """Track what actions are done"""
+
+ @staticmethod
+ def use_actions(actions):
+ InstrumentedReplacer.actions = actions
+
+ def __getattribute__(self, attr):
+ InstrumentedReplacer.actions.append(('__getattribute__', attr))
+ return lazy_import.ScopeReplacer.__getattribute__(self, attr)
+
+ def __call__(self, *args, **kwargs):
+ InstrumentedReplacer.actions.append(('__call__', args, kwargs))
+ return lazy_import.ScopeReplacer.__call__(self, *args, **kwargs)
+
+
+class InstrumentedImportReplacer(lazy_import.ImportReplacer):
+
+ @staticmethod
+ def use_actions(actions):
+ InstrumentedImportReplacer.actions = actions
+
+ def _import(self, scope, name):
+ InstrumentedImportReplacer.actions.append(('_import', name))
+ return lazy_import.ImportReplacer._import(self, scope, name)
+
+ def __getattribute__(self, attr):
+ InstrumentedImportReplacer.actions.append(('__getattribute__', attr))
+ return lazy_import.ScopeReplacer.__getattribute__(self, attr)
+
+ def __call__(self, *args, **kwargs):
+ InstrumentedImportReplacer.actions.append(('__call__', args, kwargs))
+ return lazy_import.ScopeReplacer.__call__(self, *args, **kwargs)
+
+
+class TestClass(object):
+ """Just a simple test class instrumented for the test cases"""
+
+ class_member = 'class_member'
+
+ @staticmethod
+ def use_actions(actions):
+ TestClass.actions = actions
+
+ def __init__(self):
+ TestClass.actions.append('init')
+
+ def foo(self, x):
+ TestClass.actions.append(('foo', x))
+ return 'foo'
+
+
+class TestScopeReplacer(TestCase):
+ """Test the ability of the replacer to put itself into the correct scope.
+
+ In these tests we use the global scope, because we cannot replace
+ variables in the local scope. This means that we need to be careful
+ and not have the replacing objects use the same name, or we would
+ get collisions.
+ """
+
+ def setUp(self):
+ TestCase.setUp(self)
+ # These tests assume we will not be proxying, so make sure proxying is
+ # disabled.
+ orig_proxy = lazy_import.ScopeReplacer._should_proxy
+ def restore():
+ lazy_import.ScopeReplacer._should_proxy = orig_proxy
+ lazy_import.ScopeReplacer._should_proxy = False
+
+ def test_object(self):
+ """ScopeReplacer can create an instance in local scope.
+
+ An object should appear in globals() by constructing a ScopeReplacer,
+ and it will be replaced with the real object upon the first request.
+ """
+ actions = []
+ InstrumentedReplacer.use_actions(actions)
+ TestClass.use_actions(actions)
+
+ def factory(replacer, scope, name):
+ actions.append('factory')
+ return TestClass()
+
+ try:
+ test_obj1
+ except NameError:
+ # test_obj1 shouldn't exist yet
+ pass
+ else:
+ self.fail('test_obj1 was not supposed to exist yet')
+
+ InstrumentedReplacer(scope=globals(), name='test_obj1',
+ factory=factory)
+
+ # We can't use isinstance() because that uses test_obj1.__class__
+ # and that goes through __getattribute__ which would activate
+ # the replacement
+ self.assertEqual(InstrumentedReplacer,
+ object.__getattribute__(test_obj1, '__class__'))
+ self.assertEqual('foo', test_obj1.foo(1))
+ self.assertIsInstance(test_obj1, TestClass)
+ self.assertEqual('foo', test_obj1.foo(2))
+ self.assertEqual([('__getattribute__', 'foo'),
+ 'factory',
+ 'init',
+ ('foo', 1),
+ ('foo', 2),
+ ], actions)
+
+ def test_setattr_replaces(self):
+ """ScopeReplacer can create an instance in local scope.
+
+ An object should appear in globals() by constructing a ScopeReplacer,
+ and it will be replaced with the real object upon the first request.
+ """
+ actions = []
+ TestClass.use_actions(actions)
+ def factory(replacer, scope, name):
+ return TestClass()
+ try:
+ test_obj6
+ except NameError:
+ # test_obj6 shouldn't exist yet
+ pass
+ else:
+ self.fail('test_obj6 was not supposed to exist yet')
+
+ lazy_import.ScopeReplacer(scope=globals(), name='test_obj6',
+ factory=factory)
+
+ # We can't use isinstance() because that uses test_obj6.__class__
+ # and that goes through __getattribute__ which would activate
+ # the replacement
+ self.assertEqual(lazy_import.ScopeReplacer,
+ object.__getattribute__(test_obj6, '__class__'))
+ test_obj6.bar = 'test'
+ self.assertNotEqual(lazy_import.ScopeReplacer,
+ object.__getattribute__(test_obj6, '__class__'))
+ self.assertEqual('test', test_obj6.bar)
+
+ def test_replace_side_effects(self):
+ """Creating a new object should only create one entry in globals.
+
+ And only that entry even after replacement.
+ """
+ try:
+ test_scope1
+ except NameError:
+ # test_scope1 shouldn't exist yet
+ pass
+ else:
+ self.fail('test_scope1 was not supposed to exist yet')
+
+ # ignore the logged actions
+ TestClass.use_actions([])
+
+ def factory(replacer, scope, name):
+ return TestClass()
+
+ orig_globals = set(globals().keys())
+
+ lazy_import.ScopeReplacer(scope=globals(), name='test_scope1',
+ factory=factory)
+
+ new_globals = set(globals().keys())
+
+ self.assertEqual(lazy_import.ScopeReplacer,
+ object.__getattribute__(test_scope1, '__class__'))
+ self.assertEqual('foo', test_scope1.foo(1))
+ self.assertIsInstance(test_scope1, TestClass)
+
+ final_globals = set(globals().keys())
+
+ self.assertEqual(set(['test_scope1']), new_globals - orig_globals)
+ self.assertEqual(set(), orig_globals - new_globals)
+ self.assertEqual(set(), final_globals - new_globals)
+ self.assertEqual(set(), new_globals - final_globals)
+
+ def test_class(self):
+ actions = []
+ InstrumentedReplacer.use_actions(actions)
+ TestClass.use_actions(actions)
+
+ def factory(replacer, scope, name):
+ actions.append('factory')
+ return TestClass
+
+ try:
+ test_class1
+ except NameError:
+ # test_class2 shouldn't exist yet
+ pass
+ else:
+ self.fail('test_class1 was not supposed to exist yet')
+
+ InstrumentedReplacer(scope=globals(), name='test_class1',
+ factory=factory)
+
+ self.assertEqual('class_member', test_class1.class_member)
+ self.assertEqual(test_class1, TestClass)
+ self.assertEqual([('__getattribute__', 'class_member'),
+ 'factory',
+ ], actions)
+
+ def test_call_class(self):
+ actions = []
+ InstrumentedReplacer.use_actions(actions)
+ TestClass.use_actions(actions)
+
+ def factory(replacer, scope, name):
+ actions.append('factory')
+ return TestClass
+
+ try:
+ test_class2
+ except NameError:
+ # test_class2 shouldn't exist yet
+ pass
+ else:
+ self.fail('test_class2 was not supposed to exist yet')
+
+ InstrumentedReplacer(scope=globals(), name='test_class2',
+ factory=factory)
+
+ self.assertFalse(test_class2 is TestClass)
+ obj = test_class2()
+ self.assertIs(test_class2, TestClass)
+ self.assertIsInstance(obj, TestClass)
+ self.assertEqual('class_member', obj.class_member)
+ self.assertEqual([('__call__', (), {}),
+ 'factory',
+ 'init',
+ ], actions)
+
+ def test_call_func(self):
+ actions = []
+ InstrumentedReplacer.use_actions(actions)
+
+ def func(a, b, c=None):
+ actions.append('func')
+ return (a, b, c)
+
+ def factory(replacer, scope, name):
+ actions.append('factory')
+ return func
+
+ try:
+ test_func1
+ except NameError:
+ # test_func1 shouldn't exist yet
+ pass
+ else:
+ self.fail('test_func1 was not supposed to exist yet')
+ InstrumentedReplacer(scope=globals(), name='test_func1',
+ factory=factory)
+
+ self.assertFalse(test_func1 is func)
+ val = test_func1(1, 2, c='3')
+ self.assertIs(test_func1, func)
+
+ self.assertEqual((1,2,'3'), val)
+ self.assertEqual([('__call__', (1,2), {'c':'3'}),
+ 'factory',
+ 'func',
+ ], actions)
+
+ def test_other_variable(self):
+ """Test when a ScopeReplacer is assigned to another variable.
+
+ This test could be updated if we find a way to trap '=' rather
+ than just giving a belated exception.
+ ScopeReplacer only knows about the variable it was created as,
+ so until the object is replaced, it is illegal to pass it to
+ another variable. (Though discovering this may take a while)
+ """
+ actions = []
+ InstrumentedReplacer.use_actions(actions)
+ TestClass.use_actions(actions)
+
+ def factory(replacer, scope, name):
+ actions.append('factory')
+ return TestClass()
+
+ try:
+ test_obj2
+ except NameError:
+ # test_obj2 shouldn't exist yet
+ pass
+ else:
+ self.fail('test_obj2 was not supposed to exist yet')
+
+ InstrumentedReplacer(scope=globals(), name='test_obj2',
+ factory=factory)
+
+ self.assertEqual(InstrumentedReplacer,
+ object.__getattribute__(test_obj2, '__class__'))
+ # This is technically not allowed, but we don't have a way to
+ # test it until later.
+ test_obj3 = test_obj2
+ self.assertEqual(InstrumentedReplacer,
+ object.__getattribute__(test_obj2, '__class__'))
+ self.assertEqual(InstrumentedReplacer,
+ object.__getattribute__(test_obj3, '__class__'))
+
+ # The first use of the alternate variable causes test_obj2 to
+ # be replaced.
+ self.assertEqual('foo', test_obj3.foo(1))
+ # test_obj2 has been replaced, but the ScopeReplacer has no
+ # idea of test_obj3
+ self.assertEqual(TestClass,
+ object.__getattribute__(test_obj2, '__class__'))
+ self.assertEqual(InstrumentedReplacer,
+ object.__getattribute__(test_obj3, '__class__'))
+ # We should be able to access test_obj2 attributes normally
+ self.assertEqual('foo', test_obj2.foo(2))
+ self.assertEqual('foo', test_obj2.foo(3))
+
+ # However, the next access on test_obj3 should raise an error
+ # because only now are we able to detect the problem.
+ self.assertRaises(errors.IllegalUseOfScopeReplacer,
+ getattr, test_obj3, 'foo')
+
+ self.assertEqual([('__getattribute__', 'foo'),
+ 'factory',
+ 'init',
+ ('foo', 1),
+ ('foo', 2),
+ ('foo', 3),
+ ('__getattribute__', 'foo'),
+ ], actions)
+
+ def test_enable_proxying(self):
+ """Test that we can allow ScopeReplacer to proxy."""
+ actions = []
+ InstrumentedReplacer.use_actions(actions)
+ TestClass.use_actions(actions)
+
+ def factory(replacer, scope, name):
+ actions.append('factory')
+ return TestClass()
+
+ try:
+ test_obj4
+ except NameError:
+ # test_obj4 shouldn't exist yet
+ pass
+ else:
+ self.fail('test_obj4 was not supposed to exist yet')
+
+ lazy_import.ScopeReplacer._should_proxy = True
+ InstrumentedReplacer(scope=globals(), name='test_obj4',
+ factory=factory)
+
+ self.assertEqual(InstrumentedReplacer,
+ object.__getattribute__(test_obj4, '__class__'))
+ test_obj5 = test_obj4
+ self.assertEqual(InstrumentedReplacer,
+ object.__getattribute__(test_obj4, '__class__'))
+ self.assertEqual(InstrumentedReplacer,
+ object.__getattribute__(test_obj5, '__class__'))
+
+ # The first use of the alternate variable causes test_obj2 to
+ # be replaced.
+ self.assertEqual('foo', test_obj4.foo(1))
+ self.assertEqual(TestClass,
+ object.__getattribute__(test_obj4, '__class__'))
+ self.assertEqual(InstrumentedReplacer,
+ object.__getattribute__(test_obj5, '__class__'))
+ # We should be able to access test_obj4 attributes normally
+ self.assertEqual('foo', test_obj4.foo(2))
+ # because we enabled proxying, test_obj5 can access its members as well
+ self.assertEqual('foo', test_obj5.foo(3))
+ self.assertEqual('foo', test_obj5.foo(4))
+
+ # However, it cannot be replaced by the ScopeReplacer
+ self.assertEqual(InstrumentedReplacer,
+ object.__getattribute__(test_obj5, '__class__'))
+
+ self.assertEqual([('__getattribute__', 'foo'),
+ 'factory',
+ 'init',
+ ('foo', 1),
+ ('foo', 2),
+ ('__getattribute__', 'foo'),
+ ('foo', 3),
+ ('__getattribute__', 'foo'),
+ ('foo', 4),
+ ], actions)
+
+ def test_replacing_from_own_scope_fails(self):
+ """If a ScopeReplacer tries to replace itself a nice error is given"""
+ actions = []
+ InstrumentedReplacer.use_actions(actions)
+ TestClass.use_actions(actions)
+
+ def factory(replacer, scope, name):
+ actions.append('factory')
+ # return the name in given scope, which is currently the replacer
+ return scope[name]
+
+ try:
+ test_obj7
+ except NameError:
+ # test_obj7 shouldn't exist yet
+ pass
+ else:
+ self.fail('test_obj7 was not supposed to exist yet')
+
+ InstrumentedReplacer(scope=globals(), name='test_obj7',
+ factory=factory)
+
+ self.assertEqual(InstrumentedReplacer,
+ object.__getattribute__(test_obj7, '__class__'))
+ e = self.assertRaises(errors.IllegalUseOfScopeReplacer, test_obj7)
+ self.assertIn("replace itself", e.msg)
+ self.assertEqual([('__call__', (), {}),
+ 'factory'], actions)
+
+
+class ImportReplacerHelper(TestCaseInTempDir):
+ """Test the ability to have a lazily imported module or object"""
+
+ def setUp(self):
+ TestCaseInTempDir.setUp(self)
+ self.create_modules()
+ base_path = self.test_dir + '/base'
+
+ self.actions = []
+ InstrumentedImportReplacer.use_actions(self.actions)
+
+ sys.path.append(base_path)
+ self.addCleanup(sys.path.remove, base_path)
+
+ original_import = __import__
+ def instrumented_import(mod, scope1, scope2, fromlist, level):
+ self.actions.append(('import', mod, fromlist, level))
+ return original_import(mod, scope1, scope2, fromlist, level)
+ def cleanup():
+ __builtins__['__import__'] = original_import
+ self.addCleanup(cleanup)
+ __builtins__['__import__'] = instrumented_import
+
+ def create_modules(self):
+ """Create some random modules to be imported.
+
+ Each entry has a random suffix, and the full names are saved
+
+ These are setup as follows:
+ base/ <= used to ensure not in default search path
+ root-XXX/
+ __init__.py <= This will contain var1, func1
+ mod-XXX.py <= This will contain var2, func2
+ sub-XXX/
+ __init__.py <= Contains var3, func3
+ submoda-XXX.py <= contains var4, func4
+ submodb-XXX.py <= containse var5, func5
+ """
+ rand_suffix = osutils.rand_chars(4)
+ root_name = 'root_' + rand_suffix
+ mod_name = 'mod_' + rand_suffix
+ sub_name = 'sub_' + rand_suffix
+ submoda_name = 'submoda_' + rand_suffix
+ submodb_name = 'submodb_' + rand_suffix
+
+ os.mkdir('base')
+ root_path = osutils.pathjoin('base', root_name)
+ os.mkdir(root_path)
+ root_init = osutils.pathjoin(root_path, '__init__.py')
+ f = open(osutils.pathjoin(root_path, '__init__.py'), 'wb')
+ try:
+ f.write('var1 = 1\ndef func1(a):\n return a\n')
+ finally:
+ f.close()
+ mod_path = osutils.pathjoin(root_path, mod_name + '.py')
+ f = open(mod_path, 'wb')
+ try:
+ f.write('var2 = 2\ndef func2(a):\n return a\n')
+ finally:
+ f.close()
+
+ sub_path = osutils.pathjoin(root_path, sub_name)
+ os.mkdir(sub_path)
+ f = open(osutils.pathjoin(sub_path, '__init__.py'), 'wb')
+ try:
+ f.write('var3 = 3\ndef func3(a):\n return a\n')
+ finally:
+ f.close()
+ submoda_path = osutils.pathjoin(sub_path, submoda_name + '.py')
+ f = open(submoda_path, 'wb')
+ try:
+ f.write('var4 = 4\ndef func4(a):\n return a\n')
+ finally:
+ f.close()
+ submodb_path = osutils.pathjoin(sub_path, submodb_name + '.py')
+ f = open(submodb_path, 'wb')
+ try:
+ f.write('var5 = 5\ndef func5(a):\n return a\n')
+ finally:
+ f.close()
+ self.root_name = root_name
+ self.mod_name = mod_name
+ self.sub_name = sub_name
+ self.submoda_name = submoda_name
+ self.submodb_name = submodb_name
+
+
+class TestImportReplacerHelper(ImportReplacerHelper):
+
+ def test_basic_import(self):
+ """Test that a real import of these modules works"""
+ sub_mod_path = '.'.join([self.root_name, self.sub_name,
+ self.submoda_name])
+ root = __import__(sub_mod_path, globals(), locals(), [], 0)
+ self.assertEqual(1, root.var1)
+ self.assertEqual(3, getattr(root, self.sub_name).var3)
+ self.assertEqual(4, getattr(getattr(root, self.sub_name),
+ self.submoda_name).var4)
+
+ mod_path = '.'.join([self.root_name, self.mod_name])
+ root = __import__(mod_path, globals(), locals(), [], 0)
+ self.assertEqual(2, getattr(root, self.mod_name).var2)
+
+ self.assertEqual([('import', sub_mod_path, [], 0),
+ ('import', mod_path, [], 0),
+ ], self.actions)
+
+
+class TestImportReplacer(ImportReplacerHelper):
+
+ def test_import_root(self):
+ """Test 'import root-XXX as root1'"""
+ try:
+ root1
+ except NameError:
+ # root1 shouldn't exist yet
+ pass
+ else:
+ self.fail('root1 was not supposed to exist yet')
+
+ # This should replicate 'import root-xxyyzz as root1'
+ InstrumentedImportReplacer(scope=globals(), name='root1',
+ module_path=[self.root_name],
+ member=None, children={})
+
+ self.assertEqual(InstrumentedImportReplacer,
+ object.__getattribute__(root1, '__class__'))
+ self.assertEqual(1, root1.var1)
+ self.assertEqual('x', root1.func1('x'))
+
+ self.assertEqual([('__getattribute__', 'var1'),
+ ('_import', 'root1'),
+ ('import', self.root_name, [], 0),
+ ], self.actions)
+
+ def test_import_mod(self):
+ """Test 'import root-XXX.mod-XXX as mod2'"""
+ try:
+ mod1
+ except NameError:
+ # mod1 shouldn't exist yet
+ pass
+ else:
+ self.fail('mod1 was not supposed to exist yet')
+
+ mod_path = self.root_name + '.' + self.mod_name
+ InstrumentedImportReplacer(scope=globals(), name='mod1',
+ module_path=[self.root_name, self.mod_name],
+ member=None, children={})
+
+ self.assertEqual(InstrumentedImportReplacer,
+ object.__getattribute__(mod1, '__class__'))
+ self.assertEqual(2, mod1.var2)
+ self.assertEqual('y', mod1.func2('y'))
+
+ self.assertEqual([('__getattribute__', 'var2'),
+ ('_import', 'mod1'),
+ ('import', mod_path, [], 0),
+ ], self.actions)
+
+ def test_import_mod_from_root(self):
+ """Test 'from root-XXX import mod-XXX as mod2'"""
+ try:
+ mod2
+ except NameError:
+ # mod2 shouldn't exist yet
+ pass
+ else:
+ self.fail('mod2 was not supposed to exist yet')
+
+ InstrumentedImportReplacer(scope=globals(), name='mod2',
+ module_path=[self.root_name],
+ member=self.mod_name, children={})
+
+ self.assertEqual(InstrumentedImportReplacer,
+ object.__getattribute__(mod2, '__class__'))
+ self.assertEqual(2, mod2.var2)
+ self.assertEqual('y', mod2.func2('y'))
+
+ self.assertEqual([('__getattribute__', 'var2'),
+ ('_import', 'mod2'),
+ ('import', self.root_name, [self.mod_name], 0),
+ ], self.actions)
+
+ def test_import_root_and_mod(self):
+ """Test 'import root-XXX.mod-XXX' remapping both to root3.mod3"""
+ try:
+ root3
+ except NameError:
+ # root3 shouldn't exist yet
+ pass
+ else:
+ self.fail('root3 was not supposed to exist yet')
+
+ InstrumentedImportReplacer(scope=globals(),
+ name='root3', module_path=[self.root_name], member=None,
+ children={'mod3':([self.root_name, self.mod_name], None, {})})
+
+ # So 'root3' should be a lazy import
+ # and once it is imported, mod3 should also be lazy until
+ # actually accessed.
+ self.assertEqual(InstrumentedImportReplacer,
+ object.__getattribute__(root3, '__class__'))
+ self.assertEqual(1, root3.var1)
+
+ # There is a mod3 member, but it is also lazy
+ self.assertEqual(InstrumentedImportReplacer,
+ object.__getattribute__(root3.mod3, '__class__'))
+ self.assertEqual(2, root3.mod3.var2)
+
+ mod_path = self.root_name + '.' + self.mod_name
+ self.assertEqual([('__getattribute__', 'var1'),
+ ('_import', 'root3'),
+ ('import', self.root_name, [], 0),
+ ('__getattribute__', 'var2'),
+ ('_import', 'mod3'),
+ ('import', mod_path, [], 0),
+ ], self.actions)
+
+ def test_import_root_and_root_mod(self):
+ """Test that 'import root, root.mod' can be done.
+
+ The second import should re-use the first one, and just add
+ children to be imported.
+ """
+ try:
+ root4
+ except NameError:
+ # root4 shouldn't exist yet
+ pass
+ else:
+ self.fail('root4 was not supposed to exist yet')
+
+ InstrumentedImportReplacer(scope=globals(),
+ name='root4', module_path=[self.root_name], member=None,
+ children={})
+
+ # So 'root4' should be a lazy import
+ self.assertEqual(InstrumentedImportReplacer,
+ object.__getattribute__(root4, '__class__'))
+
+ # Lets add a new child to be imported on demand
+ # This syntax of using object.__getattribute__ is the correct method
+ # for accessing the _import_replacer_children member
+ children = object.__getattribute__(root4, '_import_replacer_children')
+ children['mod4'] = ([self.root_name, self.mod_name], None, {})
+
+ # Accessing root4.mod4 should import root, but mod should stay lazy
+ self.assertEqual(InstrumentedImportReplacer,
+ object.__getattribute__(root4.mod4, '__class__'))
+ self.assertEqual(2, root4.mod4.var2)
+
+ mod_path = self.root_name + '.' + self.mod_name
+ self.assertEqual([('__getattribute__', 'mod4'),
+ ('_import', 'root4'),
+ ('import', self.root_name, [], 0),
+ ('__getattribute__', 'var2'),
+ ('_import', 'mod4'),
+ ('import', mod_path, [], 0),
+ ], self.actions)
+
+ def test_import_root_sub_submod(self):
+ """Test import root.mod, root.sub.submoda, root.sub.submodb
+ root should be a lazy import, with multiple children, who also
+ have children to be imported.
+ And when root is imported, the children should be lazy, and
+ reuse the intermediate lazy object.
+ """
+ try:
+ root5
+ except NameError:
+ # root5 shouldn't exist yet
+ pass
+ else:
+ self.fail('root5 was not supposed to exist yet')
+
+ InstrumentedImportReplacer(scope=globals(),
+ name='root5', module_path=[self.root_name], member=None,
+ children={'mod5':([self.root_name, self.mod_name], None, {}),
+ 'sub5':([self.root_name, self.sub_name], None,
+ {'submoda5':([self.root_name, self.sub_name,
+ self.submoda_name], None, {}),
+ 'submodb5':([self.root_name, self.sub_name,
+ self.submodb_name], None, {})
+ }),
+ })
+
+ # So 'root5' should be a lazy import
+ self.assertEqual(InstrumentedImportReplacer,
+ object.__getattribute__(root5, '__class__'))
+
+ # Accessing root5.mod5 should import root, but mod should stay lazy
+ self.assertEqual(InstrumentedImportReplacer,
+ object.__getattribute__(root5.mod5, '__class__'))
+ # root5.sub5 should still be lazy, but not re-import root5
+ self.assertEqual(InstrumentedImportReplacer,
+ object.__getattribute__(root5.sub5, '__class__'))
+
+ # Accessing root5.sub5.submoda5 should import sub5, but not either
+ # of the sub objects (they should be available as lazy objects
+ self.assertEqual(InstrumentedImportReplacer,
+ object.__getattribute__(root5.sub5.submoda5, '__class__'))
+ self.assertEqual(InstrumentedImportReplacer,
+ object.__getattribute__(root5.sub5.submodb5, '__class__'))
+
+ # This should import mod5
+ self.assertEqual(2, root5.mod5.var2)
+ # These should import submoda5 and submodb5
+ self.assertEqual(4, root5.sub5.submoda5.var4)
+ self.assertEqual(5, root5.sub5.submodb5.var5)
+
+ mod_path = self.root_name + '.' + self.mod_name
+ sub_path = self.root_name + '.' + self.sub_name
+ submoda_path = sub_path + '.' + self.submoda_name
+ submodb_path = sub_path + '.' + self.submodb_name
+
+ self.assertEqual([('__getattribute__', 'mod5'),
+ ('_import', 'root5'),
+ ('import', self.root_name, [], 0),
+ ('__getattribute__', 'submoda5'),
+ ('_import', 'sub5'),
+ ('import', sub_path, [], 0),
+ ('__getattribute__', 'var2'),
+ ('_import', 'mod5'),
+ ('import', mod_path, [], 0),
+ ('__getattribute__', 'var4'),
+ ('_import', 'submoda5'),
+ ('import', submoda_path, [], 0),
+ ('__getattribute__', 'var5'),
+ ('_import', 'submodb5'),
+ ('import', submodb_path, [], 0),
+ ], self.actions)
+
+
+class TestConvertImportToMap(TestCase):
+ """Directly test the conversion from import strings to maps"""
+
+ def check(self, expected, import_strings):
+ proc = lazy_import.ImportProcessor()
+ for import_str in import_strings:
+ proc._convert_import_str(import_str)
+ self.assertEqual(expected, proc.imports,
+ 'Import of %r was not converted correctly'
+ ' %s != %s' % (import_strings, expected,
+ proc.imports))
+
+ def test_import_one(self):
+ self.check({'one':(['one'], None, {}),
+ }, ['import one'])
+
+ def test_import_one_two(self):
+ one_two_map = {'one':(['one'], None,
+ {'two':(['one', 'two'], None, {}),
+ }),
+ }
+ self.check(one_two_map, ['import one.two'])
+ self.check(one_two_map, ['import one, one.two'])
+ self.check(one_two_map, ['import one', 'import one.two'])
+ self.check(one_two_map, ['import one.two', 'import one'])
+
+ def test_import_one_two_three(self):
+ one_two_three_map = {
+ 'one':(['one'], None,
+ {'two':(['one', 'two'], None,
+ {'three':(['one', 'two', 'three'], None, {}),
+ }),
+ }),
+ }
+ self.check(one_two_three_map, ['import one.two.three'])
+ self.check(one_two_three_map, ['import one, one.two.three'])
+ self.check(one_two_three_map, ['import one',
+ 'import one.two.three'])
+ self.check(one_two_three_map, ['import one.two.three',
+ 'import one'])
+
+ def test_import_one_as_x(self):
+ self.check({'x':(['one'], None, {}),
+ }, ['import one as x'])
+
+ def test_import_one_two_as_x(self):
+ self.check({'x':(['one', 'two'], None, {}),
+ }, ['import one.two as x'])
+
+ def test_import_mixed(self):
+ mixed = {'x':(['one', 'two'], None, {}),
+ 'one':(['one'], None,
+ {'two':(['one', 'two'], None, {}),
+ }),
+ }
+ self.check(mixed, ['import one.two as x, one.two'])
+ self.check(mixed, ['import one.two as x', 'import one.two'])
+ self.check(mixed, ['import one.two', 'import one.two as x'])
+
+ def test_import_with_as(self):
+ self.check({'fast':(['fast'], None, {})}, ['import fast'])
+
+
+class TestFromToMap(TestCase):
+ """Directly test the conversion of 'from foo import bar' syntax"""
+
+ def check_result(self, expected, from_strings):
+ proc = lazy_import.ImportProcessor()
+ for from_str in from_strings:
+ proc._convert_from_str(from_str)
+ self.assertEqual(expected, proc.imports,
+ 'Import of %r was not converted correctly'
+ ' %s != %s' % (from_strings, expected, proc.imports))
+
+ def test_from_one_import_two(self):
+ self.check_result({'two':(['one'], 'two', {})},
+ ['from one import two'])
+
+ def test_from_one_import_two_as_three(self):
+ self.check_result({'three':(['one'], 'two', {})},
+ ['from one import two as three'])
+
+ def test_from_one_import_two_three(self):
+ two_three_map = {'two':(['one'], 'two', {}),
+ 'three':(['one'], 'three', {}),
+ }
+ self.check_result(two_three_map,
+ ['from one import two, three'])
+ self.check_result(two_three_map,
+ ['from one import two',
+ 'from one import three'])
+
+ def test_from_one_two_import_three(self):
+ self.check_result({'three':(['one', 'two'], 'three', {})},
+ ['from one.two import three'])
+
+
+class TestCanonicalize(TestCase):
+ """Test that we can canonicalize import texts"""
+
+ def check(self, expected, text):
+ proc = lazy_import.ImportProcessor()
+ parsed = proc._canonicalize_import_text(text)
+ self.assertEqual(expected, parsed,
+ 'Incorrect parsing of text:\n%s\n%s\n!=\n%s'
+ % (text, expected, parsed))
+
+ def test_import_one(self):
+ self.check(['import one'], 'import one')
+ self.check(['import one'], '\nimport one\n\n')
+
+ def test_import_one_two(self):
+ self.check(['import one, two'], 'import one, two')
+ self.check(['import one, two'], '\nimport one, two\n\n')
+
+ def test_import_one_as_two_as(self):
+ self.check(['import one as x, two as y'], 'import one as x, two as y')
+ self.check(['import one as x, two as y'],
+ '\nimport one as x, two as y\n')
+
+ def test_from_one_import_two(self):
+ self.check(['from one import two'], 'from one import two')
+ self.check(['from one import two'], '\nfrom one import two\n\n')
+ self.check(['from one import two'], '\nfrom one import (two)\n')
+ self.check(['from one import two '], '\nfrom one import (\n\ttwo\n)\n')
+
+ def test_multiple(self):
+ self.check(['import one', 'import two, three', 'from one import four'],
+ 'import one\nimport two, three\nfrom one import four')
+ self.check(['import one', 'import two, three', 'from one import four'],
+ 'import one\nimport (two, three)\nfrom one import four')
+ self.check(['import one', 'import two, three', 'from one import four'],
+ 'import one\n'
+ 'import two, three\n'
+ 'from one import four')
+ self.check(['import one',
+ 'import two, three', 'from one import four, '],
+ 'import one\n'
+ 'import two, three\n'
+ 'from one import (\n'
+ ' four,\n'
+ ' )\n'
+ )
+
+ def test_missing_trailing(self):
+ proc = lazy_import.ImportProcessor()
+ self.assertRaises(errors.InvalidImportLine,
+ proc._canonicalize_import_text,
+ "from foo import (\n bar\n")
+
+
+class TestImportProcessor(TestCase):
+ """Test that ImportProcessor can turn import texts into lazy imports"""
+
+ def check(self, expected, text):
+ proc = lazy_import.ImportProcessor()
+ proc._build_map(text)
+ self.assertEqual(expected, proc.imports,
+ 'Incorrect processing of:\n%s\n%s\n!=\n%s'
+ % (text, expected, proc.imports))
+
+ def test_import_one(self):
+ exp = {'one':(['one'], None, {})}
+ self.check(exp, 'import one')
+ self.check(exp, '\nimport one\n')
+
+ def test_import_one_two(self):
+ exp = {'one':(['one'], None,
+ {'two':(['one', 'two'], None, {}),
+ }),
+ }
+ self.check(exp, 'import one.two')
+ self.check(exp, 'import one, one.two')
+ self.check(exp, 'import one\nimport one.two')
+
+ def test_import_as(self):
+ exp = {'two':(['one'], None, {})}
+ self.check(exp, 'import one as two')
+
+ def test_import_many(self):
+ exp = {'one':(['one'], None,
+ {'two':(['one', 'two'], None,
+ {'three':(['one', 'two', 'three'], None, {}),
+ }),
+ 'four':(['one', 'four'], None, {}),
+ }),
+ 'five':(['one', 'five'], None, {}),
+ }
+ self.check(exp, 'import one.two.three, one.four, one.five as five')
+ self.check(exp, 'import one.five as five\n'
+ 'import one\n'
+ 'import one.two.three\n'
+ 'import one.four\n')
+
+ def test_from_one_import_two(self):
+ exp = {'two':(['one'], 'two', {})}
+ self.check(exp, 'from one import two\n')
+ self.check(exp, 'from one import (\n'
+ ' two,\n'
+ ' )\n')
+
+ def test_from_one_import_two(self):
+ exp = {'two':(['one'], 'two', {})}
+ self.check(exp, 'from one import two\n')
+ self.check(exp, 'from one import (two)\n')
+ self.check(exp, 'from one import (two,)\n')
+ self.check(exp, 'from one import two as two\n')
+ self.check(exp, 'from one import (\n'
+ ' two,\n'
+ ' )\n')
+
+ def test_from_many(self):
+ exp = {'two':(['one'], 'two', {}),
+ 'three':(['one', 'two'], 'three', {}),
+ 'five':(['one', 'two'], 'four', {}),
+ }
+ self.check(exp, 'from one import two\n'
+ 'from one.two import three, four as five\n')
+ self.check(exp, 'from one import two\n'
+ 'from one.two import (\n'
+ ' three,\n'
+ ' four as five,\n'
+ ' )\n')
+
+ def test_mixed(self):
+ exp = {'two':(['one'], 'two', {}),
+ 'three':(['one', 'two'], 'three', {}),
+ 'five':(['one', 'two'], 'four', {}),
+ 'one':(['one'], None,
+ {'two':(['one', 'two'], None, {}),
+ }),
+ }
+ self.check(exp, 'from one import two\n'
+ 'from one.two import three, four as five\n'
+ 'import one.two')
+ self.check(exp, 'from one import two\n'
+ 'from one.two import (\n'
+ ' three,\n'
+ ' four as five,\n'
+ ' )\n'
+ 'import one\n'
+ 'import one.two\n')
+
+ def test_incorrect_line(self):
+ proc = lazy_import.ImportProcessor()
+ self.assertRaises(errors.InvalidImportLine,
+ proc._build_map, 'foo bar baz')
+ self.assertRaises(errors.InvalidImportLine,
+ proc._build_map, 'improt foo')
+ self.assertRaises(errors.InvalidImportLine,
+ proc._build_map, 'importfoo')
+ self.assertRaises(errors.InvalidImportLine,
+ proc._build_map, 'fromimport')
+
+ def test_name_collision(self):
+ proc = lazy_import.ImportProcessor()
+ proc._build_map('import foo')
+
+ # All of these would try to create an object with the
+ # same name as an existing object.
+ self.assertRaises(errors.ImportNameCollision,
+ proc._build_map, 'import bar as foo')
+ self.assertRaises(errors.ImportNameCollision,
+ proc._build_map, 'from foo import bar as foo')
+ self.assertRaises(errors.ImportNameCollision,
+ proc._build_map, 'from bar import foo')
+
+
+class TestLazyImportProcessor(ImportReplacerHelper):
+
+ def test_root(self):
+ try:
+ root6
+ except NameError:
+ pass # root6 should not be defined yet
+ else:
+ self.fail('root6 was not supposed to exist yet')
+
+ text = 'import %s as root6' % (self.root_name,)
+ proc = lazy_import.ImportProcessor(InstrumentedImportReplacer)
+ proc.lazy_import(scope=globals(), text=text)
+
+ # So 'root6' should be a lazy import
+ self.assertEqual(InstrumentedImportReplacer,
+ object.__getattribute__(root6, '__class__'))
+
+ self.assertEqual(1, root6.var1)
+ self.assertEqual('x', root6.func1('x'))
+
+ self.assertEqual([('__getattribute__', 'var1'),
+ ('_import', 'root6'),
+ ('import', self.root_name, [], 0),
+ ], self.actions)
+
+ def test_import_deep(self):
+ """Test import root.mod, root.sub.submoda, root.sub.submodb
+ root should be a lazy import, with multiple children, who also
+ have children to be imported.
+ And when root is imported, the children should be lazy, and
+ reuse the intermediate lazy object.
+ """
+ try:
+ submoda7
+ except NameError:
+ pass # submoda7 should not be defined yet
+ else:
+ self.fail('submoda7 was not supposed to exist yet')
+
+ text = """\
+import %(root_name)s.%(sub_name)s.%(submoda_name)s as submoda7
+""" % self.__dict__
+ proc = lazy_import.ImportProcessor(InstrumentedImportReplacer)
+ proc.lazy_import(scope=globals(), text=text)
+
+ # So 'submoda7' should be a lazy import
+ self.assertEqual(InstrumentedImportReplacer,
+ object.__getattribute__(submoda7, '__class__'))
+
+ # This should import submoda7
+ self.assertEqual(4, submoda7.var4)
+
+ sub_path = self.root_name + '.' + self.sub_name
+ submoda_path = sub_path + '.' + self.submoda_name
+
+ self.assertEqual([('__getattribute__', 'var4'),
+ ('_import', 'submoda7'),
+ ('import', submoda_path, [], 0),
+ ], self.actions)
+
+ def test_lazy_import(self):
+ """Smoke test that lazy_import() does the right thing"""
+ try:
+ root8
+ except NameError:
+ pass # root8 should not be defined yet
+ else:
+ self.fail('root8 was not supposed to exist yet')
+ lazy_import.lazy_import(globals(),
+ 'import %s as root8' % (self.root_name,),
+ lazy_import_class=InstrumentedImportReplacer)
+
+ self.assertEqual(InstrumentedImportReplacer,
+ object.__getattribute__(root8, '__class__'))
+
+ self.assertEqual(1, root8.var1)
+ self.assertEqual(1, root8.var1)
+ self.assertEqual(1, root8.func1(1))
+
+ self.assertEqual([('__getattribute__', 'var1'),
+ ('_import', 'root8'),
+ ('import', self.root_name, [], 0),
+ ], self.actions)
+
+
+class TestScopeReplacerReentrance(TestCase):
+ """The ScopeReplacer should be reentrant.
+
+ Invoking a replacer while an invocation was already on-going leads to a
+ race to see which invocation will be the first to call _replace.
+ The losing caller used to see an exception (bugs 396819 and 702914).
+
+ These tests set up a tracer that stops at a suitable moment (upon
+ entry of a specified method) and starts another call to the
+ functionality in question (__call__, __getattribute__, __setattr_)
+ in order to win the race, setting up the original caller to lose.
+ """
+
+ def tracer(self, frame, event, arg):
+ if event != 'call':
+ return self.tracer
+ # Grab the name of the file that contains the code being executed.
+ code = frame.f_code
+ filename = code.co_filename
+ # Convert ".pyc" and ".pyo" file names to their ".py" equivalent.
+ filename = re.sub(r'\.py[co]$', '.py', filename)
+ function_name = code.co_name
+ # If we're executing a line of code from the right module...
+ if (filename.endswith('lazy_import.py') and
+ function_name == self.method_to_trace):
+ # We don't need to trace any more.
+ sys.settrace(None)
+ # Run another racer. This one will "win" the race.
+ self.racer()
+ return self.tracer
+
+ def run_race(self, racer, method_to_trace='_resolve'):
+ self.overrideAttr(lazy_import.ScopeReplacer, '_should_proxy', True)
+ self.racer = racer
+ self.method_to_trace = method_to_trace
+ sys.settrace(self.tracer)
+ self.racer() # Should not raise any exception
+ # Make sure the tracer actually found the code it was
+ # looking for. If not, maybe the code was refactored in
+ # such a way that these tests aren't needed any more.
+ self.assertEqual(None, sys.gettrace())
+
+ def test_call(self):
+ def factory(*args):
+ return factory
+ replacer = lazy_import.ScopeReplacer({}, factory, 'name')
+ self.run_race(replacer)
+
+ def test_setattr(self):
+ class Replaced:
+ pass
+
+ def factory(*args):
+ return Replaced()
+
+ replacer = lazy_import.ScopeReplacer({}, factory, 'name')
+
+ def racer():
+ replacer.foo = 42
+
+ self.run_race(racer)
+
+ def test_getattribute(self):
+ class Replaced:
+ foo = 'bar'
+
+ def factory(*args):
+ return Replaced()
+
+ replacer = lazy_import.ScopeReplacer({}, factory, 'name')
+
+ def racer():
+ replacer.foo
+
+ self.run_race(racer)