summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2016-01-18 14:08:51 +0000
committerGerrit Code Review <review@openstack.org>2016-01-18 14:08:51 +0000
commit040747d228d9905a3ae4746f37fe44fca2a1bb78 (patch)
tree958ac7b1b3920334a8b850800b78724c6aed0678
parenta26a6fa0191126300018c21ee8e9b78b47971c80 (diff)
parent9b5f4c1f3d9d827252ae6b8f253fc572df827cea (diff)
downloaddesignate-040747d228d9905a3ae4746f37fe44fca2a1bb78.tar.gz
Merge "Add retry logic on periodic_sync to stable/liberty" into stable/liberty
-rw-r--r--designate/pool_manager/__init__.py4
-rw-r--r--designate/pool_manager/service.py91
-rw-r--r--designate/tests/test_pool_manager/test_service.py159
-rw-r--r--designate/tests/unit/__init__.py54
-rw-r--r--designate/tests/unit/test_pool_manager/__init__.py0
-rw-r--r--designate/tests/unit/test_pool_manager/test_service.py98
-rw-r--r--etc/designate/designate.conf.sample5
7 files changed, 365 insertions, 46 deletions
diff --git a/designate/pool_manager/__init__.py b/designate/pool_manager/__init__.py
index 17c45036..a987dea2 100644
--- a/designate/pool_manager/__init__.py
+++ b/designate/pool_manager/__init__.py
@@ -54,6 +54,10 @@ OPTS = [
cfg.IntOpt('periodic-sync-seconds', default=21600,
help='Zones Updated within last N seconds will be syncd. Use '
'None to sync all zones.'),
+ cfg.IntOpt('periodic-sync-max-attempts', default=3,
+ help='Number of attempts to update a zone during sync'),
+ cfg.IntOpt('periodic-sync-retry-interval', default=30,
+ help='Interval between zone update attempts during sync'),
cfg.StrOpt('cache-driver', default='memcache',
help='The cache driver to use'),
]
diff --git a/designate/pool_manager/service.py b/designate/pool_manager/service.py
index 0d210703..8fbdb3b6 100644
--- a/designate/pool_manager/service.py
+++ b/designate/pool_manager/service.py
@@ -13,6 +13,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+import time
from contextlib import contextmanager
from decimal import Decimal
@@ -61,6 +62,20 @@ def wrap_backend_call():
raise exceptions.Backend('Unknown backend failure: %r' % e)
+def _constant_retries(num_attempts, sleep_interval):
+ """Generate a sequence of False terminated by a True
+ Sleep `sleep_interval` between cycles but not at the end.
+ """
+ for cnt in range(num_attempts):
+ if cnt != 0:
+ LOG.debug(_LI("Executing retry n. %d"), cnt)
+ if cnt < num_attempts - 1:
+ yield False
+ time.sleep(sleep_interval)
+ else:
+ yield True
+
+
class Service(service.RPCService, coordination.CoordinationMixin,
service.Service):
"""
@@ -91,6 +106,10 @@ class Service(service.RPCService, coordination.CoordinationMixin,
self.retry_interval = CONF['service:pool_manager'].poll_retry_interval
self.max_retries = CONF['service:pool_manager'].poll_max_retries
self.delay = CONF['service:pool_manager'].poll_delay
+ self._periodic_sync_max_attempts = \
+ CONF['service:pool_manager'].periodic_sync_max_attempts
+ self._periodic_sync_retry_interval = \
+ CONF['service:pool_manager'].periodic_sync_retry_interval
# Create the necessary Backend instances for each target
self._setup_target_backends()
@@ -200,18 +219,10 @@ class Service(service.RPCService, coordination.CoordinationMixin,
LOG.exception(_LE('An unhandled exception in periodic '
'recovery occurred'))
- def periodic_sync(self):
- """Periodically sync all the zones that are not in ERROR status
- Runs only on the pool leader
- :return: None
+ def _fetch_healthy_zones(self, context):
+ """Fetch all zones not in error
+ :return: :class:`ZoneList` zones
"""
- if not self._pool_election.is_leader:
- return
-
- context = DesignateContext.get_admin_context(all_tenants=True)
-
- LOG.debug("Starting Periodic Synchronization")
-
criterion = {
'pool_id': CONF['service:pool_manager'].pool_id,
'status': '!%s' % ERROR_STATUS
@@ -225,23 +236,51 @@ class Service(service.RPCService, coordination.CoordinationMixin,
current = utils.increment_serial()
criterion['serial'] = ">%s" % (current - periodic_sync_seconds)
- domains = self.central_api.find_domains(context, criterion)
+ zones = self.central_api.find_domains(context, criterion)
+ return zones
- try:
- for domain in domains:
- # TODO(kiall): If the zone was created within the last
- # periodic_sync_seconds, attempt to recreate
- # to fill in targets which may have failed.
- success = self.update_domain(context, domain)
- if not success:
- self.central_api.update_status(
- context, domain.id, ERROR_STATUS, domain.serial)
+ def periodic_sync(self):
+ """Periodically sync all the zones that are not in ERROR status
+ Runs only on the pool leader
+ :return: None
+ """
+ if not self._pool_election.is_leader:
+ return
- except Exception:
- LOG.exception(_LE('An unhandled exception in periodic '
- 'synchronization occurred.'))
- self.central_api.update_status(context, domain.id, ERROR_STATUS,
- domain.serial)
+ context = DesignateContext.get_admin_context(all_tenants=True)
+
+ LOG.debug("Starting Periodic Synchronization")
+ context = DesignateContext.get_admin_context(all_tenants=True)
+ zones = self._fetch_healthy_zones(context)
+ zones = set(zones)
+
+ # TODO(kiall): If the zone was created within the last
+ # periodic_sync_seconds, attempt to recreate
+ # to fill in targets which may have failed.
+ retry_gen = _constant_retries(
+ self._periodic_sync_max_attempts,
+ self._periodic_sync_retry_interval
+ )
+ for is_last_cycle in retry_gen:
+ zones_in_error = []
+ for zone in zones:
+ try:
+ success = self.update_domain(context, zone)
+ if not success:
+ zones_in_error.append(zone)
+ except Exception:
+ LOG.exception(_LE('An unhandled exception in periodic '
+ 'synchronization occurred.'))
+ zones_in_error.append(zone)
+
+ if not zones_in_error:
+ return
+
+ zones = zones_in_error
+
+ for zone in zones_in_error:
+ self.central_api.update_status(context, zone.id, ERROR_STATUS,
+ zone.serial)
# Standard Create/Update/Delete Methods
diff --git a/designate/tests/test_pool_manager/test_service.py b/designate/tests/test_pool_manager/test_service.py
index e7f7f514..e223fc8a 100644
--- a/designate/tests/test_pool_manager/test_service.py
+++ b/designate/tests/test_pool_manager/test_service.py
@@ -13,6 +13,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+import logging
import uuid
import oslo_messaging as messaging
@@ -24,9 +25,13 @@ from mock import patch
from designate import exceptions
from designate import objects
from designate.backend import impl_fake
+from designate.storage.impl_sqlalchemy import tables
from designate.central import rpcapi as central_rpcapi
from designate.mdns import rpcapi as mdns_rpcapi
from designate.tests.test_pool_manager import PoolManagerTestCase
+import designate.pool_manager.service as pm_module
+
+LOG = logging.getLogger(__name__)
class PoolManagerServiceNoopTest(PoolManagerTestCase):
@@ -119,7 +124,7 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
def _build_domains(self, n, action, status):
return [
self._build_domain("zone%02X.example" % cnt, action,
- status, id=str(uuid.uuid4()))
+ status, id=str(uuid.uuid4()))
for cnt in range(n)
]
@@ -475,7 +480,7 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
mock_cent_update_status, *a):
self.service.update_domain = Mock()
mock_find_domains.return_value = self._build_domains(2, 'UPDATE',
- 'PENDING')
+ 'PENDING')
self.service.periodic_sync()
self.assertEqual(1, mock_find_domains.call_count)
@@ -484,36 +489,166 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
self.assertEqual(2, self.service.update_domain.call_count)
self.assertEqual(0, mock_cent_update_status.call_count)
+ @patch.object(pm_module.time, 'sleep')
@patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
@patch.object(central_rpcapi.CentralAPI, 'update_status')
@patch.object(central_rpcapi.CentralAPI, 'find_domains')
- def test_periodic_sync_with_failing_update(self, mock_find_domains,
- mock_cent_update_status, *a):
+ def test_periodic_sync_with_failing_update(
+ self, mock_find_domains, mock_cent_update_status, *mocks):
self.service.update_domain = Mock(return_value=False) # fail update
mock_find_domains.return_value = self._build_domains(3, 'UPDATE',
- 'PENDING')
+ 'PENDING')
self.service.periodic_sync()
self.assertEqual(1, mock_find_domains.call_count)
criterion = mock_find_domains.call_args_list[0][0][1]
self.assertEqual('!ERROR', criterion['status'])
- # all zones are now in ERROR status
- self.assertEqual(3, self.service.update_domain.call_count)
+
+ # 3 zones, all failing, with 3 attempts: 9 calls
+ self.assertEqual(9, self.service.update_domain.call_count)
+
+ # the zones have been put in ERROR status
self.assertEqual(3, mock_cent_update_status.call_count)
+ @patch.object(pm_module.time, 'sleep')
@patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
@patch.object(central_rpcapi.CentralAPI, 'update_status')
@patch.object(central_rpcapi.CentralAPI, 'find_domains')
def test_periodic_sync_with_failing_update_with_exception(
- self, mock_find_domains, mock_cent_update_status, *a):
+ self, mock_find_domains, mock_cent_update_status, *mocks):
self.service.update_domain = Mock(side_effect=Exception)
mock_find_domains.return_value = self._build_domains(3, 'UPDATE',
- 'PENDING')
+ 'PENDING')
self.service.periodic_sync()
self.assertEqual(1, mock_find_domains.call_count)
criterion = mock_find_domains.call_args_list[0][0][1]
self.assertEqual('!ERROR', criterion['status'])
- # the first updated zone is now in ERROR status
- self.assertEqual(1, self.service.update_domain.call_count)
- self.assertEqual(1, mock_cent_update_status.call_count)
+
+ # 3 zones, all failing, with 3 attempts: 9 calls
+ self.assertEqual(9, self.service.update_domain.call_count)
+
+ # the zones have been put in ERROR status
+ self.assertEqual(3, mock_cent_update_status.call_count)
+
+ # Periodic recovery
+
+ @patch.object(pm_module.time, 'sleep')
+ @patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
+ @patch.object(central_rpcapi.CentralAPI, 'update_status')
+ def test_periodic_recovery(self, mock_find_domains,
+ mock_cent_update_status, *mocks):
+
+ def mock_get_failed_domains(ctx, action):
+ if action == pm_module.DELETE_ACTION:
+ return self._build_domains(3, 'DELETE', 'ERROR')
+ if action == pm_module.CREATE_ACTION:
+ return self._build_domains(4, 'CREATE', 'ERROR')
+ if action == pm_module.UPDATE_ACTION:
+ return self._build_domains(5, 'UPDATE', 'ERROR')
+
+ self.service._get_failed_domains = mock_get_failed_domains
+ self.service.delete_domain = Mock()
+ self.service.create_domain = Mock()
+ self.service.update_domain = Mock()
+
+ self.service.periodic_recovery()
+
+ self.assertEqual(3, self.service.delete_domain.call_count)
+ self.assertEqual(4, self.service.create_domain.call_count)
+ self.assertEqual(5, self.service.update_domain.call_count)
+
+
+class PoolManagerServiceEndToEndTest(PoolManagerServiceNoopTest):
+
+ def setUp(self):
+ super(PoolManagerServiceEndToEndTest, self).setUp()
+
+ def _fetch_all_domains(self):
+ """Fetch all zones including deleted ones
+ """
+ query = tables.zones.select()
+ return self.storage.session.execute(query).fetchall()
+
+ def _log_all_domains(self, zones, msg=None):
+ """Log out a summary of zones
+ """
+ if msg:
+ LOG.debug("--- %s ---" % msg)
+ cols = ('name', 'status', 'action', 'deleted', 'deleted_at',
+ 'parent_domain_id')
+ tpl = "%-35s | %-11s | %-11s | %-32s | %-20s | %s"
+ LOG.debug(tpl % cols)
+ for z in zones:
+ LOG.debug(tpl % tuple(z[k] for k in cols))
+
+ def _assert_count_all_domains(self, n):
+ """Assert count ALL zones including deleted ones
+ """
+ zones = self._fetch_all_domains()
+ if len(zones) == n:
+ return
+
+ msg = "failed: %d zones expected, %d found" % (n, len(zones))
+ self._log_all_domains(zones, msg=msg)
+ raise Exception("Unexpected number of zones")
+
+ def _assert_num_failed_domains(self, action, n):
+ zones = self.service._get_failed_domains(
+ self.admin_context, action)
+ if len(zones) != n:
+ LOG.error("Expected %d failed zones, got %d", n, len(zones))
+ self._log_all_domains(zones, msg='listing zones')
+ self.assertEqual(n, len(zones))
+
+ def _assert_num_healthy_domains(self, action, n):
+ criterion = {
+ 'action': action,
+ 'pool_id': pm_module.CONF['service:pool_manager'].pool_id,
+ 'status': '!%s' % pm_module.ERROR_STATUS
+ }
+ zones = self.service.central_api.find_domains(self.admin_context,
+ criterion)
+ if len(zones) != n:
+ LOG.error("Expected %d healthy zones, got %d", n, len(zones))
+ self._log_all_domains(zones, msg='listing zones')
+ self.assertEqual(n, len(zones))
+
+ @patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
+ def test_periodic_sync_and_recovery(
+ self, mock_cent_update_status, *a):
+ # Periodic sync + recovery
+ self.service._periodic_sync_retry_interval = 0
+
+ # Create healthy zones, run a periodic sync that will fail
+ self.create_domain(name='created.example.com.')
+ self._assert_num_healthy_domains(pm_module.CREATE_ACTION, 1)
+
+ z = self.create_domain(name='updated.example.net.')
+ z.email = 'info@example.net'
+ self.service.central_api.update_domain(self.admin_context, z)
+ self._assert_num_healthy_domains(pm_module.UPDATE_ACTION, 1)
+
+ with patch.object(self.service, '_update_domain_on_target',
+ return_value=False):
+ self.service.periodic_sync()
+
+ zones = self.service._fetch_healthy_zones(self.admin_context)
+ self.assertEqual(0, len(zones))
+ self._assert_num_failed_domains(pm_module.CREATE_ACTION, 1)
+ self._assert_num_failed_domains(pm_module.UPDATE_ACTION, 1)
+
+ # Now run a periodic_recovery that will fix the zones
+
+ backends = self.service.target_backends
+ for tid in self.service.target_backends:
+ backends[tid].create_domain = Mock()
+ backends[tid].update_domain = Mock()
+ backends[tid].delete_domain = Mock()
+
+ self.service.periodic_recovery()
+
+ # There are 2 pool targets in use
+ for backend in self.service.target_backends.itervalues():
+ self.assertEqual(1, backend.create_domain.call_count)
+ self.assertEqual(1, backend.update_domain.call_count)
diff --git a/designate/tests/unit/__init__.py b/designate/tests/unit/__init__.py
index e69de29b..22512526 100644
--- a/designate/tests/unit/__init__.py
+++ b/designate/tests/unit/__init__.py
@@ -0,0 +1,54 @@
+# Copyright 2015 Hewlett-Packard Development Company, L.P.
+#
+# Author: Federico Ceratto <federico.ceratto@hpe.com>
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Unit test utilities
+"""
+
+import six
+
+
+class RoObject(object):
+ """Read-only object: raise exception on unexpected
+ __setitem__ or __setattr__
+ """
+ def __init__(self, d=None, **kw):
+ if d:
+ kw.update(d)
+
+ self.__dict__.update(kw)
+
+ def __getitem__(self, k):
+ try:
+ return self.__dict__[k]
+ except KeyError:
+ raise NotImplementedError(
+ "Attempt to perform __getitem__"
+ " %r on RoObject %r" % (k, self.__dict__)
+ )
+
+ def __setitem__(self, k, v):
+ raise NotImplementedError(
+ "Attempt to perform __setitem__ or __setattr__"
+ " %r on RoObject %r" % (k, self.__dict__)
+ )
+
+ def __setattr__(self, k, v):
+ self.__setitem__(k, v)
+
+ def __iter__(self):
+ for k in six.iterkeys(self.__dict__):
+ yield k, self.__dict__[k]
diff --git a/designate/tests/unit/test_pool_manager/__init__.py b/designate/tests/unit/test_pool_manager/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/designate/tests/unit/test_pool_manager/__init__.py
diff --git a/designate/tests/unit/test_pool_manager/test_service.py b/designate/tests/unit/test_pool_manager/test_service.py
index 4432737e..9f294572 100644
--- a/designate/tests/unit/test_pool_manager/test_service.py
+++ b/designate/tests/unit/test_pool_manager/test_service.py
@@ -15,24 +15,19 @@
# under the License.
from mock import Mock
-from mock import MagicMock
from mock import patch
from oslotest import base as test
-from designate import exceptions
from designate import objects
from designate.pool_manager.service import Service
+import designate.pool_manager.service as pm_module
+from designate.tests.unit import RoObject
-class PoolManagerTest(test.BaseTestCase):
+class PoolManagerInitTest(test.BaseTestCase):
def __setUp(self):
super(PoolManagerTest, self).setUp()
- def test_init_no_pool_targets(self):
- with patch.object(objects.Pool, 'from_config',
- return_value=MagicMock()):
- self.assertRaises(exceptions.NoPoolTargetsConfigured, Service)
-
def test_init(self):
with patch.object(objects.Pool, 'from_config',
return_value=Mock()):
@@ -56,3 +51,90 @@ class PoolManagerTest(test.BaseTestCase):
call2 = pm.tg.add_timer.call_args_list[1][0]
self.assertEqual(1800, call2[0])
self.assertEqual(1800, call2[-1])
+
+ def test_constant_retries(self):
+ with patch.object(pm_module.time, 'sleep') as mock_zzz:
+ gen = pm_module._constant_retries(5, 2)
+ out = list(gen)
+ self.assertEqual(
+ [False, False, False, False, True],
+ out
+ )
+ self.assertEqual(4, mock_zzz.call_count)
+ mock_zzz.assert_called_with(2)
+
+
+class PoolManagerTest(test.BaseTestCase):
+
+ @patch.object(pm_module.DesignateContext, 'get_admin_context')
+ @patch.object(pm_module.central_api.CentralAPI, 'get_instance')
+ @patch.object(objects.Pool, 'from_config')
+ @patch.object(Service, '_setup_target_backends')
+ def setUp(self, *mocks):
+ super(PoolManagerTest, self).setUp()
+ self.pm = Service()
+ self.pm.pool.targets = ()
+ self.pm.tg.add_timer = Mock()
+ self.pm._pool_election = Mock()
+ self.pm.target_backends = {}
+
+ def test_get_failed_domains(self, *mocks):
+ with patch.object(self.pm.central_api, 'find_domains') as \
+ mock_find_domains:
+ self.pm._get_failed_domains('ctx', pm_module.DELETE_ACTION)
+
+ mock_find_domains.assert_called_once_with(
+ 'ctx', {'action': 'DELETE', 'status': 'ERROR', 'pool_id':
+ '794ccc2c-d751-44fe-b57f-8894c9f5c842'})
+
+ @patch.object(pm_module.DesignateContext, 'get_admin_context')
+ def test_periodic_recover(self, mock_get_ctx, *mocks):
+ z = RoObject(name='a_domain')
+
+ def mock_get_failed_domains(ctx, action):
+ if action == pm_module.DELETE_ACTION:
+ return [z] * 3
+ if action == pm_module.CREATE_ACTION:
+ return [z] * 4
+ if action == pm_module.UPDATE_ACTION:
+ return [z] * 5
+
+ self.pm._get_failed_domains = mock_get_failed_domains
+ self.pm.delete_domain = Mock()
+ self.pm.create_domain = Mock()
+ self.pm.update_domain = Mock()
+ mock_ctx = mock_get_ctx.return_value
+
+ self.pm.periodic_recovery()
+
+ self.pm.delete_domain.assert_called_with(mock_ctx, z)
+ self.assertEqual(3, self.pm.delete_domain.call_count)
+ self.pm.create_domain.assert_called_with(mock_ctx, z)
+ self.assertEqual(4, self.pm.create_domain.call_count)
+ self.pm.update_domain.assert_called_with(mock_ctx, z)
+ self.assertEqual(5, self.pm.update_domain.call_count)
+
+ @patch.object(pm_module.DesignateContext, 'get_admin_context')
+ def test_periodic_recover_exception(self, mock_get_ctx, *mocks):
+ z = RoObject(name='a_domain')
+ # Raise an exception half through the recovery
+
+ def mock_get_failed_domains(ctx, action):
+ if action == pm_module.DELETE_ACTION:
+ return [z] * 3
+ if action == pm_module.CREATE_ACTION:
+ return [z] * 4
+
+ self.pm._get_failed_domains = mock_get_failed_domains
+ self.pm.delete_domain = Mock()
+ self.pm.create_domain = Mock(side_effect=Exception('oops'))
+ self.pm.update_domain = Mock()
+ mock_ctx = mock_get_ctx.return_value
+
+ self.pm.periodic_recovery()
+
+ self.pm.delete_domain.assert_called_with(mock_ctx, z)
+ self.assertEqual(3, self.pm.delete_domain.call_count)
+ self.pm.create_domain.assert_called_with(mock_ctx, z)
+ self.assertEqual(1, self.pm.create_domain.call_count)
+ self.assertEqual(0, self.pm.update_domain.call_count)
diff --git a/etc/designate/designate.conf.sample b/etc/designate/designate.conf.sample
index 3a017200..5f0256df 100644
--- a/etc/designate/designate.conf.sample
+++ b/etc/designate/designate.conf.sample
@@ -285,6 +285,11 @@ debug = False
# Zones Updated within last N seconds will be syncd. Use None to sync all zones
#periodic_sync_seconds = None
+# Perform multiple update attempts during periodic_sync
+#periodic_sync_max_attempts = 3
+#periodic_sync_retry_interval = 30
+
+
# The cache driver to use
#cache_driver = memcache