summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.openstack.org>2018-02-10 01:15:27 +0000
committerGerrit Code Review <review@openstack.org>2018-02-10 01:15:27 +0000
commit98636290c5a4ed9c7d608a1944124b5d92678107 (patch)
treefc789ca3c2fd54d6029bc06bf68aad450fa1519e
parentbee961a00eb838ce94019b5619a5dc46dc72f1cd (diff)
parentbc83d86255a61b5af7864889cd90960069ade598 (diff)
downloadheat-98636290c5a4ed9c7d608a1944124b5d92678107.tar.gz
Merge "Support tenacity exponential backoff retry on resource sync"10.0.0.0rc1
-rw-r--r--heat/engine/sync_point.py53
-rw-r--r--heat/tests/engine/test_sync_point.py7
2 files changed, 45 insertions, 15 deletions
diff --git a/heat/engine/sync_point.py b/heat/engine/sync_point.py
index 0ea8dd204..09f92365d 100644
--- a/heat/engine/sync_point.py
+++ b/heat/engine/sync_point.py
@@ -13,9 +13,8 @@
# limitations under the License.
import ast
-import eventlet
-import random
import six
+import tenacity
from oslo_log import log as logging
@@ -116,25 +115,55 @@ def serialize_input_data(input_data):
return {'input_data': _serialize(input_data)}
+class wait_random_exponential(tenacity.wait_exponential):
+ """Random wait strategy with a geometrically increasing amount of jitter.
+
+ Implements the truncated binary exponential backoff algorithm as used in
+ e.g. CSMA media access control. The retry occurs at a random time in a
+ (geometrically) expanding interval constrained by minimum and maximum
+ limits.
+ """
+ def __init__(self, min=0, multiplier=1, max=tenacity._utils.MAX_WAIT,
+ exp_base=2):
+ super(wait_random_exponential, self).__init__(multiplier=multiplier,
+ max=(max-min),
+ exp_base=exp_base)
+ self._random = tenacity.wait_random(min=min, max=(min + multiplier))
+
+ def __call__(self, previous_attempt_number, delay_since_first_attempt):
+ jitter = super(wait_random_exponential,
+ self).__call__(previous_attempt_number,
+ delay_since_first_attempt)
+ self._random.wait_random_max = self._random.wait_random_min + jitter
+ return self._random(previous_attempt_number, delay_since_first_attempt)
+
+
def sync(cnxt, entity_id, current_traversal, is_update, propagate,
predecessors, new_data):
- rows_updated = None
- sync_point = None
- input_data = None
- nconflicts = max(0, len(predecessors) - 2)
- # limit to 10 seconds
- max_wt = min(nconflicts * 0.01, 10)
- while not rows_updated:
+ # Retry waits up to 60 seconds at most, with exponentially increasing
+ # amounts of jitter per resource still outstanding
+ wait_strategy = wait_random_exponential(max=60)
+
+ def init_jitter(existing_input_data):
+ nconflicts = max(0, len(predecessors) - len(existing_input_data) - 1)
+ # 10ms per potential conflict, up to a max of 10s in total
+ return min(nconflicts, 1000) * 0.01
+
+ @tenacity.retry(
+ retry=tenacity.retry_if_result(lambda r: r is None),
+ wait=wait_strategy
+ )
+ def _sync():
sync_point = get(cnxt, entity_id, current_traversal, is_update)
input_data = deserialize_input_data(sync_point.input_data)
+ wait_strategy.multiplier = init_jitter(input_data)
input_data.update(new_data)
rows_updated = update_input_data(
cnxt, entity_id, current_traversal, is_update,
sync_point.atomic_key, serialize_input_data(input_data))
- # don't aggressively spin; induce some sleep
- if not rows_updated:
- eventlet.sleep(random.uniform(0, max_wt))
+ return input_data if rows_updated else None
+ input_data = _sync()
waiting = predecessors - set(input_data)
key = make_key(entity_id, current_traversal, is_update)
if waiting:
diff --git a/heat/tests/engine/test_sync_point.py b/heat/tests/engine/test_sync_point.py
index 615f70cd3..cbe03d164 100644
--- a/heat/tests/engine/test_sync_point.py
+++ b/heat/tests/engine/test_sync_point.py
@@ -74,15 +74,16 @@ class SyncPointTestCase(common.HeatTestCase):
self.assertEqual({'input_data': {u'tuple:(3, 8)': None}}, res)
@mock.patch('heat.engine.sync_point.update_input_data', return_value=None)
- @mock.patch('eventlet.sleep', side_effect=exception.DBError)
+ @mock.patch('time.sleep', side_effect=exception.DBError)
def sync_with_sleep(self, ctx, stack, mock_sleep_time, mock_uid):
resource = stack['C']
graph = stack.convergence_dependencies.graph()
mock_callback = mock.Mock()
+ sender = (3, True)
self.assertRaises(exception.DBError, sync_point.sync, ctx, resource.id,
stack.current_traversal, True, mock_callback,
- set(graph[(resource.id, True)]), {})
+ set(graph[(resource.id, True)]), {sender: None})
return mock_sleep_time
def test_sync_with_time_throttle(self):
@@ -92,4 +93,4 @@ class SyncPointTestCase(common.HeatTestCase):
convergence=True)
stack.converge_stack(stack.t, action=stack.CREATE)
mock_sleep_time = self.sync_with_sleep(ctx, stack)
- mock_sleep_time.assert_called_once_with(mock.ANY)
+ self.assertTrue(mock_sleep_time.called)