/** * Copyright (C) 2014 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #pragma once #include #include #include "mongo/base/disallow_copying.h" #include "mongo/base/status.h" #include "mongo/base/status_with.h" #include "mongo/base/string_data.h" #include "mongo/client/remote_command_runner.h" #include "mongo/db/concurrency/lock_manager_defs.h" #include "mongo/db/repl/task_runner.h" #include "mongo/executor/task_executor.h" #include "mongo/platform/compiler.h" #include "mongo/platform/random.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/list.h" #include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/old_thread_pool.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/time_support.h" namespace mongo { class NamespaceString; class OperationContext; namespace executor{ class NetworkInterface; } // namespace executor namespace repl { class StorageInterface; /** * Implementation of the TaskExecutor interface for providing an event loop for driving state * machines in replication. * * Usage: Instantiate an executor, schedule a work item, call run(). * * Implementation details: * * The executor is composed of several WorkQueues, which are queues of WorkItems. WorkItems * describe units of work -- a callback and state needed to track its lifecycle. The iterators * pointing to WorkItems are spliced between the WorkQueues, rather than copying WorkItems * themselves. Further, those WorkQueue::iterators are never invalidated during the life of an * executor. They may be recycled to represent new work items, but when that happens, a counter * on the WorkItem is incremented, to disambiguate. * * All work executed by the run() method of the executor is popped off the front of the * _readyQueue. Remote commands blocked on the network can be found in the * _networkInProgressQueue. Callbacks waiting for a timer to expire are in the _sleepersQueue. * When the network returns or the timer expires, items from these two queues are transferred to * the back of the _readyQueue. * * The _exclusiveLockInProgressQueue, which represents work items to execute while holding the * GlobalWrite lock, is exceptional. WorkItems in that queue execute in unspecified order with * respect to work in the _readyQueue or other WorkItems in the _exclusiveLockInProgressQueue, * but they are executed in a single serial order with respect to those other WorkItems. The * _terribleExLockSyncMutex is used to provide this serialization, until such time as the global * lock may be passed from one thread to another. */ class ReplicationExecutor final : public executor::TaskExecutor { MONGO_DISALLOW_COPYING(ReplicationExecutor); public: /** * Constructs a new executor. * * Takes ownership of the passed NetworkInterface object. */ ReplicationExecutor(executor::NetworkInterface* netInterface, StorageInterface* storageInterface, int64_t pnrgSeed); /** * Destroys an executor. */ virtual ~ReplicationExecutor(); std::string getDiagnosticString() override; Date_t now() override; void shutdown() override; void signalEvent(const EventHandle& event) override; StatusWith makeEvent() override; StatusWith onEvent(const EventHandle& event, const CallbackFn& work) override; void waitForEvent(const EventHandle& event) override; StatusWith scheduleWork(const CallbackFn& work) override; StatusWith scheduleWorkAt(Date_t when, const CallbackFn& work) override; StatusWith scheduleRemoteCommand( const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) override; void cancel(const CallbackHandle& cbHandle) override; void wait(const CallbackHandle& cbHandle) override; /** * Executes the run loop. May be called up to one time. * * Returns after the executor has been shutdown and is safe to delete. */ void run(); /** * Schedules DB "work" to be run by the executor.. * * Takes no locks for caller - global, database or collection. * * The "work" will run exclusively with other DB work items. All DB work items * are run the in order they are scheduled. * * The "work" may run concurrently with other non-DB work items, * but there are no ordering guarantees provided with respect to * any other work item. * * Returns a handle for waiting on or canceling the callback, or * ErrorCodes::ShutdownInProgress. * * May be called by client threads or callbacks running in the executor. */ StatusWith scheduleDBWork(const CallbackFn& work); /** * Schedules DB "work" to be run by the executor while holding the collection lock. * * Takes collection lock in specified mode (and slightly more permissive lock for the * database lock) but not the global exclusive lock. * * The "work" will run exclusively with other DB work items. All DB work items * are run the in order they are scheduled. * * The "work" may run concurrently with other non-DB work items, * but there are no ordering guarantees provided with respect to * any other work item. * * Returns a handle for waiting on or canceling the callback, or * ErrorCodes::ShutdownInProgress. * * May be called by client threads or callbacks running in the executor. */ StatusWith scheduleDBWork(const CallbackFn& work, const NamespaceString& nss, LockMode mode); /** * Schedules "work" to be run by the executor while holding the global exclusive lock. * * Takes collection lock in specified mode (and slightly more permissive lock for the * database lock) but not the global exclusive lock. * * The "work" will run exclusively, as though it were executed by the main * run loop, but there are no ordering guarantees provided with respect to * any other work item. * * Returns a handle for waiting on or canceling the callback, or * ErrorCodes::ShutdownInProgress. * * May be called by client threads or callbacks running in the executor. */ StatusWith scheduleWorkWithGlobalExclusiveLock( const CallbackFn& work); /** * Returns an int64_t generated by the prng with a max value of "limit". */ int64_t nextRandomInt64(int64_t limit); private: class Callback; class Event; struct WorkItem; friend class Callback; friend class Event; /** * A linked list of WorkItem objects. * * WorkItems get moved among lists by splicing iterators of work lists together, * not by copying underlying WorkItem objects. */ typedef stdx::list WorkQueue; /** * A linked list of EventHandles. */ typedef stdx::list EventList; /** * Returns diagnostic info */ std::string _getDiagnosticString_inlock() const; /** * Implementation of makeEvent() for use when _mutex is already held. */ StatusWith makeEvent_inlock(); /** * Implementation of signalEvent() for use when _mutex is already held. */ void signalEvent_inlock(const EventHandle&); /** * Gets a single piece of work to execute. * * If the "callback" member of the returned WorkItem is falsey, that is a signal * to the run loop to wait for shutdown. */ std::pair getWork(); /** * Marks as runnable any sleepers whose ready date has passed as of "now". * Returns the date when the next sleeper will be ready, or Date_t(~0ULL) if there are no * remaining sleepers. */ Date_t scheduleReadySleepers_inlock(Date_t now); /** * Enqueues "callback" into "queue". */ StatusWith enqueueWork_inlock(WorkQueue* queue, const CallbackFn& callback); /** * Notifies interested parties that shutdown has completed, if it has. */ void maybeNotifyShutdownComplete_inlock(); /** * Completes the shutdown process. Called by run(). */ void finishShutdown(); void _finishRemoteCommand( const RemoteCommandRequest& request, const StatusWith& response, const CallbackHandle& cbHandle, const uint64_t expectedHandleGeneration, const RemoteCommandCallbackFn& cb); /** * Executes the callback referenced by "cbHandle", and moves the underlying * WorkQueue::iterator from "workQueue" into the _freeQueue. * * "txn" is a pointer to the OperationContext. * * "status" is the callback status from the task runner. Only possible values are * Status::OK and ErrorCodes::CallbackCanceled (when task runner is canceled). * * If "terribleExLockSyncMutex" is not null, serializes execution of "cbHandle" with the * execution of other callbacks. */ void _doOperation(OperationContext* txn, const Status& taskRunnerStatus, const CallbackHandle& cbHandle, WorkQueue* workQueue, stdx::mutex* terribleExLockSyncMutex); /** * Wrapper around TaskExecutor::getCallbackFromHandle that return an Event* instead of * a generic EventState*. */ Event* _getEventFromHandle(const EventHandle& eventHandle); /** * Wrapper around TaskExecutor::getCallbackFromHandle that return an Event* instead of * a generic EventState*. */ Callback* _getCallbackFromHandle(const CallbackHandle& callbackHandle); // PRNG; seeded at class construction time. PseudoRandom _random; std::unique_ptr _networkInterface; std::unique_ptr _storageInterface; stdx::mutex _mutex; stdx::mutex _terribleExLockSyncMutex; stdx::condition_variable _noMoreWaitingThreads; WorkQueue _freeQueue; WorkQueue _readyQueue; WorkQueue _dbWorkInProgressQueue; WorkQueue _exclusiveLockInProgressQueue; WorkQueue _networkInProgressQueue; WorkQueue _sleepersQueue; EventList _unsignaledEvents; int64_t _totalEventWaiters; bool _inShutdown; OldThreadPool _dblockWorkers; TaskRunner _dblockTaskRunner; TaskRunner _dblockExclusiveLockTaskRunner; uint64_t _nextId; }; class ReplicationExecutor::Callback : public executor::TaskExecutor::CallbackState { friend class ReplicationExecutor; public: Callback(ReplicationExecutor* executor, const CallbackFn callbackFn, const WorkQueue::iterator& iter, const EventHandle& finishedEvent); virtual ~Callback(); void cancel() override; void waitForCompletion() override; private: ReplicationExecutor* _executor; // All members other than _executor are protected by the executor's _mutex. CallbackFn _callbackFn; bool _isCanceled; WorkQueue::iterator _iter; EventHandle _finishedEvent; }; typedef ReplicationExecutor::ResponseStatus ResponseStatus; /** * Description of a scheduled but not-yet-run work item. * * Once created, WorkItem objects remain in scope until the executor is destroyed. * However, over their lifetime, they may represent many different work items. This * divorces the lifetime of CallbackHandles from the lifetime of WorkItem objects, but * requires a unique generation identifier in CallbackHandles and WorkItem objects. * * WorkItem is copyable so that it may be stored in a list. However, in practice they * should only be copied by getWork() and when allocating new entries into a WorkQueue (not * when moving entries between work lists). */ struct ReplicationExecutor::WorkItem { WorkItem(); uint64_t generation; CallbackHandle callback; EventHandle finishedEvent; Date_t readyDate; bool isNetworkOperation; }; /** * Description of an event. * * Like WorkItem, above, but for events. On signaling, the executor removes the event from the * "unsignaled" EventList and schedules all work items in the _waiters list. */ class ReplicationExecutor::Event : public executor::TaskExecutor::EventState { friend class ReplicationExecutor; public: Event(ReplicationExecutor* executor, const EventList::iterator& iter); virtual ~Event(); void signal() override; void waitUntilSignaled() override; bool isSignaled() override; private: // Note that the caller is responsible for removing any references to any EventHandles // pointing to this event. void _signal_inlock(); ReplicationExecutor* _executor; // All members other than _executor are protected by the executor's _mutex. bool _isSignaled; stdx::condition_variable _isSignaledCondition; EventList::iterator _iter; WorkQueue _waiters; }; } // namespace repl } // namespace mongo