diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2016-04-10 15:49:59 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2016-04-10 16:16:50 +0300 |
commit | 80807a90a24defa909ee55a39ee1b4b0ad6510ba (patch) | |
tree | 04e32883d651c26c4671d09440f3523c68dd3db0 /tests/test_schedulers.py | |
parent | 46e4d66bb5ef896d41bdadea5cf60da90ce08ee4 (diff) | |
download | apscheduler-80807a90a24defa909ee55a39ee1b4b0ad6510ba.tar.gz |
Added the ability to pause and resume job processing in the scheduler
Closes #21
Diffstat (limited to 'tests/test_schedulers.py')
-rw-r--r-- | tests/test_schedulers.py | 399 |
1 files changed, 216 insertions, 183 deletions
diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py index 66e0d7c..15a7b3a 100644 --- a/tests/test_schedulers.py +++ b/tests/test_schedulers.py @@ -13,12 +13,13 @@ from apscheduler.job import Job from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.schedulers import SchedulerAlreadyRunningError, SchedulerNotRunningError -from apscheduler.schedulers.base import BaseScheduler +from apscheduler.schedulers.base import BaseScheduler, SchedulerState from apscheduler.events import ( - EVENT_SCHEDULER_START, EVENT_SCHEDULER_SHUTDOWN, EVENT_JOBSTORE_ADDED, + EVENT_SCHEDULER_STARTED, EVENT_SCHEDULER_SHUTDOWN, EVENT_JOBSTORE_ADDED, EVENT_JOBSTORE_REMOVED, EVENT_ALL, EVENT_ALL_JOBS_REMOVED, EVENT_EXECUTOR_ADDED, - EVENT_EXECUTOR_REMOVED, EVENT_JOB_MODIFIED, EVENT_JOB_REMOVED, SchedulerEvent, - EVENT_JOB_ADDED, EVENT_JOB_EXECUTED, EVENT_JOB_SUBMITTED, EVENT_JOB_MAX_INSTANCES) + EVENT_EXECUTOR_REMOVED, EVENT_JOB_MODIFIED, EVENT_JOB_REMOVED, EVENT_JOB_ADDED, + EVENT_JOB_EXECUTED, EVENT_JOB_SUBMITTED, EVENT_JOB_MAX_INSTANCES, SchedulerEvent, + EVENT_SCHEDULER_PAUSED, EVENT_SCHEDULER_RESUMED) from apscheduler.triggers.base import BaseTrigger from apscheduler.util import undefined @@ -35,8 +36,9 @@ except ImportError: class DummyScheduler(BaseScheduler): - def start(self): - super(DummyScheduler, self).start() + def __init__(self, *args, **kwargs): + super(DummyScheduler, self).__init__(*args, **kwargs) + self.wakeup = MagicMock() def shutdown(self, wait=True): super(DummyScheduler, self).shutdown(wait) @@ -57,6 +59,9 @@ class DummyExecutor(BaseExecutor): def __init__(self, **args): super(DummyExecutor, self).__init__() self.args = args + self.start = MagicMock() + self.shutdown = MagicMock() + self.submit_job = MagicMock() def _do_submit_job(self, job, run_times): pass @@ -66,6 +71,8 @@ class DummyJobStore(BaseJobStore): def __init__(self, **args): super(DummyJobStore, self).__init__() self.args = args + self.start = MagicMock() + self.shutdown = MagicMock() def get_due_jobs(self, now): pass @@ -92,14 +99,18 @@ class DummyJobStore(BaseJobStore): pass -@pytest.fixture -def scheduler(monkeypatch, timezone): - monkeypatch.setattr('apscheduler.schedulers.base.get_localzone', - MagicMock(return_value=timezone)) - return DummyScheduler() +class TestBaseScheduler(object): + @pytest.fixture + def scheduler(self, timezone): + return DummyScheduler() + @pytest.fixture + def scheduler_events(self, request, scheduler): + events = [] + mask = getattr(request, 'param', EVENT_ALL ^ EVENT_SCHEDULER_STARTED) + scheduler.add_listener(events.append, mask) + return events -class TestBaseScheduler(object): def test_constructor(self): with patch('%s.DummyScheduler.configure' % __name__) as configure: gconfig = {'apscheduler.foo': 'bar', 'apscheduler.x': 'y'} @@ -163,9 +174,31 @@ class TestBaseScheduler(object): } }) - def test_configure_already_running(self, scheduler): - scheduler._stopped = False - pytest.raises(SchedulerAlreadyRunningError, scheduler.configure, {}) + @pytest.mark.parametrize('method', [ + BaseScheduler.configure, + BaseScheduler.start + ]) + def test_scheduler_already_running(self, method, scheduler): + """ + Test that SchedulerAlreadyRunningError is raised when certain methods are called before + the scheduler has been started. + + """ + scheduler.start(paused=True) + pytest.raises(SchedulerAlreadyRunningError, method, scheduler) + + @pytest.mark.parametrize('method', [ + BaseScheduler.pause, + BaseScheduler.resume, + BaseScheduler.shutdown + ], ids=['pause', 'resume', 'shutdown']) + def test_scheduler_not_running(self, scheduler, method): + """ + Test that the SchedulerNotRunningError is raised when certain methods are called before + the scheduler has been started. + + """ + pytest.raises(SchedulerNotRunningError, method, scheduler) def test_start(self, scheduler, create_job): scheduler._executors = {'exec1': MagicMock(BaseExecutor), 'exec2': MagicMock(BaseExecutor)} @@ -186,7 +219,7 @@ class TestBaseScheduler(object): assert 'default' in scheduler._executors assert 'default' in scheduler._jobstores - scheduler._real_add_job.assert_called_once_with(job, 'store1', False, False) + scheduler._real_add_job.assert_called_once_with(job, 'store1', False) assert scheduler._pending_jobs == [] assert scheduler._dispatch_event.call_count == 3 @@ -197,144 +230,139 @@ class TestBaseScheduler(object): assert event.code == EVENT_JOBSTORE_ADDED assert event.alias == 'default' event = scheduler._dispatch_event.call_args_list[2][0][0] - assert event.code == EVENT_SCHEDULER_START - - assert not scheduler._stopped + assert event.code == EVENT_SCHEDULER_STARTED - def test_start_already_running(self, scheduler): - scheduler._stopped = False - pytest.raises(SchedulerAlreadyRunningError, scheduler.start) + assert scheduler.state is SchedulerState.running @pytest.mark.parametrize('wait', [True, False], ids=['wait', 'nowait']) - def test_shutdown(self, scheduler, wait): - scheduler._executors = {'exec1': MagicMock(BaseExecutor), 'exec2': MagicMock(BaseExecutor)} - scheduler._jobstores = {'store1': MagicMock(BaseJobStore), - 'store2': MagicMock(BaseJobStore)} - scheduler._stopped = False - scheduler._dispatch_event = MagicMock() + def test_shutdown(self, scheduler, scheduler_events, wait): + executor = DummyExecutor() + jobstore = DummyJobStore() + scheduler.add_executor(executor) + scheduler.add_jobstore(jobstore) + scheduler.start(paused=True) + del scheduler_events[:] scheduler.shutdown(wait) - assert scheduler._stopped is True - assert scheduler._dispatch_event.call_count == 1 - event = scheduler._dispatch_event.call_args[0][0] - assert event.code == EVENT_SCHEDULER_SHUTDOWN - - for executor in scheduler._executors.values(): - executor.shutdown.assert_called_once_with(wait) - for jobstore in scheduler._jobstores.values(): - jobstore.shutdown.assert_called_once_with() - - def test_shutdown_not_running(self, scheduler): - pytest.raises(SchedulerNotRunningError, scheduler.shutdown) - - @pytest.mark.parametrize('stopped', [True, False], ids=['stopped=True', 'stopped=False']) - def test_running(self, scheduler, stopped): - scheduler._stopped = stopped - assert scheduler.running is not stopped - - @pytest.mark.parametrize('stopped', [True, False], ids=['stopped=True', 'stopped=False']) - def test_add_executor(self, scheduler, stopped): - scheduler._stopped = stopped - executor = DebugExecutor() - executor.start = MagicMock() - scheduler.add_executor(executor) + assert scheduler.state is SchedulerState.stopped + assert len(scheduler_events) == 1 + assert scheduler_events[0].code == EVENT_SCHEDULER_SHUTDOWN + + executor.shutdown.assert_called_once_with(wait) + jobstore.shutdown.assert_called_once_with() + + def test_pause_resume(self, scheduler, scheduler_events): + scheduler.start() + del scheduler_events[:] + scheduler.wakeup.reset_mock() + + scheduler.pause() + + assert len(scheduler_events) == 1 + assert scheduler_events[0].code == EVENT_SCHEDULER_PAUSED + assert not scheduler.wakeup.called - assert scheduler._executors == {'default': executor} - if not stopped: - executor.start.assert_called_once_with(scheduler, 'default') + scheduler.resume() + assert len(scheduler_events) == 2 + assert scheduler_events[1].code == EVENT_SCHEDULER_RESUMED + assert scheduler.wakeup.called + + @pytest.mark.parametrize('start_scheduler', [True, False]) + def test_running(self, scheduler, start_scheduler): + if start_scheduler: + scheduler.start() + + assert scheduler.running is start_scheduler + + @pytest.mark.parametrize('start_scheduler', [True, False]) + def test_add_remove_executor(self, scheduler, scheduler_events, start_scheduler): + if start_scheduler: + scheduler.start(paused=True) + + del scheduler_events[:] + executor = DummyExecutor() + scheduler.add_executor(executor, 'exec1') + + assert len(scheduler_events) == 1 + assert scheduler_events[0].code == EVENT_EXECUTOR_ADDED + assert scheduler_events[0].alias == 'exec1' + if start_scheduler: + executor.start.assert_called_once_with(scheduler, 'exec1') else: - assert executor.start.call_count == 0 + assert not executor.start.called + + scheduler.remove_executor('exec1') + assert len(scheduler_events) == 2 + assert scheduler_events[1].code == EVENT_EXECUTOR_REMOVED + assert scheduler_events[1].alias == 'exec1' + assert executor.shutdown.called def test_add_executor_already_exists(self, scheduler): - executor = DebugExecutor() + executor = DummyExecutor() scheduler.add_executor(executor) exc = pytest.raises(ValueError, scheduler.add_executor, executor) assert str(exc.value) == 'This scheduler already has an executor by the alias of "default"' - def test_remove_executor(self, scheduler): - scheduler.add_executor(DebugExecutor(), 'foo') - scheduler._dispatch_event = MagicMock() - scheduler.remove_executor('foo') - - assert scheduler._executors == {} - assert scheduler._dispatch_event.call_count == 1 - event = scheduler._dispatch_event.call_args[0][0] - assert event.code == EVENT_EXECUTOR_REMOVED - assert event.alias == 'foo' - def test_remove_executor_nonexistent(self, scheduler): pytest.raises(KeyError, scheduler.remove_executor, 'foo') - @pytest.mark.parametrize('stopped', [True, False], ids=['stopped=True', 'stopped=False']) - def test_add_jobstore(self, scheduler, stopped): - scheduler._stopped = stopped - jobstore = MemoryJobStore() - jobstore.start = MagicMock() - scheduler._real_add_job = MagicMock() - scheduler._dispatch_event = MagicMock() - scheduler.wakeup = MagicMock() - scheduler.add_jobstore(jobstore) + @pytest.mark.parametrize('start_scheduler', [True, False]) + def test_add_jobstore(self, scheduler, scheduler_events, start_scheduler): + """ + Test that the proper event is dispatched when a job store is added and the scheduler's + wake() method is called if the scheduler is running. - assert scheduler._dispatch_event.call_count == 1 - event = scheduler._dispatch_event.call_args[0][0] - assert event.code == EVENT_JOBSTORE_ADDED - assert event.alias == 'default' - if stopped: - assert jobstore.start.call_count == 0 - assert scheduler.wakeup.call_count == 0 + """ + if start_scheduler: + scheduler.start() + + del scheduler_events[:] + jobstore = DummyJobStore() + scheduler.add_jobstore(jobstore, 'store1') + + assert len(scheduler_events) == 1 + assert scheduler_events[0].code == EVENT_JOBSTORE_ADDED + assert scheduler_events[0].alias == 'store1' + + if start_scheduler: + assert scheduler.wakeup.called + jobstore.start.assert_called_once_with(scheduler, 'store1') else: - scheduler.wakeup.assert_called_once_with() - jobstore.start.assert_called_once_with(scheduler, 'default') + assert not jobstore.start.called def test_add_jobstore_already_exists(self, scheduler): + """ + Test that ValueError is raised when a job store is added with an alias that already exists. + + """ jobstore = MemoryJobStore() scheduler.add_jobstore(jobstore) exc = pytest.raises(ValueError, scheduler.add_jobstore, jobstore) assert str(exc.value) == 'This scheduler already has a job store by the alias of "default"' - def test_remove_jobstore(self, scheduler): + def test_remove_jobstore(self, scheduler, scheduler_events): scheduler.add_jobstore(MemoryJobStore(), 'foo') - scheduler._dispatch_event = MagicMock() scheduler.remove_jobstore('foo') - assert scheduler._jobstores == {} - assert scheduler._dispatch_event.call_count == 1 - event = scheduler._dispatch_event.call_args[0][0] - assert event.code == EVENT_JOBSTORE_REMOVED - assert event.alias == 'foo' + assert len(scheduler_events) == 2 + assert scheduler_events[1].code == EVENT_JOBSTORE_REMOVED + assert scheduler_events[1].alias == 'foo' def test_remove_jobstore_nonexistent(self, scheduler): pytest.raises(KeyError, scheduler.remove_jobstore, 'foo') - @pytest.mark.parametrize('event', [EVENT_ALL, EVENT_JOBSTORE_ADDED], - ids=['default', 'explicit value']) - def test_add_listener(self, scheduler, event): - def listener(event): - pass - - args = (event,) if event != EVENT_ALL else () - scheduler.add_listener(listener, *args) - assert scheduler._listeners == [(listener, event)] - - def test_remove_listener(self, scheduler): - def func(): - pass - - def func2(): - pass - - scheduler._listeners = [(func, EVENT_ALL), (func2, EVENT_JOBSTORE_ADDED)] - scheduler.remove_listener(func) - assert scheduler._listeners == [(func2, EVENT_JOBSTORE_ADDED)] - - @pytest.mark.parametrize('stopped', [True, False], ids=['stopped=True', 'stopped=False']) - def test_add_job(self, scheduler, stopped, timezone): - def func(x, y): - pass + def test_add_remove_listener(self, scheduler): + """Test that event dispatch works but removed listeners aren't called.""" + events = [] + scheduler.add_listener(events.append, EVENT_EXECUTOR_ADDED) + scheduler.add_executor(DummyExecutor(), 'exec1') + scheduler.remove_listener(events.append) + scheduler.add_executor(DummyExecutor(), 'exec2') + assert len(events) == 1 - scheduler._stopped = stopped - scheduler._real_add_job = MagicMock() - job = scheduler.add_job(func, 'date', [1], {'y': 2}, 'my-id', 'dummy', + def test_add_job_return_value(self, scheduler, timezone): + """Test that when a job is added to a stopped scheduler, a Job instance is returned.""" + job = scheduler.add_job(lambda x, y: None, 'date', [1], {'y': 2}, 'my-id', 'dummy', next_run_time=datetime(2014, 5, 23, 10), run_date='2014-06-01 08:41:00') @@ -344,16 +372,57 @@ class TestBaseScheduler(object): assert not hasattr(job, 'coalesce') assert not hasattr(job, 'max_instances') assert job.next_run_time.tzinfo.zone == timezone.zone - assert len(scheduler._pending_jobs) == (1 if stopped else 0) - assert scheduler._real_add_job.call_count == (0 if stopped else 1) + + def test_add_job_pending(self, scheduler, scheduler_events): + """ + Test that when a job is added to a stopped scheduler, it is not added to a job store until + the scheduler is started and that the event is dispatched when that happens. + + """ + scheduler.configure(job_defaults={ + 'misfire_grace_time': 3, 'coalesce': False, 'max_instances': 6 + }) + job = scheduler.add_job(lambda: None, 'interval', hours=1) + assert not scheduler_events + + scheduler.start(paused=True) + + assert len(scheduler_events) == 3 + assert scheduler_events[2].code == EVENT_JOB_ADDED + assert scheduler_events[2].job_id is job.id + + # Check that the undefined values were replaced with scheduler's job defaults + assert job.misfire_grace_time == 3 + assert not job.coalesce + assert job.max_instances == 6 + + def test_add_job_id_conflict(self, scheduler): + """ + Test that if a job is added with an already existing id, ConflictingIdError is raised. + + """ + scheduler.start(paused=True) + scheduler.add_job(lambda: None, 'interval', id='testjob', seconds=1) + pytest.raises(ConflictingIdError, scheduler.add_job, lambda: None, 'interval', + id='testjob', seconds=1) + + def test_add_job_replace(self, scheduler): + """Test that with replace_existing=True, a new job replaces another with the same id.""" + scheduler.start(paused=True) + scheduler.add_job(lambda: None, 'interval', id='testjob', seconds=1) + scheduler.add_job(lambda: None, 'cron', id='testjob', name='replacement', + replace_existing=True) + jobs = scheduler.get_jobs() + assert len(jobs) == 1 + assert jobs[0].name == 'replacement' def test_scheduled_job(self, scheduler): def func(x, y): pass scheduler.add_job = MagicMock() - decorator = scheduler.scheduled_job('date', [1], {'y': 2}, 'my-id', 'dummy', - run_date='2014-06-01 08:41:00') + decorator = scheduler.scheduled_job('date', [1], {'y': 2}, 'my-id', + 'dummy', run_date='2014-06-01 08:41:00') decorator(func) scheduler.add_job.assert_called_once_with( @@ -671,50 +740,6 @@ Jobstore baz: assert not scheduler._listeners[0][0].called scheduler._listeners[1][0].assert_called_once_with(event) - @pytest.mark.parametrize('job_exists', [True, False], ids=['job exists', 'new job']) - @pytest.mark.parametrize('replace_existing', [True, False], ids=['replace', 'no replace']) - @pytest.mark.parametrize('wakeup', [True, False], ids=['wakeup', 'no wakeup']) - def test_real_add_job(self, scheduler, job_exists, replace_existing, wakeup): - job = Job(scheduler, id='foo', func=lambda: None, args=(), kwargs={}, next_run_time=None) - jobstore = MagicMock( - BaseJobStore, _alias='bar', - add_job=MagicMock(side_effect=ConflictingIdError('foo') if job_exists else None)) - scheduler.wakeup = MagicMock() - scheduler._job_defaults = {'misfire_grace_time': 3, 'coalesce': False, 'max_instances': 6} - scheduler._dispatch_event = MagicMock() - scheduler._jobstores = {'bar': jobstore} - - # Expect and exception if the job already exists and we're not trying to replace it - if job_exists and not replace_existing: - pytest.raises(ConflictingIdError, scheduler._real_add_job, job, 'bar', - replace_existing, wakeup) - return - - scheduler._real_add_job(job, 'bar', replace_existing, wakeup) - - # Check that the undefined values were replaced with scheduler defaults - assert job.misfire_grace_time == 3 - assert job.coalesce is False - assert job.max_instances == 6 - assert job.next_run_time is None - - if job_exists: - jobstore.update_job.assert_called_once_with(job) - else: - assert not jobstore.update_job.called - - if wakeup: - scheduler.wakeup.assert_called_once_with() - else: - assert not scheduler.wakeup.called - - assert job._jobstore_alias == 'bar' - - assert scheduler._dispatch_event.call_count == 1 - event = scheduler._dispatch_event.call_args[0][0] - assert event.code == EVENT_JOB_ADDED - assert event.job_id == 'foo' - @pytest.mark.parametrize('load_plugin', [True, False], ids=['load plugin', 'plugin loaded']) def test_create_trigger(self, scheduler, load_plugin): """Tests that creating a trigger with an already loaded plugin works.""" @@ -777,20 +802,21 @@ Jobstore baz: assert len(events) == 1 assert events[0].scheduled_run_times == [freeze_time.get(scheduler.timezone)] - def test_job_max_instances_event(self, scheduler, freeze_time): - class MaxedOutExecutor(DummyExecutor): + @pytest.mark.parametrize('scheduler_events', [EVENT_JOB_MAX_INSTANCES], + indirect=['scheduler_events']) + def test_job_max_instances_event(self, scheduler, scheduler_events, freeze_time): + class MaxedOutExecutor(DebugExecutor): def submit_job(self, job, run_times): raise MaxInstancesReachedError(job) - events = [] - scheduler.add_executor(MaxedOutExecutor(), 'maxed') + executor = MaxedOutExecutor() + scheduler.add_executor(executor, 'maxed') scheduler.add_job(lambda: None, run_date=freeze_time.get(), executor='maxed') - scheduler.add_listener(events.append, EVENT_JOB_MAX_INSTANCES) scheduler.start() scheduler._process_jobs() - assert len(events) == 1 - assert events[0].scheduled_run_times == [freeze_time.get(scheduler.timezone)] + assert len(scheduler_events) == 1 + assert scheduler_events[0].scheduled_run_times == [freeze_time.get(scheduler.timezone)] class TestProcessJobs(object): @@ -802,6 +828,12 @@ class TestProcessJobs(object): return job @pytest.fixture + def scheduler(self): + scheduler = DummyScheduler() + scheduler.start() + return scheduler + + @pytest.fixture def jobstore(self, scheduler, job): jobstore = MagicMock(BaseJobStore, get_due_jobs=MagicMock(return_value=[job]), get_next_run_time=MagicMock(return_value=None)) @@ -816,11 +848,12 @@ class TestProcessJobs(object): def test_nonexistent_executor(self, scheduler, jobstore, caplog): """ - Tests that an error is logged and the job is removed from its job store if its executor is + Test that an error is logged and the job is removed from its job store if its executor is not found. """ caplog.set_level(logging.ERROR) + scheduler.remove_executor('default') assert scheduler._process_jobs() is None jobstore.remove_job.assert_called_once_with(999) assert len(caplog.records) == 1 @@ -838,7 +871,7 @@ class TestProcessJobs(object): assert caplog.records[0].message == \ 'Execution of job "job 999" skipped: maximum number of running instances reached (1)' - def test_executor_error(self, scheduler, job, jobstore, executor, caplog): + def test_executor_error(self, scheduler, jobstore, executor, caplog): """Tests that if any exception is raised in executor.submit(), it is logged.""" caplog.set_level(logging.ERROR) executor.submit_job = MagicMock(side_effect=Exception('test message')) @@ -848,7 +881,7 @@ class TestProcessJobs(object): assert 'test message' in caplog.records[0].exc_text assert 'Error submitting job "job 999" to executor "default"' in caplog.records[0].message - def test_job_update(self, scheduler, job, jobstore, executor, freeze_time): + def test_job_update(self, scheduler, job, jobstore, freeze_time): """ Tests that the job is updated in its job store with the next run time from the trigger. @@ -859,7 +892,7 @@ class TestProcessJobs(object): job._modify.assert_called_once_with(next_run_time=next_run_time) jobstore.update_job.assert_called_once_with(job) - def test_wait_time(self, scheduler, job, executor, freeze_time): + def test_wait_time(self, scheduler, freeze_time): """ Tests that the earliest next run time from all job stores is returned (ignoring Nones). @@ -906,7 +939,7 @@ class SchedulerImplementationTestBase(object): assert self.wait_event(eventqueue).code == EVENT_JOBSTORE_ADDED assert self.wait_event(eventqueue).code == EVENT_JOB_ADDED - assert self.wait_event(eventqueue).code == EVENT_SCHEDULER_START + assert self.wait_event(eventqueue).code == EVENT_SCHEDULER_STARTED event = self.wait_event(eventqueue) assert event.code == EVENT_JOB_EXECUTED assert event.retval == 3 @@ -917,7 +950,7 @@ class SchedulerImplementationTestBase(object): freeze_time.set_increment(timedelta(seconds=0.2)) start_scheduler() assert self.wait_event(eventqueue).code == EVENT_JOBSTORE_ADDED - assert self.wait_event(eventqueue).code == EVENT_SCHEDULER_START + assert self.wait_event(eventqueue).code == EVENT_SCHEDULER_STARTED scheduler.add_job(lambda x, y: x + y, 'date', args=[1, 2], run_date=freeze_time.next() + freeze_time.increment * 2) @@ -931,7 +964,7 @@ class SchedulerImplementationTestBase(object): """Tests that shutting down the scheduler emits the proper event.""" start_scheduler() assert self.wait_event(eventqueue).code == EVENT_JOBSTORE_ADDED - assert self.wait_event(eventqueue).code == EVENT_SCHEDULER_START + assert self.wait_event(eventqueue).code == EVENT_SCHEDULER_STARTED scheduler.shutdown() assert self.wait_event(eventqueue).code == EVENT_SCHEDULER_SHUTDOWN |