diff options
author | Jenkins <jenkins@review.openstack.org> | 2016-01-18 14:08:51 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2016-01-18 14:08:51 +0000 |
commit | 040747d228d9905a3ae4746f37fe44fca2a1bb78 (patch) | |
tree | 958ac7b1b3920334a8b850800b78724c6aed0678 | |
parent | a26a6fa0191126300018c21ee8e9b78b47971c80 (diff) | |
parent | 9b5f4c1f3d9d827252ae6b8f253fc572df827cea (diff) | |
download | designate-040747d228d9905a3ae4746f37fe44fca2a1bb78.tar.gz |
Merge "Add retry logic on periodic_sync to stable/liberty" into stable/liberty
-rw-r--r-- | designate/pool_manager/__init__.py | 4 | ||||
-rw-r--r-- | designate/pool_manager/service.py | 91 | ||||
-rw-r--r-- | designate/tests/test_pool_manager/test_service.py | 159 | ||||
-rw-r--r-- | designate/tests/unit/__init__.py | 54 | ||||
-rw-r--r-- | designate/tests/unit/test_pool_manager/__init__.py | 0 | ||||
-rw-r--r-- | designate/tests/unit/test_pool_manager/test_service.py | 98 | ||||
-rw-r--r-- | etc/designate/designate.conf.sample | 5 |
7 files changed, 365 insertions, 46 deletions
diff --git a/designate/pool_manager/__init__.py b/designate/pool_manager/__init__.py index 17c45036..a987dea2 100644 --- a/designate/pool_manager/__init__.py +++ b/designate/pool_manager/__init__.py @@ -54,6 +54,10 @@ OPTS = [ cfg.IntOpt('periodic-sync-seconds', default=21600, help='Zones Updated within last N seconds will be syncd. Use ' 'None to sync all zones.'), + cfg.IntOpt('periodic-sync-max-attempts', default=3, + help='Number of attempts to update a zone during sync'), + cfg.IntOpt('periodic-sync-retry-interval', default=30, + help='Interval between zone update attempts during sync'), cfg.StrOpt('cache-driver', default='memcache', help='The cache driver to use'), ] diff --git a/designate/pool_manager/service.py b/designate/pool_manager/service.py index 0d210703..8fbdb3b6 100644 --- a/designate/pool_manager/service.py +++ b/designate/pool_manager/service.py @@ -13,6 +13,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +import time from contextlib import contextmanager from decimal import Decimal @@ -61,6 +62,20 @@ def wrap_backend_call(): raise exceptions.Backend('Unknown backend failure: %r' % e) +def _constant_retries(num_attempts, sleep_interval): + """Generate a sequence of False terminated by a True + Sleep `sleep_interval` between cycles but not at the end. + """ + for cnt in range(num_attempts): + if cnt != 0: + LOG.debug(_LI("Executing retry n. %d"), cnt) + if cnt < num_attempts - 1: + yield False + time.sleep(sleep_interval) + else: + yield True + + class Service(service.RPCService, coordination.CoordinationMixin, service.Service): """ @@ -91,6 +106,10 @@ class Service(service.RPCService, coordination.CoordinationMixin, self.retry_interval = CONF['service:pool_manager'].poll_retry_interval self.max_retries = CONF['service:pool_manager'].poll_max_retries self.delay = CONF['service:pool_manager'].poll_delay + self._periodic_sync_max_attempts = \ + CONF['service:pool_manager'].periodic_sync_max_attempts + self._periodic_sync_retry_interval = \ + CONF['service:pool_manager'].periodic_sync_retry_interval # Create the necessary Backend instances for each target self._setup_target_backends() @@ -200,18 +219,10 @@ class Service(service.RPCService, coordination.CoordinationMixin, LOG.exception(_LE('An unhandled exception in periodic ' 'recovery occurred')) - def periodic_sync(self): - """Periodically sync all the zones that are not in ERROR status - Runs only on the pool leader - :return: None + def _fetch_healthy_zones(self, context): + """Fetch all zones not in error + :return: :class:`ZoneList` zones """ - if not self._pool_election.is_leader: - return - - context = DesignateContext.get_admin_context(all_tenants=True) - - LOG.debug("Starting Periodic Synchronization") - criterion = { 'pool_id': CONF['service:pool_manager'].pool_id, 'status': '!%s' % ERROR_STATUS @@ -225,23 +236,51 @@ class Service(service.RPCService, coordination.CoordinationMixin, current = utils.increment_serial() criterion['serial'] = ">%s" % (current - periodic_sync_seconds) - domains = self.central_api.find_domains(context, criterion) + zones = self.central_api.find_domains(context, criterion) + return zones - try: - for domain in domains: - # TODO(kiall): If the zone was created within the last - # periodic_sync_seconds, attempt to recreate - # to fill in targets which may have failed. - success = self.update_domain(context, domain) - if not success: - self.central_api.update_status( - context, domain.id, ERROR_STATUS, domain.serial) + def periodic_sync(self): + """Periodically sync all the zones that are not in ERROR status + Runs only on the pool leader + :return: None + """ + if not self._pool_election.is_leader: + return - except Exception: - LOG.exception(_LE('An unhandled exception in periodic ' - 'synchronization occurred.')) - self.central_api.update_status(context, domain.id, ERROR_STATUS, - domain.serial) + context = DesignateContext.get_admin_context(all_tenants=True) + + LOG.debug("Starting Periodic Synchronization") + context = DesignateContext.get_admin_context(all_tenants=True) + zones = self._fetch_healthy_zones(context) + zones = set(zones) + + # TODO(kiall): If the zone was created within the last + # periodic_sync_seconds, attempt to recreate + # to fill in targets which may have failed. + retry_gen = _constant_retries( + self._periodic_sync_max_attempts, + self._periodic_sync_retry_interval + ) + for is_last_cycle in retry_gen: + zones_in_error = [] + for zone in zones: + try: + success = self.update_domain(context, zone) + if not success: + zones_in_error.append(zone) + except Exception: + LOG.exception(_LE('An unhandled exception in periodic ' + 'synchronization occurred.')) + zones_in_error.append(zone) + + if not zones_in_error: + return + + zones = zones_in_error + + for zone in zones_in_error: + self.central_api.update_status(context, zone.id, ERROR_STATUS, + zone.serial) # Standard Create/Update/Delete Methods diff --git a/designate/tests/test_pool_manager/test_service.py b/designate/tests/test_pool_manager/test_service.py index e7f7f514..e223fc8a 100644 --- a/designate/tests/test_pool_manager/test_service.py +++ b/designate/tests/test_pool_manager/test_service.py @@ -13,6 +13,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +import logging import uuid import oslo_messaging as messaging @@ -24,9 +25,13 @@ from mock import patch from designate import exceptions from designate import objects from designate.backend import impl_fake +from designate.storage.impl_sqlalchemy import tables from designate.central import rpcapi as central_rpcapi from designate.mdns import rpcapi as mdns_rpcapi from designate.tests.test_pool_manager import PoolManagerTestCase +import designate.pool_manager.service as pm_module + +LOG = logging.getLogger(__name__) class PoolManagerServiceNoopTest(PoolManagerTestCase): @@ -119,7 +124,7 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase): def _build_domains(self, n, action, status): return [ self._build_domain("zone%02X.example" % cnt, action, - status, id=str(uuid.uuid4())) + status, id=str(uuid.uuid4())) for cnt in range(n) ] @@ -475,7 +480,7 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase): mock_cent_update_status, *a): self.service.update_domain = Mock() mock_find_domains.return_value = self._build_domains(2, 'UPDATE', - 'PENDING') + 'PENDING') self.service.periodic_sync() self.assertEqual(1, mock_find_domains.call_count) @@ -484,36 +489,166 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase): self.assertEqual(2, self.service.update_domain.call_count) self.assertEqual(0, mock_cent_update_status.call_count) + @patch.object(pm_module.time, 'sleep') @patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed') @patch.object(central_rpcapi.CentralAPI, 'update_status') @patch.object(central_rpcapi.CentralAPI, 'find_domains') - def test_periodic_sync_with_failing_update(self, mock_find_domains, - mock_cent_update_status, *a): + def test_periodic_sync_with_failing_update( + self, mock_find_domains, mock_cent_update_status, *mocks): self.service.update_domain = Mock(return_value=False) # fail update mock_find_domains.return_value = self._build_domains(3, 'UPDATE', - 'PENDING') + 'PENDING') self.service.periodic_sync() self.assertEqual(1, mock_find_domains.call_count) criterion = mock_find_domains.call_args_list[0][0][1] self.assertEqual('!ERROR', criterion['status']) - # all zones are now in ERROR status - self.assertEqual(3, self.service.update_domain.call_count) + + # 3 zones, all failing, with 3 attempts: 9 calls + self.assertEqual(9, self.service.update_domain.call_count) + + # the zones have been put in ERROR status self.assertEqual(3, mock_cent_update_status.call_count) + @patch.object(pm_module.time, 'sleep') @patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed') @patch.object(central_rpcapi.CentralAPI, 'update_status') @patch.object(central_rpcapi.CentralAPI, 'find_domains') def test_periodic_sync_with_failing_update_with_exception( - self, mock_find_domains, mock_cent_update_status, *a): + self, mock_find_domains, mock_cent_update_status, *mocks): self.service.update_domain = Mock(side_effect=Exception) mock_find_domains.return_value = self._build_domains(3, 'UPDATE', - 'PENDING') + 'PENDING') self.service.periodic_sync() self.assertEqual(1, mock_find_domains.call_count) criterion = mock_find_domains.call_args_list[0][0][1] self.assertEqual('!ERROR', criterion['status']) - # the first updated zone is now in ERROR status - self.assertEqual(1, self.service.update_domain.call_count) - self.assertEqual(1, mock_cent_update_status.call_count) + + # 3 zones, all failing, with 3 attempts: 9 calls + self.assertEqual(9, self.service.update_domain.call_count) + + # the zones have been put in ERROR status + self.assertEqual(3, mock_cent_update_status.call_count) + + # Periodic recovery + + @patch.object(pm_module.time, 'sleep') + @patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed') + @patch.object(central_rpcapi.CentralAPI, 'update_status') + def test_periodic_recovery(self, mock_find_domains, + mock_cent_update_status, *mocks): + + def mock_get_failed_domains(ctx, action): + if action == pm_module.DELETE_ACTION: + return self._build_domains(3, 'DELETE', 'ERROR') + if action == pm_module.CREATE_ACTION: + return self._build_domains(4, 'CREATE', 'ERROR') + if action == pm_module.UPDATE_ACTION: + return self._build_domains(5, 'UPDATE', 'ERROR') + + self.service._get_failed_domains = mock_get_failed_domains + self.service.delete_domain = Mock() + self.service.create_domain = Mock() + self.service.update_domain = Mock() + + self.service.periodic_recovery() + + self.assertEqual(3, self.service.delete_domain.call_count) + self.assertEqual(4, self.service.create_domain.call_count) + self.assertEqual(5, self.service.update_domain.call_count) + + +class PoolManagerServiceEndToEndTest(PoolManagerServiceNoopTest): + + def setUp(self): + super(PoolManagerServiceEndToEndTest, self).setUp() + + def _fetch_all_domains(self): + """Fetch all zones including deleted ones + """ + query = tables.zones.select() + return self.storage.session.execute(query).fetchall() + + def _log_all_domains(self, zones, msg=None): + """Log out a summary of zones + """ + if msg: + LOG.debug("--- %s ---" % msg) + cols = ('name', 'status', 'action', 'deleted', 'deleted_at', + 'parent_domain_id') + tpl = "%-35s | %-11s | %-11s | %-32s | %-20s | %s" + LOG.debug(tpl % cols) + for z in zones: + LOG.debug(tpl % tuple(z[k] for k in cols)) + + def _assert_count_all_domains(self, n): + """Assert count ALL zones including deleted ones + """ + zones = self._fetch_all_domains() + if len(zones) == n: + return + + msg = "failed: %d zones expected, %d found" % (n, len(zones)) + self._log_all_domains(zones, msg=msg) + raise Exception("Unexpected number of zones") + + def _assert_num_failed_domains(self, action, n): + zones = self.service._get_failed_domains( + self.admin_context, action) + if len(zones) != n: + LOG.error("Expected %d failed zones, got %d", n, len(zones)) + self._log_all_domains(zones, msg='listing zones') + self.assertEqual(n, len(zones)) + + def _assert_num_healthy_domains(self, action, n): + criterion = { + 'action': action, + 'pool_id': pm_module.CONF['service:pool_manager'].pool_id, + 'status': '!%s' % pm_module.ERROR_STATUS + } + zones = self.service.central_api.find_domains(self.admin_context, + criterion) + if len(zones) != n: + LOG.error("Expected %d healthy zones, got %d", n, len(zones)) + self._log_all_domains(zones, msg='listing zones') + self.assertEqual(n, len(zones)) + + @patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed') + def test_periodic_sync_and_recovery( + self, mock_cent_update_status, *a): + # Periodic sync + recovery + self.service._periodic_sync_retry_interval = 0 + + # Create healthy zones, run a periodic sync that will fail + self.create_domain(name='created.example.com.') + self._assert_num_healthy_domains(pm_module.CREATE_ACTION, 1) + + z = self.create_domain(name='updated.example.net.') + z.email = 'info@example.net' + self.service.central_api.update_domain(self.admin_context, z) + self._assert_num_healthy_domains(pm_module.UPDATE_ACTION, 1) + + with patch.object(self.service, '_update_domain_on_target', + return_value=False): + self.service.periodic_sync() + + zones = self.service._fetch_healthy_zones(self.admin_context) + self.assertEqual(0, len(zones)) + self._assert_num_failed_domains(pm_module.CREATE_ACTION, 1) + self._assert_num_failed_domains(pm_module.UPDATE_ACTION, 1) + + # Now run a periodic_recovery that will fix the zones + + backends = self.service.target_backends + for tid in self.service.target_backends: + backends[tid].create_domain = Mock() + backends[tid].update_domain = Mock() + backends[tid].delete_domain = Mock() + + self.service.periodic_recovery() + + # There are 2 pool targets in use + for backend in self.service.target_backends.itervalues(): + self.assertEqual(1, backend.create_domain.call_count) + self.assertEqual(1, backend.update_domain.call_count) diff --git a/designate/tests/unit/__init__.py b/designate/tests/unit/__init__.py index e69de29b..22512526 100644 --- a/designate/tests/unit/__init__.py +++ b/designate/tests/unit/__init__.py @@ -0,0 +1,54 @@ +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Author: Federico Ceratto <federico.ceratto@hpe.com> +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Unit test utilities +""" + +import six + + +class RoObject(object): + """Read-only object: raise exception on unexpected + __setitem__ or __setattr__ + """ + def __init__(self, d=None, **kw): + if d: + kw.update(d) + + self.__dict__.update(kw) + + def __getitem__(self, k): + try: + return self.__dict__[k] + except KeyError: + raise NotImplementedError( + "Attempt to perform __getitem__" + " %r on RoObject %r" % (k, self.__dict__) + ) + + def __setitem__(self, k, v): + raise NotImplementedError( + "Attempt to perform __setitem__ or __setattr__" + " %r on RoObject %r" % (k, self.__dict__) + ) + + def __setattr__(self, k, v): + self.__setitem__(k, v) + + def __iter__(self): + for k in six.iterkeys(self.__dict__): + yield k, self.__dict__[k] diff --git a/designate/tests/unit/test_pool_manager/__init__.py b/designate/tests/unit/test_pool_manager/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/designate/tests/unit/test_pool_manager/__init__.py diff --git a/designate/tests/unit/test_pool_manager/test_service.py b/designate/tests/unit/test_pool_manager/test_service.py index 4432737e..9f294572 100644 --- a/designate/tests/unit/test_pool_manager/test_service.py +++ b/designate/tests/unit/test_pool_manager/test_service.py @@ -15,24 +15,19 @@ # under the License. from mock import Mock -from mock import MagicMock from mock import patch from oslotest import base as test -from designate import exceptions from designate import objects from designate.pool_manager.service import Service +import designate.pool_manager.service as pm_module +from designate.tests.unit import RoObject -class PoolManagerTest(test.BaseTestCase): +class PoolManagerInitTest(test.BaseTestCase): def __setUp(self): super(PoolManagerTest, self).setUp() - def test_init_no_pool_targets(self): - with patch.object(objects.Pool, 'from_config', - return_value=MagicMock()): - self.assertRaises(exceptions.NoPoolTargetsConfigured, Service) - def test_init(self): with patch.object(objects.Pool, 'from_config', return_value=Mock()): @@ -56,3 +51,90 @@ class PoolManagerTest(test.BaseTestCase): call2 = pm.tg.add_timer.call_args_list[1][0] self.assertEqual(1800, call2[0]) self.assertEqual(1800, call2[-1]) + + def test_constant_retries(self): + with patch.object(pm_module.time, 'sleep') as mock_zzz: + gen = pm_module._constant_retries(5, 2) + out = list(gen) + self.assertEqual( + [False, False, False, False, True], + out + ) + self.assertEqual(4, mock_zzz.call_count) + mock_zzz.assert_called_with(2) + + +class PoolManagerTest(test.BaseTestCase): + + @patch.object(pm_module.DesignateContext, 'get_admin_context') + @patch.object(pm_module.central_api.CentralAPI, 'get_instance') + @patch.object(objects.Pool, 'from_config') + @patch.object(Service, '_setup_target_backends') + def setUp(self, *mocks): + super(PoolManagerTest, self).setUp() + self.pm = Service() + self.pm.pool.targets = () + self.pm.tg.add_timer = Mock() + self.pm._pool_election = Mock() + self.pm.target_backends = {} + + def test_get_failed_domains(self, *mocks): + with patch.object(self.pm.central_api, 'find_domains') as \ + mock_find_domains: + self.pm._get_failed_domains('ctx', pm_module.DELETE_ACTION) + + mock_find_domains.assert_called_once_with( + 'ctx', {'action': 'DELETE', 'status': 'ERROR', 'pool_id': + '794ccc2c-d751-44fe-b57f-8894c9f5c842'}) + + @patch.object(pm_module.DesignateContext, 'get_admin_context') + def test_periodic_recover(self, mock_get_ctx, *mocks): + z = RoObject(name='a_domain') + + def mock_get_failed_domains(ctx, action): + if action == pm_module.DELETE_ACTION: + return [z] * 3 + if action == pm_module.CREATE_ACTION: + return [z] * 4 + if action == pm_module.UPDATE_ACTION: + return [z] * 5 + + self.pm._get_failed_domains = mock_get_failed_domains + self.pm.delete_domain = Mock() + self.pm.create_domain = Mock() + self.pm.update_domain = Mock() + mock_ctx = mock_get_ctx.return_value + + self.pm.periodic_recovery() + + self.pm.delete_domain.assert_called_with(mock_ctx, z) + self.assertEqual(3, self.pm.delete_domain.call_count) + self.pm.create_domain.assert_called_with(mock_ctx, z) + self.assertEqual(4, self.pm.create_domain.call_count) + self.pm.update_domain.assert_called_with(mock_ctx, z) + self.assertEqual(5, self.pm.update_domain.call_count) + + @patch.object(pm_module.DesignateContext, 'get_admin_context') + def test_periodic_recover_exception(self, mock_get_ctx, *mocks): + z = RoObject(name='a_domain') + # Raise an exception half through the recovery + + def mock_get_failed_domains(ctx, action): + if action == pm_module.DELETE_ACTION: + return [z] * 3 + if action == pm_module.CREATE_ACTION: + return [z] * 4 + + self.pm._get_failed_domains = mock_get_failed_domains + self.pm.delete_domain = Mock() + self.pm.create_domain = Mock(side_effect=Exception('oops')) + self.pm.update_domain = Mock() + mock_ctx = mock_get_ctx.return_value + + self.pm.periodic_recovery() + + self.pm.delete_domain.assert_called_with(mock_ctx, z) + self.assertEqual(3, self.pm.delete_domain.call_count) + self.pm.create_domain.assert_called_with(mock_ctx, z) + self.assertEqual(1, self.pm.create_domain.call_count) + self.assertEqual(0, self.pm.update_domain.call_count) diff --git a/etc/designate/designate.conf.sample b/etc/designate/designate.conf.sample index 3a017200..5f0256df 100644 --- a/etc/designate/designate.conf.sample +++ b/etc/designate/designate.conf.sample @@ -285,6 +285,11 @@ debug = False # Zones Updated within last N seconds will be syncd. Use None to sync all zones #periodic_sync_seconds = None +# Perform multiple update attempts during periodic_sync +#periodic_sync_max_attempts = 3 +#periodic_sync_retry_interval = 30 + + # The cache driver to use #cache_driver = memcache |