diff options
author | Jenkins <jenkins@review.openstack.org> | 2016-01-04 16:00:26 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2016-01-04 16:00:26 +0000 |
commit | 94d0f00b09044fcef36604a9c81accb0907c9705 (patch) | |
tree | ed924b72821fa65b4f9d6829e9a6e624091e83dd | |
parent | ffd80ec5792fe42d6a9680c0f3fa785eab82dc3e (diff) | |
parent | b3235bdb74d32f57c43eb884f4ef3cd27cf9c457 (diff) | |
download | designate-94d0f00b09044fcef36604a9c81accb0907c9705.tar.gz |
Merge "Set zone in ERROR status on periodic sync fail" into stable/liberty
-rw-r--r-- | designate/pool_manager/service.py | 191 | ||||
-rw-r--r-- | designate/tests/test_pool_manager/test_service.py | 104 | ||||
-rw-r--r-- | designate/tests/unit/test_pool_manager/test_service.py | 58 |
3 files changed, 268 insertions, 85 deletions
diff --git a/designate/pool_manager/service.py b/designate/pool_manager/service.py index f9f92814..0d210703 100644 --- a/designate/pool_manager/service.py +++ b/designate/pool_manager/service.py @@ -137,18 +137,16 @@ class Service(service.RPCService, coordination.CoordinationMixin, self._pool_election.start() if CONF['service:pool_manager'].enable_recovery_timer: - LOG.info(_LI('Starting periodic recovery timer')) - self.tg.add_timer( - CONF['service:pool_manager'].periodic_recovery_interval, - self.periodic_recovery, - CONF['service:pool_manager'].periodic_recovery_interval) + interval = CONF['service:pool_manager'].periodic_recovery_interval + LOG.info(_LI('Starting periodic recovery timer every' + ' %(interval)s s') % {'interval': interval}) + self.tg.add_timer(interval, self.periodic_recovery, interval) if CONF['service:pool_manager'].enable_sync_timer: - LOG.info(_LI('Starting periodic synchronization timer')) - self.tg.add_timer( - CONF['service:pool_manager'].periodic_sync_interval, - self.periodic_sync, - CONF['service:pool_manager'].periodic_sync_interval) + interval = CONF['service:pool_manager'].periodic_sync_interval + LOG.info(_LI('Starting periodic synchronization timer every' + ' %(interval)s s') % {'interval': interval}) + self.tg.add_timer(interval, self.periodic_sync, interval) def stop(self): self._pool_election.stop() @@ -169,72 +167,81 @@ class Service(service.RPCService, coordination.CoordinationMixin, # Periodioc Tasks def periodic_recovery(self): """ + Runs only on the pool leader :return: None """ # NOTE(kiall): Only run this periodic task on the pool leader - if self._pool_election.is_leader: - context = DesignateContext.get_admin_context(all_tenants=True) + if not self._pool_election.is_leader: + return - LOG.debug("Starting Periodic Recovery") + context = DesignateContext.get_admin_context(all_tenants=True) + LOG.debug("Starting Periodic Recovery") - try: - # Handle Deletion Failures - domains = self._get_failed_domains(context, DELETE_ACTION) + try: + # Handle Deletion Failures + domains = self._get_failed_domains(context, DELETE_ACTION) - for domain in domains: - self.delete_domain(context, domain) + for domain in domains: + self.delete_domain(context, domain) - # Handle Creation Failures - domains = self._get_failed_domains(context, CREATE_ACTION) + # Handle Creation Failures + domains = self._get_failed_domains(context, CREATE_ACTION) - for domain in domains: - self.create_domain(context, domain) + for domain in domains: + self.create_domain(context, domain) - # Handle Update Failures - domains = self._get_failed_domains(context, UPDATE_ACTION) + # Handle Update Failures + domains = self._get_failed_domains(context, UPDATE_ACTION) - for domain in domains: - self.update_domain(context, domain) + for domain in domains: + self.update_domain(context, domain) - except Exception: - LOG.exception(_LE('An unhandled exception in periodic ' - 'recovery occurred')) + except Exception: + LOG.exception(_LE('An unhandled exception in periodic ' + 'recovery occurred')) def periodic_sync(self): - """ + """Periodically sync all the zones that are not in ERROR status + Runs only on the pool leader :return: None """ - # NOTE(kiall): Only run this periodic task on the pool leader - if self._pool_election.is_leader: - context = DesignateContext.get_admin_context(all_tenants=True) + if not self._pool_election.is_leader: + return - LOG.debug("Starting Periodic Synchronization") + context = DesignateContext.get_admin_context(all_tenants=True) - criterion = { - 'pool_id': CONF['service:pool_manager'].pool_id, - 'status': '!%s' % ERROR_STATUS - } + LOG.debug("Starting Periodic Synchronization") - periodic_sync_seconds = \ - CONF['service:pool_manager'].periodic_sync_seconds + criterion = { + 'pool_id': CONF['service:pool_manager'].pool_id, + 'status': '!%s' % ERROR_STATUS + } - if periodic_sync_seconds is not None: - # Generate the current serial, will provide a UTC Unix TS. - current = utils.increment_serial() - criterion['serial'] = ">%s" % (current - periodic_sync_seconds) + periodic_sync_seconds = \ + CONF['service:pool_manager'].periodic_sync_seconds - domains = self.central_api.find_domains(context, criterion) + if periodic_sync_seconds is not None: + # Generate the current serial, will provide a UTC Unix TS. + current = utils.increment_serial() + criterion['serial'] = ">%s" % (current - periodic_sync_seconds) - try: - for domain in domains: - # TODO(kiall): If the domain was created within the last - # periodic_sync_seconds, attempt to recreate - # to fill in targets which may have failed. - self.update_domain(context, domain) + domains = self.central_api.find_domains(context, criterion) - except Exception: - LOG.exception(_LE('An unhandled exception in periodic ' - 'synchronization occurred.')) + try: + for domain in domains: + # TODO(kiall): If the zone was created within the last + # periodic_sync_seconds, attempt to recreate + # to fill in targets which may have failed. + success = self.update_domain(context, domain) + if not success: + self.central_api.update_status( + context, domain.id, ERROR_STATUS, domain.serial) + + except Exception: + LOG.exception(_LE('An unhandled exception in periodic ' + 'synchronization occurred.')) + self.central_api.update_status(context, domain.id, ERROR_STATUS, + domain.serial) # Standard Create/Update/Delete Methods @@ -303,32 +310,30 @@ class Service(service.RPCService, coordination.CoordinationMixin, return False def update_domain(self, context, domain): - """ + """Update a domain across every pool target, check for consensus and + for propagation. :param context: Security context information. :param domain: Domain to be updated - :return: None + :return: consensus reached (bool) """ LOG.info(_LI("Updating domain %s"), domain.name) - results = [] - # Update the domain on each of the Pool Targets + success_count = 0 for target in self.pool.targets: - results.append( - self._update_domain_on_target(context, target, domain)) - - if self._exceed_or_meet_threshold(results.count(True)): - LOG.debug('Consensus reached for updating domain %(domain)s ' - 'on pool targets' % {'domain': domain.name}) - - else: - LOG.warn(_LW('Consensus not reached for updating domain %(domain)s' - ' on pool targets') % {'domain': domain.name}) - - self.central_api.update_status( - context, domain.id, ERROR_STATUS, domain.serial) + ok_status = self._update_domain_on_target(context, target, domain) + if ok_status: + success_count += 1 + + if not self._exceed_or_meet_threshold(success_count): + LOG.warn(_LW('Consensus not reached for updating zone %(zone)s' + ' on pool targets') % {'zone': domain.name}) + self.central_api.update_status(context, domain.id, ERROR_STATUS, + domain.serial) + return False - return + LOG.debug('Consensus reached for updating zone %(zone)s ' + 'on pool targets' % {'zone': domain.name}) # Send a NOTIFY to each also-notifies for also_notify in self.pool.also_notifies: @@ -349,8 +354,10 @@ class Service(service.RPCService, coordination.CoordinationMixin, context, domain, nameserver, self.timeout, self.retry_interval, self.max_retries, self.delay) + return True + def _update_domain_on_target(self, context, target, domain): - """ + """Instruct the appropriate backend to update a zone on a target :param context: Security context information. :param target: Target to update Domain on :param domain: Domain to be updated @@ -515,7 +522,7 @@ class Service(service.RPCService, coordination.CoordinationMixin, criterion = { 'pool_id': CONF['service:pool_manager'].pool_id, 'action': action, - 'status': 'ERROR' + 'status': ERROR_STATUS } return self.central_api.find_domains(context, criterion) @@ -528,10 +535,12 @@ class Service(service.RPCService, coordination.CoordinationMixin, return (Decimal(count) / Decimal(total_count)) * Decimal(100) def _exceed_or_meet_threshold(self, count, threshold=None): + """Evaluate if count / the number of pool targets >= threshold + Used to implement consensus + """ threshold = threshold or self.threshold - - return self._percentage( - count, len(self.pool.targets)) >= Decimal(threshold) + perc = self._percentage(count, len(self.pool.targets)) + return perc >= Decimal(threshold) @staticmethod def _get_sorted_serials(pool_manager_statuses, descending=False): @@ -548,6 +557,10 @@ class Service(service.RPCService, coordination.CoordinationMixin, return self._get_sorted_serials(pool_manager_statuses, descending=True) def _is_consensus(self, context, domain, action, status, threshold=None): + """Fetch zone status across all nameservers through MiniDNS and compare + it with the expected `status` + :return: consensus reached (bool) + """ status_count = 0 pool_manager_statuses = self._retrieve_statuses( context, domain, action) @@ -589,6 +602,9 @@ class Service(service.RPCService, coordination.CoordinationMixin, # value the nameserver @staticmethod def _build_status_object(nameserver, domain, action): + """ + :return: :class:`objects.PoolManagerStatus` + """ values = { 'nameserver_id': nameserver.id, 'domain_id': domain.id, @@ -623,6 +639,10 @@ class Service(service.RPCService, coordination.CoordinationMixin, pass def _retrieve_from_mdns(self, context, nameserver, domain, action): + """Instruct MiniDNS to get a zone serial number from a nameserver + Set error status if the zone is unexpectedly missing. + :return: :class:`objects.PoolManagerStatus` or None + """ try: (status, actual_serial, retries) = \ self.mdns_api.get_serial_number( @@ -641,16 +661,16 @@ class Service(service.RPCService, coordination.CoordinationMixin, if status == NO_DOMAIN_STATUS: if action == CREATE_ACTION: - pool_manager_status.status = 'ERROR' + pool_manager_status.status = ERROR_STATUS elif action == DELETE_ACTION: - pool_manager_status.status = 'SUCCESS' + pool_manager_status.status = SUCCESS_STATUS elif action == UPDATE_ACTION: - pool_manager_status.action = 'CREATE' - pool_manager_status.status = 'ERROR' + pool_manager_status.action = CREATE_ACTION + pool_manager_status.status = ERROR_STATUS else: pool_manager_status.status = status - pool_manager_status.serial_number = actual_serial \ - if actual_serial is not None else 0 + + pool_manager_status.serial_number = actual_serial or 0 LOG.debug('Retrieved status %s and serial %s for domain %s ' 'on nameserver %s with action %s from mdns.' % (pool_manager_status.status, @@ -661,6 +681,11 @@ class Service(service.RPCService, coordination.CoordinationMixin, return pool_manager_status def _retrieve_statuses(self, context, domain, action): + """Instruct MiniDNS to get a zone serial number from all nameservers, + unless a cached value is available. + Set error status if the zone is unexpectedly missing. + :return: list of :class:`objects.PoolManagerStatus` + """ pool_manager_statuses = [] for nameserver in self.pool.nameservers: try: diff --git a/designate/tests/test_pool_manager/test_service.py b/designate/tests/test_pool_manager/test_service.py index 5dee34d9..e7f7f514 100644 --- a/designate/tests/test_pool_manager/test_service.py +++ b/designate/tests/test_pool_manager/test_service.py @@ -13,9 +13,12 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +import uuid + import oslo_messaging as messaging from oslo_config import cfg from mock import call +from mock import Mock from mock import patch from designate import exceptions @@ -101,9 +104,10 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase): self.cache = self.service.cache @staticmethod - def _build_domain(name, action, status): + def _build_domain(name, action, status, id=None): + zid = id or '75ea1626-eea7-46b5-acb7-41e5897c2d40' values = { - 'id': '75ea1626-eea7-46b5-acb7-41e5897c2d40', + 'id': zid, 'name': name, 'pool_id': '794ccc2c-d751-44fe-b57f-8894c9f5c842', 'action': action, @@ -112,6 +116,13 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase): } return objects.Domain.from_dict(values) + def _build_domains(self, n, action, status): + return [ + self._build_domain("zone%02X.example" % cnt, action, + status, id=str(uuid.uuid4())) + for cnt in range(n) + ] + @patch.object(mdns_rpcapi.MdnsAPI, 'get_serial_number', side_effect=messaging.MessagingException) @patch.object(mdns_rpcapi.MdnsAPI, 'poll_for_serial_number') @@ -417,3 +428,92 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase): mock_update_status.assert_called_once_with( self.admin_context, domain.id, 'ERROR', 0) + + @patch.object(central_rpcapi.CentralAPI, 'find_domains') + def test_periodic_sync_not_leader(self, mock_find_domains): + self.service._update_domain_on_target = Mock(return_value=False) + self.service._pool_election = Mock() + self.service._pool_election.is_leader = False + self.service.update_domain = Mock() + + self.service.periodic_sync() + self.assertFalse(mock_find_domains.called) + + @patch.object(central_rpcapi.CentralAPI, 'update_status') + def test_update_domain_no_consensus(self, mock_cent_update_status): + zone = self._build_domain('example.org.', 'UPDATE', 'PENDING') + self.service._update_domain_on_target = Mock(return_value=True) + self.service._exceed_or_meet_threshold = Mock(return_value=False) + + ret = self.service.update_domain(self.admin_context, zone) + self.assertFalse(ret) + + self.assertEqual(2, self.service._update_domain_on_target.call_count) + self.assertEqual(1, mock_cent_update_status.call_count) + + @patch.object(mdns_rpcapi.MdnsAPI, 'poll_for_serial_number') + def test_update_domain(self, mock_mdns_poll): + zone = self._build_domain('example.org.', 'UPDATE', 'PENDING') + self.service._update_domain_on_target = Mock(return_value=True) + self.service._update_domain_on_also_notify = Mock() + self.service.pool.also_notifies = ['bogus'] + self.service._exceed_or_meet_threshold = Mock(return_value=True) + + # cache.retrieve will throw exceptions.PoolManagerStatusNotFound + # mdns_api.poll_for_serial_number will be called twice + ret = self.service.update_domain(self.admin_context, zone) + self.assertTrue(ret) + + self.assertEqual(2, self.service._update_domain_on_target.call_count) + self.assertEqual(1, self.service._update_domain_on_also_notify.call_count) # noqa + self.assertEqual(2, mock_mdns_poll.call_count) + + @patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed') + @patch.object(central_rpcapi.CentralAPI, 'update_status') + @patch.object(central_rpcapi.CentralAPI, 'find_domains') + def test_periodic_sync(self, mock_find_domains, + mock_cent_update_status, *a): + self.service.update_domain = Mock() + mock_find_domains.return_value = self._build_domains(2, 'UPDATE', + 'PENDING') + self.service.periodic_sync() + + self.assertEqual(1, mock_find_domains.call_count) + criterion = mock_find_domains.call_args_list[0][0][1] + self.assertEqual('!ERROR', criterion['status']) + self.assertEqual(2, self.service.update_domain.call_count) + self.assertEqual(0, mock_cent_update_status.call_count) + + @patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed') + @patch.object(central_rpcapi.CentralAPI, 'update_status') + @patch.object(central_rpcapi.CentralAPI, 'find_domains') + def test_periodic_sync_with_failing_update(self, mock_find_domains, + mock_cent_update_status, *a): + self.service.update_domain = Mock(return_value=False) # fail update + mock_find_domains.return_value = self._build_domains(3, 'UPDATE', + 'PENDING') + self.service.periodic_sync() + + self.assertEqual(1, mock_find_domains.call_count) + criterion = mock_find_domains.call_args_list[0][0][1] + self.assertEqual('!ERROR', criterion['status']) + # all zones are now in ERROR status + self.assertEqual(3, self.service.update_domain.call_count) + self.assertEqual(3, mock_cent_update_status.call_count) + + @patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed') + @patch.object(central_rpcapi.CentralAPI, 'update_status') + @patch.object(central_rpcapi.CentralAPI, 'find_domains') + def test_periodic_sync_with_failing_update_with_exception( + self, mock_find_domains, mock_cent_update_status, *a): + self.service.update_domain = Mock(side_effect=Exception) + mock_find_domains.return_value = self._build_domains(3, 'UPDATE', + 'PENDING') + self.service.periodic_sync() + + self.assertEqual(1, mock_find_domains.call_count) + criterion = mock_find_domains.call_args_list[0][0][1] + self.assertEqual('!ERROR', criterion['status']) + # the first updated zone is now in ERROR status + self.assertEqual(1, self.service.update_domain.call_count) + self.assertEqual(1, mock_cent_update_status.call_count) diff --git a/designate/tests/unit/test_pool_manager/test_service.py b/designate/tests/unit/test_pool_manager/test_service.py new file mode 100644 index 00000000..4432737e --- /dev/null +++ b/designate/tests/unit/test_pool_manager/test_service.py @@ -0,0 +1,58 @@ +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Author: Federico Ceratto <federico.ceratto@hpe.com> +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from mock import Mock +from mock import MagicMock +from mock import patch +from oslotest import base as test + +from designate import exceptions +from designate import objects +from designate.pool_manager.service import Service + + +class PoolManagerTest(test.BaseTestCase): + def __setUp(self): + super(PoolManagerTest, self).setUp() + + def test_init_no_pool_targets(self): + with patch.object(objects.Pool, 'from_config', + return_value=MagicMock()): + self.assertRaises(exceptions.NoPoolTargetsConfigured, Service) + + def test_init(self): + with patch.object(objects.Pool, 'from_config', + return_value=Mock()): + Service._setup_target_backends = Mock() + Service() + + def test_start(self): + with patch.object(objects.Pool, 'from_config', + return_value=Mock()): + Service._setup_target_backends = Mock() + pm = Service() + pm.pool.targets = () + pm.tg.add_timer = Mock() + pm._pool_election = Mock() + with patch("designate.service.RPCService.start"): + pm.start() + + call1 = pm.tg.add_timer.call_args_list[0][0] + self.assertEqual(120, call1[0]) + self.assertEqual(120, call1[-1]) + call2 = pm.tg.add_timer.call_args_list[1][0] + self.assertEqual(1800, call2[0]) + self.assertEqual(1800, call2[-1]) |