summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2016-02-24 16:22:00 +0000
committerGerrit Code Review <review@openstack.org>2016-02-24 16:22:00 +0000
commit59c496041765337704df2f7a9f105d76c4e20839 (patch)
treeee87a7534e3db4d908fdbe75e792093cf1b1147b
parent2db07bab906a89b71090af96b3daf6018cce0222 (diff)
parent3429e3824c060071e59a117c19c95659c78e4c8b (diff)
downloadironic-59c496041765337704df2f7a9f105d76c4e20839.tar.gz
Merge "Switch to Futurist library for asynchronous execution and periodic tasks"
-rw-r--r--doc/source/dev/architecture.rst25
-rw-r--r--etc/ironic/ironic.conf.sample7
-rw-r--r--ironic/common/service.py9
-rw-r--r--ironic/conductor/base_manager.py120
-rw-r--r--ironic/conductor/manager.py23
-rw-r--r--ironic/conductor/task_manager.py14
-rw-r--r--ironic/drivers/base.py65
-rw-r--r--ironic/drivers/modules/inspector.py5
-rw-r--r--ironic/tests/unit/conductor/mgr_utils.py9
-rw-r--r--ironic/tests/unit/conductor/test_base_manager.py62
-rw-r--r--ironic/tests/unit/conductor/test_manager.py125
-rw-r--r--ironic/tests/unit/conductor/test_task_manager.py93
-rw-r--r--ironic/tests/unit/drivers/test_base.py21
-rw-r--r--releasenotes/notes/futurist-e9c55699f479f97a.yaml16
-rw-r--r--requirements.txt1
15 files changed, 285 insertions, 310 deletions
diff --git a/doc/source/dev/architecture.rst b/doc/source/dev/architecture.rst
index e055c2479..26dda4dfc 100644
--- a/doc/source/dev/architecture.rst
+++ b/doc/source/dev/architecture.rst
@@ -77,12 +77,14 @@ Driver-Specific Periodic Tasks
Drivers may run their own periodic tasks, i.e. actions run repeatedly after
a certain amount of time. Such task is created by decorating a method on the
-driver itself or on any interface with driver_periodic_task_ decorator, e.g.
+driver itself or on any interface with periodic_ decorator, e.g.
::
+ from futurist import periodics
+
class FakePower(base.PowerInterface):
- @base.driver_periodic_task(spacing=42)
+ @periodics.periodic(spacing=42)
def task(self, manager, context):
pass # do something
@@ -90,7 +92,7 @@ driver itself or on any interface with driver_periodic_task_ decorator, e.g.
def __init__(self):
self.power = FakePower()
- @base.driver_periodic_task(spacing=42)
+ @periodics.periodic(spacing=42)
def task2(self, manager, context):
pass # do something
@@ -98,21 +100,6 @@ driver itself or on any interface with driver_periodic_task_ decorator, e.g.
Here the ``spacing`` argument is a period in seconds for a given periodic task.
For example 'spacing=5' means every 5 seconds.
-.. note::
- The ``parallel`` argument may be passed to driver_periodic_task_.
- If it's set to False, this task will be run in the periodic task loop,
- rather than a separate greenthread.
-
- This is deprecated as of Liberty release, and the parallel argument will be
- ignored starting in the Mitaka cycle, as such task would prevent all other
- periodic tasks from starting while it is running.
-
-.. note::
- By default periodic task names are derived from method names,
- so they should be unique within a Python module.
- Use ``name`` argument to driver_periodic_task_ to override
- automatically generated name.
-
Message Routing
===============
@@ -137,4 +124,4 @@ driver actions such as take-over or clean-up.
.. _DB API: ../api/ironic.db.api.html
.. _diskimage-builder: https://github.com/openstack/diskimage-builder
.. _consistent hashing algorithm: ../api/ironic.common.hash_ring.html
-.. _driver_periodic_task: ../api/ironic.drivers.base.html#ironic.drivers.base.driver_periodic_task
+.. _periodic: http://docs.openstack.org/developer/futurist/api.html#futurist.periodics.periodic
diff --git a/etc/ironic/ironic.conf.sample b/etc/ironic/ironic.conf.sample
index 60f180b66..30cbb6551 100644
--- a/etc/ironic/ironic.conf.sample
+++ b/etc/ironic/ironic.conf.sample
@@ -117,7 +117,8 @@
# Options defined in ironic.common.service
#
-# Seconds between running periodic tasks. (integer value)
+# Default interval for running driver periodic tasks. (integer
+# value)
#periodic_interval=60
# Name of this node. This can be an opaque identifier. It is
@@ -596,7 +597,9 @@
# Options defined in ironic.conductor.base_manager
#
-# The size of the workers greenthread pool. (integer value)
+# The size of the workers greenthread pool. Note that 2
+# threads will be reserved by the conductor itself for
+# handling heart beats and periodic tasks. (integer value)
#workers_pool_size=100
# Seconds between conductor heart beats. (integer value)
diff --git a/ironic/common/service.py b/ironic/common/service.py
index 01333c964..71c8e30ab 100644
--- a/ironic/common/service.py
+++ b/ironic/common/service.py
@@ -40,7 +40,8 @@ from ironic.objects import base as objects_base
service_opts = [
cfg.IntOpt('periodic_interval',
default=60,
- help=_('Seconds between running periodic tasks.')),
+ help=_('Default interval for running driver periodic tasks.'),
+ deprecated_for_removal=True),
cfg.StrOpt('host',
default=socket.getfqdn(),
help=_('Name of this node. This can be an opaque identifier. '
@@ -79,11 +80,7 @@ class RPCService(service.Service):
self.rpcserver.start()
self.handle_signal()
- self.manager.init_host()
- self.tg.add_dynamic_timer(
- self.manager.periodic_tasks,
- periodic_interval_max=CONF.periodic_interval,
- context=admin_context)
+ self.manager.init_host(admin_context)
LOG.info(_LI('Created RPC server for service %(service)s on host '
'%(host)s.'),
diff --git a/ironic/conductor/base_manager.py b/ironic/conductor/base_manager.py
index 48abbbe4b..e4ace43cc 100644
--- a/ironic/conductor/base_manager.py
+++ b/ironic/conductor/base_manager.py
@@ -15,13 +15,13 @@
import inspect
import threading
-from eventlet import greenpool
-from oslo_concurrency import lockutils
+import futurist
+from futurist import periodics
+from futurist import rejection
from oslo_config import cfg
from oslo_context import context as ironic_context
from oslo_db import exception as db_exception
from oslo_log import log
-from oslo_service import periodic_task
from oslo_utils import excutils
from ironic.common import driver_factory
@@ -40,8 +40,10 @@ from ironic.db import api as dbapi
conductor_opts = [
cfg.IntOpt('workers_pool_size',
- default=100,
- help=_('The size of the workers greenthread pool.')),
+ default=100, min=3,
+ help=_('The size of the workers greenthread pool. '
+ 'Note that 2 threads will be reserved by the conductor '
+ 'itself for handling heart beats and periodic tasks.')),
cfg.IntOpt('heartbeat_interval',
default=10,
help=_('Seconds between conductor heart beats.')),
@@ -51,18 +53,18 @@ conductor_opts = [
CONF = cfg.CONF
CONF.register_opts(conductor_opts, 'conductor')
LOG = log.getLogger(__name__)
-WORKER_SPAWN_lOCK = "conductor_worker_spawn"
-class BaseConductorManager(periodic_task.PeriodicTasks):
+class BaseConductorManager(object):
def __init__(self, host, topic):
- super(BaseConductorManager, self).__init__(CONF)
+ super(BaseConductorManager, self).__init__()
if not host:
host = CONF.host
self.host = host
self.topic = topic
self.notifier = rpc.get_notifier()
+ self._started = False
def _get_driver(self, driver_name):
"""Get the driver.
@@ -78,15 +80,29 @@ class BaseConductorManager(periodic_task.PeriodicTasks):
except KeyError:
raise exception.DriverNotFound(driver_name=driver_name)
- def init_host(self):
+ def init_host(self, admin_context=None):
+ """Initialize the conductor host.
+
+ :param admin_context: the admin context to pass to periodic tasks.
+ :raises: RuntimeError when conductor is already running
+ :raises: NoDriversLoaded when no drivers are enabled on the conductor
+ """
+ if self._started:
+ raise RuntimeError(_('Attempt to start an already running '
+ 'conductor manager'))
+
self.dbapi = dbapi.get_instance()
self._keepalive_evt = threading.Event()
"""Event for the keepalive thread."""
- self._worker_pool = greenpool.GreenPool(
- size=CONF.conductor.workers_pool_size)
- """GreenPool of background workers for performing tasks async."""
+ # TODO(dtantsur): make the threshold configurable?
+ rejection_func = rejection.reject_when_reached(
+ CONF.conductor.workers_pool_size)
+ self._executor = futurist.GreenThreadPoolExecutor(
+ max_workers=CONF.conductor.workers_pool_size,
+ check_and_reject=rejection_func)
+ """Executor for performing tasks async."""
self.ring_manager = hash.HashRingManager()
"""Consistent hash ring which maps drivers to conductors."""
@@ -106,15 +122,36 @@ class BaseConductorManager(periodic_task.PeriodicTasks):
LOG.error(msg, self.host)
raise exception.NoDriversLoaded(conductor=self.host)
- # Collect driver-specific periodic tasks
+ # Collect driver-specific periodic tasks.
+ # Conductor periodic tasks accept context argument, driver periodic
+ # tasks accept this manager and context. We have to ensure that the
+ # same driver interface class is not traversed twice, otherwise
+ # we'll have several instances of the same task.
+ LOG.debug('Collecting periodic tasks')
+ self._periodic_task_callables = []
+ periodic_task_classes = set()
+ self._collect_periodic_tasks(self, (admin_context,))
for driver_obj in driver_factory.drivers().values():
- self._collect_periodic_tasks(driver_obj)
+ self._collect_periodic_tasks(driver_obj, (self, admin_context))
for iface_name in (driver_obj.core_interfaces +
driver_obj.standard_interfaces +
['vendor']):
iface = getattr(driver_obj, iface_name, None)
- if iface:
- self._collect_periodic_tasks(iface)
+ if iface and iface.__class__ not in periodic_task_classes:
+ self._collect_periodic_tasks(iface, (self, admin_context))
+ periodic_task_classes.add(iface.__class__)
+
+ if (len(self._periodic_task_callables) >
+ CONF.conductor.workers_pool_size):
+ LOG.warning(_LW('This conductor has %(tasks)d periodic tasks '
+ 'enabled, but only %(workers)d task workers '
+ 'allowed by [conductor]workers_pool_size option'),
+ {'tasks': len(self._periodic_task_callables),
+ 'workers': CONF.conductor.workers_pool_size})
+
+ self._periodic_tasks = periodics.PeriodicWorker(
+ self._periodic_task_callables,
+ executor_factory=periodics.ExistingExecutor(self._executor))
# clear all locks held by this conductor before registering
self.dbapi.clear_node_reservations_for_conductor(self.host)
@@ -134,6 +171,12 @@ class BaseConductorManager(periodic_task.PeriodicTasks):
update_existing=True)
self.conductor = cdr
+ # Start periodic tasks
+ self._periodic_tasks_worker = self._executor.submit(
+ self._periodic_tasks.start, allow_empty=True)
+ self._periodic_tasks_worker.add_done_callback(
+ self._on_periodic_tasks_stop)
+
# NOTE(lucasagomes): If the conductor server dies abruptly
# mid deployment (OMM Killer, power outage, etc...) we
# can not resume the deployment even if the conductor
@@ -161,10 +204,7 @@ class BaseConductorManager(periodic_task.PeriodicTasks):
LOG.critical(_LC('Failed to start keepalive'))
self.del_host()
- def _collect_periodic_tasks(self, obj):
- for n, method in inspect.getmembers(obj, inspect.ismethod):
- if getattr(method, '_periodic_enabled', False):
- self.add_periodic_task(method)
+ self._started = True
def del_host(self, deregister=True):
# Conductor deregistration fails if called on non-initialized
@@ -190,11 +230,34 @@ class BaseConductorManager(periodic_task.PeriodicTasks):
# Waiting here to give workers the chance to finish. This has the
# benefit of releasing locks workers placed on nodes, as well as
# having work complete normally.
- self._worker_pool.waitall()
+ self._periodic_tasks.stop()
+ self._periodic_tasks.wait()
+ self._executor.shutdown(wait=True)
+ self._started = False
+
+ def _collect_periodic_tasks(self, obj, args):
+ """Collect periodic tasks from a given object.
- def periodic_tasks(self, context, raise_on_error=False):
- """Periodic tasks are run at pre-specified interval."""
- return self.run_periodic_tasks(context, raise_on_error=raise_on_error)
+ Populates self._periodic_task_callables with tuples
+ (callable, args, kwargs).
+
+ :param obj: object containing periodic tasks as methods
+ :param args: tuple with arguments to pass to every task
+ """
+ for name, member in inspect.getmembers(obj):
+ if periodics.is_periodic(member):
+ LOG.debug('Found periodic task %(owner)s.%(member)s',
+ {'owner': obj.__class__.__name__,
+ 'member': name})
+ self._periodic_task_callables.append((member, args, {}))
+
+ def _on_periodic_tasks_stop(self, fut):
+ try:
+ fut.result()
+ except Exception as exc:
+ LOG.critical(_LC('Periodic tasks worker has failed: %s'), exc)
+ else:
+ LOG.info(_LI('Successfully shut down periodic tasks'))
def iter_nodes(self, fields=None, **kwargs):
"""Iterate over nodes mapped to this conductor.
@@ -217,7 +280,6 @@ class BaseConductorManager(periodic_task.PeriodicTasks):
if self._mapped_to_this_conductor(*result[:2]):
yield result
- @lockutils.synchronized(WORKER_SPAWN_lOCK, 'ironic-')
def _spawn_worker(self, func, *args, **kwargs):
"""Create a greenthread to run func(*args, **kwargs).
@@ -225,13 +287,13 @@ class BaseConductorManager(periodic_task.PeriodicTasks):
Spawns a greenthread if there are free slots in pool, otherwise raises
exception. Execution control returns immediately to the caller.
- :returns: GreenThread object.
+ :returns: Future object.
:raises: NoFreeConductorWorker if worker pool is currently full.
"""
- if self._worker_pool.free():
- return self._worker_pool.spawn(func, *args, **kwargs)
- else:
+ try:
+ return self._executor.submit(func, *args, **kwargs)
+ except futurist.RejectedSubmission:
raise exception.NoFreeConductorWorker()
def _conductor_service_record_keepalive(self):
diff --git a/ironic/conductor/manager.py b/ironic/conductor/manager.py
index 5e0027be2..fe4c49d67 100644
--- a/ironic/conductor/manager.py
+++ b/ironic/conductor/manager.py
@@ -46,10 +46,10 @@ import datetime
import tempfile
import eventlet
+from futurist import periodics
from oslo_config import cfg
from oslo_log import log
import oslo_messaging as messaging
-from oslo_service import periodic_task
from oslo_utils import excutils
from oslo_utils import uuidutils
@@ -1200,8 +1200,7 @@ class ConductorManager(base_manager.BaseConductorManager):
action=action, node=node.uuid,
state=node.provision_state)
- @periodic_task.periodic_task(
- spacing=CONF.conductor.sync_power_state_interval)
+ @periodics.periodic(spacing=CONF.conductor.sync_power_state_interval)
def _sync_power_states(self, context):
"""Periodic task to sync power states for the nodes.
@@ -1269,8 +1268,7 @@ class ConductorManager(base_manager.BaseConductorManager):
# Yield on every iteration
eventlet.sleep(0)
- @periodic_task.periodic_task(
- spacing=CONF.conductor.check_provision_state_interval)
+ @periodics.periodic(spacing=CONF.conductor.check_provision_state_interval)
def _check_deploy_timeouts(self, context):
"""Periodically checks whether a deploy RPC call has timed out.
@@ -1292,8 +1290,7 @@ class ConductorManager(base_manager.BaseConductorManager):
self._fail_if_in_state(context, filters, states.DEPLOYWAIT,
sort_key, callback_method, err_handler)
- @periodic_task.periodic_task(
- spacing=CONF.conductor.check_provision_state_interval)
+ @periodics.periodic(spacing=CONF.conductor.check_provision_state_interval)
def _check_deploying_status(self, context):
"""Periodically checks the status of nodes in DEPLOYING state.
@@ -1376,8 +1373,7 @@ class ConductorManager(base_manager.BaseConductorManager):
task.node.conductor_affinity = self.conductor.id
task.node.save()
- @periodic_task.periodic_task(
- spacing=CONF.conductor.check_provision_state_interval)
+ @periodics.periodic(spacing=CONF.conductor.check_provision_state_interval)
def _check_cleanwait_timeouts(self, context):
"""Periodically checks for nodes being cleaned.
@@ -1402,8 +1398,7 @@ class ConductorManager(base_manager.BaseConductorManager):
last_error=last_error,
keep_target_state=True)
- @periodic_task.periodic_task(
- spacing=CONF.conductor.sync_local_state_interval)
+ @periodics.periodic(spacing=CONF.conductor.sync_local_state_interval)
def _sync_local_state(self, context):
"""Perform any actions necessary to sync local state.
@@ -1826,8 +1821,7 @@ class ConductorManager(base_manager.BaseConductorManager):
driver = self._get_driver(driver_name)
return driver.get_properties()
- @periodic_task.periodic_task(
- spacing=CONF.conductor.send_sensor_data_interval)
+ @periodics.periodic(spacing=CONF.conductor.send_sensor_data_interval)
def _send_sensor_data(self, context):
"""Periodically sends sensor data to Ceilometer."""
# do nothing if send_sensor_data option is False
@@ -2061,8 +2055,7 @@ class ConductorManager(base_manager.BaseConductorManager):
action='inspect', node=task.node.uuid,
state=task.node.provision_state)
- @periodic_task.periodic_task(
- spacing=CONF.conductor.check_provision_state_interval)
+ @periodics.periodic(spacing=CONF.conductor.check_provision_state_interval)
def _check_inspect_timeouts(self, context):
"""Periodically checks inspect_timeout and fails upon reaching it.
diff --git a/ironic/conductor/task_manager.py b/ironic/conductor/task_manager.py
index aac3ca12c..5a0c3da17 100644
--- a/ironic/conductor/task_manager.py
+++ b/ironic/conductor/task_manager.py
@@ -383,15 +383,15 @@ class TaskManager(object):
# for some reason, this is true.
# All of the above are asserted in tests such that we'll
# catch if eventlet ever changes this behavior.
- thread = None
+ fut = None
try:
- thread = self._spawn_method(*self._spawn_args,
- **self._spawn_kwargs)
+ fut = self._spawn_method(*self._spawn_args,
+ **self._spawn_kwargs)
# NOTE(comstud): Trying to use a lambda here causes
# the callback to not occur for some reason. This
# also makes it easier to test.
- thread.link(self._thread_release_resources)
+ fut.add_done_callback(self._thread_release_resources)
# Don't unlock! The unlock will occur when the
# thread finshes.
return
@@ -408,9 +408,9 @@ class TaskManager(object):
{'method': self._on_error_method.__name__,
'node': self.node.uuid})
- if thread is not None:
- # This means the link() failed for some
+ if fut is not None:
+ # This means the add_done_callback() failed for some
# reason. Nuke the thread.
- thread.cancel()
+ fut.cancel()
self.release_resources()
self.release_resources()
diff --git a/ironic/drivers/base.py b/ironic/drivers/base.py
index f71d3931a..7b4bd60bf 100644
--- a/ironic/drivers/base.py
+++ b/ironic/drivers/base.py
@@ -24,9 +24,9 @@ import inspect
import json
import os
-import eventlet
+from futurist import periodics
+from oslo_config import cfg
from oslo_log import log as logging
-from oslo_service import periodic_task
from oslo_utils import excutils
import six
@@ -40,6 +40,10 @@ RAID_CONFIG_SCHEMA = os.path.join(os.path.dirname(__file__),
'raid_config_schema.json')
+CONF = cfg.CONF
+CONF.import_opt('periodic_interval', 'ironic.common.service')
+
+
@six.add_metaclass(abc.ABCMeta)
class BaseDriver(object):
"""Base class for all drivers.
@@ -1116,45 +1120,36 @@ def clean_step(priority, abortable=False, argsinfo=None):
return decorator
-def driver_periodic_task(parallel=True, **other):
+def driver_periodic_task(**kwargs):
"""Decorator for a driver-specific periodic task.
+ Deprecated, please use futurist directly.
Example::
+ from futurist import periodics
+
class MyDriver(base.BaseDriver):
- @base.driver_periodic_task(spacing=42)
+ @periodics.periodic(spacing=42)
def task(self, manager, context):
# do some job
- :param parallel: If True (default), this task is run in a separate thread.
- If False, this task will be run in the conductor's periodic task
- loop, rather than a separate greenthread. This parameter is
- deprecated and will be ignored starting with Mitaka cycle.
- :param other: arguments to pass to @periodic_task.periodic_task
+ :param kwargs: arguments to pass to @periodics.periodic
"""
- # TODO(dtantsur): drop all this magic once
- # https://review.openstack.org/#/c/134303/ lands
- semaphore = eventlet.semaphore.BoundedSemaphore()
-
- def decorator2(func):
- @six.wraps(func)
- def wrapper(*args, **kwargs):
- if parallel:
- def _internal():
- with semaphore:
- func(*args, **kwargs)
-
- eventlet.greenthread.spawn_n(_internal)
- else:
- LOG.warning(_LW(
- 'Using periodic tasks with parallel=False is deprecated, '
- '"parallel" argument will be ignored starting with '
- 'the Mitaka release'))
- func(*args, **kwargs)
-
- # NOTE(dtantsur): name should be unique
- other.setdefault('name', '%s.%s' % (func.__module__, func.__name__))
- decorator = periodic_task.periodic_task(**other)
- return decorator(wrapper)
-
- return decorator2
+ LOG.warning(_LW('driver_periodic_task decorator is deprecated, please '
+ 'use futurist.periodics.periodic directly'))
+ # Previously we accepted more arguments, make a backward compatibility
+ # layer for out-of-tree drivers.
+ new_kwargs = {}
+ for arg in ('spacing', 'enabled', 'run_immediately'):
+ try:
+ new_kwargs[arg] = kwargs.pop(arg)
+ except KeyError:
+ pass
+ new_kwargs.setdefault('spacing', CONF.periodic_interval)
+
+ if kwargs:
+ LOG.warning(_LW('The following arguments are not supported by '
+ 'futurist.periodics.periodic and are ignored: %s'),
+ ', '.join(kwargs))
+
+ return periodics.periodic(**new_kwargs)
diff --git a/ironic/drivers/modules/inspector.py b/ironic/drivers/modules/inspector.py
index 44dd525d3..e89083aad 100644
--- a/ironic/drivers/modules/inspector.py
+++ b/ironic/drivers/modules/inspector.py
@@ -16,6 +16,7 @@ Modules required to work with ironic_inspector:
"""
import eventlet
+from futurist import periodics
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import importutils
@@ -121,8 +122,8 @@ class Inspector(base.InspectInterface):
eventlet.spawn_n(_start_inspection, task.node.uuid, task.context)
return states.INSPECTING
- @base.driver_periodic_task(spacing=CONF.inspector.status_check_period,
- enabled=CONF.inspector.enabled)
+ @periodics.periodic(spacing=CONF.inspector.status_check_period,
+ enabled=CONF.inspector.enabled)
def _periodic_check_result(self, manager, context):
"""Periodic task checking results of inspection."""
filters = {'provision_state': states.INSPECTING}
diff --git a/ironic/tests/unit/conductor/mgr_utils.py b/ironic/tests/unit/conductor/mgr_utils.py
index 4403efa59..6564d3217 100644
--- a/ironic/tests/unit/conductor/mgr_utils.py
+++ b/ironic/tests/unit/conductor/mgr_utils.py
@@ -17,6 +17,7 @@
"""Test utils for Ironic Managers."""
+from futurist import periodics
import mock
from oslo_utils import strutils
from oslo_utils import uuidutils
@@ -175,8 +176,12 @@ class ServiceSetUpMixin(object):
return
self.service.del_host()
- def _start_service(self):
- self.service.init_host()
+ def _start_service(self, start_periodic_tasks=False):
+ if start_periodic_tasks:
+ self.service.init_host()
+ else:
+ with mock.patch.object(periodics, 'PeriodicWorker', autospec=True):
+ self.service.init_host()
self.addCleanup(self._stop_service)
diff --git a/ironic/tests/unit/conductor/test_base_manager.py b/ironic/tests/unit/conductor/test_base_manager.py
index 5ad86a505..ce4543453 100644
--- a/ironic/tests/unit/conductor/test_base_manager.py
+++ b/ironic/tests/unit/conductor/test_base_manager.py
@@ -13,6 +13,8 @@
"""Test class for Ironic BaseConductorManager."""
import eventlet
+import futurist
+from futurist import periodics
import mock
from oslo_config import cfg
from oslo_db import exception as db_exception
@@ -23,6 +25,7 @@ from ironic.conductor import base_manager
from ironic.conductor import manager
from ironic.drivers import base as drivers_base
from ironic import objects
+from ironic.tests import base as tests_base
from ironic.tests.unit.conductor import mgr_utils
from ironic.tests.unit.db import base as tests_db_base
from ironic.tests.unit.objects import utils as obj_utils
@@ -86,6 +89,7 @@ class StartStopTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
res = objects.Conductor.get_by_hostname(self.context,
self.hostname)
self.assertEqual(init_names, res['drivers'])
+ self._stop_service()
# verify that restart registers new driver names
self.config(enabled_drivers=restart_names)
@@ -98,12 +102,10 @@ class StartStopTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
@mock.patch.object(driver_factory.DriverFactory, '__getitem__')
def test_start_registers_driver_specific_tasks(self, get_mock):
init_names = ['fake1']
- expected_name = 'ironic.tests.unit.conductor.test_base_manager.task'
- expected_name2 = 'ironic.tests.unit.conductor.test_base_manager.iface'
self.config(enabled_drivers=init_names)
class TestInterface(object):
- @drivers_base.driver_periodic_task(spacing=100500)
+ @periodics.periodic(spacing=100500)
def iface(self):
pass
@@ -113,28 +115,27 @@ class StartStopTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
iface = TestInterface()
- @drivers_base.driver_periodic_task(spacing=42)
+ @periodics.periodic(spacing=42)
def task(self, context):
pass
+ @drivers_base.driver_periodic_task()
+ def deprecated_task(self, context):
+ pass
+
obj = Driver()
- self.assertTrue(obj.task._periodic_enabled)
get_mock.return_value = mock.Mock(obj=obj)
with mock.patch.object(
driver_factory.DriverFactory()._extension_manager,
'names') as mock_names:
mock_names.return_value = init_names
- self._start_service()
- tasks = dict(self.service._periodic_tasks)
- self.assertEqual(obj.task, tasks[expected_name])
- self.assertEqual(obj.iface.iface, tasks[expected_name2])
- self.assertEqual(42,
- self.service._periodic_spacing[expected_name])
- self.assertEqual(100500,
- self.service._periodic_spacing[expected_name2])
- self.assertIn(expected_name, self.service._periodic_last_run)
- self.assertIn(expected_name2, self.service._periodic_last_run)
+ self._start_service(start_periodic_tasks=True)
+
+ tasks = {c[0] for c in self.service._periodic_task_callables}
+ for t in (obj.task, obj.iface.iface, obj.deprecated_task):
+ self.assertTrue(periodics.is_periodic(t))
+ self.assertIn(t, tasks)
@mock.patch.object(driver_factory.DriverFactory, '__init__')
def test_start_fails_on_missing_driver(self, mock_df):
@@ -154,6 +155,17 @@ class StartStopTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
self.service.init_host)
self.assertTrue(log_mock.error.called)
+ def test_prevent_double_start(self):
+ self._start_service()
+ self.assertRaisesRegexp(RuntimeError, 'already running',
+ self.service.init_host)
+
+ @mock.patch.object(base_manager, 'LOG')
+ def test_warning_on_low_workers_pool(self, log_mock):
+ CONF.set_override('workers_pool_size', 3, 'conductor')
+ self._start_service()
+ self.assertTrue(log_mock.warning.called)
+
@mock.patch.object(eventlet.greenpool.GreenPool, 'waitall')
def test_del_host_waits_on_workerpool(self, wait_mock):
self._start_service()
@@ -185,3 +197,23 @@ class KeepAliveTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
mock_is_set.side_effect = [False, False, False, True]
self.service._conductor_service_record_keepalive()
self.assertEqual(3, mock_touch.call_count)
+
+
+class ManagerSpawnWorkerTestCase(tests_base.TestCase):
+ def setUp(self):
+ super(ManagerSpawnWorkerTestCase, self).setUp()
+ self.service = manager.ConductorManager('hostname', 'test-topic')
+ self.executor = mock.Mock(spec=futurist.GreenThreadPoolExecutor)
+ self.service._executor = self.executor
+
+ def test__spawn_worker(self):
+ self.service._spawn_worker('fake', 1, 2, foo='bar', cat='meow')
+
+ self.executor.submit.assert_called_once_with(
+ 'fake', 1, 2, foo='bar', cat='meow')
+
+ def test__spawn_worker_none_free(self):
+ self.executor.submit.side_effect = futurist.RejectedSubmission()
+
+ self.assertRaises(exception.NoFreeConductorWorker,
+ self.service._spawn_worker, 'fake')
diff --git a/ironic/tests/unit/conductor/test_manager.py b/ironic/tests/unit/conductor/test_manager.py
index 3c6f27b5a..4d0c3e829 100644
--- a/ironic/tests/unit/conductor/test_manager.py
+++ b/ironic/tests/unit/conductor/test_manager.py
@@ -70,7 +70,7 @@ class ChangeNodePowerStateTestCase(mgr_utils.ServiceSetUpMixin,
self.service.change_node_power_state(self.context,
node.uuid,
states.POWER_ON)
- self.service._worker_pool.waitall()
+ self._stop_service()
get_power_mock.assert_called_once_with(mock.ANY)
node.refresh()
@@ -103,7 +103,7 @@ class ChangeNodePowerStateTestCase(mgr_utils.ServiceSetUpMixin,
# In this test worker should not be spawned, but waiting to make sure
# the below perform_mock assertion is valid.
- self.service._worker_pool.waitall()
+ self._stop_service()
self.assertFalse(pwr_act_mock.called, 'node_power_action has been '
'unexpectedly called.')
# Verify existing reservation wasn't broken.
@@ -162,7 +162,7 @@ class ChangeNodePowerStateTestCase(mgr_utils.ServiceSetUpMixin,
self.service.change_node_power_state(self.context,
node.uuid,
new_state)
- self.service._worker_pool.waitall()
+ self._stop_service()
get_power_mock.assert_called_once_with(mock.ANY)
set_power_mock.assert_called_once_with(mock.ANY, new_state)
@@ -298,7 +298,7 @@ class VendorPassthruTestCase(mgr_utils.ServiceSetUpMixin,
'first_method', 'POST',
info)
# Waiting to make sure the below assertions are valid.
- self.service._worker_pool.waitall()
+ self._stop_service()
# Assert spawn_after was called
self.assertTrue(mock_spawn.called)
@@ -320,7 +320,7 @@ class VendorPassthruTestCase(mgr_utils.ServiceSetUpMixin,
'third_method_sync',
'POST', info)
# Waiting to make sure the below assertions are valid.
- self.service._worker_pool.waitall()
+ self._stop_service()
# Assert no workers were used
self.assertFalse(mock_spawn.called)
@@ -438,7 +438,7 @@ class VendorPassthruTestCase(mgr_utils.ServiceSetUpMixin,
self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0])
# Waiting to make sure the below assertions are valid.
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
self.assertIsNone(node.last_error)
@@ -715,7 +715,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
provision_state=states.AVAILABLE)
self.service.do_node_deploy(self.context, node.uuid)
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
self.assertEqual(states.DEPLOYING, node.provision_state)
self.assertEqual(states.ACTIVE, node.target_provision_state)
@@ -745,7 +745,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
driver_internal_info={'is_whole_disk_image': False})
self.service.do_node_deploy(self.context, node.uuid, rebuild=True)
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
self.assertEqual(states.DEPLOYING, node.provision_state)
self.assertEqual(states.ACTIVE, node.target_provision_state)
@@ -774,7 +774,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
instance_info={'image_source': uuidutils.generate_uuid()})
self.service.do_node_deploy(self.context, node.uuid, rebuild=True)
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
self.assertEqual(states.DEPLOYWAIT, node.provision_state)
self.assertEqual(states.ACTIVE, node.target_provision_state)
@@ -798,7 +798,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
target_provision_state=states.NOSTATE)
self.service.do_node_deploy(self.context, node.uuid, rebuild=True)
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
self.assertEqual(states.ACTIVE, node.provision_state)
self.assertEqual(states.NOSTATE, node.target_provision_state)
@@ -822,7 +822,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
target_provision_state=states.NOSTATE)
self.service.do_node_deploy(self.context, node.uuid, rebuild=True)
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
self.assertEqual(states.ACTIVE, node.provision_state)
self.assertEqual(states.NOSTATE, node.target_provision_state)
@@ -845,7 +845,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
target_provision_state=states.NOSTATE)
self.service.do_node_deploy(self.context, node.uuid, rebuild=True)
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
self.assertEqual(states.ACTIVE, node.provision_state)
self.assertEqual(states.NOSTATE, node.target_provision_state)
@@ -893,7 +893,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
self.context, node.uuid)
# Compare true exception hidden by @messaging.expected_exceptions
self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0])
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
# Make sure things were rolled back
self.assertEqual(prv_state, node.provision_state)
@@ -1049,7 +1049,7 @@ class DoNodeDeployTearDownTestCase(mgr_utils.ServiceSetUpMixin,
provision_updated_at=datetime.datetime(2000, 1, 1, 0, 0))
self.service._check_deploy_timeouts(self.context)
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
self.assertEqual(states.DEPLOYFAIL, node.provision_state)
self.assertEqual(states.ACTIVE, node.target_provision_state)
@@ -1067,7 +1067,7 @@ class DoNodeDeployTearDownTestCase(mgr_utils.ServiceSetUpMixin,
provision_updated_at=datetime.datetime(2000, 1, 1, 0, 0))
self.service._check_cleanwait_timeouts(self.context)
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
self.assertEqual(states.CLEANFAIL, node.provision_state)
self.assertEqual(tgt_prov_state, node.target_provision_state)
@@ -1162,8 +1162,9 @@ class DoNodeDeployTearDownTestCase(mgr_utils.ServiceSetUpMixin,
target_provision_state=states.AVAILABLE,
driver_internal_info={'is_whole_disk_image': False})
+ self._start_service()
self.service.do_node_tear_down(self.context, node.uuid)
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
# Node will be moved to AVAILABLE after cleaning, not tested here
self.assertEqual(states.CLEANING, node.provision_state)
@@ -1176,7 +1177,6 @@ class DoNodeDeployTearDownTestCase(mgr_utils.ServiceSetUpMixin,
def test__do_node_tear_down_from_valid_states(self):
valid_states = [states.ACTIVE, states.DEPLOYWAIT, states.DEPLOYFAIL,
states.ERROR]
- self._start_service()
for state in valid_states:
self._test_do_node_tear_down_from_state(state)
@@ -1207,7 +1207,7 @@ class DoNodeDeployTearDownTestCase(mgr_utils.ServiceSetUpMixin,
self.context, node.uuid)
# Compare true exception hidden by @messaging.expected_exceptions
self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0])
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
# Assert instance_info/driver_internal_info was not touched
self.assertEqual(fake_instance_info, node.instance_info)
@@ -1236,7 +1236,7 @@ class DoNodeDeployTearDownTestCase(mgr_utils.ServiceSetUpMixin,
self.context, node.uuid, 'provide')
# Compare true exception hidden by @messaging.expected_exceptions
self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0])
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
# Make sure things were rolled back
self.assertEqual(prv_state, node.provision_state)
@@ -1463,7 +1463,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
self.context, node.uuid, clean_steps)
# Compare true exception hidden by @messaging.expected_exceptions
self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0])
- self.service._worker_pool.waitall()
+ self._stop_service()
mock_validate.assert_called_once_with(mock.ANY)
mock_spawn.assert_called_with(self.service._do_node_clean, mock.ANY,
clean_steps)
@@ -1492,9 +1492,6 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
self.service.continue_node_clean,
self.context, node.uuid)
- self.service._worker_pool.waitall()
- node.refresh()
-
@mock.patch('ironic.conductor.manager.ConductorManager._spawn_worker')
def test_continue_node_clean_wrong_state(self, mock_spawn):
# Test the appropriate exception is raised if node isn't already
@@ -1511,7 +1508,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
self.service.continue_node_clean,
self.context, node.uuid)
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
# Make sure things were rolled back
self.assertEqual(prv_state, node.provision_state)
@@ -1533,7 +1530,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
clean_step=self.clean_steps[0])
self._start_service()
self.service.continue_node_clean(self.context, node.uuid)
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
self.assertEqual(states.CLEANING, node.provision_state)
self.assertEqual(tgt_prv_state, node.target_provision_state)
@@ -1561,7 +1558,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
driver_internal_info=driver_info, clean_step=self.clean_steps[0])
self._start_service()
self.service.continue_node_clean(self.context, node.uuid)
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
if skip:
expected_step_index = 1
@@ -1591,7 +1588,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
self._start_service()
self.service.continue_node_clean(self.context, node.uuid)
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
self.assertEqual(states.CLEANFAIL, node.provision_state)
self.assertEqual(tgt_prov_state, node.target_provision_state)
@@ -1619,7 +1616,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
self._start_service()
self.service.continue_node_clean(self.context, node.uuid)
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
self.assertEqual(tgt_prov_state, node.provision_state)
self.assertIsNone(node.target_provision_state)
@@ -1667,7 +1664,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
with task_manager.acquire(
self.context, node.uuid, shared=False) as task:
self.service._do_node_clean(task)
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
# Assert that the node was moved to available without cleaning
@@ -1779,7 +1776,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
self.context, node.uuid, shared=False) as task:
self.service._do_node_clean(task, clean_steps=clean_steps)
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
mock_validate.assert_called_once_with(task)
@@ -1827,7 +1824,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
self.context, node.uuid, shared=False) as task:
self.service._do_next_clean_step(task, 0)
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
self.assertEqual(states.CLEANWAIT, node.provision_state)
@@ -1868,7 +1865,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
self.context, node.uuid, shared=False) as task:
self.service._do_next_clean_step(task, self.next_clean_step_index)
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
self.assertEqual(states.CLEANWAIT, node.provision_state)
@@ -1907,7 +1904,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
self.context, node.uuid, shared=False) as task:
self.service._do_next_clean_step(task, None)
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
# Cleaning should be complete without calling additional steps
@@ -1947,7 +1944,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
self.context, node.uuid, shared=False) as task:
self.service._do_next_clean_step(task, 0)
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
# Cleaning should be complete
@@ -1992,7 +1989,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
self.service._do_next_clean_step(task, 0)
tear_mock.assert_called_once_with(task.driver.deploy, task)
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
# Make sure we go to CLEANFAIL, clear clean_steps
@@ -2034,7 +2031,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
self.context, node.uuid, shared=False) as task:
self.service._do_next_clean_step(task, 0)
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
# Make sure we go to CLEANFAIL, clear clean_steps
@@ -2075,7 +2072,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
self.context, node.uuid, shared=False) as task:
self.service._do_next_clean_step(task, None)
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
# Cleaning should be complete without calling additional steps
@@ -2114,7 +2111,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
self.context, node.uuid, shared=False) as task:
self.service._do_next_clean_step(task, 0)
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
# Make sure we go to CLEANFAIL, clear clean_steps
@@ -2232,7 +2229,7 @@ class DoNodeVerifyTestCase(mgr_utils.ServiceSetUpMixin,
self.context, node['id'], shared=False) as task:
self.service._do_node_verify(task)
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
mock_validate.assert_called_once_with(task)
@@ -2261,7 +2258,7 @@ class DoNodeVerifyTestCase(mgr_utils.ServiceSetUpMixin,
self.context, node['id'], shared=False) as task:
self.service._do_node_verify(task)
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
mock_validate.assert_called_once_with(task)
@@ -2289,7 +2286,7 @@ class DoNodeVerifyTestCase(mgr_utils.ServiceSetUpMixin,
self.context, node['id'], shared=False) as task:
self.service._do_node_verify(task)
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
mock_get_power_state.assert_called_once_with(task)
@@ -2394,14 +2391,14 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
self.context, node.uuid, True)
# Compare true exception hidden by @messaging.expected_exceptions
self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0])
- self.service._worker_pool.waitall()
+ self._stop_service()
spawn_mock.assert_called_once_with(mock.ANY, mock.ANY, mock.ANY)
def test_set_console_mode_enabled(self):
node = obj_utils.create_test_node(self.context, driver='fake')
self._start_service()
self.service.set_console_mode(self.context, node.uuid, True)
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
self.assertTrue(node.console_enabled)
@@ -2409,7 +2406,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
node = obj_utils.create_test_node(self.context, driver='fake')
self._start_service()
self.service.set_console_mode(self.context, node.uuid, False)
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
self.assertFalse(node.console_enabled)
@@ -2425,7 +2422,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
# Compare true exception hidden by @messaging.expected_exceptions
self.assertEqual(exception.UnsupportedDriverExtension,
exc.exc_info[0])
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
def test_set_console_mode_validation_fail(self):
@@ -2449,7 +2446,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
'start_console') as mock_sc:
mock_sc.side_effect = exception.IronicException('test-error')
self.service.set_console_mode(self.context, node.uuid, True)
- self.service._worker_pool.waitall()
+ self._stop_service()
mock_sc.assert_called_once_with(mock.ANY)
node.refresh()
self.assertIsNotNone(node.last_error)
@@ -2463,7 +2460,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
'stop_console') as mock_sc:
mock_sc.side_effect = exception.IronicException('test-error')
self.service.set_console_mode(self.context, node.uuid, False)
- self.service._worker_pool.waitall()
+ self._stop_service()
mock_sc.assert_called_once_with(mock.ANY)
node.refresh()
self.assertIsNotNone(node.last_error)
@@ -2475,7 +2472,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
with mock.patch.object(self.driver.console,
'start_console') as mock_sc:
self.service.set_console_mode(self.context, node.uuid, True)
- self.service._worker_pool.waitall()
+ self._stop_service()
self.assertFalse(mock_sc.called)
def test_disable_console_already_disabled(self):
@@ -2485,7 +2482,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
with mock.patch.object(self.driver.console,
'stop_console') as mock_sc:
self.service.set_console_mode(self.context, node.uuid, False)
- self.service._worker_pool.waitall()
+ self._stop_service()
self.assertFalse(mock_sc.called)
def test_get_console(self):
@@ -3065,32 +3062,6 @@ class RaidTestCases(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
self.assertEqual(exception.InvalidParameterValue, exc.exc_info[0])
-class ManagerSpawnWorkerTestCase(tests_base.TestCase):
- def setUp(self):
- super(ManagerSpawnWorkerTestCase, self).setUp()
- self.service = manager.ConductorManager('hostname', 'test-topic')
-
- def test__spawn_worker(self):
- worker_pool = mock.Mock(spec_set=['free', 'spawn'])
- worker_pool.free.return_value = True
- self.service._worker_pool = worker_pool
-
- self.service._spawn_worker('fake', 1, 2, foo='bar', cat='meow')
-
- worker_pool.spawn.assert_called_once_with(
- 'fake', 1, 2, foo='bar', cat='meow')
-
- def test__spawn_worker_none_free(self):
- worker_pool = mock.Mock(spec_set=['free', 'spawn'])
- worker_pool.free.return_value = False
- self.service._worker_pool = worker_pool
-
- self.assertRaises(exception.NoFreeConductorWorker,
- self.service._spawn_worker, 'fake')
-
- self.assertFalse(worker_pool.spawn.called)
-
-
@mock.patch.object(conductor_utils, 'node_power_action')
class ManagerDoSyncPowerStateTestCase(tests_db_base.DbTestCase):
def setUp(self):
@@ -4184,7 +4155,7 @@ class NodeInspectHardware(mgr_utils.ServiceSetUpMixin,
inspection_started_at=datetime.datetime(2000, 1, 1, 0, 0))
self.service._check_inspect_timeouts(self.context)
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
self.assertEqual(states.INSPECTFAIL, node.provision_state)
self.assertEqual(states.MANAGEABLE, node.target_provision_state)
@@ -4207,7 +4178,7 @@ class NodeInspectHardware(mgr_utils.ServiceSetUpMixin,
self.context, node.uuid)
# Compare true exception hidden by @messaging.expected_exceptions
self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0])
- self.service._worker_pool.waitall()
+ self._stop_service()
node.refresh()
# Make sure things were rolled back
self.assertEqual(prv_state, node.provision_state)
diff --git a/ironic/tests/unit/conductor/test_task_manager.py b/ironic/tests/unit/conductor/test_task_manager.py
index 093bdecdf..7aadc7bcb 100644
--- a/ironic/tests/unit/conductor/test_task_manager.py
+++ b/ironic/tests/unit/conductor/test_task_manager.py
@@ -17,8 +17,6 @@
"""Tests for :class:`ironic.conductor.task_manager`."""
-import eventlet
-from eventlet import greenpool
import mock
from oslo_utils import uuidutils
@@ -47,6 +45,7 @@ class TaskManagerTestCase(tests_db_base.DbTestCase):
self.config(node_locked_retry_attempts=1, group='conductor')
self.config(node_locked_retry_interval=0, group='conductor')
self.node = obj_utils.create_test_node(self.context)
+ self.future_mock = mock.Mock(spec=['cancel', 'add_done_callback'])
def test_excl_lock(self, get_portgroups_mock, get_ports_mock,
get_driver_mock, reserve_mock, release_mock,
@@ -389,8 +388,7 @@ class TaskManagerTestCase(tests_db_base.DbTestCase):
def test_spawn_after(
self, get_portgroups_mock, get_ports_mock, get_driver_mock,
reserve_mock, release_mock, node_get_mock):
- thread_mock = mock.Mock(spec_set=['link', 'cancel'])
- spawn_mock = mock.Mock(return_value=thread_mock)
+ spawn_mock = mock.Mock(return_value=self.future_mock)
task_release_mock = mock.Mock()
reserve_mock.return_value = self.node
@@ -399,9 +397,9 @@ class TaskManagerTestCase(tests_db_base.DbTestCase):
task.release_resources = task_release_mock
spawn_mock.assert_called_once_with(1, 2, foo='bar', cat='meow')
- thread_mock.link.assert_called_once_with(
+ self.future_mock.add_done_callback.assert_called_once_with(
task._thread_release_resources)
- self.assertFalse(thread_mock.cancel.called)
+ self.assertFalse(self.future_mock.cancel.called)
# Since we mocked link(), we're testing that __exit__ didn't
# release resources pending the finishing of the background
# thread
@@ -444,9 +442,9 @@ class TaskManagerTestCase(tests_db_base.DbTestCase):
def test_spawn_after_link_fails(
self, get_portgroups_mock, get_ports_mock, get_driver_mock,
reserve_mock, release_mock, node_get_mock):
- thread_mock = mock.Mock(spec_set=['link', 'cancel'])
- thread_mock.link.side_effect = exception.IronicException('foo')
- spawn_mock = mock.Mock(return_value=thread_mock)
+ self.future_mock.add_done_callback.side_effect = (
+ exception.IronicException('foo'))
+ spawn_mock = mock.Mock(return_value=self.future_mock)
task_release_mock = mock.Mock()
thr_release_mock = mock.Mock(spec_set=[])
reserve_mock.return_value = self.node
@@ -459,8 +457,9 @@ class TaskManagerTestCase(tests_db_base.DbTestCase):
self.assertRaises(exception.IronicException, _test_it)
spawn_mock.assert_called_once_with(1, 2, foo='bar', cat='meow')
- thread_mock.link.assert_called_once_with(thr_release_mock)
- thread_mock.cancel.assert_called_once_with()
+ self.future_mock.add_done_callback.assert_called_once_with(
+ thr_release_mock)
+ self.future_mock.cancel.assert_called_once_with()
task_release_mock.assert_called_once_with()
def test_spawn_after_on_error_hook(
@@ -659,75 +658,3 @@ class ExclusiveLockDecoratorTestCase(tests_base.TestCase):
_req_excl_lock_method,
*self.args_task_second,
**self.kwargs)
-
-
-class TaskManagerGreenThreadTestCase(tests_base.TestCase):
- """Class to assert our assumptions about greenthread behavior."""
- def test_gt_link_callback_added_during_execution(self):
- pool = greenpool.GreenPool()
- q1 = eventlet.Queue()
- q2 = eventlet.Queue()
-
- def func():
- q1.put(None)
- q2.get()
-
- link_callback = mock.Mock()
-
- thread = pool.spawn(func)
- q1.get()
- thread.link(link_callback)
- q2.put(None)
- pool.waitall()
- link_callback.assert_called_once_with(thread)
-
- def test_gt_link_callback_added_after_execution(self):
- pool = greenpool.GreenPool()
- link_callback = mock.Mock()
-
- thread = pool.spawn(lambda: None)
- pool.waitall()
- thread.link(link_callback)
- link_callback.assert_called_once_with(thread)
-
- def test_gt_link_callback_exception_inside_thread(self):
- pool = greenpool.GreenPool()
- q1 = eventlet.Queue()
- q2 = eventlet.Queue()
-
- def func():
- q1.put(None)
- q2.get()
- raise Exception()
-
- link_callback = mock.Mock()
-
- thread = pool.spawn(func)
- q1.get()
- thread.link(link_callback)
- q2.put(None)
- pool.waitall()
- link_callback.assert_called_once_with(thread)
-
- def test_gt_link_callback_added_after_exception_inside_thread(self):
- pool = greenpool.GreenPool()
-
- def func():
- raise Exception()
-
- link_callback = mock.Mock()
-
- thread = pool.spawn(func)
- pool.waitall()
- thread.link(link_callback)
-
- link_callback.assert_called_once_with(thread)
-
- def test_gt_cancel_doesnt_run_thread(self):
- pool = greenpool.GreenPool()
- func = mock.Mock()
- thread = pool.spawn(func)
- thread.link(lambda t: None)
- thread.cancel()
- pool.waitall()
- self.assertFalse(func.called)
diff --git a/ironic/tests/unit/drivers/test_base.py b/ironic/tests/unit/drivers/test_base.py
index 4f38b9321..55d647ed0 100644
--- a/ironic/tests/unit/drivers/test_base.py
+++ b/ironic/tests/unit/drivers/test_base.py
@@ -15,7 +15,7 @@
import json
-import eventlet
+from futurist import periodics
import mock
from ironic.common import exception
@@ -85,36 +85,21 @@ class PassthruDecoratorTestCase(base.TestCase):
inst2.driver_routes['driver_noexception']['func'])
-@mock.patch.object(eventlet.greenthread, 'spawn_n', autospec=True,
- side_effect=lambda func, *args, **kw: func(*args, **kw))
class DriverPeriodicTaskTestCase(base.TestCase):
- def test(self, spawn_mock):
+ def test(self):
method_mock = mock.MagicMock(spec_set=[])
- function_mock = mock.MagicMock(spec_set=[])
class TestClass(object):
@driver_base.driver_periodic_task(spacing=42)
def method(self, foo, bar=None):
method_mock(foo, bar=bar)
- @driver_base.driver_periodic_task(spacing=100, parallel=False)
- def function():
- function_mock()
-
obj = TestClass()
self.assertEqual(42, obj.method._periodic_spacing)
- self.assertTrue(obj.method._periodic_task)
- self.assertEqual('ironic.tests.unit.drivers.test_base.method',
- obj.method._periodic_name)
- self.assertEqual('ironic.tests.unit.drivers.test_base.function',
- function._periodic_name)
+ self.assertTrue(periodics.is_periodic(obj.method))
obj.method(1, bar=2)
method_mock.assert_called_once_with(1, bar=2)
- self.assertEqual(1, spawn_mock.call_count)
- function()
- function_mock.assert_called_once_with()
- self.assertEqual(1, spawn_mock.call_count)
class CleanStepDecoratorTestCase(base.TestCase):
diff --git a/releasenotes/notes/futurist-e9c55699f479f97a.yaml b/releasenotes/notes/futurist-e9c55699f479f97a.yaml
new file mode 100644
index 000000000..4423af5c3
--- /dev/null
+++ b/releasenotes/notes/futurist-e9c55699f479f97a.yaml
@@ -0,0 +1,16 @@
+---
+prelude: >
+ This release features switch to Oslo Futurist library for asynchronous
+ thread execution and periodic tasks. Main benefit is that periodic tasks
+ are now executed truly in parallel, and not sequentially in one
+ green thread.
+upgrade:
+ - Configuration option "workers_pool_size" can no longer be less or equal
+ to 2. Please set it to greater value (the default is 100) before update.
+deprecations:
+ - Configuration option "periodic_interval" is deprecated.
+ - Using "driver_periodic_task" decorator is deprecated. Please update your
+ out-of-tree drivers to use "periodics.periodic" decorator from Futurist
+ library.
+fixes:
+ - Periodic tasks are no longer executed all in one thread.
diff --git a/requirements.txt b/requirements.txt
index 15b4fc2d6..a4ecf6475 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -43,3 +43,4 @@ retrying!=1.3.0,>=1.2.3 # Apache-2.0
oslo.versionedobjects>=1.5.0 # Apache-2.0
jsonschema!=2.5.0,<3.0.0,>=2.0.0 # MIT
psutil<2.0.0,>=1.1.1 # BSD
+futurist>=0.11.0 # Apache-2.0