summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2016-01-04 16:00:26 +0000
committerGerrit Code Review <review@openstack.org>2016-01-04 16:00:26 +0000
commit94d0f00b09044fcef36604a9c81accb0907c9705 (patch)
treeed924b72821fa65b4f9d6829e9a6e624091e83dd
parentffd80ec5792fe42d6a9680c0f3fa785eab82dc3e (diff)
parentb3235bdb74d32f57c43eb884f4ef3cd27cf9c457 (diff)
downloaddesignate-94d0f00b09044fcef36604a9c81accb0907c9705.tar.gz
Merge "Set zone in ERROR status on periodic sync fail" into stable/liberty
-rw-r--r--designate/pool_manager/service.py191
-rw-r--r--designate/tests/test_pool_manager/test_service.py104
-rw-r--r--designate/tests/unit/test_pool_manager/test_service.py58
3 files changed, 268 insertions, 85 deletions
diff --git a/designate/pool_manager/service.py b/designate/pool_manager/service.py
index f9f92814..0d210703 100644
--- a/designate/pool_manager/service.py
+++ b/designate/pool_manager/service.py
@@ -137,18 +137,16 @@ class Service(service.RPCService, coordination.CoordinationMixin,
self._pool_election.start()
if CONF['service:pool_manager'].enable_recovery_timer:
- LOG.info(_LI('Starting periodic recovery timer'))
- self.tg.add_timer(
- CONF['service:pool_manager'].periodic_recovery_interval,
- self.periodic_recovery,
- CONF['service:pool_manager'].periodic_recovery_interval)
+ interval = CONF['service:pool_manager'].periodic_recovery_interval
+ LOG.info(_LI('Starting periodic recovery timer every'
+ ' %(interval)s s') % {'interval': interval})
+ self.tg.add_timer(interval, self.periodic_recovery, interval)
if CONF['service:pool_manager'].enable_sync_timer:
- LOG.info(_LI('Starting periodic synchronization timer'))
- self.tg.add_timer(
- CONF['service:pool_manager'].periodic_sync_interval,
- self.periodic_sync,
- CONF['service:pool_manager'].periodic_sync_interval)
+ interval = CONF['service:pool_manager'].periodic_sync_interval
+ LOG.info(_LI('Starting periodic synchronization timer every'
+ ' %(interval)s s') % {'interval': interval})
+ self.tg.add_timer(interval, self.periodic_sync, interval)
def stop(self):
self._pool_election.stop()
@@ -169,72 +167,81 @@ class Service(service.RPCService, coordination.CoordinationMixin,
# Periodioc Tasks
def periodic_recovery(self):
"""
+ Runs only on the pool leader
:return: None
"""
# NOTE(kiall): Only run this periodic task on the pool leader
- if self._pool_election.is_leader:
- context = DesignateContext.get_admin_context(all_tenants=True)
+ if not self._pool_election.is_leader:
+ return
- LOG.debug("Starting Periodic Recovery")
+ context = DesignateContext.get_admin_context(all_tenants=True)
+ LOG.debug("Starting Periodic Recovery")
- try:
- # Handle Deletion Failures
- domains = self._get_failed_domains(context, DELETE_ACTION)
+ try:
+ # Handle Deletion Failures
+ domains = self._get_failed_domains(context, DELETE_ACTION)
- for domain in domains:
- self.delete_domain(context, domain)
+ for domain in domains:
+ self.delete_domain(context, domain)
- # Handle Creation Failures
- domains = self._get_failed_domains(context, CREATE_ACTION)
+ # Handle Creation Failures
+ domains = self._get_failed_domains(context, CREATE_ACTION)
- for domain in domains:
- self.create_domain(context, domain)
+ for domain in domains:
+ self.create_domain(context, domain)
- # Handle Update Failures
- domains = self._get_failed_domains(context, UPDATE_ACTION)
+ # Handle Update Failures
+ domains = self._get_failed_domains(context, UPDATE_ACTION)
- for domain in domains:
- self.update_domain(context, domain)
+ for domain in domains:
+ self.update_domain(context, domain)
- except Exception:
- LOG.exception(_LE('An unhandled exception in periodic '
- 'recovery occurred'))
+ except Exception:
+ LOG.exception(_LE('An unhandled exception in periodic '
+ 'recovery occurred'))
def periodic_sync(self):
- """
+ """Periodically sync all the zones that are not in ERROR status
+ Runs only on the pool leader
:return: None
"""
- # NOTE(kiall): Only run this periodic task on the pool leader
- if self._pool_election.is_leader:
- context = DesignateContext.get_admin_context(all_tenants=True)
+ if not self._pool_election.is_leader:
+ return
- LOG.debug("Starting Periodic Synchronization")
+ context = DesignateContext.get_admin_context(all_tenants=True)
- criterion = {
- 'pool_id': CONF['service:pool_manager'].pool_id,
- 'status': '!%s' % ERROR_STATUS
- }
+ LOG.debug("Starting Periodic Synchronization")
- periodic_sync_seconds = \
- CONF['service:pool_manager'].periodic_sync_seconds
+ criterion = {
+ 'pool_id': CONF['service:pool_manager'].pool_id,
+ 'status': '!%s' % ERROR_STATUS
+ }
- if periodic_sync_seconds is not None:
- # Generate the current serial, will provide a UTC Unix TS.
- current = utils.increment_serial()
- criterion['serial'] = ">%s" % (current - periodic_sync_seconds)
+ periodic_sync_seconds = \
+ CONF['service:pool_manager'].periodic_sync_seconds
- domains = self.central_api.find_domains(context, criterion)
+ if periodic_sync_seconds is not None:
+ # Generate the current serial, will provide a UTC Unix TS.
+ current = utils.increment_serial()
+ criterion['serial'] = ">%s" % (current - periodic_sync_seconds)
- try:
- for domain in domains:
- # TODO(kiall): If the domain was created within the last
- # periodic_sync_seconds, attempt to recreate
- # to fill in targets which may have failed.
- self.update_domain(context, domain)
+ domains = self.central_api.find_domains(context, criterion)
- except Exception:
- LOG.exception(_LE('An unhandled exception in periodic '
- 'synchronization occurred.'))
+ try:
+ for domain in domains:
+ # TODO(kiall): If the zone was created within the last
+ # periodic_sync_seconds, attempt to recreate
+ # to fill in targets which may have failed.
+ success = self.update_domain(context, domain)
+ if not success:
+ self.central_api.update_status(
+ context, domain.id, ERROR_STATUS, domain.serial)
+
+ except Exception:
+ LOG.exception(_LE('An unhandled exception in periodic '
+ 'synchronization occurred.'))
+ self.central_api.update_status(context, domain.id, ERROR_STATUS,
+ domain.serial)
# Standard Create/Update/Delete Methods
@@ -303,32 +310,30 @@ class Service(service.RPCService, coordination.CoordinationMixin,
return False
def update_domain(self, context, domain):
- """
+ """Update a domain across every pool target, check for consensus and
+ for propagation.
:param context: Security context information.
:param domain: Domain to be updated
- :return: None
+ :return: consensus reached (bool)
"""
LOG.info(_LI("Updating domain %s"), domain.name)
- results = []
-
# Update the domain on each of the Pool Targets
+ success_count = 0
for target in self.pool.targets:
- results.append(
- self._update_domain_on_target(context, target, domain))
-
- if self._exceed_or_meet_threshold(results.count(True)):
- LOG.debug('Consensus reached for updating domain %(domain)s '
- 'on pool targets' % {'domain': domain.name})
-
- else:
- LOG.warn(_LW('Consensus not reached for updating domain %(domain)s'
- ' on pool targets') % {'domain': domain.name})
-
- self.central_api.update_status(
- context, domain.id, ERROR_STATUS, domain.serial)
+ ok_status = self._update_domain_on_target(context, target, domain)
+ if ok_status:
+ success_count += 1
+
+ if not self._exceed_or_meet_threshold(success_count):
+ LOG.warn(_LW('Consensus not reached for updating zone %(zone)s'
+ ' on pool targets') % {'zone': domain.name})
+ self.central_api.update_status(context, domain.id, ERROR_STATUS,
+ domain.serial)
+ return False
- return
+ LOG.debug('Consensus reached for updating zone %(zone)s '
+ 'on pool targets' % {'zone': domain.name})
# Send a NOTIFY to each also-notifies
for also_notify in self.pool.also_notifies:
@@ -349,8 +354,10 @@ class Service(service.RPCService, coordination.CoordinationMixin,
context, domain, nameserver, self.timeout,
self.retry_interval, self.max_retries, self.delay)
+ return True
+
def _update_domain_on_target(self, context, target, domain):
- """
+ """Instruct the appropriate backend to update a zone on a target
:param context: Security context information.
:param target: Target to update Domain on
:param domain: Domain to be updated
@@ -515,7 +522,7 @@ class Service(service.RPCService, coordination.CoordinationMixin,
criterion = {
'pool_id': CONF['service:pool_manager'].pool_id,
'action': action,
- 'status': 'ERROR'
+ 'status': ERROR_STATUS
}
return self.central_api.find_domains(context, criterion)
@@ -528,10 +535,12 @@ class Service(service.RPCService, coordination.CoordinationMixin,
return (Decimal(count) / Decimal(total_count)) * Decimal(100)
def _exceed_or_meet_threshold(self, count, threshold=None):
+ """Evaluate if count / the number of pool targets >= threshold
+ Used to implement consensus
+ """
threshold = threshold or self.threshold
-
- return self._percentage(
- count, len(self.pool.targets)) >= Decimal(threshold)
+ perc = self._percentage(count, len(self.pool.targets))
+ return perc >= Decimal(threshold)
@staticmethod
def _get_sorted_serials(pool_manager_statuses, descending=False):
@@ -548,6 +557,10 @@ class Service(service.RPCService, coordination.CoordinationMixin,
return self._get_sorted_serials(pool_manager_statuses, descending=True)
def _is_consensus(self, context, domain, action, status, threshold=None):
+ """Fetch zone status across all nameservers through MiniDNS and compare
+ it with the expected `status`
+ :return: consensus reached (bool)
+ """
status_count = 0
pool_manager_statuses = self._retrieve_statuses(
context, domain, action)
@@ -589,6 +602,9 @@ class Service(service.RPCService, coordination.CoordinationMixin,
# value the nameserver
@staticmethod
def _build_status_object(nameserver, domain, action):
+ """
+ :return: :class:`objects.PoolManagerStatus`
+ """
values = {
'nameserver_id': nameserver.id,
'domain_id': domain.id,
@@ -623,6 +639,10 @@ class Service(service.RPCService, coordination.CoordinationMixin,
pass
def _retrieve_from_mdns(self, context, nameserver, domain, action):
+ """Instruct MiniDNS to get a zone serial number from a nameserver
+ Set error status if the zone is unexpectedly missing.
+ :return: :class:`objects.PoolManagerStatus` or None
+ """
try:
(status, actual_serial, retries) = \
self.mdns_api.get_serial_number(
@@ -641,16 +661,16 @@ class Service(service.RPCService, coordination.CoordinationMixin,
if status == NO_DOMAIN_STATUS:
if action == CREATE_ACTION:
- pool_manager_status.status = 'ERROR'
+ pool_manager_status.status = ERROR_STATUS
elif action == DELETE_ACTION:
- pool_manager_status.status = 'SUCCESS'
+ pool_manager_status.status = SUCCESS_STATUS
elif action == UPDATE_ACTION:
- pool_manager_status.action = 'CREATE'
- pool_manager_status.status = 'ERROR'
+ pool_manager_status.action = CREATE_ACTION
+ pool_manager_status.status = ERROR_STATUS
else:
pool_manager_status.status = status
- pool_manager_status.serial_number = actual_serial \
- if actual_serial is not None else 0
+
+ pool_manager_status.serial_number = actual_serial or 0
LOG.debug('Retrieved status %s and serial %s for domain %s '
'on nameserver %s with action %s from mdns.' %
(pool_manager_status.status,
@@ -661,6 +681,11 @@ class Service(service.RPCService, coordination.CoordinationMixin,
return pool_manager_status
def _retrieve_statuses(self, context, domain, action):
+ """Instruct MiniDNS to get a zone serial number from all nameservers,
+ unless a cached value is available.
+ Set error status if the zone is unexpectedly missing.
+ :return: list of :class:`objects.PoolManagerStatus`
+ """
pool_manager_statuses = []
for nameserver in self.pool.nameservers:
try:
diff --git a/designate/tests/test_pool_manager/test_service.py b/designate/tests/test_pool_manager/test_service.py
index 5dee34d9..e7f7f514 100644
--- a/designate/tests/test_pool_manager/test_service.py
+++ b/designate/tests/test_pool_manager/test_service.py
@@ -13,9 +13,12 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+import uuid
+
import oslo_messaging as messaging
from oslo_config import cfg
from mock import call
+from mock import Mock
from mock import patch
from designate import exceptions
@@ -101,9 +104,10 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
self.cache = self.service.cache
@staticmethod
- def _build_domain(name, action, status):
+ def _build_domain(name, action, status, id=None):
+ zid = id or '75ea1626-eea7-46b5-acb7-41e5897c2d40'
values = {
- 'id': '75ea1626-eea7-46b5-acb7-41e5897c2d40',
+ 'id': zid,
'name': name,
'pool_id': '794ccc2c-d751-44fe-b57f-8894c9f5c842',
'action': action,
@@ -112,6 +116,13 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
}
return objects.Domain.from_dict(values)
+ def _build_domains(self, n, action, status):
+ return [
+ self._build_domain("zone%02X.example" % cnt, action,
+ status, id=str(uuid.uuid4()))
+ for cnt in range(n)
+ ]
+
@patch.object(mdns_rpcapi.MdnsAPI, 'get_serial_number',
side_effect=messaging.MessagingException)
@patch.object(mdns_rpcapi.MdnsAPI, 'poll_for_serial_number')
@@ -417,3 +428,92 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
mock_update_status.assert_called_once_with(
self.admin_context, domain.id, 'ERROR', 0)
+
+ @patch.object(central_rpcapi.CentralAPI, 'find_domains')
+ def test_periodic_sync_not_leader(self, mock_find_domains):
+ self.service._update_domain_on_target = Mock(return_value=False)
+ self.service._pool_election = Mock()
+ self.service._pool_election.is_leader = False
+ self.service.update_domain = Mock()
+
+ self.service.periodic_sync()
+ self.assertFalse(mock_find_domains.called)
+
+ @patch.object(central_rpcapi.CentralAPI, 'update_status')
+ def test_update_domain_no_consensus(self, mock_cent_update_status):
+ zone = self._build_domain('example.org.', 'UPDATE', 'PENDING')
+ self.service._update_domain_on_target = Mock(return_value=True)
+ self.service._exceed_or_meet_threshold = Mock(return_value=False)
+
+ ret = self.service.update_domain(self.admin_context, zone)
+ self.assertFalse(ret)
+
+ self.assertEqual(2, self.service._update_domain_on_target.call_count)
+ self.assertEqual(1, mock_cent_update_status.call_count)
+
+ @patch.object(mdns_rpcapi.MdnsAPI, 'poll_for_serial_number')
+ def test_update_domain(self, mock_mdns_poll):
+ zone = self._build_domain('example.org.', 'UPDATE', 'PENDING')
+ self.service._update_domain_on_target = Mock(return_value=True)
+ self.service._update_domain_on_also_notify = Mock()
+ self.service.pool.also_notifies = ['bogus']
+ self.service._exceed_or_meet_threshold = Mock(return_value=True)
+
+ # cache.retrieve will throw exceptions.PoolManagerStatusNotFound
+ # mdns_api.poll_for_serial_number will be called twice
+ ret = self.service.update_domain(self.admin_context, zone)
+ self.assertTrue(ret)
+
+ self.assertEqual(2, self.service._update_domain_on_target.call_count)
+ self.assertEqual(1, self.service._update_domain_on_also_notify.call_count) # noqa
+ self.assertEqual(2, mock_mdns_poll.call_count)
+
+ @patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
+ @patch.object(central_rpcapi.CentralAPI, 'update_status')
+ @patch.object(central_rpcapi.CentralAPI, 'find_domains')
+ def test_periodic_sync(self, mock_find_domains,
+ mock_cent_update_status, *a):
+ self.service.update_domain = Mock()
+ mock_find_domains.return_value = self._build_domains(2, 'UPDATE',
+ 'PENDING')
+ self.service.periodic_sync()
+
+ self.assertEqual(1, mock_find_domains.call_count)
+ criterion = mock_find_domains.call_args_list[0][0][1]
+ self.assertEqual('!ERROR', criterion['status'])
+ self.assertEqual(2, self.service.update_domain.call_count)
+ self.assertEqual(0, mock_cent_update_status.call_count)
+
+ @patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
+ @patch.object(central_rpcapi.CentralAPI, 'update_status')
+ @patch.object(central_rpcapi.CentralAPI, 'find_domains')
+ def test_periodic_sync_with_failing_update(self, mock_find_domains,
+ mock_cent_update_status, *a):
+ self.service.update_domain = Mock(return_value=False) # fail update
+ mock_find_domains.return_value = self._build_domains(3, 'UPDATE',
+ 'PENDING')
+ self.service.periodic_sync()
+
+ self.assertEqual(1, mock_find_domains.call_count)
+ criterion = mock_find_domains.call_args_list[0][0][1]
+ self.assertEqual('!ERROR', criterion['status'])
+ # all zones are now in ERROR status
+ self.assertEqual(3, self.service.update_domain.call_count)
+ self.assertEqual(3, mock_cent_update_status.call_count)
+
+ @patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
+ @patch.object(central_rpcapi.CentralAPI, 'update_status')
+ @patch.object(central_rpcapi.CentralAPI, 'find_domains')
+ def test_periodic_sync_with_failing_update_with_exception(
+ self, mock_find_domains, mock_cent_update_status, *a):
+ self.service.update_domain = Mock(side_effect=Exception)
+ mock_find_domains.return_value = self._build_domains(3, 'UPDATE',
+ 'PENDING')
+ self.service.periodic_sync()
+
+ self.assertEqual(1, mock_find_domains.call_count)
+ criterion = mock_find_domains.call_args_list[0][0][1]
+ self.assertEqual('!ERROR', criterion['status'])
+ # the first updated zone is now in ERROR status
+ self.assertEqual(1, self.service.update_domain.call_count)
+ self.assertEqual(1, mock_cent_update_status.call_count)
diff --git a/designate/tests/unit/test_pool_manager/test_service.py b/designate/tests/unit/test_pool_manager/test_service.py
new file mode 100644
index 00000000..4432737e
--- /dev/null
+++ b/designate/tests/unit/test_pool_manager/test_service.py
@@ -0,0 +1,58 @@
+# Copyright 2015 Hewlett-Packard Development Company, L.P.
+#
+# Author: Federico Ceratto <federico.ceratto@hpe.com>
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from mock import Mock
+from mock import MagicMock
+from mock import patch
+from oslotest import base as test
+
+from designate import exceptions
+from designate import objects
+from designate.pool_manager.service import Service
+
+
+class PoolManagerTest(test.BaseTestCase):
+ def __setUp(self):
+ super(PoolManagerTest, self).setUp()
+
+ def test_init_no_pool_targets(self):
+ with patch.object(objects.Pool, 'from_config',
+ return_value=MagicMock()):
+ self.assertRaises(exceptions.NoPoolTargetsConfigured, Service)
+
+ def test_init(self):
+ with patch.object(objects.Pool, 'from_config',
+ return_value=Mock()):
+ Service._setup_target_backends = Mock()
+ Service()
+
+ def test_start(self):
+ with patch.object(objects.Pool, 'from_config',
+ return_value=Mock()):
+ Service._setup_target_backends = Mock()
+ pm = Service()
+ pm.pool.targets = ()
+ pm.tg.add_timer = Mock()
+ pm._pool_election = Mock()
+ with patch("designate.service.RPCService.start"):
+ pm.start()
+
+ call1 = pm.tg.add_timer.call_args_list[0][0]
+ self.assertEqual(120, call1[0])
+ self.assertEqual(120, call1[-1])
+ call2 = pm.tg.add_timer.call_args_list[1][0]
+ self.assertEqual(1800, call2[0])
+ self.assertEqual(1800, call2[-1])