From 721184fc6b98b5dc89b5dddcb3a848fa3d963b11 Mon Sep 17 00:00:00 2001 From: Erik Olof Gunnar Andersson Date: Mon, 11 Apr 2022 18:39:17 -0700 Subject: Validate worker actions before retrying poll This adds a check to validate if we need to keep retrying the current action. Change-Id: I4b2991499f33e65790388aea902a8b3e6023eb4e (cherry picked from commit dc45a03563bc075242b4d27dfbc6d340166c1c0c) --- designate/tests/unit/workers/test_base_task.py | 88 ++++++++++++++++++++++++++ designate/worker/tasks/base.py | 48 ++++++++++++++ designate/worker/tasks/zone.py | 30 +++++---- 3 files changed, 155 insertions(+), 11 deletions(-) diff --git a/designate/tests/unit/workers/test_base_task.py b/designate/tests/unit/workers/test_base_task.py index 9b37b52e..f419f4f4 100644 --- a/designate/tests/unit/workers/test_base_task.py +++ b/designate/tests/unit/workers/test_base_task.py @@ -14,17 +14,105 @@ # License for the specific language governing permissions and limitations # under the License.mport threading import oslotest.base +from unittest import mock +from designate import exceptions +from designate import objects from designate.worker.tasks import base class TestTask(oslotest.base.BaseTestCase): def setUp(self): super(TestTask, self).setUp() + self.context = mock.Mock() self.task = base.Task(None) + self.storage = self.task._storage = mock.Mock() def test_constructor(self): self.assertTrue(self.task) def test_call(self): self.assertRaises(NotImplementedError, self.task) + + def test_current_action_is_valid(self): + self.storage.get_zone = mock.Mock( + return_value=objects.Zone(action='UPDATE') + ) + self.assertTrue( + self.task.is_current_action_valid( + self.context, 'UPDATE', objects.Zone(action='UPDATE')) + ) + + self.storage.get_zone = mock.Mock( + return_value=objects.Zone(action='CREATE') + ) + self.assertTrue( + self.task.is_current_action_valid( + self.context, 'CREATE', objects.Zone(action='CREATE')) + ) + + self.storage.get_zone = mock.Mock( + return_value=objects.Zone(action='UPDATE') + ) + self.assertTrue( + self.task.is_current_action_valid( + self.context, 'CREATE', objects.Zone(action='CREATE')) + ) + + self.storage.get_zone = mock.Mock( + return_value=objects.Zone(action='DELETE') + ) + self.assertTrue( + self.task.is_current_action_valid( + self.context, 'DELETE', objects.Zone(action='DELETE')) + ) + + def test_current_action_delete_always_valid(self): + self.assertTrue( + self.task.is_current_action_valid( + self.context, 'DELETE', None) + ) + + def test_current_action_bad_storage_always_valid(self): + self.storage.get_zone = mock.Mock( + side_effect=exceptions.DesignateException() + ) + self.assertTrue( + self.task.is_current_action_valid( + self.context, 'CREATE', objects.Zone(action='CREATE')) + ) + + def test_current_action_is_not_valid_none(self): + self.storage.get_zone = mock.Mock( + return_value=objects.Zone(action='NONE') + ) + self.assertFalse( + self.task.is_current_action_valid( + self.context, 'UPDATE', objects.Zone(action='UPDATE')) + ) + + def test_current_action_is_not_valid_deleted(self): + self.storage.get_zone = mock.Mock( + return_value=objects.Zone(action='DELETE') + ) + self.assertFalse( + self.task.is_current_action_valid( + self.context, 'UPDATE', objects.Zone(action='UPDATE')) + ) + + def test_current_action_is_not_found(self): + self.storage.get_zone = mock.Mock( + side_effect=exceptions.ZoneNotFound() + ) + self.assertTrue( + self.task.is_current_action_valid( + self.context, 'CREATE', objects.Zone(action='CREATE')) + ) + + self.storage.get_zone = mock.Mock( + side_effect=exceptions.ZoneNotFound() + ) + self.assertFalse( + self.task.is_current_action_valid( + self.context, 'UPDATE', objects.Zone(action='UPDATE')) + ) diff --git a/designate/worker/tasks/base.py b/designate/worker/tasks/base.py index b6959391..5c3c8294 100644 --- a/designate/worker/tasks/base.py +++ b/designate/worker/tasks/base.py @@ -18,6 +18,7 @@ from oslo_config import cfg from oslo_log import log as logging from designate.central import rpcapi as central_rpcapi +from designate import exceptions from designate import quota from designate import storage from designate import utils @@ -139,5 +140,52 @@ class Task(TaskConfig): self._worker_api = worker_rpcapi.WorkerAPI.get_instance() return self._worker_api + def is_current_action_valid(self, context, action, zone): + """Is our current action still valid?""" + + # We always allow for DELETE operations. + if action == 'DELETE': + return True + + try: + zone = self.storage.get_zone(context, zone.id) + + # If the zone is either in a DELETE or NONE state, + # we don't need to continue with the current action. + if zone.action in ['DELETE', 'NONE']: + LOG.info( + 'Failed to %(action)s zone_name=%(zone_name)s ' + 'zone_id=%(zone_id)s action state has changed ' + 'to %(current_action)s, not retrying action', + { + 'action': action, + 'zone_name': zone.name, + 'zone_id': zone.id, + 'current_action': zone.action, + } + ) + return False + except exceptions.ZoneNotFound: + if action != 'CREATE': + LOG.info( + 'Failed to %(action)s zone_name=%(zone_name)s ' + 'zone_id=%(zone_id)s Error=ZoneNotFound', + { + 'action': action, + 'zone_name': zone.name, + 'zone_id': zone.id, + } + ) + return False + except Exception as e: + LOG.warning( + 'Error trying to get zone action. Error=%(error)s', + { + 'error': str(e), + } + ) + + return True + def __call__(self): raise NotImplementedError diff --git a/designate/worker/tasks/zone.py b/designate/worker/tasks/zone.py index 3189de27..906a16c7 100644 --- a/designate/worker/tasks/zone.py +++ b/designate/worker/tasks/zone.py @@ -88,17 +88,21 @@ class ZoneActionOnTarget(base.Task): self.action, self.zone.name, self.target) return True except Exception as e: - LOG.info('Failed to %(action)s zone %(zone)s on ' - 'target %(target)s on attempt %(attempt)d, ' - 'Error: %(error)s.', - { - 'action': self.action, - 'zone': self.zone.name, - 'target': self.target.id, - 'attempt': retry + 1, - 'error': str(e) - }) - time.sleep(self.retry_interval) + LOG.info( + 'Failed to %(action)s zone_name=%(zone_name)s ' + 'zone_id=%(zone_id)s on target=%(target)s on ' + 'attempt=%(attempt)d Error=%(error)s', + { + 'action': self.action, + 'zone_name': self.zone.name, + 'zone_id': self.zone.id, + 'target': self.target, + 'attempt': retry + 1, + 'error': str(e), + } + ) + + time.sleep(self.retry_interval) return False @@ -404,6 +408,10 @@ class ZonePoller(base.Task, ThresholdMixin): {'zone': self.zone.name, 'n': retry + 1}) time.sleep(retry_interval) + if not self.is_current_action_valid(self.context, self.zone.action, + self.zone): + break + return query_result def _on_failure(self, error_status): -- cgit v1.2.1