/** * Copyright (C) 2014 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #pragma once #include #include #include #include #include #include "mongo/base/disallow_copying.h" #include "mongo/base/status.h" #include "mongo/base/status_with.h" #include "mongo/base/string_data.h" #include "mongo/db/jsobj.h" #include "mongo/platform/compiler.h" #include "mongo/platform/random.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/list.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/time_support.h" namespace mongo { class OperationContext; namespace repl { /** * Event loop for driving state machines in replication. * * The event loop has notions of events and callbacks. * * Callbacks are function objects representing work to be performed in some sequential order by * the executor. They may be scheduled by client threads or by other callbacks. Methods that * schedule callbacks return a CallbackHandle if they are able to enqueue the callback in the * appropriate work queue. Every CallbackHandle represents an invocation of a function that * will happen before the executor returns from run(). Calling cancel(CallbackHandle) schedules * the specified callback to run with a flag indicating that it is "canceled," but it will run. * Client threads may block waiting for a callback to execute by calling wait(CallbackHandle). * * Events are level-triggered and may only be signaled one time. Client threads and callbacks * may schedule callbacks to be run by the executor after the event is signaled, and client * threads may ask the executor to block them until after the event is signaled. * * If an event is unsignaled when shutdown is called, the executor will ensure that any threads * blocked in waitForEvent() eventually return. * * Logically, Callbacks and Events exist for the life of the executor. That means that while * the executor is in scope, no CallbackHandle or EventHandle is stale. * * Usage: Instantiate an executor, schedule a work item, call run(). * * Implementation details: * * The executor is composed of several WorkQueues, which are queues of WorkItems. WorkItems * describe units of work -- a callback and state needed to track its lifecycle. The iterators * pointing to WorkItems are spliced between the WorkQueues, rather than copying WorkItems * themselves. Further, those WorkQueue::iterators are never invalidated during the life of an * executor. They may be recycled to represent new work items, but when that happens, a counter * on the WorkItem is incremented, to disambiguate. Handles referencing WorkQueue::iterators, * called CallbackHandles, are thus valid for the life of the executor, simplifying lifecycle * management. * * All work executed by the run() method of the executor is popped off the front of the * _readyQueue. Remote commands blocked on the network can be found in the * _networkInProgressQueue. Callbacks waiting for a timer to expire are in the _sleepersQueue. * When the network returns or the timer expires, items from these two queues are transferred to * the back of the _readyQueue. * * The _exclusiveLockInProgressQueue, which represents work items to execute while holding the * GlobalWrite lock, is exceptional. WorkItems in that queue execute in unspecified order with * respect to work in the _readyQueue or other WorkItems in the _exclusiveLockInProgressQueue, * but they are executed in a single serial order with respect to those other WorkItems. The * _terribleExLockSyncMutex is used to provide this serialization, until such time as the global * lock may be passed from one thread to another. * * Events work similiarly to WorkItems, and EventList is akin to WorkQueue. */ class ReplicationExecutor { MONGO_DISALLOW_COPYING(ReplicationExecutor); public: typedef boost::posix_time::milliseconds Milliseconds; struct CallbackData; class CallbackHandle; class EventHandle; class NetworkInterface; struct RemoteCommandCallbackData; struct RemoteCommandRequest; struct RemoteCommandResponse; typedef StatusWith ResponseStatus; static const Milliseconds kNoTimeout; static const Date_t kNoExpirationDate; /** * Type of a regular callback function. * * The status argument passed at invocation will have code ErrorCodes::CallbackCanceled if * the callback was canceled for any reason (including shutdown). Otherwise, it should have * Status::OK(). */ typedef stdx::function CallbackFn; /** * Type of a callback from a request to run a command on a remote MongoDB node. * * The StatusWith will have ErrorCodes::CallbackCanceled if the callback was * canceled. Otherwise, its status will represent any failure to execute the command. * If the command executed and a response came back, then the status object will contain * the BSONObj returned by the command, with the "ok" field indicating the success of the * command in the usual way. */ typedef stdx::function RemoteCommandCallbackFn; /** * Constructs a new executor. * * Takes ownership of the passed NetworkInterface object. */ explicit ReplicationExecutor(NetworkInterface* netInterface, int64_t pnrgSeed); /** * Destroys an executor. */ ~ReplicationExecutor(); /** * Returns diagnostic information. */ std::string getDiagnosticString(); /** * Gets the current time as reported by the network interface. */ Date_t now(); /** * Executes the run loop. May be called up to one time. * * Returns after the executor has been shutdown and is safe to delete. */ void run(); /** * Signals to the executor that it should shut down. The only reliable indication * that shutdown has completed is that the run() method returns. * * May be called by client threads or callbacks running in the executor. */ void shutdown(); /** * Creates a new event. Returns a handle to the event, or ErrorCodes::ShutdownInProgress if * makeEvent() fails because the executor is shutting down. * * May be called by client threads or callbacks running in the executor. */ StatusWith makeEvent(); /** * Signals the event, making waiting client threads and callbacks runnable. * * May be called up to one time per event. * * May be called by client threads or callbacks running in the executor. */ void signalEvent(const EventHandle&); /** * Schedules a callback, "work", to run after "event" is signaled. If "event" * has already been signaled, marks "work" as immediately runnable. * * If "event" has yet to be signaled when "shutdown()" is called, "work" will * be scheduled with a status of ErrorCodes::CallbackCanceled. * * May be called by client threads or callbacks running in the executor. */ StatusWith onEvent(const EventHandle& event, const CallbackFn& work); /** * Blocks the calling thread until after "event" is signaled. Also returns * if the event is never signaled but shutdown() is called on the executor. * * NOTE: Do not call from a callback running in the executor. * * TODO(schwerin): Change return type so that the caller can know which of the two reasons * led to this method returning. */ void waitForEvent(const EventHandle& event); /** * Schedules "work" to be run by the executor ASAP. * * Returns a handle for waiting on or canceling the callback, or * ErrorCodes::ShutdownInProgress. * * May be called by client threads or callbacks running in the executor. */ StatusWith scheduleWork(const CallbackFn& work); /** * Schedules "work" to be run by the executor no sooner than "when". * * Returns a handle for waiting on or canceling the callback, or * ErrorCodes::ShutdownInProgress. * * May be called by client threads or callbacks running in the executor. */ StatusWith scheduleWorkAt(Date_t when, const CallbackFn& work); /** * Schedules "work" to be run by the executor while holding the global exclusive lock. * * The "work" will run exclusively, as though it were executed by the main * run loop, but there are no ordering guarantees provided with respect to * any other work item. * * Returns a handle for waiting on or canceling the callback, or * ErrorCodes::ShutdownInProgress. * * May be called by client threads or callbacks running in the executor. */ StatusWith scheduleWorkWithGlobalExclusiveLock( const CallbackFn& work); /** * Schedules "cb" to be run by the executor with the result of executing the remote command * described by "request". * * Returns a handle for waiting on or canceling the callback, or * ErrorCodes::ShutdownInProgress. * * May be called by client threads or callbacks running in the executor. */ StatusWith scheduleRemoteCommand( const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb); /** * If the callback referenced by "cbHandle" hasn't already executed, marks it as * canceled and runnable. * * May be called by client threads or callbacks running in the executor. */ void cancel(const CallbackHandle& cbHandle); /** * Blocks until the executor finishes running the callback referenced by "cbHandle". * * Becaue callbacks all run during shutdown if they weren't run beforehand, there is no need * to indicate the reason for returning from wait(CallbackHandle). It is always that the * callback ran. * * NOTE: Do not call from a callback running in the executor. */ void wait(const CallbackHandle& cbHandle); /** * Returns an int64_t generated by the prng with a max value of "limit". */ int64_t nextRandomInt64(int64_t limit); private: struct Event; struct WorkItem; /** * A linked list of WorkItem objects. * * WorkItems get moved among lists by splicing iterators of work lists together, * not by copying underlying WorkItem objects. */ typedef stdx::list WorkQueue; /** * A linked list of Event objects, like WorkQueue, above. */ typedef stdx::list EventList; /** * Returns diagnostic info */ std::string _getDiagnosticString_inlock() const; /** * Implementation of makeEvent() for use when _mutex is already held. */ StatusWith makeEvent_inlock(); /** * Gets a single piece of work to execute. * * If the "callback" member of the returned WorkItem is falsey, that is a signal * to the run loop to wait for shutdown. */ std::pair getWork(); /** * Marks as runnable any sleepers whose ready date has passed as of "now". * Returns the date when the next sleeper will be ready, or Date_t(~0ULL) if there are no * remaining sleepers. */ Date_t scheduleReadySleepers_inlock(Date_t now); /** * Enqueues "callback" into "queue". * * Assumes that "queue" is sorted by readyDate, and performs insertion sort, starting * at the back of the "queue" working toward the front. * * Use Date_t(0) for readyDate to mean "ready now". */ StatusWith enqueueWork_inlock(WorkQueue* queue, const CallbackFn& callback); /** * Implementation of signalEvent() that assumes the caller owns _mutex. */ void signalEvent_inlock(const EventHandle& event); /** * Notifies interested parties that shutdown has completed, if it has. */ void maybeNotifyShutdownComplete_inlock(); /** * Completes the shutdown process. Called by run(). */ void finishShutdown(); void _finishRemoteCommand( const RemoteCommandRequest& request, const StatusWith& response, const CallbackHandle& cbHandle, const uint64_t expectedHandleGeneration, const RemoteCommandCallbackFn& cb); /** * Executes the callback referenced by "cbHandle", and moves the underlying * WorkQueue::iterator into the _freeQueue. "txn" is a pointer to the OperationContext * owning the global exclusive lock. * * Serializes execution of "cbHandle" with the execution of other callbacks. */ void doOperationWithGlobalExclusiveLock(OperationContext* txn, const CallbackHandle& cbHandle); // PRNG; seeded at class construction time. PseudoRandom _random; boost::scoped_ptr _networkInterface; boost::mutex _mutex; boost::mutex _terribleExLockSyncMutex; boost::condition_variable _noMoreWaitingThreads; WorkQueue _freeQueue; WorkQueue _readyQueue; WorkQueue _exclusiveLockInProgressQueue; WorkQueue _networkInProgressQueue; WorkQueue _sleepersQueue; EventList _unsignaledEvents; EventList _signaledEvents; int64_t _totalEventWaiters; bool _inShutdown; threadpool::ThreadPool _dblockWorkers; uint64_t _nextId; }; /** * Reference to an event object in the executor. */ class ReplicationExecutor::EventHandle { friend class ReplicationExecutor; public: EventHandle() : _generation(0), _id(0) {} /** * Returns true if the handle is valid, meaning that it identifies */ bool isValid() const { return _id != 0; } bool operator==(const EventHandle &other) const { return (_id == other._id); } bool operator!=(const EventHandle &other) const { return !(*this == other); } private: EventHandle(const EventList::iterator& iter, const uint64_t id); EventList::iterator _iter; uint64_t _generation; uint64_t _id; }; /** * Reference to a scheduled callback. */ class ReplicationExecutor::CallbackHandle { friend class ReplicationExecutor; public: CallbackHandle() : _generation(0) {} bool isValid() const { return _finishedEvent.isValid(); } bool operator==(const CallbackHandle &other) const { return (_finishedEvent == other._finishedEvent); } bool operator!=(const CallbackHandle &other) const { return !(*this == other); } private: explicit CallbackHandle(const WorkQueue::iterator& iter); WorkQueue::iterator _iter; uint64_t _generation; EventHandle _finishedEvent; }; struct ReplicationExecutor::CallbackData { CallbackData(ReplicationExecutor* theExecutor, const CallbackHandle& theHandle, const Status& theStatus, OperationContext* txn = NULL); ReplicationExecutor* executor; CallbackHandle myHandle; Status status; OperationContext* txn; }; /** * Type of object describing a command to execute against a remote MongoDB node. */ struct ReplicationExecutor::RemoteCommandRequest { RemoteCommandRequest(); RemoteCommandRequest(const HostAndPort& theTarget, const std::string& theDbName, const BSONObj& theCmdObj, const Milliseconds timeoutMillis = kNoTimeout); // Returns diagnostic info. std::string getDiagnosticString(); HostAndPort target; std::string dbname; BSONObj cmdObj; Milliseconds timeout; Date_t expirationDate; // Set by scheduleRemoteCommand. }; struct ReplicationExecutor::RemoteCommandResponse { RemoteCommandResponse() : data(), elapsedMillis(Milliseconds(0)) {} RemoteCommandResponse(BSONObj obj, Milliseconds millis) : data(obj), elapsedMillis(millis) {} BSONObj data; Milliseconds elapsedMillis; }; /** * Interface to networking and lock manager. */ class ReplicationExecutor::NetworkInterface { MONGO_DISALLOW_COPYING(NetworkInterface); public: typedef RemoteCommandResponse Response; typedef stdx::function RemoteCommandCompletionFn; virtual ~NetworkInterface(); /** * Returns diagnostic info. */ virtual std::string getDiagnosticString() = 0; /** * Starts up the network interface. * * It is valid to call all methods except shutdown() before this method completes. That is, * implementations may not assume that startup() completes before startCommand() first * executes. * * Called by the owning ReplicationExecutor inside its run() method. */ virtual void startup() = 0; /** * Shuts down the network interface. Must be called before this instance gets deleted, * if startup() is called. * * Called by the owning ReplicationExecutor inside its run() method. */ virtual void shutdown() = 0; /** * Blocks the current thread (presumably the executor thread) until the network interface * knows of work for the executor to perform. */ virtual void waitForWork() = 0; /** * Similar to waitForWork, but only blocks until "when". */ virtual void waitForWorkUntil(Date_t when) = 0; /** * Signals to the network interface that there is new work (such as a signaled event) for * the executor to process. Wakes the executor from waitForWork() and friends. */ virtual void signalWorkAvailable() = 0; /** * Returns the current time. */ virtual Date_t now() = 0; /** * Starts asynchronous execution of the command described by "request". */ virtual void startCommand(const CallbackHandle& cbHandle, const RemoteCommandRequest& request, const RemoteCommandCompletionFn& onFinish) = 0; /** * Requests cancelation of the network activity associated with "cbHandle" if it has not yet * completed. */ virtual void cancelCommand(const CallbackHandle& cbHandle) = 0; /** * Runs the given callback while holding the global exclusive lock. */ virtual void runCallbackWithGlobalExclusiveLock( const stdx::function& callback) = 0; protected: NetworkInterface(); }; typedef ReplicationExecutor::ResponseStatus ResponseStatus; // Must be after NetworkInterface class struct ReplicationExecutor::RemoteCommandCallbackData { RemoteCommandCallbackData(ReplicationExecutor* theExecutor, const CallbackHandle& theHandle, const RemoteCommandRequest& theRequest, const StatusWith& theResponse); ReplicationExecutor* executor; CallbackHandle myHandle; RemoteCommandRequest request; StatusWith response; }; /** * Description of a scheduled but not-yet-run work item. * * Once created, WorkItem objects remain in scope until the executor is destroyed. * However, over their lifetime, they may represent many different work items. This * divorces the lifetime of CallbackHandles from the lifetime of WorkItem objects, but * requires a unique generation identifier in CallbackHandles and WorkItem objects. * * WorkItem is copyable so that it may be stored in a list. However, in practice they * should only be copied by getWork() and when allocating new entries into a WorkQueue (not * when moving entries between work lists). */ struct ReplicationExecutor::WorkItem { WorkItem(); uint64_t generation; CallbackFn callback; EventHandle finishedEvent; Date_t readyDate; bool isNetworkOperation; bool isCanceled; }; /** * Description of an unsignaled event. * * Like WorkItem, above, but for events. On signaling, the executor bumps the * generation, marks all waiters as runnable, and moves the event from the "unsignaled" * EventList to the "signaled" EventList, the latter being a free list of events. */ struct ReplicationExecutor::Event { Event(); uint64_t generation; bool isSignaled; WorkQueue waiters; boost::shared_ptr isSignaledCondition; }; } // namespace repl } // namespace mongo