summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnn Taraday <akamyshnikova@mirantis.com>2020-07-31 16:58:23 +0400
committerAnn Taraday <akamyshnikova@mirantis.com>2020-08-24 13:16:54 +0000
commitc32454213e5aa1b6cda770f56ddef18f06c2c3c2 (patch)
treef700980a1e1397ad80a9543c41d75b37f6cd6e73
parentcf327a2e2d4e2c504b5080fbf7bd48421fe7b4c7 (diff)
downloadtaskflow-c32454213e5aa1b6cda770f56ddef18f06c2c3c2.tar.gz
Avoid endless loop on StorageFailure
If an error occures with writing atom detail in database( persistence backend) flow execution enters an endless loop throwing errors and retrying to save details. Avoid this situation and log exception message. Change-Id: Ic6b0a78d20124cc027468ecc6aeff189c25d1a8a Closes-bug: 1889773
-rw-r--r--releasenotes/notes/fix-endless-loop-on-storage-error-dd4467f0bbc66abf.yaml7
-rw-r--r--taskflow/storage.py11
2 files changed, 18 insertions, 0 deletions
diff --git a/releasenotes/notes/fix-endless-loop-on-storage-error-dd4467f0bbc66abf.yaml b/releasenotes/notes/fix-endless-loop-on-storage-error-dd4467f0bbc66abf.yaml
new file mode 100644
index 0000000..35a6ca4
--- /dev/null
+++ b/releasenotes/notes/fix-endless-loop-on-storage-error-dd4467f0bbc66abf.yaml
@@ -0,0 +1,7 @@
+---
+fixes:
+ - |
+ Limit retries for storage failures on saving flow/task state in the storage.
+ Previously on StorageFailure exception may cause an endless loop during
+ execution of flows throwing errors and retrying to save details.
+
diff --git a/taskflow/storage.py b/taskflow/storage.py
index 76d059b..7ddd7ef 100644
--- a/taskflow/storage.py
+++ b/taskflow/storage.py
@@ -21,6 +21,7 @@ import fasteners
from oslo_utils import reflection
from oslo_utils import uuidutils
import six
+import tenacity
from taskflow import exceptions
from taskflow import logging
@@ -33,6 +34,8 @@ from taskflow.utils import misc
LOG = logging.getLogger(__name__)
+RETRY_ATTEMPTS = 3
+RETRY_WAIT_TIMEOUT = 5
_EXECUTE_STATES_WITH_RESULTS = (
# The atom ``execute`` worked out :)
@@ -449,6 +452,10 @@ class Storage(object):
# This never changes (so no read locking needed).
return self._backend
+ @tenacity.retry(retry=tenacity.retry_if_exception_type(
+ exception_types=exceptions.StorageFailure),
+ stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS),
+ wait=tenacity.wait_fixed(RETRY_WAIT_TIMEOUT))
def _save_flow_detail(self, conn, original_flow_detail, flow_detail):
# NOTE(harlowja): we need to update our contained flow detail if
# the result of the update actually added more (aka another process
@@ -482,6 +489,10 @@ class Storage(object):
else:
return (ad, ad)
+ @tenacity.retry(retry=tenacity.retry_if_exception_type(
+ exception_types=exceptions.StorageFailure),
+ stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS),
+ wait=tenacity.wait_fixed(RETRY_WAIT_TIMEOUT))
def _save_atom_detail(self, conn, original_atom_detail, atom_detail):
# NOTE(harlowja): we need to update our contained atom detail if
# the result of the update actually added more (aka another process