diff options
author | Jonathan Reams <jbreams@mongodb.com> | 2018-12-13 21:59:58 -0500 |
---|---|---|
committer | Jonathan Reams <jbreams@mongodb.com> | 2019-01-15 13:37:00 -0500 |
commit | 764396a48f5a31b548eab9092967cac610c24b73 (patch) | |
tree | 9719682ea8b2c85c9768fc38048529ef9acafe5e /src | |
parent | 2fc070e8fdf659aed92e05092860cf9e5e4acf5b (diff) | |
download | mongo-764396a48f5a31b548eab9092967cac610c24b73.tar.gz |
SERVER-38649 Add generic AlarmScheduler and AlarmRunner
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/base/error_codes.err | 1 | ||||
-rw-r--r-- | src/mongo/util/SConscript | 23 | ||||
-rw-r--r-- | src/mongo/util/alarm.cpp | 170 | ||||
-rw-r--r-- | src/mongo/util/alarm.h | 195 | ||||
-rw-r--r-- | src/mongo/util/alarm_runner_background_thread.cpp | 118 | ||||
-rw-r--r-- | src/mongo/util/alarm_runner_background_thread.h | 75 | ||||
-rw-r--r-- | src/mongo/util/alarm_test.cpp | 168 |
7 files changed, 750 insertions, 0 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index ca74eb7a13b..9f70c55617d 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -274,6 +274,7 @@ error_code("ProducerConsumerQueueProducerQueueDepthExceeded", 273) error_code("ProducerConsumerQueueConsumed", 274) error_code("ExchangePassthrough", 275) # For exchange execution in aggregation. Do not reuse. error_code("IndexBuildAborted", 276) +error_code("AlarmAlreadyFulfilled", 277) # Error codes 4000-8999 are reserved. # Non-sequential error codes (for compatibility only) diff --git a/src/mongo/util/SConscript b/src/mongo/util/SConscript index e52886dbd82..11bfa6313de 100644 --- a/src/mongo/util/SConscript +++ b/src/mongo/util/SConscript @@ -201,6 +201,29 @@ env.CppUnitTest( ], ) +env.Library( + target='alarm', + source=[ + 'alarm.cpp', + 'alarm_runner_background_thread.cpp', + ], + LIBDEPS=[ + 'clock_sources', + '$BUILD_DIR/mongo/base', + ], +) + +env.CppUnitTest( + target='alarm_test', + source=[ + 'alarm_test.cpp', + ], + LIBDEPS=[ + 'alarm', + 'clock_source_mock', + ] +) + env.CppUnitTest( target='tick_source_test', source=[ diff --git a/src/mongo/util/alarm.cpp b/src/mongo/util/alarm.cpp new file mode 100644 index 00000000000..b3236a9ef5b --- /dev/null +++ b/src/mongo/util/alarm.cpp @@ -0,0 +1,170 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/util/alarm.h" + +namespace mongo { + +class AlarmSchedulerPrecise::HandleImpl final + : public AlarmScheduler::Handle, + public std::enable_shared_from_this<AlarmSchedulerPrecise::HandleImpl> { +public: + HandleImpl(std::weak_ptr<AlarmSchedulerPrecise> service, AlarmSchedulerPrecise::AlarmMapIt it) + : _service(std::move(service)), _myIt(std::move(it)) {} + + struct MakeEmptyHandle {}; + explicit HandleImpl(MakeEmptyHandle) + : _service(std::shared_ptr<AlarmSchedulerPrecise>(nullptr)), _myIt(), _done(true) {} + + Status cancel() override { + auto service = _service.lock(); + if (!service) { + return {ErrorCodes::ShutdownInProgress, "The alarm scheduler was shutdown"}; + } + + stdx::unique_lock<stdx::mutex> lk(service->_mutex); + if (_done) { + return {ErrorCodes::AlarmAlreadyFulfilled, "The alarm has already been canceled"}; + } + + auto state = std::move(_myIt->second); + service->_alarms.erase(_myIt); + lk.unlock(); + + std::move(state.promise) + .setError({ErrorCodes::CallbackCanceled, + "The alarm was canceled before it expired or could be processed"}); + return Status::OK(); + } + + void setDone() { + _done = true; + } + +private: + std::weak_ptr<AlarmSchedulerPrecise> const _service; + AlarmSchedulerPrecise::AlarmMapIt _myIt; + bool _done = false; +}; + +AlarmSchedulerPrecise::~AlarmSchedulerPrecise() { + clearAllAlarms(); +} + +AlarmScheduler::Alarm AlarmSchedulerPrecise::alarmAt(Date_t date) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + if (_shutdown) { + Alarm ret; + ret.future = Future<void>::makeReady( + Status(ErrorCodes::ShutdownInProgress, "Alarm scheduler has been shut down.")); + ret.handle = std::make_shared<HandleImpl>(HandleImpl::MakeEmptyHandle{}); + return ret; + } + + auto pf = makePromiseFuture<void>(); + auto it = _alarms.emplace(date, AlarmData(std::move(pf.promise))); + auto nextAlarm = _alarms.begin()->first; + auto ret = std::make_shared<HandleImpl>(shared_from_this(), it); + it->second.handle = ret; + lk.unlock(); + + callRegisterHook(nextAlarm, shared_from_this()); + return {std::move(pf.future), std::move(ret)}; +} + +void AlarmSchedulerPrecise::processExpiredAlarms( + boost::optional<AlarmScheduler::AlarmExpireHook> hook) { + AlarmCount processed = 0; + auto now = clockSource()->now(); + std::vector<Promise<void>> toExpire; + AlarmMapIt it; + + stdx::unique_lock<stdx::mutex> lk(_mutex); + for (it = _alarms.begin(); it != _alarms.end();) { + if (hook && !(*hook)(processed + 1)) { + break; + } + + if (it->first > now) { + break; + } + + processed++; + toExpire.push_back(std::move(it->second.promise)); + auto handle = it->second.handle.lock(); + if (handle) { + handle->setDone(); + } + + it = _alarms.erase(it); + } + + lk.unlock(); + + for (auto& promise : toExpire) { + promise.emplaceValue(); + } +} + +Date_t AlarmSchedulerPrecise::nextAlarm() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return (_alarms.empty()) ? Date_t::max() : _alarms.begin()->first; +} + +void AlarmSchedulerPrecise::clearAllAlarms() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _clearAllAlarmsImpl(lk); +} + +void AlarmSchedulerPrecise::clearAllAlarmsAndShutdown() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _shutdown = true; + _clearAllAlarmsImpl(lk); +} + +void AlarmSchedulerPrecise::_clearAllAlarmsImpl(stdx::unique_lock<stdx::mutex>& lk) { + std::vector<Promise<void>> toExpire; + for (AlarmMapIt it = _alarms.begin(); it != _alarms.end();) { + toExpire.push_back(std::move(it->second.promise)); + auto handle = it->second.handle.lock(); + if (handle) { + handle->setDone(); + } + it = _alarms.erase(it); + } + + lk.unlock(); + for (auto& alarm : toExpire) { + alarm.setError({ErrorCodes::CallbackCanceled, "Alarm scheduler was cleared"}); + } +} + +} // namespace mongo diff --git a/src/mongo/util/alarm.h b/src/mongo/util/alarm.h new file mode 100644 index 00000000000..aa46c01f924 --- /dev/null +++ b/src/mongo/util/alarm.h @@ -0,0 +1,195 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#pragma once + +#include <map> +#include <memory> + +#include "mongo/base/status.h" +#include "mongo/stdx/mutex.h" +#include "mongo/util/clock_source.h" +#include "mongo/util/functional.h" +#include "mongo/util/future.h" + +namespace mongo { + +/* + * An alarm scheduler will fill a Future<void> at some time in the future and allow the caller to + * cancel specific alarms by opaque ID. + * + * All alarm service implementations take a ClockSource and all Date_t/Durations used to schedule + * alarms must be in relation to that ClockSource's now() and epoch. + * + * A scheduler won't actually process alarms. Some executor must be calling processExpiredAlarms() + * in order for alarms to fire. + */ +class AlarmScheduler { +public: + /* + * This type gets returned when an alarm is scheduled and allows you to cancel the alarm. + * + * Once this handle is destroyed, the alarm will still be processed if it is still outstanding + * and there will be no way to cancel it. + */ + class Handle { + public: + virtual ~Handle() = default; + /* + * Cancels the alarm if it has not already been fulfilled. If the alarm has already been + * fulfilled this returns an ErrorCodes::AlarmAlreadyFulfilled error + */ + virtual Status cancel() = 0; + }; + + using SharedHandle = std::shared_ptr<Handle>; + using AlarmCount = uint64_t; + + explicit AlarmScheduler(ClockSource* clockSource) : _clockSource(clockSource) {} + + virtual ~AlarmScheduler() = default; + + /* + * Fulfills all outstanding alarms with CallbackCanceled. Memory associated with the scheduler + * may not be freed until the last outstanding Handle is destroyed, however there should be + * no broken promises. + * + * This will be called implicitly by the destructor. + */ + virtual void clearAllAlarms() = 0; + + /* + * Clears all alarms as above, and prevents any new alarms from being scheduled. Calls to + * alarmAt will return a ready Future with a ShutdownInProgress error code. + */ + virtual void clearAllAlarmsAndShutdown() = 0; + + struct Alarm { + Future<void> future; + SharedHandle handle; + }; + /* + * Schedules an alarm some milliseconds from now(). + */ + Alarm alarmFromNow(Milliseconds time) { + return alarmAt(_clockSource->now() + time); + }; + + /* + * Schedules an alarm at a specific time on the service's clock source. + */ + virtual Alarm alarmAt(Date_t time) = 0; + + /* + * Registers a callback that will be called when a new alarm has been registered. + * + * The hook will be called with a Date_t representing the next time an alarm will expire after + * all internal locks have been released. This can be used to unblock between calling + * processExpiredAlarms() with a new amount of time to block for. + */ + using AlarmRegisterHook = unique_function<void(Date_t, const std::shared_ptr<AlarmScheduler>&)>; + void setAlarmRegisterHook(AlarmRegisterHook onAlarmHook) { + _registerHook = std::move(onAlarmHook); + } + + /* + * Processes all alarms that have expired as of now(). If maxAlarms is not boost::none, then + * this will only expire that many alarms before returning. + * + * Returns the number of alarms that were expired by this call. + */ + using AlarmExpireHook = unique_function<bool(AlarmCount)>; + virtual void processExpiredAlarms(boost::optional<AlarmExpireHook> hook = boost::none) = 0; + + /* + * Returns the Date_t of the next scheduled alarm. + */ + virtual Date_t nextAlarm() = 0; + + virtual ClockSource* clockSource() const { + return _clockSource; + } + +protected: + void callRegisterHook(Date_t nextAlarm, const std::shared_ptr<AlarmScheduler>& which) { + if (_registerHook) { + _registerHook(nextAlarm, which); + } + } + +private: + AlarmRegisterHook _registerHook; + ClockSource* const _clockSource; +}; + +/* + * Implements a basic alarm scheduler based on a multimap of date_t to promise. + * + * Scheduling an alarm takes O(log(n)) where n is the number of outstanding alarms. + * Canceling an alarm is done in constant time. + * Processing alarms is done in constant time. + */ +class AlarmSchedulerPrecise : public AlarmScheduler, + public std::enable_shared_from_this<AlarmSchedulerPrecise> { +public: + explicit AlarmSchedulerPrecise(ClockSource* clockSource) : AlarmScheduler(clockSource) {} + + ~AlarmSchedulerPrecise(); + + void clearAllAlarms() override; + + void clearAllAlarmsAndShutdown() override; + + Alarm alarmAt(Date_t time) override; + + void processExpiredAlarms(boost::optional<AlarmExpireHook> hook) override; + + Date_t nextAlarm() override; + +private: + class HandleImpl; + + enum AlarmState { kOutstanding, kFulfilled, kAbandoned }; + struct AlarmData { + explicit AlarmData(Promise<void> promise_) : promise(std::move(promise_)) {} + + std::weak_ptr<HandleImpl> handle; + Promise<void> promise; + }; + + using AlarmMap = std::multimap<Date_t, AlarmData>; + using AlarmMapIt = AlarmMap::iterator; + + void _clearAllAlarmsImpl(stdx::unique_lock<stdx::mutex>& lk); + + stdx::mutex _mutex; + bool _shutdown = false; + AlarmMap _alarms; +}; + +} // namespace diff --git a/src/mongo/util/alarm_runner_background_thread.cpp b/src/mongo/util/alarm_runner_background_thread.cpp new file mode 100644 index 00000000000..4d22f84e87d --- /dev/null +++ b/src/mongo/util/alarm_runner_background_thread.cpp @@ -0,0 +1,118 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/util/alarm_runner_background_thread.h" + +namespace mongo { + +void AlarmRunnerBackgroundThread::start() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _running = true; + _thread = stdx::thread(&AlarmRunnerBackgroundThread::_threadRoutine, this); +} + +void AlarmRunnerBackgroundThread::shutdown() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _running = false; + lk.unlock(); + _condVar.notify_one(); + _thread.join(); + + for (const auto& scheduler : _schedulers) { + scheduler->clearAllAlarmsAndShutdown(); + } +} + +std::vector<AlarmRunnerBackgroundThread::AlarmSchedulerHandle> +AlarmRunnerBackgroundThread::_initializeSchedulers(std::vector<AlarmSchedulerHandle> schedulers) { + invariant(!schedulers.empty()); + + const auto registerHook = [this](Date_t next, const std::shared_ptr<AlarmScheduler>& which) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + if (next >= _nextAlarm) { + return; + } + + _nextAlarm = next; + _condVar.notify_one(); + }; + + const auto clockSource = schedulers.front()->clockSource(); + for (auto& scheduler : schedulers) { + scheduler->setAlarmRegisterHook(registerHook); + auto nextAlarm = scheduler->nextAlarm(); + if (nextAlarm < _nextAlarm) { + _nextAlarm = nextAlarm; + } + // The thread routine uses the clock source of the first registered scheduler to wait + // on its condvar, so all registered schedulers must use the same clock source. + fassert(51046, scheduler->clockSource() == clockSource); + } + + return schedulers; +} + +void AlarmRunnerBackgroundThread::_threadRoutine() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + while (_running) { + const auto clockSource = _schedulers.front()->clockSource(); + const auto now = clockSource->now(); + + // Schedulers may try to alter our _nextAlarm/_newAlarm state as they process expired + // alarms that then reschedule themselves, so to eliminate any lock contention, just + // unlock the mutex while we process expired alarms. + lk.unlock(); + for (const auto& scheduler : _schedulers) { + if (scheduler->nextAlarm() > now) { + continue; + } + scheduler->processExpiredAlarms(); + } + + // Lock the mutex while the sleep duration is computed. This will block schedulers from + // scheduling new alarms. + lk.lock(); + auto sleepUntil = Date_t::max(); + for (const auto& scheduler : _schedulers) { + sleepUntil = std::min(sleepUntil, scheduler->nextAlarm()); + } + + // Update _nextAlarm so that any schedulers that have pending new alarms see the actual + // time we're about to sleep for, and set _newAlarm to false before going to sleep. + _nextAlarm = sleepUntil; + + clockSource->waitForConditionUntil(_condVar, lk, _nextAlarm, [&] { + return (_nextAlarm != sleepUntil || _running == false); + }); + } +} + +} // namespace mongo diff --git a/src/mongo/util/alarm_runner_background_thread.h b/src/mongo/util/alarm_runner_background_thread.h new file mode 100644 index 00000000000..179f6350480 --- /dev/null +++ b/src/mongo/util/alarm_runner_background_thread.h @@ -0,0 +1,75 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#pragma once + +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/thread.h" +#include "mongo/util/alarm.h" +#include "mongo/util/concurrency/with_lock.h" +#include "mongo/util/time_support.h" + +namespace mongo { + +/* + * This is a runner for alarm schedulers that waits for and processes alarms in a single + * background thread. + */ +class AlarmRunnerBackgroundThread { +public: + using AlarmSchedulerHandle = std::shared_ptr<AlarmScheduler>; + // Construct an alarm runner from a vector of shared_ptr<AlarmScheduler>'s. + explicit AlarmRunnerBackgroundThread(std::vector<AlarmSchedulerHandle> container) + : _schedulers(_initializeSchedulers(std::move(container))) {} + + /* + * Starts a background thread that will process alarms from all registered schedulers. + */ + void start(); + + /* + * Clears all outstanding timers from all registered schedulers and shuts down the background + * thread. + */ + void shutdown(); + +private: + std::vector<AlarmSchedulerHandle> _initializeSchedulers( + std::vector<AlarmSchedulerHandle> container); + + void _threadRoutine(); + + stdx::mutex _mutex; + stdx::condition_variable _condVar; + bool _running = false; + Date_t _nextAlarm = Date_t::max(); + std::vector<AlarmSchedulerHandle> _schedulers; + stdx::thread _thread; +}; + +} // namespace mongo diff --git a/src/mongo/util/alarm_test.cpp b/src/mongo/util/alarm_test.cpp new file mode 100644 index 00000000000..1fc29b1b5f9 --- /dev/null +++ b/src/mongo/util/alarm_test.cpp @@ -0,0 +1,168 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault + +#include "mongo/platform/basic.h" + +#include "mongo/stdx/chrono.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/alarm.h" +#include "mongo/util/alarm_runner_background_thread.h" +#include "mongo/util/clock_source_mock.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace { +TEST(AlarmScheduler, BasicSingleThread) { + auto clockSource = std::make_unique<ClockSourceMock>(); + + std::shared_ptr<AlarmScheduler> scheduler = + std::make_shared<AlarmSchedulerPrecise>(clockSource.get()); + + auto testStart = clockSource->now(); + auto alarm = scheduler->alarmAt(testStart + Milliseconds(10)); + bool firstTimerExpired = false; + std::move(alarm.future).getAsync([&](Status status) { + log() << "Timer expired: " << status; + firstTimerExpired = true; + }); + + alarm = scheduler->alarmAt(testStart + Milliseconds(500)); + bool secondTimerExpired = false; + std::move(alarm.future).getAsync([&](Status status) { + log() << "Second timer expired: " << status; + secondTimerExpired = true; + }); + + alarm = scheduler->alarmAt(testStart + Milliseconds(515)); + bool thirdTimerExpired = false; + std::move(alarm.future).getAsync([&](Status status) { + log() << "third timer expired: " << status; + thirdTimerExpired = true; + }); + auto missingEvent = alarm.handle; + + alarm = scheduler->alarmAt(testStart + Milliseconds(250)); + + clockSource->advance(Milliseconds(11)); + scheduler->processExpiredAlarms(); + ASSERT_TRUE(firstTimerExpired); + ASSERT_FALSE(secondTimerExpired); + + ASSERT_OK(alarm.handle->cancel()); + auto cancelledStatus = std::move(alarm.future).getNoThrow(); + ASSERT_EQ(cancelledStatus.code(), ErrorCodes::CallbackCanceled); + + + clockSource->advance(Milliseconds(501)); + scheduler->processExpiredAlarms(); + ASSERT_TRUE(secondTimerExpired); + ASSERT_FALSE(thirdTimerExpired); + + clockSource->advance(Milliseconds(64)); + scheduler->processExpiredAlarms(); + ASSERT_TRUE(thirdTimerExpired); + + cancelledStatus = missingEvent->cancel(); + ASSERT_EQ(cancelledStatus.code(), ErrorCodes::AlarmAlreadyFulfilled); + alarm = scheduler->alarmAt(testStart + Hours(5)); + scheduler->clearAllAlarmsAndShutdown(); + cancelledStatus = std::move(alarm.future).getNoThrow(); + ASSERT_EQ(cancelledStatus.code(), ErrorCodes::CallbackCanceled); + + alarm = scheduler->alarmFromNow(Milliseconds{50}); + auto shutdownStatus = alarm.future.getNoThrow(); + ASSERT_EQ(shutdownStatus.code(), ErrorCodes::ShutdownInProgress); +} + +TEST(AlarmRunner, BasicTest) { + auto clockSource = std::make_unique<ClockSourceMock>(); + auto scheduler = std::make_shared<AlarmSchedulerPrecise>(clockSource.get()); + AlarmRunnerBackgroundThread runner({scheduler}); + runner.start(); + + auto alarm1 = scheduler->alarmFromNow(Milliseconds(10)); + auto alarm2 = scheduler->alarmFromNow(Milliseconds(20)); + + AtomicWord<bool> future2Filled{false}; + auto pf = makePromiseFuture<void>(); + std::move(alarm2.future).getAsync([&future2Filled, + promise = std::move(pf.promise) ](Status status) mutable { + ASSERT_OK(status); + future2Filled.store(true); + promise.emplaceValue(); + }); + + clockSource->advance(Milliseconds(11)); + + ASSERT_OK(alarm1.future.getNoThrow()); + ASSERT_FALSE(future2Filled.load()); + + clockSource->advance(Milliseconds(21)); + ASSERT_OK(pf.future.getNoThrow()); + + auto alarm3 = scheduler->alarmFromNow(Milliseconds(10)); + + ASSERT_OK(alarm3.handle->cancel()); + + ASSERT_EQ(alarm3.future.getNoThrow().code(), ErrorCodes::CallbackCanceled); + + auto alarm4 = scheduler->alarmFromNow(Milliseconds(10)); + runner.shutdown(); + + ASSERT_EQ(alarm4.future.getNoThrow().code(), ErrorCodes::CallbackCanceled); + + auto alarm5 = scheduler->alarmFromNow(Milliseconds(50)); + ASSERT_EQ(alarm5.future.getNoThrow().code(), ErrorCodes::ShutdownInProgress); +} + +TEST(AlarmRunner, SeveralSchedulers) { + auto clockSource = std::make_unique<ClockSourceMock>(); + auto scheduler1 = std::make_shared<AlarmSchedulerPrecise>(clockSource.get()); + auto scheduler2 = std::make_shared<AlarmSchedulerPrecise>(clockSource.get()); + + AlarmRunnerBackgroundThread runner({scheduler1, scheduler2}); + runner.start(); + + scheduler1->alarmAt(Date_t::max()); + // Schedule two alarms, the first is just to wake up the runner so that it can try to decide + // which scheduler to wait for next. The second is the one we actually want to wait for. + auto alarm1 = scheduler2->alarmFromNow(Milliseconds(1)); + auto alarm2 = scheduler2->alarmFromNow(Milliseconds(20)); + clockSource->advance(Milliseconds(2)); + ASSERT_OK(alarm1.future.getNoThrow()); + + // If everything goes well then we should be able to wait on this future without blocking. + clockSource->advance(Milliseconds(20)); + ASSERT_OK(alarm2.future.getNoThrow()); + + runner.shutdown(); +} +} // namespace +} // namespace mongo |