/** * Copyright (C) 2016 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication #include "mongo/platform/basic.h" #include "mongo/db/commands.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" #include "mongo/db/namespace_string.h" #include "mongo/db/op_observer.h" #include "mongo/db/repl/noop_writer.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/server_parameters.h" #include "mongo/stdx/functional.h" #include "mongo/util/concurrency/idle_thread_block.h" #include "mongo/util/log.h" namespace mongo { namespace repl { namespace { MONGO_EXPORT_SERVER_PARAMETER(writePeriodicNoops, bool, true); const auto kMsgObj = BSON("msg" << "periodic noop"); } // namespace /** * Runs the noopWrite argument with waitTime period until its destroyed. */ class NoopWriter::PeriodicNoopRunner { MONGO_DISALLOW_COPYING(PeriodicNoopRunner); using NoopWriteFn = stdx::function; public: PeriodicNoopRunner(Seconds waitTime, NoopWriteFn noopWrite) : _thread([this, noopWrite, waitTime] { run(waitTime, std::move(noopWrite)); }) {} ~PeriodicNoopRunner() { stdx::unique_lock lk(_mutex); _inShutdown = true; _cv.notify_all(); lk.unlock(); _thread.join(); } private: void run(Seconds waitTime, NoopWriteFn noopWrite) { Client::initThread("NoopWriter"); while (true) { const ServiceContext::UniqueOperationContext opCtxPtr = cc().makeOperationContext(); OperationContext& opCtx = *opCtxPtr; { stdx::unique_lock lk(_mutex); MONGO_IDLE_THREAD_BLOCK; _cv.wait_for(lk, waitTime.toSystemDuration(), [&] { return _inShutdown; }); if (_inShutdown) return; } noopWrite(&opCtx); } } /** * Indicator that thread is shutting down. */ bool _inShutdown{false}; /** * Mutex for the CV */ stdx::mutex _mutex; /** * CV to wait for. */ stdx::condition_variable _cv; /** * Thread that runs the tasks. Must be last so all other members are initialized before * starting. */ stdx::thread _thread; }; NoopWriter::NoopWriter(Seconds writeInterval) : _writeInterval(writeInterval) { uassert(ErrorCodes::BadValue, "write interval must be positive", writeInterval > Seconds(0)); } NoopWriter::~NoopWriter() { stopWritingPeriodicNoops(); } Status NoopWriter::startWritingPeriodicNoops(OpTime lastKnownOpTime) { stdx::lock_guard lk(_mutex); _lastKnownOpTime = lastKnownOpTime; invariant(!_noopRunner); _noopRunner = stdx::make_unique( _writeInterval, [this](OperationContext* opCtx) { _writeNoop(opCtx); }); return Status::OK(); } void NoopWriter::stopWritingPeriodicNoops() { stdx::lock_guard lk(_mutex); _noopRunner.reset(); } void NoopWriter::_writeNoop(OperationContext* opCtx) { // Use GlobalLock + lockMMAPV1Flush instead of DBLock to allow return when the lock is not // available. It may happen when the primary steps down and a shared global lock is acquired. Lock::GlobalLock lock(opCtx, MODE_IX, 1); if (!lock.isLocked()) { LOG(1) << "Global lock is not available skipping noopWrite"; return; } opCtx->lockState()->lockMMAPV1Flush(); auto replCoord = ReplicationCoordinator::get(opCtx); // Its a proxy for being a primary if (!replCoord->canAcceptWritesForDatabase(opCtx, "admin")) { LOG(1) << "Not a primary, skipping the noop write"; return; } auto lastAppliedOpTime = replCoord->getMyLastAppliedOpTime(); // _lastKnownOpTime is not protected by lock as its used only by one thread. if (lastAppliedOpTime != _lastKnownOpTime) { LOG(1) << "Not scheduling a noop write. Last known OpTime: " << _lastKnownOpTime << " != last primary OpTime: " << lastAppliedOpTime; } else { if (writePeriodicNoops.load()) { const auto logLevel = Command::testCommandsEnabled ? 0 : 1; LOG(logLevel) << "Writing noop to oplog as there has been no writes to this replica set in over " << _writeInterval; writeConflictRetry( opCtx, "writeNoop", NamespaceString::kRsOplogNamespace.ns(), [&opCtx] { WriteUnitOfWork uow(opCtx); opCtx->getClient()->getServiceContext()->getOpObserver()->onOpMessage(opCtx, kMsgObj); uow.commit(); }); } } _lastKnownOpTime = replCoord->getMyLastAppliedOpTime(); LOG(1) << "Set last known op time to " << _lastKnownOpTime; } } // namespace repl } // namespace mongo