summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJonathan Reams <jbreams@mongodb.com>2018-12-13 21:59:58 -0500
committerJonathan Reams <jbreams@mongodb.com>2019-01-15 13:37:00 -0500
commit764396a48f5a31b548eab9092967cac610c24b73 (patch)
tree9719682ea8b2c85c9768fc38048529ef9acafe5e
parent2fc070e8fdf659aed92e05092860cf9e5e4acf5b (diff)
downloadmongo-764396a48f5a31b548eab9092967cac610c24b73.tar.gz
SERVER-38649 Add generic AlarmScheduler and AlarmRunner
-rw-r--r--src/mongo/base/error_codes.err1
-rw-r--r--src/mongo/util/SConscript23
-rw-r--r--src/mongo/util/alarm.cpp170
-rw-r--r--src/mongo/util/alarm.h195
-rw-r--r--src/mongo/util/alarm_runner_background_thread.cpp118
-rw-r--r--src/mongo/util/alarm_runner_background_thread.h75
-rw-r--r--src/mongo/util/alarm_test.cpp168
7 files changed, 750 insertions, 0 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index ca74eb7a13b..9f70c55617d 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -274,6 +274,7 @@ error_code("ProducerConsumerQueueProducerQueueDepthExceeded", 273)
error_code("ProducerConsumerQueueConsumed", 274)
error_code("ExchangePassthrough", 275) # For exchange execution in aggregation. Do not reuse.
error_code("IndexBuildAborted", 276)
+error_code("AlarmAlreadyFulfilled", 277)
# Error codes 4000-8999 are reserved.
# Non-sequential error codes (for compatibility only)
diff --git a/src/mongo/util/SConscript b/src/mongo/util/SConscript
index e52886dbd82..11bfa6313de 100644
--- a/src/mongo/util/SConscript
+++ b/src/mongo/util/SConscript
@@ -201,6 +201,29 @@ env.CppUnitTest(
],
)
+env.Library(
+ target='alarm',
+ source=[
+ 'alarm.cpp',
+ 'alarm_runner_background_thread.cpp',
+ ],
+ LIBDEPS=[
+ 'clock_sources',
+ '$BUILD_DIR/mongo/base',
+ ],
+)
+
+env.CppUnitTest(
+ target='alarm_test',
+ source=[
+ 'alarm_test.cpp',
+ ],
+ LIBDEPS=[
+ 'alarm',
+ 'clock_source_mock',
+ ]
+)
+
env.CppUnitTest(
target='tick_source_test',
source=[
diff --git a/src/mongo/util/alarm.cpp b/src/mongo/util/alarm.cpp
new file mode 100644
index 00000000000..b3236a9ef5b
--- /dev/null
+++ b/src/mongo/util/alarm.cpp
@@ -0,0 +1,170 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/util/alarm.h"
+
+namespace mongo {
+
+class AlarmSchedulerPrecise::HandleImpl final
+ : public AlarmScheduler::Handle,
+ public std::enable_shared_from_this<AlarmSchedulerPrecise::HandleImpl> {
+public:
+ HandleImpl(std::weak_ptr<AlarmSchedulerPrecise> service, AlarmSchedulerPrecise::AlarmMapIt it)
+ : _service(std::move(service)), _myIt(std::move(it)) {}
+
+ struct MakeEmptyHandle {};
+ explicit HandleImpl(MakeEmptyHandle)
+ : _service(std::shared_ptr<AlarmSchedulerPrecise>(nullptr)), _myIt(), _done(true) {}
+
+ Status cancel() override {
+ auto service = _service.lock();
+ if (!service) {
+ return {ErrorCodes::ShutdownInProgress, "The alarm scheduler was shutdown"};
+ }
+
+ stdx::unique_lock<stdx::mutex> lk(service->_mutex);
+ if (_done) {
+ return {ErrorCodes::AlarmAlreadyFulfilled, "The alarm has already been canceled"};
+ }
+
+ auto state = std::move(_myIt->second);
+ service->_alarms.erase(_myIt);
+ lk.unlock();
+
+ std::move(state.promise)
+ .setError({ErrorCodes::CallbackCanceled,
+ "The alarm was canceled before it expired or could be processed"});
+ return Status::OK();
+ }
+
+ void setDone() {
+ _done = true;
+ }
+
+private:
+ std::weak_ptr<AlarmSchedulerPrecise> const _service;
+ AlarmSchedulerPrecise::AlarmMapIt _myIt;
+ bool _done = false;
+};
+
+AlarmSchedulerPrecise::~AlarmSchedulerPrecise() {
+ clearAllAlarms();
+}
+
+AlarmScheduler::Alarm AlarmSchedulerPrecise::alarmAt(Date_t date) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ if (_shutdown) {
+ Alarm ret;
+ ret.future = Future<void>::makeReady(
+ Status(ErrorCodes::ShutdownInProgress, "Alarm scheduler has been shut down."));
+ ret.handle = std::make_shared<HandleImpl>(HandleImpl::MakeEmptyHandle{});
+ return ret;
+ }
+
+ auto pf = makePromiseFuture<void>();
+ auto it = _alarms.emplace(date, AlarmData(std::move(pf.promise)));
+ auto nextAlarm = _alarms.begin()->first;
+ auto ret = std::make_shared<HandleImpl>(shared_from_this(), it);
+ it->second.handle = ret;
+ lk.unlock();
+
+ callRegisterHook(nextAlarm, shared_from_this());
+ return {std::move(pf.future), std::move(ret)};
+}
+
+void AlarmSchedulerPrecise::processExpiredAlarms(
+ boost::optional<AlarmScheduler::AlarmExpireHook> hook) {
+ AlarmCount processed = 0;
+ auto now = clockSource()->now();
+ std::vector<Promise<void>> toExpire;
+ AlarmMapIt it;
+
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ for (it = _alarms.begin(); it != _alarms.end();) {
+ if (hook && !(*hook)(processed + 1)) {
+ break;
+ }
+
+ if (it->first > now) {
+ break;
+ }
+
+ processed++;
+ toExpire.push_back(std::move(it->second.promise));
+ auto handle = it->second.handle.lock();
+ if (handle) {
+ handle->setDone();
+ }
+
+ it = _alarms.erase(it);
+ }
+
+ lk.unlock();
+
+ for (auto& promise : toExpire) {
+ promise.emplaceValue();
+ }
+}
+
+Date_t AlarmSchedulerPrecise::nextAlarm() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return (_alarms.empty()) ? Date_t::max() : _alarms.begin()->first;
+}
+
+void AlarmSchedulerPrecise::clearAllAlarms() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _clearAllAlarmsImpl(lk);
+}
+
+void AlarmSchedulerPrecise::clearAllAlarmsAndShutdown() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _shutdown = true;
+ _clearAllAlarmsImpl(lk);
+}
+
+void AlarmSchedulerPrecise::_clearAllAlarmsImpl(stdx::unique_lock<stdx::mutex>& lk) {
+ std::vector<Promise<void>> toExpire;
+ for (AlarmMapIt it = _alarms.begin(); it != _alarms.end();) {
+ toExpire.push_back(std::move(it->second.promise));
+ auto handle = it->second.handle.lock();
+ if (handle) {
+ handle->setDone();
+ }
+ it = _alarms.erase(it);
+ }
+
+ lk.unlock();
+ for (auto& alarm : toExpire) {
+ alarm.setError({ErrorCodes::CallbackCanceled, "Alarm scheduler was cleared"});
+ }
+}
+
+} // namespace mongo
diff --git a/src/mongo/util/alarm.h b/src/mongo/util/alarm.h
new file mode 100644
index 00000000000..aa46c01f924
--- /dev/null
+++ b/src/mongo/util/alarm.h
@@ -0,0 +1,195 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#pragma once
+
+#include <map>
+#include <memory>
+
+#include "mongo/base/status.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/util/clock_source.h"
+#include "mongo/util/functional.h"
+#include "mongo/util/future.h"
+
+namespace mongo {
+
+/*
+ * An alarm scheduler will fill a Future<void> at some time in the future and allow the caller to
+ * cancel specific alarms by opaque ID.
+ *
+ * All alarm service implementations take a ClockSource and all Date_t/Durations used to schedule
+ * alarms must be in relation to that ClockSource's now() and epoch.
+ *
+ * A scheduler won't actually process alarms. Some executor must be calling processExpiredAlarms()
+ * in order for alarms to fire.
+ */
+class AlarmScheduler {
+public:
+ /*
+ * This type gets returned when an alarm is scheduled and allows you to cancel the alarm.
+ *
+ * Once this handle is destroyed, the alarm will still be processed if it is still outstanding
+ * and there will be no way to cancel it.
+ */
+ class Handle {
+ public:
+ virtual ~Handle() = default;
+ /*
+ * Cancels the alarm if it has not already been fulfilled. If the alarm has already been
+ * fulfilled this returns an ErrorCodes::AlarmAlreadyFulfilled error
+ */
+ virtual Status cancel() = 0;
+ };
+
+ using SharedHandle = std::shared_ptr<Handle>;
+ using AlarmCount = uint64_t;
+
+ explicit AlarmScheduler(ClockSource* clockSource) : _clockSource(clockSource) {}
+
+ virtual ~AlarmScheduler() = default;
+
+ /*
+ * Fulfills all outstanding alarms with CallbackCanceled. Memory associated with the scheduler
+ * may not be freed until the last outstanding Handle is destroyed, however there should be
+ * no broken promises.
+ *
+ * This will be called implicitly by the destructor.
+ */
+ virtual void clearAllAlarms() = 0;
+
+ /*
+ * Clears all alarms as above, and prevents any new alarms from being scheduled. Calls to
+ * alarmAt will return a ready Future with a ShutdownInProgress error code.
+ */
+ virtual void clearAllAlarmsAndShutdown() = 0;
+
+ struct Alarm {
+ Future<void> future;
+ SharedHandle handle;
+ };
+ /*
+ * Schedules an alarm some milliseconds from now().
+ */
+ Alarm alarmFromNow(Milliseconds time) {
+ return alarmAt(_clockSource->now() + time);
+ };
+
+ /*
+ * Schedules an alarm at a specific time on the service's clock source.
+ */
+ virtual Alarm alarmAt(Date_t time) = 0;
+
+ /*
+ * Registers a callback that will be called when a new alarm has been registered.
+ *
+ * The hook will be called with a Date_t representing the next time an alarm will expire after
+ * all internal locks have been released. This can be used to unblock between calling
+ * processExpiredAlarms() with a new amount of time to block for.
+ */
+ using AlarmRegisterHook = unique_function<void(Date_t, const std::shared_ptr<AlarmScheduler>&)>;
+ void setAlarmRegisterHook(AlarmRegisterHook onAlarmHook) {
+ _registerHook = std::move(onAlarmHook);
+ }
+
+ /*
+ * Processes all alarms that have expired as of now(). If maxAlarms is not boost::none, then
+ * this will only expire that many alarms before returning.
+ *
+ * Returns the number of alarms that were expired by this call.
+ */
+ using AlarmExpireHook = unique_function<bool(AlarmCount)>;
+ virtual void processExpiredAlarms(boost::optional<AlarmExpireHook> hook = boost::none) = 0;
+
+ /*
+ * Returns the Date_t of the next scheduled alarm.
+ */
+ virtual Date_t nextAlarm() = 0;
+
+ virtual ClockSource* clockSource() const {
+ return _clockSource;
+ }
+
+protected:
+ void callRegisterHook(Date_t nextAlarm, const std::shared_ptr<AlarmScheduler>& which) {
+ if (_registerHook) {
+ _registerHook(nextAlarm, which);
+ }
+ }
+
+private:
+ AlarmRegisterHook _registerHook;
+ ClockSource* const _clockSource;
+};
+
+/*
+ * Implements a basic alarm scheduler based on a multimap of date_t to promise.
+ *
+ * Scheduling an alarm takes O(log(n)) where n is the number of outstanding alarms.
+ * Canceling an alarm is done in constant time.
+ * Processing alarms is done in constant time.
+ */
+class AlarmSchedulerPrecise : public AlarmScheduler,
+ public std::enable_shared_from_this<AlarmSchedulerPrecise> {
+public:
+ explicit AlarmSchedulerPrecise(ClockSource* clockSource) : AlarmScheduler(clockSource) {}
+
+ ~AlarmSchedulerPrecise();
+
+ void clearAllAlarms() override;
+
+ void clearAllAlarmsAndShutdown() override;
+
+ Alarm alarmAt(Date_t time) override;
+
+ void processExpiredAlarms(boost::optional<AlarmExpireHook> hook) override;
+
+ Date_t nextAlarm() override;
+
+private:
+ class HandleImpl;
+
+ enum AlarmState { kOutstanding, kFulfilled, kAbandoned };
+ struct AlarmData {
+ explicit AlarmData(Promise<void> promise_) : promise(std::move(promise_)) {}
+
+ std::weak_ptr<HandleImpl> handle;
+ Promise<void> promise;
+ };
+
+ using AlarmMap = std::multimap<Date_t, AlarmData>;
+ using AlarmMapIt = AlarmMap::iterator;
+
+ void _clearAllAlarmsImpl(stdx::unique_lock<stdx::mutex>& lk);
+
+ stdx::mutex _mutex;
+ bool _shutdown = false;
+ AlarmMap _alarms;
+};
+
+} // namespace
diff --git a/src/mongo/util/alarm_runner_background_thread.cpp b/src/mongo/util/alarm_runner_background_thread.cpp
new file mode 100644
index 00000000000..4d22f84e87d
--- /dev/null
+++ b/src/mongo/util/alarm_runner_background_thread.cpp
@@ -0,0 +1,118 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/util/alarm_runner_background_thread.h"
+
+namespace mongo {
+
+void AlarmRunnerBackgroundThread::start() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _running = true;
+ _thread = stdx::thread(&AlarmRunnerBackgroundThread::_threadRoutine, this);
+}
+
+void AlarmRunnerBackgroundThread::shutdown() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _running = false;
+ lk.unlock();
+ _condVar.notify_one();
+ _thread.join();
+
+ for (const auto& scheduler : _schedulers) {
+ scheduler->clearAllAlarmsAndShutdown();
+ }
+}
+
+std::vector<AlarmRunnerBackgroundThread::AlarmSchedulerHandle>
+AlarmRunnerBackgroundThread::_initializeSchedulers(std::vector<AlarmSchedulerHandle> schedulers) {
+ invariant(!schedulers.empty());
+
+ const auto registerHook = [this](Date_t next, const std::shared_ptr<AlarmScheduler>& which) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ if (next >= _nextAlarm) {
+ return;
+ }
+
+ _nextAlarm = next;
+ _condVar.notify_one();
+ };
+
+ const auto clockSource = schedulers.front()->clockSource();
+ for (auto& scheduler : schedulers) {
+ scheduler->setAlarmRegisterHook(registerHook);
+ auto nextAlarm = scheduler->nextAlarm();
+ if (nextAlarm < _nextAlarm) {
+ _nextAlarm = nextAlarm;
+ }
+ // The thread routine uses the clock source of the first registered scheduler to wait
+ // on its condvar, so all registered schedulers must use the same clock source.
+ fassert(51046, scheduler->clockSource() == clockSource);
+ }
+
+ return schedulers;
+}
+
+void AlarmRunnerBackgroundThread::_threadRoutine() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ while (_running) {
+ const auto clockSource = _schedulers.front()->clockSource();
+ const auto now = clockSource->now();
+
+ // Schedulers may try to alter our _nextAlarm/_newAlarm state as they process expired
+ // alarms that then reschedule themselves, so to eliminate any lock contention, just
+ // unlock the mutex while we process expired alarms.
+ lk.unlock();
+ for (const auto& scheduler : _schedulers) {
+ if (scheduler->nextAlarm() > now) {
+ continue;
+ }
+ scheduler->processExpiredAlarms();
+ }
+
+ // Lock the mutex while the sleep duration is computed. This will block schedulers from
+ // scheduling new alarms.
+ lk.lock();
+ auto sleepUntil = Date_t::max();
+ for (const auto& scheduler : _schedulers) {
+ sleepUntil = std::min(sleepUntil, scheduler->nextAlarm());
+ }
+
+ // Update _nextAlarm so that any schedulers that have pending new alarms see the actual
+ // time we're about to sleep for, and set _newAlarm to false before going to sleep.
+ _nextAlarm = sleepUntil;
+
+ clockSource->waitForConditionUntil(_condVar, lk, _nextAlarm, [&] {
+ return (_nextAlarm != sleepUntil || _running == false);
+ });
+ }
+}
+
+} // namespace mongo
diff --git a/src/mongo/util/alarm_runner_background_thread.h b/src/mongo/util/alarm_runner_background_thread.h
new file mode 100644
index 00000000000..179f6350480
--- /dev/null
+++ b/src/mongo/util/alarm_runner_background_thread.h
@@ -0,0 +1,75 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#pragma once
+
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/util/alarm.h"
+#include "mongo/util/concurrency/with_lock.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+
+/*
+ * This is a runner for alarm schedulers that waits for and processes alarms in a single
+ * background thread.
+ */
+class AlarmRunnerBackgroundThread {
+public:
+ using AlarmSchedulerHandle = std::shared_ptr<AlarmScheduler>;
+ // Construct an alarm runner from a vector of shared_ptr<AlarmScheduler>'s.
+ explicit AlarmRunnerBackgroundThread(std::vector<AlarmSchedulerHandle> container)
+ : _schedulers(_initializeSchedulers(std::move(container))) {}
+
+ /*
+ * Starts a background thread that will process alarms from all registered schedulers.
+ */
+ void start();
+
+ /*
+ * Clears all outstanding timers from all registered schedulers and shuts down the background
+ * thread.
+ */
+ void shutdown();
+
+private:
+ std::vector<AlarmSchedulerHandle> _initializeSchedulers(
+ std::vector<AlarmSchedulerHandle> container);
+
+ void _threadRoutine();
+
+ stdx::mutex _mutex;
+ stdx::condition_variable _condVar;
+ bool _running = false;
+ Date_t _nextAlarm = Date_t::max();
+ std::vector<AlarmSchedulerHandle> _schedulers;
+ stdx::thread _thread;
+};
+
+} // namespace mongo
diff --git a/src/mongo/util/alarm_test.cpp b/src/mongo/util/alarm_test.cpp
new file mode 100644
index 00000000000..1fc29b1b5f9
--- /dev/null
+++ b/src/mongo/util/alarm_test.cpp
@@ -0,0 +1,168 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/stdx/chrono.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/alarm.h"
+#include "mongo/util/alarm_runner_background_thread.h"
+#include "mongo/util/clock_source_mock.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace {
+TEST(AlarmScheduler, BasicSingleThread) {
+ auto clockSource = std::make_unique<ClockSourceMock>();
+
+ std::shared_ptr<AlarmScheduler> scheduler =
+ std::make_shared<AlarmSchedulerPrecise>(clockSource.get());
+
+ auto testStart = clockSource->now();
+ auto alarm = scheduler->alarmAt(testStart + Milliseconds(10));
+ bool firstTimerExpired = false;
+ std::move(alarm.future).getAsync([&](Status status) {
+ log() << "Timer expired: " << status;
+ firstTimerExpired = true;
+ });
+
+ alarm = scheduler->alarmAt(testStart + Milliseconds(500));
+ bool secondTimerExpired = false;
+ std::move(alarm.future).getAsync([&](Status status) {
+ log() << "Second timer expired: " << status;
+ secondTimerExpired = true;
+ });
+
+ alarm = scheduler->alarmAt(testStart + Milliseconds(515));
+ bool thirdTimerExpired = false;
+ std::move(alarm.future).getAsync([&](Status status) {
+ log() << "third timer expired: " << status;
+ thirdTimerExpired = true;
+ });
+ auto missingEvent = alarm.handle;
+
+ alarm = scheduler->alarmAt(testStart + Milliseconds(250));
+
+ clockSource->advance(Milliseconds(11));
+ scheduler->processExpiredAlarms();
+ ASSERT_TRUE(firstTimerExpired);
+ ASSERT_FALSE(secondTimerExpired);
+
+ ASSERT_OK(alarm.handle->cancel());
+ auto cancelledStatus = std::move(alarm.future).getNoThrow();
+ ASSERT_EQ(cancelledStatus.code(), ErrorCodes::CallbackCanceled);
+
+
+ clockSource->advance(Milliseconds(501));
+ scheduler->processExpiredAlarms();
+ ASSERT_TRUE(secondTimerExpired);
+ ASSERT_FALSE(thirdTimerExpired);
+
+ clockSource->advance(Milliseconds(64));
+ scheduler->processExpiredAlarms();
+ ASSERT_TRUE(thirdTimerExpired);
+
+ cancelledStatus = missingEvent->cancel();
+ ASSERT_EQ(cancelledStatus.code(), ErrorCodes::AlarmAlreadyFulfilled);
+ alarm = scheduler->alarmAt(testStart + Hours(5));
+ scheduler->clearAllAlarmsAndShutdown();
+ cancelledStatus = std::move(alarm.future).getNoThrow();
+ ASSERT_EQ(cancelledStatus.code(), ErrorCodes::CallbackCanceled);
+
+ alarm = scheduler->alarmFromNow(Milliseconds{50});
+ auto shutdownStatus = alarm.future.getNoThrow();
+ ASSERT_EQ(shutdownStatus.code(), ErrorCodes::ShutdownInProgress);
+}
+
+TEST(AlarmRunner, BasicTest) {
+ auto clockSource = std::make_unique<ClockSourceMock>();
+ auto scheduler = std::make_shared<AlarmSchedulerPrecise>(clockSource.get());
+ AlarmRunnerBackgroundThread runner({scheduler});
+ runner.start();
+
+ auto alarm1 = scheduler->alarmFromNow(Milliseconds(10));
+ auto alarm2 = scheduler->alarmFromNow(Milliseconds(20));
+
+ AtomicWord<bool> future2Filled{false};
+ auto pf = makePromiseFuture<void>();
+ std::move(alarm2.future).getAsync([&future2Filled,
+ promise = std::move(pf.promise) ](Status status) mutable {
+ ASSERT_OK(status);
+ future2Filled.store(true);
+ promise.emplaceValue();
+ });
+
+ clockSource->advance(Milliseconds(11));
+
+ ASSERT_OK(alarm1.future.getNoThrow());
+ ASSERT_FALSE(future2Filled.load());
+
+ clockSource->advance(Milliseconds(21));
+ ASSERT_OK(pf.future.getNoThrow());
+
+ auto alarm3 = scheduler->alarmFromNow(Milliseconds(10));
+
+ ASSERT_OK(alarm3.handle->cancel());
+
+ ASSERT_EQ(alarm3.future.getNoThrow().code(), ErrorCodes::CallbackCanceled);
+
+ auto alarm4 = scheduler->alarmFromNow(Milliseconds(10));
+ runner.shutdown();
+
+ ASSERT_EQ(alarm4.future.getNoThrow().code(), ErrorCodes::CallbackCanceled);
+
+ auto alarm5 = scheduler->alarmFromNow(Milliseconds(50));
+ ASSERT_EQ(alarm5.future.getNoThrow().code(), ErrorCodes::ShutdownInProgress);
+}
+
+TEST(AlarmRunner, SeveralSchedulers) {
+ auto clockSource = std::make_unique<ClockSourceMock>();
+ auto scheduler1 = std::make_shared<AlarmSchedulerPrecise>(clockSource.get());
+ auto scheduler2 = std::make_shared<AlarmSchedulerPrecise>(clockSource.get());
+
+ AlarmRunnerBackgroundThread runner({scheduler1, scheduler2});
+ runner.start();
+
+ scheduler1->alarmAt(Date_t::max());
+ // Schedule two alarms, the first is just to wake up the runner so that it can try to decide
+ // which scheduler to wait for next. The second is the one we actually want to wait for.
+ auto alarm1 = scheduler2->alarmFromNow(Milliseconds(1));
+ auto alarm2 = scheduler2->alarmFromNow(Milliseconds(20));
+ clockSource->advance(Milliseconds(2));
+ ASSERT_OK(alarm1.future.getNoThrow());
+
+ // If everything goes well then we should be able to wait on this future without blocking.
+ clockSource->advance(Milliseconds(20));
+ ASSERT_OK(alarm2.future.getNoThrow());
+
+ runner.shutdown();
+}
+} // namespace
+} // namespace mongo