summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@gmail.com>2016-03-15 10:50:00 -0700
committerJoshua Harlow <jxharlow@godaddy.com>2016-05-31 17:58:34 -0700
commit35a9305f172ed8970caf1ff5cec261df7d3fe9ce (patch)
treea68d3e20f477d7b1d5fa8827e65954b5c3b61784
parent1bc8dd9bcab110d06fb36da756b8acc19febd065 (diff)
downloadtaskflow-35a9305f172ed8970caf1ff5cec261df7d3fe9ce.tar.gz
Ensure the fetching jobs does not fetch anything when in bad state
When the underlying connection is in LOST or SUSPENDED mode do not allow jobs to be iterated over (and clear the local cache when the connection has been LOST). Related-Bug: #1557107 Change-Id: Ic0a2ab2519ff8a7386d80d9092a0e24579883681
-rw-r--r--taskflow/jobs/backends/impl_zookeeper.py74
1 files changed, 66 insertions, 8 deletions
diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py
index 098108e..b3b72bf 100644
--- a/taskflow/jobs/backends/impl_zookeeper.py
+++ b/taskflow/jobs/backends/impl_zookeeper.py
@@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import collections
import contextlib
import functools
import sys
@@ -23,6 +24,7 @@ import fasteners
import futurist
from kazoo import exceptions as k_exceptions
from kazoo.protocol import paths as k_paths
+from kazoo.protocol import states as k_states
from kazoo.recipe import watchers
from oslo_serialization import jsonutils
from oslo_utils import excutils
@@ -261,6 +263,19 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
#: Default znode path used for jobs (data, locks...).
DEFAULT_PATH = "/taskflow/jobs"
+ STATE_HISTORY_LENGTH = 2
+ """
+ Number of prior state changes to keep a history of, mainly useful
+ for history tracking and debugging connectivity issues.
+ """
+
+ NO_FETCH_STATES = (k_states.KazooState.LOST, k_states.KazooState.SUSPENDED)
+ """
+ Client states underwhich we return empty lists from fetching routines,
+ during these states the underlying connection either is being recovered
+ or may be recovered (aka, it has not full disconnected).
+ """
+
def __init__(self, name, conf,
client=None, persistence=None, emit_notifications=True):
super(ZookeeperJobBoard, self).__init__(name, conf)
@@ -298,6 +313,8 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
self._worker = None
self._emit_notifications = bool(emit_notifications)
self._connected = False
+ self._suspended = False
+ self._last_states = collections.deque(maxlen=self.STATE_HISTORY_LENGTH)
def _try_emit(self, state, details):
# Submit the work to the executor to avoid blocking the kazoo threads
@@ -334,10 +351,25 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
return len(self._known_jobs)
def _fetch_jobs(self, ensure_fresh=False):
- if ensure_fresh:
- self._force_refresh()
- with self._job_cond:
- return sorted(six.itervalues(self._known_jobs), reverse=True)
+ try:
+ last_state = self._last_states[0]
+ except IndexError:
+ last_state = None
+ if last_state in self.NO_FETCH_STATES:
+ # NOTE(harlowja): on lost clear out all known jobs (from the
+ # in-memory mapping) as we can not safely assume there are any
+ # jobs to continue working on in this state.
+ if last_state == k_states.KazooState.LOST and self._known_jobs:
+ # This will force the jobboard to drop all (in-memory) jobs
+ # that are not in this list (pretty much simulating what
+ # would happen if a jobboard data directory was emptied).
+ self._on_job_posting([], delayed=False)
+ return []
+ else:
+ if ensure_fresh:
+ self._force_refresh()
+ with self._job_cond:
+ return sorted(six.itervalues(self._known_jobs))
def _force_refresh(self):
try:
@@ -364,12 +396,15 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
def _remove_job(self, path):
if path not in self._known_jobs:
- return
+ return False
with self._job_cond:
job = self._known_jobs.pop(path, None)
if job is not None:
LOG.debug("Removed job that was at path '%s'", path)
self._try_emit(base.REMOVAL, details={'job': job})
+ return True
+ else:
+ return False
def _process_child(self, path, request, quiet=True):
"""Receives the result of a child data fetch request."""
@@ -456,8 +491,13 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
investigate_paths.append(path)
if pending_removals:
with self._job_cond:
- for path in pending_removals:
- self._remove_job(path)
+ am_removed = 0
+ try:
+ for path in pending_removals:
+ am_removed += int(self._remove_job(path))
+ finally:
+ if am_removed:
+ self._job_cond.notify_all()
for path in investigate_paths:
# Fire off the request to populate this job.
#
@@ -694,7 +734,24 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
kazoo_utils.checked_commit(txn)
def _state_change_listener(self, state):
- LOG.debug("Kazoo client has changed to state: %s", state)
+ if self._last_states:
+ LOG.debug("Kazoo client has changed to"
+ " state '%s' from prior states '%s'", state,
+ self._last_states)
+ else:
+ LOG.debug("Kazoo client has changed to state '%s' (from"
+ " its initial/uninitialized state)", state)
+ self._last_states.appendleft(state)
+ if state == k_states.KazooState.LOST:
+ self._connected = False
+ LOG.warn("Connection to zookeeper has been lost")
+ elif state == k_states.KazooState.SUSPENDED:
+ LOG.warn("Connection to zookeeper has been suspended")
+ self._suspended = True
+ else:
+ # Must be CONNECTED then (as there are only 3 enums)
+ if self._suspended:
+ self._suspended = False
def wait(self, timeout=None):
# Wait until timeout expires (or forever) for jobs to appear.
@@ -738,6 +795,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
self._known_jobs.clear()
LOG.debug("Stopped & cleared local state")
self._connected = False
+ self._last_states.clear()
@fasteners.locked(lock='_open_close_lock')
def connect(self, timeout=10.0):