diff options
author | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2019-05-13 21:16:52 -0400 |
---|---|---|
committer | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2019-05-13 21:16:52 -0400 |
commit | 39f8c6af5ef1f637bdb2120b1cfb8b507368a7f8 (patch) | |
tree | 9f2587793cb215b9a36eb519e0b8d02bd05da317 /src/mongo/watchdog | |
parent | 9ab10de2762ba48532d9bc6717a434672eee8475 (diff) | |
download | mongo-39f8c6af5ef1f637bdb2120b1cfb8b507368a7f8.tar.gz |
SERVER-41023 Move Storage Node Watchdog to community
Diffstat (limited to 'src/mongo/watchdog')
-rw-r--r-- | src/mongo/watchdog/SConscript | 51 | ||||
-rw-r--r-- | src/mongo/watchdog/watchdog.cpp | 537 | ||||
-rw-r--r-- | src/mongo/watchdog/watchdog.h | 385 | ||||
-rw-r--r-- | src/mongo/watchdog/watchdog_mongod.cpp | 201 | ||||
-rw-r--r-- | src/mongo/watchdog/watchdog_mongod.h | 47 | ||||
-rw-r--r-- | src/mongo/watchdog/watchdog_mongod.idl | 43 | ||||
-rw-r--r-- | src/mongo/watchdog/watchdog_register.cpp | 50 | ||||
-rw-r--r-- | src/mongo/watchdog/watchdog_register.h | 48 | ||||
-rw-r--r-- | src/mongo/watchdog/watchdog_test.cpp | 480 |
9 files changed, 1842 insertions, 0 deletions
diff --git a/src/mongo/watchdog/SConscript b/src/mongo/watchdog/SConscript new file mode 100644 index 00000000000..a1cedf2a327 --- /dev/null +++ b/src/mongo/watchdog/SConscript @@ -0,0 +1,51 @@ +# -*- mode: python -*- + +Import('env') + +env.Library( + target='watchdog', + source=[ + 'watchdog.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/service_context', + '$BUILD_DIR/mongo/db/storage/storage_options', + ] +) + +env.Library( + target='watchdog_register', + source=[ + 'watchdog_register.cpp', + ], +) + +env.Library( + target='watchdog_mongod', + source=[ + 'watchdog_mongod.cpp', + env.Idlc('watchdog_mongod.idl')[0], + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/db/commands/server_status', + 'watchdog', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/server_options_core', + '$BUILD_DIR/mongo/idl/server_parameter', + 'watchdog_register', + ], +) + +env.CppUnitTest( + target='watchdog_test', + source=[ + 'watchdog_test.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/db/service_context_test_fixture', + '$BUILD_DIR/mongo/util/clock_source_mock', + 'watchdog', + ], +) diff --git a/src/mongo/watchdog/watchdog.cpp b/src/mongo/watchdog/watchdog.cpp new file mode 100644 index 00000000000..ea1d53b2219 --- /dev/null +++ b/src/mongo/watchdog/watchdog.cpp @@ -0,0 +1,537 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl + +#include "mongo/platform/basic.h" + +#include "mongo/watchdog/watchdog.h" + +#include <boost/filesystem.hpp> + +#ifndef _WIN32 +#include <fcntl.h> +#include <sys/stat.h> +#include <sys/types.h> +#include <unistd.h> +#endif + +#include "mongo/base/static_assert.h" +#include "mongo/db/client.h" +#include "mongo/db/operation_context.h" +#include "mongo/platform/process_id.h" +#include "mongo/util/concurrency/idle_thread_block.h" +#include "mongo/util/hex.h" +#include "mongo/util/log.h" +#include "mongo/util/timer.h" + + +namespace mongo { + +WatchdogPeriodicThread::WatchdogPeriodicThread(Milliseconds period, StringData threadName) + : _period(period), _enabled(true), _threadName(threadName.toString()) {} + +void WatchdogPeriodicThread::start() { + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + + invariant(_state == State::kNotStarted); + _state = State::kStarted; + + // Start the thread. + _thread = stdx::thread([this] { this->doLoop(); }); + } +} + +void WatchdogPeriodicThread::shutdown() { + + stdx::thread thread; + + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + + bool started = (_state == State::kStarted); + + invariant(_state == State::kNotStarted || _state == State::kStarted); + + if (!started) { + _state = State::kDone; + return; + } + + _state = State::kShutdownRequested; + + std::swap(thread, _thread); + + // Wake up the thread if sleeping so that it will check if we are done. + _condvar.notify_one(); + } + + thread.join(); + + _state = State::kDone; +} + +void WatchdogPeriodicThread::setPeriod(Milliseconds period) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + + bool wasEnabled = _enabled; + + if (period < Milliseconds::zero()) { + _enabled = false; + + // Leave the thread running but very slowly. If we set this value too high, it would + // overflow Duration. + _period = Hours(1); + } else { + _period = period; + _enabled = true; + } + + if (!wasEnabled && _enabled) { + resetState(); + } + + _condvar.notify_one(); +} + +void WatchdogPeriodicThread::doLoop() { + Client::initThread(_threadName); + Client* client = &cc(); + + auto preciseClockSource = client->getServiceContext()->getPreciseClockSource(); + + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + + // Ensure state is starting from a clean slate. + resetState(); + } + + while (true) { + // Wait for the next run or signal to shutdown. + + auto opCtx = client->makeOperationContext(); + + Date_t startTime = preciseClockSource->now(); + + { + stdx::unique_lock<stdx::mutex> lock(_mutex); + MONGO_IDLE_THREAD_BLOCK; + + + // Check if the period is different? + // We are signalled on period changes at which point we may be done waiting or need to + // wait longer. + while (startTime + _period > preciseClockSource->now() && + _state != State::kShutdownRequested) { + auto s = opCtx->waitForConditionOrInterruptNoAssertUntil( + _condvar, lock, startTime + _period); + + if (!s.isOK()) { + // The only bad status is when we are in shutdown + if (!opCtx->getServiceContext()->getKillAllOperations()) { + error() << "Watchdog was interrupted, shuting down:, reason: " + << s.getStatus(); + } + + return; + } + } + + // Are we done running? + if (_state == State::kShutdownRequested) { + return; + } + + // Check if the watchdog checks have been disabled + if (!_enabled) { + continue; + } + } + + run(opCtx.get()); + } +} + + +WatchdogCheckThread::WatchdogCheckThread(std::vector<std::unique_ptr<WatchdogCheck>> checks, + Milliseconds period) + : WatchdogPeriodicThread(period, "watchdogCheck"), _checks(std::move(checks)) {} + +std::int64_t WatchdogCheckThread::getGeneration() { + return _checkGeneration.load(); +} + +void WatchdogCheckThread::resetState() {} + +void WatchdogCheckThread::run(OperationContext* opCtx) { + for (auto& check : _checks) { + Timer timer(opCtx->getServiceContext()->getTickSource()); + + check->run(opCtx); + Microseconds micros = timer.elapsed(); + + LOG(1) << "Watchdog test '" << check->getDescriptionForLogging() << "' took " + << duration_cast<Milliseconds>(micros); + + // We completed a check, bump the generation counter. + _checkGeneration.fetchAndAdd(1); + } +} + + +WatchdogMonitorThread::WatchdogMonitorThread(WatchdogCheckThread* checkThread, + WatchdogDeathCallback callback, + Milliseconds interval) + : WatchdogPeriodicThread(interval, "watchdogMonitor"), + _callback(callback), + _checkThread(checkThread) {} + +std::int64_t WatchdogMonitorThread::getGeneration() { + return _monitorGeneration.load(); +} + +void WatchdogMonitorThread::resetState() { + // Reset the generation so that if the monitor thread is run before the check thread + // after being enabled, it does not. + _lastSeenGeneration = -1; +} + +void WatchdogMonitorThread::run(OperationContext* opCtx) { + auto currentGeneration = _checkThread->getGeneration(); + + if (currentGeneration != _lastSeenGeneration) { + _lastSeenGeneration = currentGeneration; + } else { + _callback(); + } +} + + +WatchdogMonitor::WatchdogMonitor(std::vector<std::unique_ptr<WatchdogCheck>> checks, + Milliseconds checkPeriod, + Milliseconds monitorPeriod, + WatchdogDeathCallback callback) + : _checkPeriod(checkPeriod), + _watchdogCheckThread(std::move(checks), checkPeriod), + _watchdogMonitorThread(&_watchdogCheckThread, callback, monitorPeriod) { + invariant(checkPeriod < monitorPeriod); +} + +void WatchdogMonitor::start() { + log() << "Starting Watchdog Monitor"; + + // Start the threads. + _watchdogCheckThread.start(); + + _watchdogMonitorThread.start(); + + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + + invariant(_state == State::kNotStarted); + _state = State::kStarted; + } +} + +void WatchdogMonitor::setPeriod(Milliseconds duration) { + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + + if (duration > Milliseconds(0)) { + dassert(duration >= Milliseconds(1)); + + // Make sure that we monitor runs more frequently then checks + // 2 feels like an arbitrary good minimum. + invariant(duration >= 2 * _checkPeriod); + + _watchdogCheckThread.setPeriod(_checkPeriod); + _watchdogMonitorThread.setPeriod(duration); + + log() << "WatchdogMonitor period changed to " << duration_cast<Seconds>(duration); + } else { + _watchdogMonitorThread.setPeriod(duration); + _watchdogCheckThread.setPeriod(duration); + + log() << "WatchdogMonitor disabled"; + } + } +} + +void WatchdogMonitor::shutdown() { + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + + bool started = (_state == State::kStarted); + + invariant(_state == State::kNotStarted || _state == State::kStarted); + + if (!started) { + _state = State::kDone; + return; + } + + _state = State::kShutdownRequested; + } + + _watchdogMonitorThread.shutdown(); + + _watchdogCheckThread.shutdown(); + + _state = State::kDone; +} + +std::int64_t WatchdogMonitor::getCheckGeneration() { + return _watchdogCheckThread.getGeneration(); +} + +std::int64_t WatchdogMonitor::getMonitorGeneration() { + return _watchdogMonitorThread.getGeneration(); +} + +#ifdef _WIN32 +/** + * Check a directory is ok + * 1. Open up a direct_io to a new file + * 2. Write to the file + * 3. Seek to the beginning + * 4. Read from the file + * 5. Close file + */ +void checkFile(OperationContext* opCtx, const boost::filesystem::path& file) { + Date_t now = opCtx->getServiceContext()->getPreciseClockSource()->now(); + std::string nowStr = now.toString(); + + HANDLE hFile = CreateFileW(file.generic_wstring().c_str(), + GENERIC_READ | GENERIC_WRITE, + 0, // No Sharing + NULL, + CREATE_ALWAYS, + FILE_ATTRIBUTE_NORMAL, + NULL); + if (hFile == INVALID_HANDLE_VALUE) { + std::uint32_t gle = ::GetLastError(); + severe() << "CreateFile failed for '" << file.generic_string() + << "' with error: " << errnoWithDescription(gle); + fassertNoTrace(4074, gle == 0); + } + + DWORD bytesWrittenTotal; + if (!WriteFile(hFile, nowStr.c_str(), nowStr.size(), &bytesWrittenTotal, NULL)) { + std::uint32_t gle = ::GetLastError(); + severe() << "WriteFile failed for '" << file.generic_string() + << "' with error: " << errnoWithDescription(gle); + fassertNoTrace(4075, gle == 0); + } + + if (bytesWrittenTotal != nowStr.size()) { + warning() << "partial write for '" << file.generic_string() << "' expected " + << nowStr.size() << " bytes but wrote " << bytesWrittenTotal << " bytes"; + } else { + + if (!FlushFileBuffers(hFile)) { + std::uint32_t gle = ::GetLastError(); + severe() << "FlushFileBuffers failed for '" << file.generic_string() + << "' with error: " << errnoWithDescription(gle); + fassertNoTrace(4076, gle == 0); + } + + DWORD newOffset = SetFilePointer(hFile, 0, 0, FILE_BEGIN); + if (newOffset != 0) { + std::uint32_t gle = ::GetLastError(); + severe() << "SetFilePointer failed for '" << file.generic_string() + << "' with error: " << errnoWithDescription(gle); + fassertNoTrace(4077, gle == 0); + } + + DWORD bytesRead; + auto readBuffer = stdx::make_unique<char[]>(nowStr.size()); + if (!ReadFile(hFile, readBuffer.get(), nowStr.size(), &bytesRead, NULL)) { + std::uint32_t gle = ::GetLastError(); + severe() << "ReadFile failed for '" << file.generic_string() + << "' with error: " << errnoWithDescription(gle); + fassertNoTrace(4078, gle == 0); + } + + if (bytesRead != bytesWrittenTotal) { + severe() << "Read wrong number of bytes for '" << file.generic_string() << "' expected " + << bytesWrittenTotal << " bytes but read " << bytesRead << " bytes"; + fassertNoTrace(50724, false); + } + + if (memcmp(nowStr.c_str(), readBuffer.get(), nowStr.size()) != 0) { + severe() << "Read wrong string from file '" << file.generic_string() << nowStr.size() + << " bytes (in hex) '" << toHexLower(nowStr.c_str(), nowStr.size()) + << "' but read bytes '" << toHexLower(readBuffer.get(), bytesRead) << "'"; + fassertNoTrace(50717, false); + } + } + + if (!CloseHandle(hFile)) { + std::uint32_t gle = ::GetLastError(); + severe() << "CloseHandle failed for '" << file.generic_string() + << "' with error: " << errnoWithDescription(gle); + fassertNoTrace(4079, gle == 0); + } +} + +void watchdogTerminate() { + ::TerminateProcess(::GetCurrentProcess(), ExitCode::EXIT_WATCHDOG); +} + +#else + +/** + * Check a directory is ok + * 1. Open up a direct_io to a new file + * 2. Write to the file + * 3. Read from the file + * 4. Close file + */ +void checkFile(OperationContext* opCtx, const boost::filesystem::path& file) { + Date_t now = opCtx->getServiceContext()->getPreciseClockSource()->now(); + std::string nowStr = now.toString(); + + int fd = open(file.generic_string().c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR); + if (fd == -1) { + auto err = errno; + severe() << "open failed for '" << file.generic_string() + << "' with error: " << errnoWithDescription(err); + fassertNoTrace(4080, err == 0); + } + + size_t bytesWrittenTotal = 0; + while (bytesWrittenTotal < nowStr.size()) { + ssize_t bytesWrittenInWrite = + write(fd, nowStr.c_str() + bytesWrittenTotal, nowStr.size() - bytesWrittenTotal); + if (bytesWrittenInWrite == -1) { + auto err = errno; + if (err == EINTR) { + continue; + } + + severe() << "write failed for '" << file.generic_string() + << "' with error: " << errnoWithDescription(err); + fassertNoTrace(4081, err == 0); + } + + // Warn if the write was incomplete + if (bytesWrittenTotal == 0 && static_cast<size_t>(bytesWrittenInWrite) != nowStr.size()) { + warning() << "parital write for '" << file.generic_string() << "' expected " + << nowStr.size() << " bytes but wrote " << bytesWrittenInWrite << " bytes"; + } + + bytesWrittenTotal += bytesWrittenInWrite; + } + + if (fsync(fd)) { + auto err = errno; + severe() << "fsync failed for '" << file.generic_string() + << "' with error: " << errnoWithDescription(err); + fassertNoTrace(4082, err == 0); + } + + auto readBuffer = stdx::make_unique<char[]>(nowStr.size()); + size_t bytesReadTotal = 0; + while (bytesReadTotal < nowStr.size()) { + ssize_t bytesReadInRead = pread( + fd, readBuffer.get() + bytesReadTotal, nowStr.size() - bytesReadTotal, bytesReadTotal); + if (bytesReadInRead == -1) { + auto err = errno; + if (err == EINTR) { + continue; + } + + severe() << "read failed for '" << file.generic_string() + << "' with error: " << errnoWithDescription(err); + fassertNoTrace(4083, err == 0); + } else if (bytesReadInRead == 0) { + severe() << "read failed for '" << file.generic_string() + << "' with unexpected end of file"; + fassertNoTrace(50719, false); + } + + // Warn if the read was incomplete + if (bytesReadTotal == 0 && static_cast<size_t>(bytesReadInRead) != nowStr.size()) { + warning() << "partial read for '" << file.generic_string() << "' expected " + << nowStr.size() << " bytes but read " << bytesReadInRead << " bytes"; + } + + bytesReadTotal += bytesReadInRead; + } + + if (memcmp(nowStr.c_str(), readBuffer.get(), nowStr.size()) != 0) { + severe() << "Read wrong string from file '" << file.generic_string() << "' expected " + << nowStr.size() << " bytes (in hex) '" + << toHexLower(nowStr.c_str(), nowStr.size()) << "' but read bytes '" + << toHexLower(readBuffer.get(), bytesReadTotal) << "'"; + fassertNoTrace(50718, false); + } + + if (close(fd)) { + auto err = errno; + severe() << "close failed for '" << file.generic_string() + << "' with error: " << errnoWithDescription(err); + fassertNoTrace(4084, err == 0); + } +} + +void watchdogTerminate() { + // This calls the exit_group syscall on Linux + ::_exit(ExitCode::EXIT_WATCHDOG); +} +#endif + +constexpr StringData DirectoryCheck::kProbeFileName; +constexpr StringData DirectoryCheck::kProbeFileNameExt; + +void DirectoryCheck::run(OperationContext* opCtx) { + // Ensure we have unique file names if multiple processes share the same logging directory + boost::filesystem::path file = _directory; + file /= kProbeFileName.toString(); + file += ProcessId::getCurrent().toString(); + file += kProbeFileNameExt.toString(); + + checkFile(opCtx, file); + + // Try to delete the file so it is not leaked on restart, but ignore errors + boost::system::error_code ec; + boost::filesystem::remove(file, ec); + if (ec) { + warning() << "Failed to delete file '" << file.generic_string() + << "', error: " << ec.message(); + } +} + +std::string DirectoryCheck::getDescriptionForLogging() { + return str::stream() << "checked directory '" << _directory.generic_string() << "'"; +} + +} // namespace mongo diff --git a/src/mongo/watchdog/watchdog.h b/src/mongo/watchdog/watchdog.h new file mode 100644 index 00000000000..fe0060e3534 --- /dev/null +++ b/src/mongo/watchdog/watchdog.h @@ -0,0 +1,385 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/filesystem/path.hpp> +#include <string> +#include <vector> + +#include "mongo/platform/atomic_word.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/functional.h" +#include "mongo/stdx/mutex.h" +#include "mongo/stdx/thread.h" +#include "mongo/util/duration.h" + +namespace mongo { + +class OperationContext; + +/** + * WatchdogDeathCallback is used by the watchdog component to terminate the process. It is expected + * to bypass MongoDB's normal shutdown process. It should not make any syscalls other then to + * exit/terminate the process. + * + * It is pluggable for testing purposes. + */ +using WatchdogDeathCallback = stdx::function<void(void)>; + +/** + * The OS specific implementation of WatchdogDeathCallback that kills the process. + */ +void watchdogTerminate(); + +/** + * WatchdogCheck represents a health check that the watchdog will run periodically to ensure the + * machine, and process are healthy. + * + * It is pluggable for testing purposes. + */ +class WatchdogCheck { +public: + virtual ~WatchdogCheck() = default; + + /** + * Runs a health check against the local machine. + * + * Note: It should throw exceptions on unexpected errors. Exceptions will result in a call to + * WatchdogDeathCallback. + */ + virtual void run(OperationContext* opCtx) = 0; + + /** + * Returns a description for the watchdog check to log to the log file. + */ + virtual std::string getDescriptionForLogging() = 0; +}; + +/** + * Do a health check for a given directory. This health check is done by reading, and writing to a + * file with direct I/O. + */ +class DirectoryCheck : public WatchdogCheck { +public: + static constexpr StringData kProbeFileName = "watchdog_probe_"_sd; + static constexpr StringData kProbeFileNameExt = ".txt"_sd; + +public: + DirectoryCheck(const boost::filesystem::path& directory) : _directory(directory) {} + + void run(OperationContext* opCtx) final; + + std::string getDescriptionForLogging() final; + +private: + boost::filesystem::path _directory; +}; + +/** + * Runs a callback on a periodic basis. The specified time period is the time delay between + * invocations. + * + * Example: + * - callback + * - sleep(period) + * - callback + */ +class WatchdogPeriodicThread { +public: + WatchdogPeriodicThread(Milliseconds period, StringData threadName); + virtual ~WatchdogPeriodicThread() = default; + + /** + * Starts the periodic thread. + */ + void start(); + + /** + * Updates the period the thread runs its task. + * + * Period changes take affect immediately. + */ + void setPeriod(Milliseconds period); + + /** + * Shutdown the periodic thread. After it is shutdown, it cannot be started. + */ + void shutdown(); + +protected: + /** + * Do one iteration of work. + */ + virtual void run(OperationContext* opCtx) = 0; + + /** + * Provides an opportunity for derived classes to initialize state. + * + * This method is called at two different times: + * 1. First time a thread is started. + * 2. When a thread goes from disabled to enabled. Specifically, a user calls setPeriod(-1) + * followed by setPeriod(> 0). + * + */ + virtual void resetState() = 0; + +private: + /** + * Main thread loop + */ + void doLoop(); + +private: + /** + * Private enum to track state. + * + * +----------------------------------------------------------------+ + * | v + * +-------------+ +----------+ +--------------------+ +-------+ + * | kNotStarted | --> | kStarted | --> | kShutdownRequested | --> | kDone | + * +-------------+ +----------+ +--------------------+ +-------+ + */ + enum class State { + /** + * Initial state. Either start() or shutdown() can be called next. + */ + kNotStarted, + + /** + * start() has been called. shutdown() should be called next. + */ + kStarted, + + /** + * shutdown() has been called, and the thread is in progress of shutting down. + */ + kShutdownRequested, + + /** + * PeriodicThread has been shutdown. + */ + kDone, + }; + + // State of PeriodicThread + State _state{State::kNotStarted}; + + // Thread period + Milliseconds _period; + + // if true, then call run() otherwise just let the thread idle, + bool _enabled; + + // Name of thread for logging purposes + std::string _threadName; + + // The thread + stdx::thread _thread; + + // Lock to protect _state and control _thread + stdx::mutex _mutex; + stdx::condition_variable _condvar; +}; + +/** + * Periodic background thread to run watchdog checks. + */ +class WatchdogCheckThread : public WatchdogPeriodicThread { +public: + WatchdogCheckThread(std::vector<std::unique_ptr<WatchdogCheck>> checks, Milliseconds period); + + /** + * Returns the current generation number of the checks. + * + * Incremented after each check is run. + */ + std::int64_t getGeneration(); + +private: + void run(OperationContext* opCtx) final; + void resetState() final; + +private: + // Vector of checks to run + std::vector<std::unique_ptr<WatchdogCheck>> _checks; + + // A counter that is incremented for each watchdog check completed, and monitored to ensure it + // does not remain at the same value for too long. + AtomicWord<long long> _checkGeneration{0}; +}; + +/** + * Periodic background thread to ensure watchdog checks run periodically. + */ +class WatchdogMonitorThread : public WatchdogPeriodicThread { +public: + WatchdogMonitorThread(WatchdogCheckThread* checkThread, + WatchdogDeathCallback callback, + Milliseconds period); + + /** + * Returns the current generation number of the monitor. + * + * Incremented after each round of monitoring is run. + */ + std::int64_t getGeneration(); + +private: + void run(OperationContext* opCtx) final; + void resetState() final; + +private: + // Callback function to call when watchdog gets stuck + const WatchdogDeathCallback _callback; + + // Watchdog check thread to query + WatchdogCheckThread* _checkThread; + + // A counter that is incremented for each watchdog monitor run is completed. + AtomicWord<long long> _monitorGeneration{0}; + + // The last seen _checkGeneration value + std::int64_t _lastSeenGeneration{-1}; +}; + + +/** + * WatchdogMonitor + * + * The Watchdog is a pair of dedicated threads that try to figure out if a process is hung + * and terminate if it is. The worst case scenario in a distributed system is a process that appears + * to work but does not actually work. + * + * The watchdog is not designed to detect all the different ways the process is hung. It's goal is + * to detect if the storage system is stuck, and to terminate the process if it is stuck. + * + * Threads: + * WatchdogCheck - runs file system checks + * WatchdogMonitor - verifies that WatchdogCheck continue to make timely progress. If WatchdogCheck + * fails to make process, WatchdogMonitor calls a callback. The callback is not + * expected to do any I/O and minimize the system calls it makes. + */ +class WatchdogMonitor { +public: + /** + * Create the watchdog with specified period. + * + * checkPeriod - how often to run the checks + * monitorPeriod - how often to run the monitor, must be >= checkPeriod + */ + WatchdogMonitor(std::vector<std::unique_ptr<WatchdogCheck>> checks, + Milliseconds checkPeriod, + Milliseconds monitorPeriod, + WatchdogDeathCallback callback); + + /** + * Starts the watchdog threads. + */ + void start(); + + /** + * Updates the watchdog monitor period. The goal is to detect a failure in the time of the + * period. + * + * Does nothing if watchdog is not started. If watchdog was started, it changes the monitor + * period, but not the check period. + * + * Accepts Milliseconds for testing purposes while the setParameter only works with seconds. + */ + void setPeriod(Milliseconds duration); + + /** + * Shutdown the watchdog. + */ + void shutdown(); + + /** + * Returns the current generation number of the checks. + * + * Incremented after each round of checks is run. + */ + std::int64_t getCheckGeneration(); + + /** + * Returns the current generation number of the checks. + * + * Incremented after each round of checks is run. + */ + std::int64_t getMonitorGeneration(); + +private: + /** + * Private enum to track state. + * + * +----------------------------------------------------------------+ + * | v + * +-------------+ +----------+ +--------------------+ +-------+ + * | kNotStarted | --> | kStarted | --> | kShutdownRequested | --> | kDone | + * +-------------+ +----------+ +--------------------+ +-------+ + */ + enum class State { + /** + * Initial state. Either start() or shutdown() can be called next. + */ + kNotStarted, + + /** + * start() has been called. shutdown() should be called next. + */ + kStarted, + + /** + * shutdown() has been called, and the background threads are in progress of shutting down. + */ + kShutdownRequested, + + /** + * Watchdog has been shutdown. + */ + kDone, + }; + + // Lock to protect _state and control _thread + stdx::mutex _mutex; + + // State of watchdog + State _state{State::kNotStarted}; + + // Fixed period for running the checks. + Milliseconds _checkPeriod; + + // WatchdogCheck Thread - runs checks + WatchdogCheckThread _watchdogCheckThread; + + // WatchdogMonitor Thread - watches _watchdogCheckThread + WatchdogMonitorThread _watchdogMonitorThread; +}; + +} // namespace mongo diff --git a/src/mongo/watchdog/watchdog_mongod.cpp b/src/mongo/watchdog/watchdog_mongod.cpp new file mode 100644 index 00000000000..c223e9fc24d --- /dev/null +++ b/src/mongo/watchdog/watchdog_mongod.cpp @@ -0,0 +1,201 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault + +#include "mongo/platform/basic.h" + +#include "mongo/watchdog/watchdog_mongod.h" + +#include <boost/filesystem.hpp> + +#include "mongo/base/init.h" +#include "mongo/config.h" +#include "mongo/db/client.h" +#include "mongo/db/commands/server_status.h" +#include "mongo/db/server_options.h" +#include "mongo/db/service_context.h" +#include "mongo/db/storage/storage_options.h" +#include "mongo/stdx/memory.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/clock_source.h" +#include "mongo/util/clock_source_mock.h" +#include "mongo/util/log.h" +#include "mongo/util/tick_source_mock.h" +#include "mongo/watchdog/watchdog.h" +#include "mongo/watchdog/watchdog_mongod_gen.h" +#include "mongo/watchdog/watchdog_register.h" + +namespace mongo { + +// Run the watchdog checks at a fixed interval regardless of user choice for monitoring period. +constexpr Seconds watchdogCheckPeriod = Seconds{10}; + +namespace { + +const auto getWatchdogMonitor = + ServiceContext::declareDecoration<std::unique_ptr<WatchdogMonitor>>(); + +// A boolean variable to track whether the watchdog was enabled at startup. +// Defaults to true because set parameters are handled before we start the watchdog if needed. +bool watchdogEnabled{true}; + +WatchdogMonitor* getGlobalWatchdogMonitor() { + if (!hasGlobalServiceContext()) { + return nullptr; + } + + return getWatchdogMonitor(getGlobalServiceContext()).get(); +} + +} // namespace + +Status validateWatchdogPeriodSeconds(const int& value) { + if (value < 60 && value != -1) { + + return {ErrorCodes::BadValue, "watchdogPeriodSeconds must be greater than or equal to 60s"}; + } + + // If the watchdog was not enabled at startup, disallow changes the period. + if (!watchdogEnabled) { + return {ErrorCodes::BadValue, + "watchdogPeriodSeconds cannot be changed at runtime if it was not set at startup"}; + } + + return Status::OK(); +} + +Status onUpdateWatchdogPeriodSeconds(const int& value) { + auto monitor = getGlobalWatchdogMonitor(); + if (monitor) { + monitor->setPeriod(Seconds(value)); + } + + return Status::OK(); +} + +/** + * Server status section for the Watchdog. + * + * Sample format: + * + * watchdog: { + * generation: int, + * } + */ +class WatchdogServerStatusSection : public ServerStatusSection { +public: + WatchdogServerStatusSection() : ServerStatusSection("watchdog") {} + bool includeByDefault() const { + // Only include this by default if the watchdog is on + return watchdogEnabled; + } + + BSONObj generateSection(OperationContext* opCtx, const BSONElement& configElement) const { + BSONObjBuilder result; + + WatchdogMonitor* watchdog = getWatchdogMonitor(opCtx->getServiceContext()).get(); + + result.append("checkGeneration", watchdog->getCheckGeneration()); + result.append("monitorGeneration", watchdog->getMonitorGeneration()); + result.append("monitorPeriod", gWatchdogPeriodSeconds.load()); + + return result.obj(); + } +} watchdogServerStatusSection; + +void startWatchdog() { + // Check three paths if set + // 1. storage directory - optional for inmemory? + // 2. log path - optional + // 3. audit path - optional + + Seconds period{gWatchdogPeriodSeconds.load()}; + if (period < Seconds::zero()) { + // Skip starting the watchdog if the user has not asked for it. + watchdogEnabled = false; + return; + } + + watchdogEnabled = true; + + std::vector<std::unique_ptr<WatchdogCheck>> checks; + + auto dataCheck = + stdx::make_unique<DirectoryCheck>(boost::filesystem::path(storageGlobalParams.dbpath)); + + checks.push_back(std::move(dataCheck)); + + // Add a check for the journal if it is not disabled + if (storageGlobalParams.dur) { + auto journalDirectory = boost::filesystem::path(storageGlobalParams.dbpath); + journalDirectory /= "journal"; + + if (boost::filesystem::exists(journalDirectory)) { + auto journalCheck = stdx::make_unique<DirectoryCheck>(journalDirectory); + + checks.push_back(std::move(journalCheck)); + } else { + warning() + << "Watchdog is skipping check for journal directory since it does not exist: '" + << journalDirectory.generic_string() << "'"; + } + } + + // If the user specified a log path, also monitor that directory. + // This may be redudant with the dbpath check but there is not easy way to confirm they are + // duplicate. + if (!serverGlobalParams.logpath.empty()) { + boost::filesystem::path logFile(serverGlobalParams.logpath); + auto logPath = logFile.parent_path(); + + auto logCheck = stdx::make_unique<DirectoryCheck>(logPath); + checks.push_back(std::move(logCheck)); + } + + // If the user specified an audit path, also monitor that directory. + // This may be redudant with the dbpath check but there is not easy way to confirm they are + // duplicate. + for (auto&& path : getWatchdogPaths()) { + auto auditCheck = stdx::make_unique<DirectoryCheck>(path); + checks.push_back(std::move(auditCheck)); + } + + auto monitor = stdx::make_unique<WatchdogMonitor>( + std::move(checks), watchdogCheckPeriod, period, watchdogTerminate); + + // Install the new WatchdogMonitor + auto& staticMonitor = getWatchdogMonitor(getGlobalServiceContext()); + + staticMonitor = std::move(monitor); + + staticMonitor->start(); +} + +} // namespace mongo diff --git a/src/mongo/watchdog/watchdog_mongod.h b/src/mongo/watchdog/watchdog_mongod.h new file mode 100644 index 00000000000..186e21e4a47 --- /dev/null +++ b/src/mongo/watchdog/watchdog_mongod.h @@ -0,0 +1,47 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/status.h" + +namespace mongo { + +/** +* Start the watchdog. +*/ +void startWatchdog(); + +/** + * Callbacks used by the 'watchdogPeriodSeconds' set parameter. + */ +Status validateWatchdogPeriodSeconds(const int& value); +Status onUpdateWatchdogPeriodSeconds(const int& value); + +} // namespace mongo diff --git a/src/mongo/watchdog/watchdog_mongod.idl b/src/mongo/watchdog/watchdog_mongod.idl new file mode 100644 index 00000000000..61e6fd605b0 --- /dev/null +++ b/src/mongo/watchdog/watchdog_mongod.idl @@ -0,0 +1,43 @@ +# Copyright (C) 2019-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. +# + +global: + cpp_namespace: "mongo" + cpp_includes: + - "mongo/watchdog/watchdog_mongod.h" + +server_parameters: + "watchdogPeriodSeconds": + description: 'Watchdog Period (seconds)' + set_at: [ startup, runtime ] + cpp_vartype: 'AtomicWord<int>' + cpp_varname: gWatchdogPeriodSeconds + default: -1 + validator: + callback: validateWatchdogPeriodSeconds + on_update: onUpdateWatchdogPeriodSeconds diff --git a/src/mongo/watchdog/watchdog_register.cpp b/src/mongo/watchdog/watchdog_register.cpp new file mode 100644 index 00000000000..d1976a4c74c --- /dev/null +++ b/src/mongo/watchdog/watchdog_register.cpp @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/watchdog/watchdog_register.h" + +namespace mongo { + +namespace { + +std::vector<std::string> watchdogPaths; + +} // namespace + +void registerWatchdogPath(StringData path) { + if (!path.empty()) { + watchdogPaths.push_back(path.toString()); + } +} + +std::vector<std::string>& getWatchdogPaths() { + return watchdogPaths; +} + +} // namespace mongo diff --git a/src/mongo/watchdog/watchdog_register.h b/src/mongo/watchdog/watchdog_register.h new file mode 100644 index 00000000000..822d5d24c6e --- /dev/null +++ b/src/mongo/watchdog/watchdog_register.h @@ -0,0 +1,48 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <vector> + +#include "mongo/base/string_data.h" + +namespace mongo { + +/** + * Allow components a way to tell the watchdog what to watch. + */ +void registerWatchdogPath(StringData path); + +/** + * Get list of registered watchdog paths. + */ +std::vector<std::string>& getWatchdogPaths(); + +} // namespace mongo diff --git a/src/mongo/watchdog/watchdog_test.cpp b/src/mongo/watchdog/watchdog_test.cpp new file mode 100644 index 00000000000..fd3b34602c9 --- /dev/null +++ b/src/mongo/watchdog/watchdog_test.cpp @@ -0,0 +1,480 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault + +#include "mongo/platform/basic.h" + +#include "mongo/watchdog/watchdog.h" + +#include "mongo/db/client.h" +#include "mongo/db/service_context.h" +#include "mongo/db/service_context_test_fixture.h" +#include "mongo/stdx/memory.h" +#include "mongo/unittest/death_test.h" +#include "mongo/unittest/temp_dir.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/clock_source.h" +#include "mongo/util/clock_source_mock.h" +#include "mongo/util/log.h" +#include "mongo/util/tick_source_mock.h" + +namespace mongo { + +class TestPeriodicThread : public WatchdogPeriodicThread { +public: + TestPeriodicThread(Milliseconds period) : WatchdogPeriodicThread(period, "testPeriodic") {} + + void run(OperationContext* opCtx) final { + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + ++_counter; + } + + if (_counter == _wait) { + _condvar.notify_all(); + } + } + + void setSignalOnCount(int c) { + _wait = c; + } + + void waitForCount() { + invariant(_wait != 0); + + stdx::unique_lock<stdx::mutex> lock(_mutex); + while (_counter < _wait) { + _condvar.wait(lock); + } + } + + void resetState() final {} + + std::uint32_t getCounter() { + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + return _counter; + } + } + +private: + std::uint32_t _counter{0}; + + stdx::mutex _mutex; + stdx::condition_variable _condvar; + std::uint32_t _wait{0}; +}; + +class PeriodicThreadTest : public ServiceContextTest {}; + +// Tests: +// 1. Make sure it runs at least N times +// 2. Make sure it responds to stop after being paused +// 3. Make sure it can be resumed +// 4. Make sure the period can be changed like from 1 minute -> 1 milli + +// Positive: Make sure periodic thread runs at least N times and stops correctly +TEST_F(PeriodicThreadTest, Basic) { + + TestPeriodicThread testThread(Milliseconds(5)); + + testThread.setSignalOnCount(5); + + testThread.start(); + + testThread.waitForCount(); + + testThread.shutdown(); + + // Check the counter after it is shutdown and make sure it does not change. + std::uint32_t lastCounter = testThread.getCounter(); + + // This is racey but it should only produce false negatives + sleepmillis(100); + + ASSERT_EQ(lastCounter, testThread.getCounter()); +} + +// Positive: Make sure it stops after being paused +TEST_F(PeriodicThreadTest, PauseAndStop) { + + TestPeriodicThread testThread(Milliseconds(5)); + testThread.setSignalOnCount(5); + + testThread.start(); + + testThread.waitForCount(); + + // Stop the thread by setting a -1 duration + testThread.setPeriod(Milliseconds(-1)); + + // Check the counter after it is shutdown and make sure it does not change. + std::uint32_t pauseCounter = testThread.getCounter(); + + // This is racey but it should only produce false negatives + sleepmillis(100); + + // We could have had one more run of the loop as we paused - allow for that case + // but no other runs of the thread. + ASSERT_GTE(pauseCounter + 1, testThread.getCounter()); + + testThread.shutdown(); + + // Check the counter after it is shutdown and make sure it does not change. + std::uint32_t stopCounter = testThread.getCounter(); + + // This is racey but it should only produce false negatives + sleepmillis(100); + + ASSERT_EQ(stopCounter, testThread.getCounter()); +} + +// Positive: Make sure it can be paused and resumed +TEST_F(PeriodicThreadTest, PauseAndResume) { + + TestPeriodicThread testThread(Milliseconds(5)); + testThread.setSignalOnCount(5); + + testThread.start(); + + testThread.waitForCount(); + + // Stop the thread by setting a -1 duration + testThread.setPeriod(Milliseconds(-1)); + + // Check the counter after it is shutdown and make sure it does not change. + std::uint32_t pauseCounter = testThread.getCounter(); + + // This is racey but it should only produce false negatives + sleepmillis(100); + + // We could have had one more run of the loop as we paused - allow for that case + // but no other runs of the thread. + ASSERT_GTE(pauseCounter + 1, testThread.getCounter()); + + // Make sure we can resume the thread again + std::uint32_t baseCounter = testThread.getCounter(); + testThread.setSignalOnCount(baseCounter + 5); + + testThread.setPeriod(Milliseconds(7)); + + testThread.waitForCount(); + + testThread.shutdown(); +} + +/** + * Simple class to ensure we run checks. + */ +class TestCounterCheck : public WatchdogCheck { +public: + void run(OperationContext* opCtx) final { + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + ++_counter; + } + + if (_counter == _wait) { + _condvar.notify_all(); + } + } + + std::string getDescriptionForLogging() final { + return "test"; + } + + void setSignalOnCount(int c) { + _wait = c; + } + + void waitForCount() { + invariant(_wait != 0); + + stdx::unique_lock<stdx::mutex> lock(_mutex); + while (_counter < _wait) { + _condvar.wait(lock); + } + } + + std::uint32_t getCounter() { + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + return _counter; + } + } + +private: + std::uint32_t _counter{0}; + + stdx::mutex _mutex; + stdx::condition_variable _condvar; + std::uint32_t _wait{0}; +}; + +class WatchdogCheckThreadTest : public ServiceContextTest {}; + +// Positive: Make sure check thread runs at least N times and stops correctly +TEST_F(WatchdogCheckThreadTest, Basic) { + auto counterCheck = stdx::make_unique<TestCounterCheck>(); + auto counterCheckPtr = counterCheck.get(); + + std::vector<std::unique_ptr<WatchdogCheck>> checks; + checks.push_back(std::move(counterCheck)); + + WatchdogCheckThread testThread(std::move(checks), Milliseconds(5)); + + counterCheckPtr->setSignalOnCount(5); + + testThread.start(); + + counterCheckPtr->waitForCount(); + + testThread.shutdown(); + + // Check the counter after it is shutdown and make sure it does not change. + std::uint32_t lastCounter = counterCheckPtr->getCounter(); + + // This is racey but it should only produce false negatives + sleepmillis(100); + + ASSERT_EQ(lastCounter, counterCheckPtr->getCounter()); +} + +/** + * A class that models the behavior of Windows' manual reset Event object. + */ +class ManualResetEvent { +public: + void set() { + stdx::lock_guard<stdx::mutex> lock(_mutex); + + _set = true; + _condvar.notify_one(); + } + + void wait() { + stdx::unique_lock<stdx::mutex> lock(_mutex); + + _condvar.wait(lock, [this]() { return _set; }); + } + +private: + bool _set{false}; + + stdx::mutex _mutex; + stdx::condition_variable _condvar; +}; + + +class WatchdogMonitorThreadTest : public ServiceContextTest {}; + +// Positive: Make sure monitor thread signals death if the check thread never starts +TEST_F(WatchdogMonitorThreadTest, Basic) { + ManualResetEvent deathEvent; + WatchdogDeathCallback deathCallback = [&deathEvent]() { + log() << "Death signalled"; + deathEvent.set(); + }; + + auto counterCheck = stdx::make_unique<TestCounterCheck>(); + + std::vector<std::unique_ptr<WatchdogCheck>> checks; + checks.push_back(std::move(counterCheck)); + + WatchdogCheckThread checkThread(std::move(checks), Milliseconds(5)); + + WatchdogMonitorThread monitorThread(&checkThread, deathCallback, Milliseconds(5)); + + monitorThread.start(); + + deathEvent.wait(); + + monitorThread.shutdown(); +} + +/** + * Sleep after doing a few checks to replicate a hung check. + */ +class SleepyCheck : public WatchdogCheck { +public: + void run(OperationContext* opCtx) final { + ++_counter; + + if (_counter >= 6) { + sleepFor(Seconds(5)); + } + } + + std::string getDescriptionForLogging() final { + return "test"; + } + +private: + std::uint32_t _counter{0}; +}; + +// Positive: Make sure monitor thread signals death if the thread does not make progress +TEST_F(WatchdogMonitorThreadTest, SleepyHungCheck) { + ManualResetEvent deathEvent; + WatchdogDeathCallback deathCallback = [&deathEvent]() { + log() << "Death signalled"; + deathEvent.set(); + }; + + auto sleepyCheck = stdx::make_unique<SleepyCheck>(); + + std::vector<std::unique_ptr<WatchdogCheck>> checks; + checks.push_back(std::move(sleepyCheck)); + + WatchdogCheckThread checkThread(std::move(checks), Milliseconds(1)); + + WatchdogMonitorThread monitorThread(&checkThread, deathCallback, Milliseconds(100)); + + checkThread.start(); + + monitorThread.start(); + + deathEvent.wait(); + + // Make sure we actually did some checks + ASSERT_GTE(checkThread.getGeneration(), 2); + + monitorThread.shutdown(); + + checkThread.shutdown(); +} + +class WatchdogMonitorTest : public ServiceContextTest {}; + +// Positive: Make sure watchdog monitor signals death if a check is unresponsive +TEST_F(WatchdogMonitorTest, SleepyHungCheck) { + ManualResetEvent deathEvent; + WatchdogDeathCallback deathCallback = [&deathEvent]() { + log() << "Death signalled"; + deathEvent.set(); + }; + + auto sleepyCheck = stdx::make_unique<SleepyCheck>(); + + std::vector<std::unique_ptr<WatchdogCheck>> checks; + checks.push_back(std::move(sleepyCheck)); + + WatchdogMonitor monitor(std::move(checks), Milliseconds(1), Milliseconds(5), deathCallback); + + monitor.start(); + + deathEvent.wait(); + + monitor.shutdown(); +} + +// Positive: Make sure watchdog monitor terminates the process if a check is unresponsive +DEATH_TEST(WatchdogMonitorTest, Death, "") { + auto sleepyCheck = stdx::make_unique<SleepyCheck>(); + + std::vector<std::unique_ptr<WatchdogCheck>> checks; + checks.push_back(std::move(sleepyCheck)); + + WatchdogMonitor monitor( + std::move(checks), Milliseconds(1), Milliseconds(100), watchdogTerminate); + + monitor.start(); + + sleepmillis(1000); +} + +// Positive: Make sure the monitor can be paused and resumed, and it does not trigger death +TEST_F(WatchdogMonitorTest, PauseAndResume) { + + WatchdogDeathCallback deathCallback = []() { + log() << "Death signalled, it should not have been"; + invariant(false); + }; + + auto counterCheck = stdx::make_unique<TestCounterCheck>(); + auto counterCheckPtr = counterCheck.get(); + + std::vector<std::unique_ptr<WatchdogCheck>> checks; + checks.push_back(std::move(counterCheck)); + + WatchdogMonitor monitor(std::move(checks), Milliseconds(1), Milliseconds(1001), deathCallback); + + counterCheckPtr->setSignalOnCount(5); + + monitor.start(); + + counterCheckPtr->waitForCount(); + + // Pause the monitor + monitor.setPeriod(Milliseconds(-1)); + + // Check the counter after it is shutdown and make sure it does not change. + std::uint32_t pauseCounter = counterCheckPtr->getCounter(); + + // This is racey but it should only produce false negatives + sleepmillis(100); + + // We could have had one more run of the loop as we paused - allow for that case + // but no other runs of the thread. + ASSERT_GTE(pauseCounter + 1, counterCheckPtr->getCounter()); + + // Resume the monitor + std::uint32_t baseCounter = counterCheckPtr->getCounter(); + counterCheckPtr->setSignalOnCount(baseCounter + 5); + + // Restart the monitor with a different interval. + monitor.setPeriod(Milliseconds(1007)); + + counterCheckPtr->waitForCount(); + + monitor.shutdown(); + + // Check the counter after it is shutdown and make sure it does not change. + std::uint32_t lastCounter = counterCheckPtr->getCounter(); + + // This is racey but it should only produce false negatives + sleepmillis(100); + + ASSERT_EQ(lastCounter, counterCheckPtr->getCounter()); +} + +class DirectoryCheckTest : public ServiceContextTest {}; + +// Positive: Do a sanity check that directory check passes +TEST_F(DirectoryCheckTest, Basic) { + unittest::TempDir tempdir("watchdog_testpath"); + + DirectoryCheck check(tempdir.path()); + + auto opCtx = makeOperationContext(); + check.run(opCtx.get()); +} + +} // namespace mongo |