summaryrefslogtreecommitdiff
path: root/taskflow/persistence
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2016-01-09 01:27:54 +0000
committerGerrit Code Review <review@openstack.org>2016-01-09 01:27:54 +0000
commit8bd70574db1233dcc612ec2c899b998060bb4a26 (patch)
treec6a6be41340edc1c1baf5d8f12cda1d6ef1104f6 /taskflow/persistence
parenta741aa2d76d6c75e86e4668bf133dea671c03d92 (diff)
parent25dca8eb593caf821b29a27f120bdcbd0e223024 (diff)
downloadtaskflow-8bd70574db1233dcc612ec2c899b998060bb4a26.tar.gz
Merge "Use the retrying lib. to do basic sqlalchemy engine validation"
Diffstat (limited to 'taskflow/persistence')
-rw-r--r--taskflow/persistence/backends/impl_sqlalchemy.py88
1 files changed, 41 insertions, 47 deletions
diff --git a/taskflow/persistence/backends/impl_sqlalchemy.py b/taskflow/persistence/backends/impl_sqlalchemy.py
index e223e02..8016b38 100644
--- a/taskflow/persistence/backends/impl_sqlalchemy.py
+++ b/taskflow/persistence/backends/impl_sqlalchemy.py
@@ -23,6 +23,7 @@ import functools
import time
from oslo_utils import strutils
+import retrying
import six
import sqlalchemy as sa
from sqlalchemy import exc as sa_exc
@@ -35,7 +36,6 @@ from taskflow.persistence.backends.sqlalchemy import migration
from taskflow.persistence.backends.sqlalchemy import tables
from taskflow.persistence import base
from taskflow.persistence import models
-from taskflow.types import failure
from taskflow.utils import eventlet_utils
from taskflow.utils import misc
@@ -246,6 +246,10 @@ class SQLAlchemyBackend(base.Backend):
self._engine = self._create_engine(self._conf)
self._owns_engine = True
self._validated = False
+ try:
+ self._max_retries = misc.as_int(self._conf.get('max_retries'))
+ except TypeError:
+ self._max_retries = 0
@staticmethod
def _create_engine(conf):
@@ -326,11 +330,7 @@ class SQLAlchemyBackend(base.Backend):
def get_connection(self):
conn = Connection(self)
if not self._validated:
- try:
- max_retries = misc.as_int(self._conf.get('max_retries', None))
- except TypeError:
- max_retries = 0
- conn.validate(max_retries=max_retries)
+ conn.validate(max_retries=self._max_retries)
self._validated = True
return conn
@@ -356,47 +356,41 @@ class Connection(base.Connection):
return self._backend
def validate(self, max_retries=0):
-
- def verify_connect(failures):
- try:
- # See if we can make a connection happen.
- #
- # NOTE(harlowja): note that even though we are connecting
- # once it does not mean that we will be able to connect in
- # the future, so this is more of a sanity test and is not
- # complete connection insurance.
- with contextlib.closing(self._engine.connect()):
- pass
- except sa_exc.OperationalError as ex:
- if _is_db_connection_error(six.text_type(ex.args[0])):
- failures.append(failure.Failure())
- return False
- return True
-
- failures = []
- if verify_connect(failures):
- return
-
- # Sorry it didn't work out...
- if max_retries <= 0:
- failures[-1].reraise()
-
- # Go through the exponential backoff loop and see if we can connect
- # after a given number of backoffs (with a backoff sleeping period
- # between each attempt)...
- attempts_left = max_retries
- for sleepy_secs in misc.ExponentialBackoff(max_retries):
- LOG.warn("SQL connection failed due to '%s', %s attempts left.",
- failures[-1].exc, attempts_left)
- LOG.info("Attempting to test the connection again in %s seconds.",
- sleepy_secs)
- time.sleep(sleepy_secs)
- if verify_connect(failures):
- return
- attempts_left -= 1
-
- # Sorry it didn't work out...
- failures[-1].reraise()
+ """Performs basic **connection** validation of a sqlalchemy engine."""
+
+ def _retry_on_exception(exc):
+ LOG.warn("Engine connection (validate) failed due to '%s'", exc)
+ if isinstance(exc, sa_exc.OperationalError) and \
+ _is_db_connection_error(six.text_type(exc.args[0])):
+ # We may be able to fix this by retrying...
+ return True
+ if isinstance(exc, (sa_exc.TimeoutError,
+ sa_exc.ResourceClosedError,
+ sa_exc.DisconnectionError)):
+ # We may be able to fix this by retrying...
+ return True
+ # Other failures we likely can't fix by retrying...
+ return False
+
+ @retrying.retry(stop_max_attempt_number=max(0, int(max_retries)),
+ # Ensure that the 2 ** retry number
+ # is converted into milliseconds (thus why this
+ # multiplies by 1000.0) because thats what retrying
+ # lib. uses internally for whatever reason.
+ wait_exponential_multiplier=1000.0,
+ wrap_exception=False,
+ retry_on_exception=_retry_on_exception)
+ def _try_connect(engine):
+ # See if we can make a connection happen.
+ #
+ # NOTE(harlowja): note that even though we are connecting
+ # once it does not mean that we will be able to connect in
+ # the future, so this is more of a sanity test and is not
+ # complete connection insurance.
+ with contextlib.closing(engine.connect()):
+ pass
+
+ _try_connect(self._engine)
def upgrade(self):
try: