# Copyright (C) 2006, 2008-2011 Canonical Ltd # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA """Tests for the Registry classes""" import os import sys from bzrlib import ( branch, osutils, registry, tests, ) class TestRegistry(tests.TestCase): def register_stuff(self, a_registry): a_registry.register('one', 1) a_registry.register('two', 2) a_registry.register('four', 4) a_registry.register('five', 5) def test_registry(self): a_registry = registry.Registry() self.register_stuff(a_registry) self.assertTrue(a_registry.default_key is None) # test get() (self.default_key is None) self.assertRaises(KeyError, a_registry.get) self.assertRaises(KeyError, a_registry.get, None) self.assertEqual(2, a_registry.get('two')) self.assertRaises(KeyError, a_registry.get, 'three') # test _set_default_key a_registry.default_key = 'five' self.assertTrue(a_registry.default_key == 'five') self.assertEqual(5, a_registry.get()) self.assertEqual(5, a_registry.get(None)) # If they ask for a specific entry, they should get KeyError # not the default value. They can always pass None if they prefer self.assertRaises(KeyError, a_registry.get, 'six') self.assertRaises(KeyError, a_registry._set_default_key, 'six') # test keys() self.assertEqual(['five', 'four', 'one', 'two'], a_registry.keys()) def test_registry_funcs(self): a_registry = registry.Registry() self.register_stuff(a_registry) self.assertTrue('one' in a_registry) a_registry.remove('one') self.assertFalse('one' in a_registry) self.assertRaises(KeyError, a_registry.get, 'one') a_registry.register('one', 'one') self.assertEqual(['five', 'four', 'one', 'two'], sorted(a_registry.keys())) self.assertEqual([('five', 5), ('four', 4), ('one', 'one'), ('two', 2)], sorted(a_registry.iteritems())) def test_register_override(self): a_registry = registry.Registry() a_registry.register('one', 'one') self.assertRaises(KeyError, a_registry.register, 'one', 'two') self.assertRaises(KeyError, a_registry.register, 'one', 'two', override_existing=False) a_registry.register('one', 'two', override_existing=True) self.assertEqual('two', a_registry.get('one')) self.assertRaises(KeyError, a_registry.register_lazy, 'one', 'three', 'four') a_registry.register_lazy('one', 'module', 'member', override_existing=True) def test_registry_help(self): a_registry = registry.Registry() a_registry.register('one', 1, help='help text for one') # We should not have to import the module to return the help # information a_registry.register_lazy('two', 'nonexistent_module', 'member', help='help text for two') # We should be able to handle a callable to get information help_calls = [] def generic_help(reg, key): help_calls.append(key) return 'generic help for %s' % (key,) a_registry.register('three', 3, help=generic_help) a_registry.register_lazy('four', 'nonexistent_module', 'member2', help=generic_help) a_registry.register('five', 5) def help_from_object(reg, key): obj = reg.get(key) return obj.help() class SimpleObj(object): def help(self): return 'this is my help' a_registry.register('six', SimpleObj(), help=help_from_object) self.assertEqual('help text for one', a_registry.get_help('one')) self.assertEqual('help text for two', a_registry.get_help('two')) self.assertEqual('generic help for three', a_registry.get_help('three')) self.assertEqual(['three'], help_calls) self.assertEqual('generic help for four', a_registry.get_help('four')) self.assertEqual(['three', 'four'], help_calls) self.assertEqual(None, a_registry.get_help('five')) self.assertEqual('this is my help', a_registry.get_help('six')) self.assertRaises(KeyError, a_registry.get_help, None) self.assertRaises(KeyError, a_registry.get_help, 'seven') a_registry.default_key = 'one' self.assertEqual('help text for one', a_registry.get_help(None)) self.assertRaises(KeyError, a_registry.get_help, 'seven') self.assertEqual([('five', None), ('four', 'generic help for four'), ('one', 'help text for one'), ('six', 'this is my help'), ('three', 'generic help for three'), ('two', 'help text for two'), ], sorted((key, a_registry.get_help(key)) for key in a_registry.keys())) # We don't know what order it was called in, but we should get # 2 more calls to three and four self.assertEqual(['four', 'four', 'three', 'three'], sorted(help_calls)) def test_registry_info(self): a_registry = registry.Registry() a_registry.register('one', 1, info='string info') # We should not have to import the module to return the info a_registry.register_lazy('two', 'nonexistent_module', 'member', info=2) # We should be able to handle a callable to get information a_registry.register('three', 3, info=['a', 'list']) obj = object() a_registry.register_lazy('four', 'nonexistent_module', 'member2', info=obj) a_registry.register('five', 5) self.assertEqual('string info', a_registry.get_info('one')) self.assertEqual(2, a_registry.get_info('two')) self.assertEqual(['a', 'list'], a_registry.get_info('three')) self.assertIs(obj, a_registry.get_info('four')) self.assertIs(None, a_registry.get_info('five')) self.assertRaises(KeyError, a_registry.get_info, None) self.assertRaises(KeyError, a_registry.get_info, 'six') a_registry.default_key = 'one' self.assertEqual('string info', a_registry.get_info(None)) self.assertRaises(KeyError, a_registry.get_info, 'six') self.assertEqual([('five', None), ('four', obj), ('one', 'string info'), ('three', ['a', 'list']), ('two', 2), ], sorted((key, a_registry.get_info(key)) for key in a_registry.keys())) def test_get_prefix(self): my_registry = registry.Registry() http_object = object() sftp_object = object() my_registry.register('http:', http_object) my_registry.register('sftp:', sftp_object) found_object, suffix = my_registry.get_prefix('http://foo/bar') self.assertEqual('//foo/bar', suffix) self.assertIs(http_object, found_object) self.assertIsNot(sftp_object, found_object) found_object, suffix = my_registry.get_prefix('sftp://baz/qux') self.assertEqual('//baz/qux', suffix) self.assertIs(sftp_object, found_object) class TestRegistryIter(tests.TestCase): """Test registry iteration behaviors. There are dark corner cases here when the registered objects trigger addition in the iterated registry. """ def setUp(self): super(TestRegistryIter, self).setUp() # We create a registry with "official" objects and "hidden" # objects. The later represent the side effects that led to bug #277048 # and #430510 _registry = registry.Registry() def register_more(): _registry.register('hidden', None) # Avoid closing over self by binding local variable self.registry = _registry self.registry.register('passive', None) self.registry.register('active', register_more) self.registry.register('passive-too', None) class InvasiveGetter(registry._ObjectGetter): def get_obj(inner_self): # Surprise ! Getting a registered object (think lazy loaded # module) register yet another object ! _registry.register('more hidden', None) return inner_self._obj self.registry.register('hacky', None) # We peek under the covers because the alternative is to use lazy # registration and create a module that can reference our test registry # it's too much work for such a corner case -- vila 090916 self.registry._dict['hacky'] = InvasiveGetter(None) def _iter_them(self, iter_func_name): iter_func = getattr(self.registry, iter_func_name, None) self.assertIsNot(None, iter_func) count = 0 for name, func in iter_func(): count += 1 self.assertFalse(name in ('hidden', 'more hidden')) if func is not None: # Using an object register another one as a side effect func() self.assertEqual(4, count) def test_iteritems(self): # the dict is modified during the iteration self.assertRaises(RuntimeError, self._iter_them, 'iteritems') def test_items(self): # we should be able to iterate even if one item modify the dict self._iter_them('items') class TestRegistryWithDirs(tests.TestCaseInTempDir): """Registry tests that require temporary dirs""" def create_plugin_file(self, contents): """Create a file to be used as a plugin. This is created in a temporary directory, so that we are sure that it doesn't start in the plugin path. """ os.mkdir('tmp') plugin_name = 'bzr_plugin_a_%s' % (osutils.rand_chars(4),) with open('tmp/'+plugin_name+'.py', 'wb') as f: f.write(contents) return plugin_name def create_simple_plugin(self): return self.create_plugin_file( 'object1 = "foo"\n' '\n\n' 'def function(a,b,c):\n' ' return a,b,c\n' '\n\n' 'class MyClass(object):\n' ' def __init__(self, a):\n' ' self.a = a\n' '\n\n' ) def test_lazy_import_registry_foo(self): a_registry = registry.Registry() a_registry.register_lazy('foo', 'bzrlib.branch', 'Branch') a_registry.register_lazy('bar', 'bzrlib.branch', 'Branch.hooks') self.assertEqual(branch.Branch, a_registry.get('foo')) self.assertEqual(branch.Branch.hooks, a_registry.get('bar')) def test_lazy_import_registry(self): plugin_name = self.create_simple_plugin() a_registry = registry.Registry() a_registry.register_lazy('obj', plugin_name, 'object1') a_registry.register_lazy('function', plugin_name, 'function') a_registry.register_lazy('klass', plugin_name, 'MyClass') a_registry.register_lazy('module', plugin_name, None) self.assertEqual(['function', 'klass', 'module', 'obj'], sorted(a_registry.keys())) # The plugin should not be loaded until we grab the first object self.assertFalse(plugin_name in sys.modules) # By default the plugin won't be in the search path self.assertRaises(ImportError, a_registry.get, 'obj') plugin_path = os.getcwd() + '/tmp' sys.path.append(plugin_path) try: obj = a_registry.get('obj') self.assertEqual('foo', obj) self.assertTrue(plugin_name in sys.modules) # Now grab another object func = a_registry.get('function') self.assertEqual(plugin_name, func.__module__) self.assertEqual('function', func.__name__) self.assertEqual((1, [], '3'), func(1, [], '3')) # And finally a class klass = a_registry.get('klass') self.assertEqual(plugin_name, klass.__module__) self.assertEqual('MyClass', klass.__name__) inst = klass(1) self.assertIsInstance(inst, klass) self.assertEqual(1, inst.a) module = a_registry.get('module') self.assertIs(obj, module.object1) self.assertIs(func, module.function) self.assertIs(klass, module.MyClass) finally: sys.path.remove(plugin_path) def test_lazy_import_get_module(self): a_registry = registry.Registry() a_registry.register_lazy('obj', "bzrlib.tests.test_registry", 'object1') self.assertEquals("bzrlib.tests.test_registry", a_registry._get_module("obj")) def test_normal_get_module(self): class AThing(object): """Something""" a_registry = registry.Registry() a_registry.register("obj", AThing()) self.assertEquals("bzrlib.tests.test_registry", a_registry._get_module("obj"))