summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorErik Olof Gunnar Andersson <eandersson@blizzard.com>2019-06-09 16:15:15 -0700
committerErik Olof Gunnar Andersson <eandersson@blizzard.com>2019-06-28 10:29:38 -0700
commitee04eaf2af1f0a39df2feadac8b882b26414a714 (patch)
tree023cfd63424c07e7ceaa45b5b59826b3d5d524b8
parent08684a2c8e1f33362e142ae6e50f5067e9424860 (diff)
downloaddesignate-ee04eaf2af1f0a39df2feadac8b882b26414a714.tar.gz
Remove unused code in utils and improved utils testing
* Added some additional tests on previously untested code. * Cleaned up existing tests. * Moved existing utils tests to designate.tests.unit. * Moved test utility code into a separate utility file. * Removed utility code no longer referenced anywhere. Change-Id: I9af77adb8a0b5c2ee076eeffe993100d8c65196c
-rw-r--r--designate/tests/test_utils.py194
-rw-r--r--designate/tests/unit/test_utils.py418
-rw-r--r--designate/tests/unit/utils.py78
-rw-r--r--designate/tests/unit/workers/test_zone_tasks.py2
-rw-r--r--designate/utils.py95
5 files changed, 495 insertions, 292 deletions
diff --git a/designate/tests/test_utils.py b/designate/tests/test_utils.py
deleted file mode 100644
index 7ffacee5..00000000
--- a/designate/tests/test_utils.py
+++ /dev/null
@@ -1,194 +0,0 @@
-# Copyright 2012 Managed I.T.
-#
-# Author: Kiall Mac Innes <kiall@managedit.ie>
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import os
-import functools
-import tempfile
-
-import six
-import testtools
-from mock import Mock
-from jinja2 import Template
-
-from designate.tests import TestCase
-from designate import exceptions
-from designate import utils
-
-
-class TestUtils(TestCase):
- def test_resource_string(self):
- name = ['templates', 'bind9-zone.jinja2']
-
- resource_string = utils.resource_string(*name)
-
- self.assertIsNotNone(resource_string)
-
- def test_resource_string_missing(self):
- name = 'invalid.jinja2'
-
- with testtools.ExpectedException(exceptions.ResourceNotFound):
- utils.resource_string(name)
-
- def test_resource_string_empty_args(self):
- with testtools.ExpectedException(ValueError):
- utils.resource_string()
-
- def test_load_schema_missing(self):
- with testtools.ExpectedException(exceptions.ResourceNotFound):
- utils.load_schema('v1', 'missing')
-
- def test_load_template(self):
- name = 'bind9-zone.jinja2'
-
- template = utils.load_template(name)
-
- self.assertIsInstance(template, Template)
-
- def test_load_template_keep_trailing_newline(self):
- name = 'bind9-zone.jinja2'
- template = utils.load_template(name)
- self.assertTrue(template.environment.keep_trailing_newline)
-
- def test_load_template_missing(self):
- name = 'invalid.jinja2'
-
- with testtools.ExpectedException(exceptions.ResourceNotFound):
- utils.load_template(name)
-
- def test_render_template(self):
- template = Template("Hello {{name}}")
-
- result = utils.render_template(template, name="World")
-
- self.assertEqual('Hello World', result)
-
- def test_render_template_to_file(self):
- output_path = tempfile.mktemp()
-
- template = Template("Hello {{name}}")
-
- utils.render_template_to_file(template, output_path=output_path,
- name="World")
-
- self.assertTrue(os.path.exists(output_path))
-
- try:
- with open(output_path, 'r') as fh:
- self.assertEqual('Hello World', fh.read())
- finally:
- os.unlink(output_path)
-
- def test_increment_serial(self):
- ret_serial = utils.increment_serial(serial=20)
- self.assertGreater(ret_serial, 20)
-
- def test_is_uuid_like(self):
- uuid_str = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'
- self.assertTrue(utils.is_uuid_like(uuid_str))
- uuid_str = '678'
- self.assertFalse(utils.is_uuid_like(uuid_str))
-
- def test_split_host_port(self):
- host_port = "abc:abc"
- host, port = utils.split_host_port(host_port)
- self.assertEqual((host, port), ("abc:abc", 53))
-
- host_port = "abc:25"
- host, port = utils.split_host_port(host_port)
- self.assertEqual((host, port), ("abc", 25))
-
- def test_get_paging_params_invalid_limit(self):
- context = Mock()
- for value in [9223372036854775809, -1]:
- with testtools.ExpectedException(exceptions.InvalidLimit):
- utils.get_paging_params(context, {'limit': value}, [])
-
- def test_get_paging_params_max_limit(self):
- context = Mock()
- self.config(max_limit_v2=1000, group='service:api')
- result = utils.get_paging_params(context, {'limit': "max"}, [])
- self.assertEqual(result[1], 1000)
-
- def test_get_paging_params_invalid_sort_dir(self):
- context = Mock()
- with testtools.ExpectedException(exceptions.InvalidSortDir):
- utils.get_paging_params(context, {'sort_dir': "dsc"}, [])
-
- def test_get_paging_params_invalid_sort_key(self):
- context = Mock()
- with testtools.ExpectedException(exceptions.InvalidSortKey):
- utils.get_paging_params(context, {'sort_key': "dsc"},
- ['asc', 'desc'])
-
-
-def def_method(f, *args, **kwargs):
- @functools.wraps(f)
- def new_method(self):
- return f(self, *args, **kwargs)
- return new_method
-
-
-def parameterized_class(cls):
- """A class decorator for running parameterized test cases.
- Mark your class with @parameterized_class.
- Mark your test cases with @parameterized.
- """
- test_functions = {
- k: v for k, v in vars(cls).items() if k.startswith('test')
- }
- for name, f in test_functions.items():
- if not hasattr(f, '_test_data'):
- continue
-
- # remove the original test function from the class
- delattr(cls, name)
-
- # add a new test function to the class for each entry in f._test_data
- for tag, args in f._test_data.items():
- new_name = "{0}_{1}".format(f.__name__, tag)
- if hasattr(cls, new_name):
- raise Exception(
- "Parameterized test case '{0}.{1}' created from '{0}.{2}' "
- "already exists".format(cls.__name__, new_name, name))
-
- # Using `def new_method(self): f(self, **args)` is not sufficient
- # (all new_methods use the same args value due to late binding).
- # Instead, use this factory function.
- new_method = def_method(f, **args)
-
- # To add a method to a class, available for all instances:
- # MyClass.method = types.MethodType(f, None, MyClass)
- setattr(cls, new_name, six.create_unbound_method(new_method, cls))
- return cls
-
-
-def parameterized(data):
- """A function decorator for parameterized test cases.
- Example:
- @parameterized({
- 'zero': dict(val=0),
- 'one': dict(val=1),
- })
- def test_val(self, val):
- self.assertEqual(self.get_val(), val)
- The above will generate two test cases:
- `test_val_zero` which runs with val=0
- `test_val_one` which runs with val=1
- :param data: A dictionary that looks like {tag: {arg1: val1, ...}}
- """
- def wrapped(f):
- f._test_data = data
- return f
- return wrapped
diff --git a/designate/tests/unit/test_utils.py b/designate/tests/unit/test_utils.py
index 14d69695..0285e18e 100644
--- a/designate/tests/unit/test_utils.py
+++ b/designate/tests/unit/test_utils.py
@@ -11,19 +11,407 @@
# under the License.
import random
+import jinja2
import mock
import oslotest.base
+from oslo_concurrency import processutils
+from oslo_config import cfg
+from oslo_config import fixture as cfg_fixture
+from oslo_utils import timeutils
+from designate import exceptions
from designate import utils
from designate.tests import fixtures
+CONF = cfg.CONF
-class TestSocket(oslotest.base.BaseTestCase):
+
+class TestUtils(oslotest.base.BaseTestCase):
def setUp(self):
- super(TestSocket, self).setUp()
+ super(TestUtils, self).setUp()
self.stdlog = fixtures.StandardLogging()
+ self.useFixture(cfg_fixture.Config(CONF))
self.useFixture(self.stdlog)
+ @mock.patch('os.path.exists')
+ @mock.patch('os.path.abspath')
+ def test_find_config(self, mock_abspath, mock_path_exists):
+ CONF.set_override('pybasedir', '/tmp/workspace/designate')
+
+ mock_path_exists.side_effect = [True, False, False, False, False]
+ mock_abspath.return_value = '/tmp/designate/designate.conf'
+
+ config_files = utils.find_config('designate.conf')
+
+ self.assertEqual(['/tmp/designate/designate.conf'], config_files)
+ mock_abspath.assert_called_once()
+
+ @mock.patch.object(processutils, 'execute')
+ def test_execute(self, mock_execute):
+ mock_execute.return_value = ('designate.conf\npools.yaml\n', '')
+
+ out, err = utils.execute(
+ '/bin/ls', '/etc/designate/',
+ run_as_root=False
+ )
+
+ mock_execute.assert_called_once_with(
+ '/bin/ls', '/etc/designate/',
+ root_helper='sudo designate-rootwrap /etc/designate/rootwrap.conf',
+ run_as_root=False
+ )
+
+ self.assertEqual('designate.conf\npools.yaml\n', out)
+ self.assertFalse(err)
+
+ @mock.patch.object(processutils, 'execute')
+ def test_execute_with_rootwrap(self, mock_execute):
+ CONF.set_override('root_helper', 'sudo designate-test')
+
+ mock_execute.return_value = ('designate.conf\npools.yaml\n', '')
+
+ out, err = utils.execute(
+ '/bin/ls', '/etc/designate/',
+ run_as_root=True
+ )
+
+ mock_execute.assert_called_once_with(
+ '/bin/ls', '/etc/designate/',
+ root_helper='sudo designate-test',
+ run_as_root=True
+ )
+
+ self.assertEqual('designate.conf\npools.yaml\n', out)
+ self.assertFalse(err)
+
+ def test_deep_dict_merge(self):
+ a = {
+ 'a': {'dns': 'record'},
+ 'b': 'b',
+ 'c': 'c',
+ }
+
+ b = {
+ 'a': {'domain': 'zone'},
+ 'c': 1,
+ 'd': 'd',
+ }
+
+ self.assertEqual(
+ {
+ 'a': {
+ 'dns': 'record', 'domain': 'zone'
+ },
+ 'b': 'b', 'c': 1, 'd': 'd'
+ },
+ utils.deep_dict_merge(a, b)
+ )
+
+ def test_deep_dict_merge_not_dict(self):
+ result = utils.deep_dict_merge(dict(), list())
+
+ self.assertIsInstance(result, list)
+
+ def test_get_proxies(self):
+ CONF.set_override('no_proxy', 'example.com', 'proxy')
+ CONF.set_override('http_proxy', 'example.org', 'proxy')
+ CONF.set_override('https_proxy', 'example.net', 'proxy')
+
+ result = utils.get_proxies()
+
+ self.assertEqual(['example.com'], result.get('no_proxy'))
+ self.assertEqual('example.org', result.get('http'))
+ self.assertEqual('example.net', result.get('https'))
+
+ def test_get_proxies_default_values(self):
+ result = utils.get_proxies()
+
+ self.assertIsNone(result.get('no_proxy'))
+ self.assertIsNone(result.get('http'))
+ self.assertIsNone(result.get('https'))
+
+ def test_get_proxies_with_no_proxy(self):
+ CONF.set_override('no_proxy', 'example.org', 'proxy')
+
+ result = utils.get_proxies()
+
+ self.assertEqual(['example.org'], result.get('no_proxy'))
+ self.assertIsNone(result.get('http'))
+ self.assertIsNone(result.get('https'))
+
+ def test_get_proxies_with_http_proxy(self):
+ CONF.set_override('http_proxy', 'example.org', 'proxy')
+
+ result = utils.get_proxies()
+
+ self.assertIsNone(result.get('no_proxy'))
+ self.assertEqual('example.org', result.get('http'))
+ self.assertEqual('example.org', result.get('https'))
+
+ def test_get_proxies_with_https_proxy(self):
+ CONF.set_override('https_proxy', 'example.org', 'proxy')
+
+ result = utils.get_proxies()
+
+ self.assertIsNone(result.get('no_proxy'))
+ self.assertIsNone(result.get('http'))
+ self.assertEqual('example.org', result.get('https'))
+
+ def test_resource_string(self):
+ resource_string = utils.resource_string(
+ 'templates', 'bind9-zone.jinja2'
+ )
+
+ self.assertIsNotNone(resource_string)
+
+ def test_resource_string_missing(self):
+ self.assertRaisesRegex(
+ exceptions.ResourceNotFound,
+ 'Could not find the requested resource',
+ utils.resource_string, 'invalid.jinja2'
+ )
+
+ def test_resource_string_empty_args(self):
+ self.assertRaises(
+ ValueError,
+ utils.resource_string
+ )
+
+ def test_load_schema_missing(self):
+ self.assertRaisesRegex(
+ exceptions.ResourceNotFound,
+ 'Could not find the requested resource',
+ utils.load_schema, 'v1', 'missing'
+ )
+
+ @mock.patch.object(utils, 'resource_string')
+ def test_load_template(self, mock_resource_string):
+ mock_resource_string.return_value = 'Hello {{name}}'.encode('utf-8')
+
+ template = utils.load_template('bind9-zone.jinja2')
+
+ self.assertIsInstance(template, jinja2.Template)
+
+ @mock.patch.object(utils, 'resource_string')
+ def test_load_template_keep_trailing_newline(self, mock_resource_string):
+ mock_resource_string.return_value = 'Hello {{name}}'.encode('utf-8')
+
+ template = utils.load_template('bind9-zone.jinja2')
+
+ self.assertTrue(template.environment.keep_trailing_newline)
+
+ def test_load_template_missing(self):
+ self.assertRaises(
+ exceptions.ResourceNotFound,
+ utils.load_template, 'invalid.jinja2'
+ )
+
+ def test_render_template(self):
+ template = jinja2.Template('Hello {{name}}')
+
+ result = utils.render_template(template, name='World')
+
+ self.assertEqual('Hello World', result)
+
+ @mock.patch('six.moves.builtins.open', new_callable=mock.mock_open)
+ @mock.patch('os.path.exists')
+ def test_render_template_to_file(self, mock_exists, mock_open):
+ mock_exists.return_value = True
+
+ output_path = '/tmp/designate/resources/templates/hello.jinja2'
+
+ template = jinja2.Template('Hello {{name}}')
+
+ utils.render_template_to_file(
+ template, makedirs=False, output_path=output_path, name='World'
+ )
+
+ mock_open.assert_called_once_with(output_path, 'w')
+ mock_open().write.assert_called_once_with('Hello World')
+
+ @mock.patch('six.moves.builtins.open', new_callable=mock.mock_open)
+ @mock.patch('os.path.exists')
+ @mock.patch('os.makedirs')
+ def test_render_template_to_file_with_makedirs(self, mock_makedirs,
+ mock_exists,
+ mock_open):
+ mock_exists.return_value = False
+
+ output_path = '/tmp/designate/resources/templates/hello.jinja2'
+
+ template = jinja2.Template('Hello {{name}}')
+
+ utils.render_template_to_file(
+ template, makedirs=True, output_path=output_path, name='World'
+ )
+
+ mock_makedirs.assert_called_once_with(
+ '/tmp/designate/resources/templates'
+ )
+ mock_open.assert_called_once_with(output_path, 'w')
+ mock_open().write.assert_called_once_with('Hello World')
+
+ @mock.patch.object(timeutils, 'utcnow_ts')
+ def test_increment_serial_lower_than_ts(self, mock_utcnow_ts):
+ mock_utcnow_ts.return_value = 1561698354
+
+ ret_serial = utils.increment_serial(serial=1)
+
+ self.assertEqual(1561698354, ret_serial)
+
+ @mock.patch.object(timeutils, 'utcnow_ts')
+ def test_increment_serial_higher_than_ts(self, mock_utcnow_ts):
+ mock_utcnow_ts.return_value = 1561698354
+
+ ret_serial = utils.increment_serial(serial=1561698354 * 2)
+
+ self.assertEqual(1561698354 * 2 + 1, ret_serial)
+
+ def test_is_uuid_like(self):
+ self.assertTrue(
+ utils.is_uuid_like('ce9fcd6b-d546-4397-8a49-8ceaec37cb64')
+ )
+
+ def test_is_not_uuid_like(self):
+ self.assertFalse(utils.is_uuid_like('678'))
+
+ def test_split_host_port(self):
+ host, port = utils.split_host_port('abc:25')
+ self.assertEqual(('abc', 25), (host, port))
+
+ def test_split_host_port_with_invalid_port(self):
+ host, port = utils.split_host_port('abc:abc')
+ self.assertEqual(('abc:abc', 53), (host, port))
+
+ def test_get_paging_params(self):
+ CONF.set_override('default_limit_v2', 100, 'service:api')
+
+ context = mock.Mock()
+ params = {
+ 'updated_at': None,
+ 'created_at': '2019-06-28T04:17:34.000000',
+ 'pattern': 'blacklisted.com.',
+ 'id': 'f6663a98-281e-4cea-b0c3-3bc425e086ea',
+ }
+
+ marker, limit, sort_key, sort_dir = utils.get_paging_params(
+ context, params, ['created_at', 'id', 'updated_at', 'pattern']
+ )
+
+ self.assertIsNone(marker)
+ self.assertEqual(100, limit)
+ self.assertIsNone(sort_key)
+ self.assertIsNone(sort_dir)
+
+ def test_get_paging_params_without_sort_keys(self):
+ CONF.set_override('default_limit_v2', 0, 'service:api')
+
+ context = mock.Mock()
+ params = {
+ 'updated_at': None,
+ 'created_at': '2019-06-28T04:17:34.000000',
+ 'pattern': 'blacklisted.com.',
+ 'id': 'f6663a98-281e-4cea-b0c3-3bc425e086ea',
+ }
+
+ marker, limit, sort_key, sort_dir = utils.get_paging_params(
+ context, params, sort_keys=None
+ )
+
+ self.assertIsNone(marker)
+ self.assertEqual(0, limit)
+ self.assertIsNone(sort_key)
+ self.assertIsNone(sort_dir)
+
+ def test_get_paging_params_sort_by_tenant_id(self):
+ CONF.set_override('default_limit_v2', 100, 'service:api')
+
+ context = mock.Mock()
+ context.all_tenants = True
+ params = {
+ 'updated_at': None,
+ 'created_at': '2019-06-28T04:17:34.000000',
+ 'pattern': 'blacklisted.com.',
+ 'id': 'f6663a98-281e-4cea-b0c3-3bc425e086ea',
+ 'sort_key': 'tenant_id',
+ }
+
+ marker, limit, sort_key, sort_dir = utils.get_paging_params(
+ context, params,
+ ['created_at', 'id', 'updated_at', 'pattern', 'tenant_id']
+ )
+
+ self.assertIsNone(marker)
+ self.assertEqual(100, limit)
+ self.assertEqual('tenant_id', sort_key)
+ self.assertIsNone(sort_dir)
+
+ def test_get_paging_params_sort_tenant_without_all_tenants(self):
+ CONF.set_override('default_limit_v2', 100, 'service:api')
+
+ context = mock.Mock()
+ context.all_tenants = False
+ params = {
+ 'updated_at': None,
+ 'created_at': '2019-06-28T04:17:34.000000',
+ 'pattern': 'blacklisted.com.',
+ 'id': 'f6663a98-281e-4cea-b0c3-3bc425e086ea',
+ 'sort_key': 'tenant_id',
+ }
+
+ marker, limit, sort_key, sort_dir = utils.get_paging_params(
+ context, params,
+ ['created_at', 'id', 'updated_at', 'pattern', 'tenant_id']
+ )
+
+ self.assertIsNone(marker)
+ self.assertEqual(100, limit)
+ self.assertIsNone(sort_key)
+ self.assertIsNone(sort_dir)
+
+ def test_get_paging_params_invalid_limit(self):
+ context = mock.Mock()
+
+ self.assertRaises(
+ exceptions.InvalidLimit,
+ utils.get_paging_params,
+ context, {'limit': 9223372036854775809}, []
+ )
+
+ self.assertRaises(
+ exceptions.InvalidLimit,
+ utils.get_paging_params,
+ context, {'limit': -1}, []
+ )
+
+ def test_get_paging_params_max_limit(self):
+ CONF.set_override('max_limit_v2', 1000, 'service:api')
+
+ context = mock.Mock()
+
+ result = utils.get_paging_params(context, {'limit': 'max'}, [])
+
+ self.assertEqual(result[1], 1000)
+
+ def test_get_paging_params_invalid_sort_dir(self):
+ context = mock.Mock()
+
+ self.assertRaisesRegex(
+ exceptions.InvalidSortDir,
+ 'Unknown sort direction, must be',
+ utils.get_paging_params, context, {'sort_dir': 'dsc'}, []
+ )
+
+ def test_get_paging_params_invalid_sort_key(self):
+ context = mock.Mock()
+
+ self.assertRaisesRegex(
+ exceptions.InvalidSortKey,
+ 'sort key must be one of',
+ utils.get_paging_params, context, {'sort_key': 'dsc'},
+ ['asc', 'desc']
+ )
+
@mock.patch('socket.socket')
def test_bind_tcp(self, mock_sock_impl):
mock_sock = mock.MagicMock()
@@ -58,6 +446,19 @@ class TestSocket(oslotest.base.BaseTestCase):
)
@mock.patch('socket.socket')
+ def test_bind_tcp_without_reuse_port(self, mock_sock_impl):
+ mock_sock = mock.MagicMock()
+ mock_sock_impl.return_value = mock_sock
+ mock_sock.setsockopt.side_effect = [None, None, AttributeError, None]
+
+ utils.bind_tcp('127.0.0.1', 53, 100, 1)
+
+ self.assertIn(
+ 'SO_REUSEPORT not available, ignoring.',
+ self.stdlog.logger.output
+ )
+
+ @mock.patch('socket.socket')
def test_bind_udp(self, mock_sock_impl):
mock_sock = mock.MagicMock()
mock_sock_impl.return_value = mock_sock
@@ -87,3 +488,16 @@ class TestSocket(oslotest.base.BaseTestCase):
'Listening on UDP port %(port)d' % {'port': random_port},
self.stdlog.logger.output
)
+
+ @mock.patch('socket.socket')
+ def test_bind_udp_without_reuse_port(self, mock_sock_impl):
+ mock_sock = mock.MagicMock()
+ mock_sock_impl.return_value = mock_sock
+ mock_sock.setsockopt.side_effect = [None, AttributeError]
+
+ utils.bind_udp('127.0.0.1', 53)
+
+ self.assertIn(
+ 'SO_REUSEPORT not available, ignoring.',
+ self.stdlog.logger.output
+ )
diff --git a/designate/tests/unit/utils.py b/designate/tests/unit/utils.py
new file mode 100644
index 00000000..8226f62c
--- /dev/null
+++ b/designate/tests/unit/utils.py
@@ -0,0 +1,78 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import functools
+
+import six
+
+
+def def_method(f, *args, **kwargs):
+ @functools.wraps(f)
+ def new_method(self):
+ return f(self, *args, **kwargs)
+
+ return new_method
+
+
+def parameterized_class(cls):
+ """A class decorator for running parameterized test cases.
+ Mark your class with @parameterized_class.
+ Mark your test cases with @parameterized.
+ """
+ test_functions = {
+ k: v for k, v in vars(cls).items() if k.startswith('test')
+ }
+ for name, f in test_functions.items():
+ if not hasattr(f, '_test_data'):
+ continue
+
+ # remove the original test function from the class
+ delattr(cls, name)
+
+ # add a new test function to the class for each entry in f._test_data
+ for tag, args in f._test_data.items():
+ new_name = "{0}_{1}".format(f.__name__, tag)
+ if hasattr(cls, new_name):
+ raise Exception(
+ "Parameterized test case '{0}.{1}' created from '{0}.{2}' "
+ "already exists".format(cls.__name__, new_name, name))
+
+ # Using `def new_method(self): f(self, **args)` is not sufficient
+ # (all new_methods use the same args value due to late binding).
+ # Instead, use this factory function.
+ new_method = def_method(f, **args)
+
+ # To add a method to a class, available for all instances:
+ # MyClass.method = types.MethodType(f, None, MyClass)
+ setattr(cls, new_name, six.create_unbound_method(new_method, cls))
+ return cls
+
+
+def parameterized(data):
+ """A function decorator for parameterized test cases.
+ Example:
+ @parameterized({
+ 'zero': dict(val=0),
+ 'one': dict(val=1),
+ })
+ def test_val(self, val):
+ self.assertEqual(self.get_val(), val)
+ The above will generate two test cases:
+ `test_val_zero` which runs with val=0
+ `test_val_one` which runs with val=1
+ :param data: A dictionary that looks like {tag: {arg1: val1, ...}}
+ """
+
+ def wrapped(f):
+ f._test_data = data
+ return f
+
+ return wrapped
diff --git a/designate/tests/unit/workers/test_zone_tasks.py b/designate/tests/unit/workers/test_zone_tasks.py
index d1b4df5d..020792ea 100644
--- a/designate/tests/unit/workers/test_zone_tasks.py
+++ b/designate/tests/unit/workers/test_zone_tasks.py
@@ -19,9 +19,9 @@ import oslotest.base
from oslo_config import cfg
from oslo_config import fixture as cfg_fixture
-import designate.tests.test_utils as utils
from designate import exceptions
from designate import objects
+from designate.tests.unit import utils
from designate.worker import processing
from designate.worker import utils as wutils
from designate.worker.tasks import zone
diff --git a/designate/utils.py b/designate/utils.py
index fe32fb8f..84e4f2ea 100644
--- a/designate/utils.py
+++ b/designate/utils.py
@@ -140,55 +140,6 @@ def execute(*cmd, **kw):
root_helper=root_helper, **kw)
-def get_item_properties(item, fields, mixed_case_fields=None, formatters=None):
- """Return a tuple containing the item properties.
-
- :param item: a single item resource (e.g. Server, Tenant, etc)
- :param fields: tuple of strings with the desired field names
- :param mixed_case_fields: tuple of field names to preserve case
- :param formatters: dictionary mapping field names to callables
- to format the values
- """
- row = []
- mixed_case_fields = mixed_case_fields or []
- formatters = formatters or {}
-
- for field in fields:
- if field in formatters:
- row.append(formatters[field](item))
- else:
- if field in mixed_case_fields:
- field_name = field.replace(' ', '_')
- else:
- field_name = field.lower().replace(' ', '_')
- if not hasattr(item, field_name) and \
- (isinstance(item, dict) and field_name in item):
- data = item[field_name]
- else:
- data = getattr(item, field_name, '')
- if data is None:
- data = ''
- row.append(data)
- return tuple(row)
-
-
-def get_columns(data):
- """
- Some row's might have variable count of columns, ensure that we have the
- same.
-
- :param data: Results in [{}, {]}]
- """
- columns = set()
-
- def _seen(col):
- columns.add(str(col))
-
- six.moves.map(lambda item: six.moves.map(_seen,
- list(six.iterkeys(item))), data)
- return list(columns)
-
-
def increment_serial(serial=0):
# This provides for *roughly* unix timestamp based serial numbers
new_serial = timeutils.utcnow_ts()
@@ -199,44 +150,6 @@ def increment_serial(serial=0):
return new_serial
-def quote_string(string):
- inparts = string.split(' ')
- outparts = []
- tmp = None
-
- for part in inparts:
- if part == '':
- continue
- elif part[0] == '"' and part[-1:] == '"' and part[-2:] != '\\"':
- # Handle Quoted Words
- outparts.append(part.strip('"'))
- elif part[0] == '"':
- # Handle Start of Quoted Sentance
- tmp = part[1:]
- elif tmp is not None and part[-1:] == '"' and part[-2:] != '\\"':
- # Handle End of Quoted Sentance
- tmp += " " + part.strip('"')
- outparts.append(tmp)
- tmp = None
- elif tmp is not None:
- # Handle Middle of Quoted Sentance
- tmp += " " + part
- else:
- # Handle Standalone words
- outparts.append(part)
-
- if tmp is not None:
- # Handle unclosed quoted strings
- outparts.append(tmp)
-
- # This looks odd, but both calls are necessary to ensure the end results
- # is always consistent.
- outparts = [o.replace('\\"', '"') for o in outparts]
- outparts = [o.replace('"', '\\"') for o in outparts]
-
- return '"' + '" "'.join(outparts) + '"'
-
-
def deep_dict_merge(a, b):
if not isinstance(b, dict):
return b
@@ -324,14 +237,6 @@ def get_proxies():
return proxies
-def extract_priority_from_data(recordset_type, record):
- priority, data = None, record['data']
- if recordset_type in ('MX', 'SRV'):
- priority, _, data = record['data'].partition(" ")
- priority = int(priority)
- return priority, data
-
-
def cache_result(function):
"""A function decorator to cache the result of the first call, every
additional call will simply return the cached value.