From c32454213e5aa1b6cda770f56ddef18f06c2c3c2 Mon Sep 17 00:00:00 2001 From: Ann Taraday Date: Fri, 31 Jul 2020 16:58:23 +0400 Subject: Avoid endless loop on StorageFailure If an error occures with writing atom detail in database( persistence backend) flow execution enters an endless loop throwing errors and retrying to save details. Avoid this situation and log exception message. Change-Id: Ic6b0a78d20124cc027468ecc6aeff189c25d1a8a Closes-bug: 1889773 --- .../fix-endless-loop-on-storage-error-dd4467f0bbc66abf.yaml | 7 +++++++ taskflow/storage.py | 11 +++++++++++ 2 files changed, 18 insertions(+) create mode 100644 releasenotes/notes/fix-endless-loop-on-storage-error-dd4467f0bbc66abf.yaml diff --git a/releasenotes/notes/fix-endless-loop-on-storage-error-dd4467f0bbc66abf.yaml b/releasenotes/notes/fix-endless-loop-on-storage-error-dd4467f0bbc66abf.yaml new file mode 100644 index 0000000..35a6ca4 --- /dev/null +++ b/releasenotes/notes/fix-endless-loop-on-storage-error-dd4467f0bbc66abf.yaml @@ -0,0 +1,7 @@ +--- +fixes: + - | + Limit retries for storage failures on saving flow/task state in the storage. + Previously on StorageFailure exception may cause an endless loop during + execution of flows throwing errors and retrying to save details. + diff --git a/taskflow/storage.py b/taskflow/storage.py index 76d059b..7ddd7ef 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -21,6 +21,7 @@ import fasteners from oslo_utils import reflection from oslo_utils import uuidutils import six +import tenacity from taskflow import exceptions from taskflow import logging @@ -33,6 +34,8 @@ from taskflow.utils import misc LOG = logging.getLogger(__name__) +RETRY_ATTEMPTS = 3 +RETRY_WAIT_TIMEOUT = 5 _EXECUTE_STATES_WITH_RESULTS = ( # The atom ``execute`` worked out :) @@ -449,6 +452,10 @@ class Storage(object): # This never changes (so no read locking needed). return self._backend + @tenacity.retry(retry=tenacity.retry_if_exception_type( + exception_types=exceptions.StorageFailure), + stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS), + wait=tenacity.wait_fixed(RETRY_WAIT_TIMEOUT)) def _save_flow_detail(self, conn, original_flow_detail, flow_detail): # NOTE(harlowja): we need to update our contained flow detail if # the result of the update actually added more (aka another process @@ -482,6 +489,10 @@ class Storage(object): else: return (ad, ad) + @tenacity.retry(retry=tenacity.retry_if_exception_type( + exception_types=exceptions.StorageFailure), + stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS), + wait=tenacity.wait_fixed(RETRY_WAIT_TIMEOUT)) def _save_atom_detail(self, conn, original_atom_detail, atom_detail): # NOTE(harlowja): we need to update our contained atom detail if # the result of the update actually added more (aka another process -- cgit v1.2.1