diff options
Diffstat (limited to 'src/mongo/db/repl/replication_executor.h')
-rw-r--r-- | src/mongo/db/repl/replication_executor.h | 638 |
1 files changed, 315 insertions, 323 deletions
diff --git a/src/mongo/db/repl/replication_executor.h b/src/mongo/db/repl/replication_executor.h index a6749bb26a4..0dfa97f9cf0 100644 --- a/src/mongo/db/repl/replication_executor.h +++ b/src/mongo/db/repl/replication_executor.h @@ -51,357 +51,349 @@ namespace mongo { - class NamespaceString; - class OperationContext; +class NamespaceString; +class OperationContext; -namespace executor{ - class NetworkInterface; -} // namespace executor +namespace executor { +class NetworkInterface; +} // namespace executor namespace repl { - class StorageInterface; +class StorageInterface; + +/** + * Implementation of the TaskExecutor interface for providing an event loop for driving state + * machines in replication. + * + * Usage: Instantiate an executor, schedule a work item, call run(). + * + * Implementation details: + * + * The executor is composed of several WorkQueues, which are queues of WorkItems. WorkItems + * describe units of work -- a callback and state needed to track its lifecycle. The iterators + * pointing to WorkItems are spliced between the WorkQueues, rather than copying WorkItems + * themselves. Further, those WorkQueue::iterators are never invalidated during the life of an + * executor. They may be recycled to represent new work items, but when that happens, a counter + * on the WorkItem is incremented, to disambiguate. + * + * All work executed by the run() method of the executor is popped off the front of the + * _readyQueue. Remote commands blocked on the network can be found in the + * _networkInProgressQueue. Callbacks waiting for a timer to expire are in the _sleepersQueue. + * When the network returns or the timer expires, items from these two queues are transferred to + * the back of the _readyQueue. + * + * The _exclusiveLockInProgressQueue, which represents work items to execute while holding the + * GlobalWrite lock, is exceptional. WorkItems in that queue execute in unspecified order with + * respect to work in the _readyQueue or other WorkItems in the _exclusiveLockInProgressQueue, + * but they are executed in a single serial order with respect to those other WorkItems. The + * _terribleExLockSyncMutex is used to provide this serialization, until such time as the global + * lock may be passed from one thread to another. + */ +class ReplicationExecutor final : public executor::TaskExecutor { + MONGO_DISALLOW_COPYING(ReplicationExecutor); + +public: + /** + * Constructs a new executor. + * + * Takes ownership of the passed NetworkInterface object. + */ + ReplicationExecutor(executor::NetworkInterface* netInterface, + StorageInterface* storageInterface, + int64_t pnrgSeed); + + /** + * Destroys an executor. + */ + virtual ~ReplicationExecutor(); + + std::string getDiagnosticString() override; + Date_t now() override; + void shutdown() override; + void signalEvent(const EventHandle& event) override; + StatusWith<EventHandle> makeEvent() override; + StatusWith<CallbackHandle> onEvent(const EventHandle& event, const CallbackFn& work) override; + void waitForEvent(const EventHandle& event) override; + StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override; + StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override; + StatusWith<CallbackHandle> scheduleRemoteCommand(const RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb) override; + void cancel(const CallbackHandle& cbHandle) override; + void wait(const CallbackHandle& cbHandle) override; + + + /** + * Executes the run loop. May be called up to one time. + * + * Returns after the executor has been shutdown and is safe to delete. + */ + void run(); /** - * Implementation of the TaskExecutor interface for providing an event loop for driving state - * machines in replication. + * Schedules DB "work" to be run by the executor.. * - * Usage: Instantiate an executor, schedule a work item, call run(). + * Takes no locks for caller - global, database or collection. * - * Implementation details: + * The "work" will run exclusively with other DB work items. All DB work items + * are run the in order they are scheduled. * - * The executor is composed of several WorkQueues, which are queues of WorkItems. WorkItems - * describe units of work -- a callback and state needed to track its lifecycle. The iterators - * pointing to WorkItems are spliced between the WorkQueues, rather than copying WorkItems - * themselves. Further, those WorkQueue::iterators are never invalidated during the life of an - * executor. They may be recycled to represent new work items, but when that happens, a counter - * on the WorkItem is incremented, to disambiguate. + * The "work" may run concurrently with other non-DB work items, + * but there are no ordering guarantees provided with respect to + * any other work item. * - * All work executed by the run() method of the executor is popped off the front of the - * _readyQueue. Remote commands blocked on the network can be found in the - * _networkInProgressQueue. Callbacks waiting for a timer to expire are in the _sleepersQueue. - * When the network returns or the timer expires, items from these two queues are transferred to - * the back of the _readyQueue. + * Returns a handle for waiting on or canceling the callback, or + * ErrorCodes::ShutdownInProgress. * - * The _exclusiveLockInProgressQueue, which represents work items to execute while holding the - * GlobalWrite lock, is exceptional. WorkItems in that queue execute in unspecified order with - * respect to work in the _readyQueue or other WorkItems in the _exclusiveLockInProgressQueue, - * but they are executed in a single serial order with respect to those other WorkItems. The - * _terribleExLockSyncMutex is used to provide this serialization, until such time as the global - * lock may be passed from one thread to another. + * May be called by client threads or callbacks running in the executor. */ - class ReplicationExecutor final : public executor::TaskExecutor { - MONGO_DISALLOW_COPYING(ReplicationExecutor); - public: - - /** - * Constructs a new executor. - * - * Takes ownership of the passed NetworkInterface object. - */ - ReplicationExecutor(executor::NetworkInterface* netInterface, - StorageInterface* storageInterface, - int64_t pnrgSeed); - - /** - * Destroys an executor. - */ - virtual ~ReplicationExecutor(); - - std::string getDiagnosticString() override; - Date_t now() override; - void shutdown() override; - void signalEvent(const EventHandle& event) override; - StatusWith<EventHandle> makeEvent() override; - StatusWith<CallbackHandle> onEvent(const EventHandle& event, - const CallbackFn& work) override; - void waitForEvent(const EventHandle& event) override; - StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override; - StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override; - StatusWith<CallbackHandle> scheduleRemoteCommand( - const RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb) override; - void cancel(const CallbackHandle& cbHandle) override; - void wait(const CallbackHandle& cbHandle) override; - - - /** - * Executes the run loop. May be called up to one time. - * - * Returns after the executor has been shutdown and is safe to delete. - */ - void run(); - - /** - * Schedules DB "work" to be run by the executor.. - * - * Takes no locks for caller - global, database or collection. - * - * The "work" will run exclusively with other DB work items. All DB work items - * are run the in order they are scheduled. - * - * The "work" may run concurrently with other non-DB work items, - * but there are no ordering guarantees provided with respect to - * any other work item. - * - * Returns a handle for waiting on or canceling the callback, or - * ErrorCodes::ShutdownInProgress. - * - * May be called by client threads or callbacks running in the executor. - */ - StatusWith<CallbackHandle> scheduleDBWork(const CallbackFn& work); - - /** - * Schedules DB "work" to be run by the executor while holding the collection lock. - * - * Takes collection lock in specified mode (and slightly more permissive lock for the - * database lock) but not the global exclusive lock. - * - * The "work" will run exclusively with other DB work items. All DB work items - * are run the in order they are scheduled. - * - * The "work" may run concurrently with other non-DB work items, - * but there are no ordering guarantees provided with respect to - * any other work item. - * - * Returns a handle for waiting on or canceling the callback, or - * ErrorCodes::ShutdownInProgress. - * - * May be called by client threads or callbacks running in the executor. - */ - StatusWith<CallbackHandle> scheduleDBWork(const CallbackFn& work, - const NamespaceString& nss, - LockMode mode); - - /** - * Schedules "work" to be run by the executor while holding the global exclusive lock. - * - * Takes collection lock in specified mode (and slightly more permissive lock for the - * database lock) but not the global exclusive lock. - * - * The "work" will run exclusively, as though it were executed by the main - * run loop, but there are no ordering guarantees provided with respect to - * any other work item. - * - * Returns a handle for waiting on or canceling the callback, or - * ErrorCodes::ShutdownInProgress. - * - * May be called by client threads or callbacks running in the executor. - */ - StatusWith<CallbackHandle> scheduleWorkWithGlobalExclusiveLock( - const CallbackFn& work); - - /** - * Returns an int64_t generated by the prng with a max value of "limit". - */ - int64_t nextRandomInt64(int64_t limit); - - private: - class Callback; - class Event; - struct WorkItem; - friend class Callback; - friend class Event; - - - /** - * A linked list of WorkItem objects. - * - * WorkItems get moved among lists by splicing iterators of work lists together, - * not by copying underlying WorkItem objects. - */ - typedef stdx::list<WorkItem> WorkQueue; - - /** - * A linked list of EventHandles. - */ - typedef stdx::list<EventHandle> EventList; - - /** - * Returns diagnostic info - */ - std::string _getDiagnosticString_inlock() const; - - /** - * Implementation of makeEvent() for use when _mutex is already held. - */ - StatusWith<EventHandle> makeEvent_inlock(); - - /** - * Implementation of signalEvent() for use when _mutex is already held. - */ - void signalEvent_inlock(const EventHandle&); - - /** - * Gets a single piece of work to execute. - * - * If the "callback" member of the returned WorkItem is falsey, that is a signal - * to the run loop to wait for shutdown. - */ - std::pair<WorkItem, CallbackHandle> getWork(); - - /** - * Marks as runnable any sleepers whose ready date has passed as of "now". - * Returns the date when the next sleeper will be ready, or Date_t(~0ULL) if there are no - * remaining sleepers. - */ - Date_t scheduleReadySleepers_inlock(Date_t now); - - /** - * Enqueues "callback" into "queue". - */ - StatusWith<CallbackHandle> enqueueWork_inlock(WorkQueue* queue, const CallbackFn& callback); - - /** - * Notifies interested parties that shutdown has completed, if it has. - */ - void maybeNotifyShutdownComplete_inlock(); - - /** - * Completes the shutdown process. Called by run(). - */ - void finishShutdown(); - - void _finishRemoteCommand( - const RemoteCommandRequest& request, - const StatusWith<RemoteCommandResponse>& response, - const CallbackHandle& cbHandle, - const uint64_t expectedHandleGeneration, - const RemoteCommandCallbackFn& cb); - - /** - * Executes the callback referenced by "cbHandle", and moves the underlying - * WorkQueue::iterator from "workQueue" into the _freeQueue. - * - * "txn" is a pointer to the OperationContext. - * - * "status" is the callback status from the task runner. Only possible values are - * Status::OK and ErrorCodes::CallbackCanceled (when task runner is canceled). - * - * If "terribleExLockSyncMutex" is not null, serializes execution of "cbHandle" with the - * execution of other callbacks. - */ - void _doOperation(OperationContext* txn, - const Status& taskRunnerStatus, - const CallbackHandle& cbHandle, - WorkQueue* workQueue, - stdx::mutex* terribleExLockSyncMutex); - - /** - * Wrapper around TaskExecutor::getCallbackFromHandle that return an Event* instead of - * a generic EventState*. - */ - Event* _getEventFromHandle(const EventHandle& eventHandle); - - /** - * Wrapper around TaskExecutor::getCallbackFromHandle that return an Event* instead of - * a generic EventState*. - */ - Callback* _getCallbackFromHandle(const CallbackHandle& callbackHandle); - - // PRNG; seeded at class construction time. - PseudoRandom _random; - - std::unique_ptr<executor::NetworkInterface> _networkInterface; - std::unique_ptr<StorageInterface> _storageInterface; - stdx::mutex _mutex; - stdx::mutex _terribleExLockSyncMutex; - stdx::condition_variable _noMoreWaitingThreads; - WorkQueue _freeQueue; - WorkQueue _readyQueue; - WorkQueue _dbWorkInProgressQueue; - WorkQueue _exclusiveLockInProgressQueue; - WorkQueue _networkInProgressQueue; - WorkQueue _sleepersQueue; - EventList _unsignaledEvents; - int64_t _totalEventWaiters; - bool _inShutdown; - OldThreadPool _dblockWorkers; - TaskRunner _dblockTaskRunner; - TaskRunner _dblockExclusiveLockTaskRunner; - uint64_t _nextId; - }; - - class ReplicationExecutor::Callback : public executor::TaskExecutor::CallbackState { - friend class ReplicationExecutor; - - public: - - Callback(ReplicationExecutor* executor, - const CallbackFn callbackFn, - const WorkQueue::iterator& iter, - const EventHandle& finishedEvent); - virtual ~Callback(); - - void cancel() override; - void waitForCompletion() override; - - private: - - ReplicationExecutor* _executor; - - // All members other than _executor are protected by the executor's _mutex. - CallbackFn _callbackFn; - bool _isCanceled; - WorkQueue::iterator _iter; - EventHandle _finishedEvent; - }; - - typedef ReplicationExecutor::ResponseStatus ResponseStatus; + StatusWith<CallbackHandle> scheduleDBWork(const CallbackFn& work); /** - * Description of a scheduled but not-yet-run work item. + * Schedules DB "work" to be run by the executor while holding the collection lock. * - * Once created, WorkItem objects remain in scope until the executor is destroyed. - * However, over their lifetime, they may represent many different work items. This - * divorces the lifetime of CallbackHandles from the lifetime of WorkItem objects, but - * requires a unique generation identifier in CallbackHandles and WorkItem objects. + * Takes collection lock in specified mode (and slightly more permissive lock for the + * database lock) but not the global exclusive lock. + * + * The "work" will run exclusively with other DB work items. All DB work items + * are run the in order they are scheduled. + * + * The "work" may run concurrently with other non-DB work items, + * but there are no ordering guarantees provided with respect to + * any other work item. + * + * Returns a handle for waiting on or canceling the callback, or + * ErrorCodes::ShutdownInProgress. + * + * May be called by client threads or callbacks running in the executor. + */ + StatusWith<CallbackHandle> scheduleDBWork(const CallbackFn& work, + const NamespaceString& nss, + LockMode mode); + + /** + * Schedules "work" to be run by the executor while holding the global exclusive lock. + * + * Takes collection lock in specified mode (and slightly more permissive lock for the + * database lock) but not the global exclusive lock. + * + * The "work" will run exclusively, as though it were executed by the main + * run loop, but there are no ordering guarantees provided with respect to + * any other work item. + * + * Returns a handle for waiting on or canceling the callback, or + * ErrorCodes::ShutdownInProgress. + * + * May be called by client threads or callbacks running in the executor. + */ + StatusWith<CallbackHandle> scheduleWorkWithGlobalExclusiveLock(const CallbackFn& work); + + /** + * Returns an int64_t generated by the prng with a max value of "limit". + */ + int64_t nextRandomInt64(int64_t limit); + +private: + class Callback; + class Event; + struct WorkItem; + friend class Callback; + friend class Event; + + + /** + * A linked list of WorkItem objects. * - * WorkItem is copyable so that it may be stored in a list. However, in practice they - * should only be copied by getWork() and when allocating new entries into a WorkQueue (not - * when moving entries between work lists). + * WorkItems get moved among lists by splicing iterators of work lists together, + * not by copying underlying WorkItem objects. */ - struct ReplicationExecutor::WorkItem { - WorkItem(); - uint64_t generation; - CallbackHandle callback; - EventHandle finishedEvent; - Date_t readyDate; - bool isNetworkOperation; - }; + typedef stdx::list<WorkItem> WorkQueue; /** - * Description of an event. + * A linked list of EventHandles. + */ + typedef stdx::list<EventHandle> EventList; + + /** + * Returns diagnostic info + */ + std::string _getDiagnosticString_inlock() const; + + /** + * Implementation of makeEvent() for use when _mutex is already held. + */ + StatusWith<EventHandle> makeEvent_inlock(); + + /** + * Implementation of signalEvent() for use when _mutex is already held. + */ + void signalEvent_inlock(const EventHandle&); + + /** + * Gets a single piece of work to execute. * - * Like WorkItem, above, but for events. On signaling, the executor removes the event from the - * "unsignaled" EventList and schedules all work items in the _waiters list. + * If the "callback" member of the returned WorkItem is falsey, that is a signal + * to the run loop to wait for shutdown. */ - class ReplicationExecutor::Event : public executor::TaskExecutor::EventState { - friend class ReplicationExecutor; + std::pair<WorkItem, CallbackHandle> getWork(); - public: + /** + * Marks as runnable any sleepers whose ready date has passed as of "now". + * Returns the date when the next sleeper will be ready, or Date_t(~0ULL) if there are no + * remaining sleepers. + */ + Date_t scheduleReadySleepers_inlock(Date_t now); - Event(ReplicationExecutor* executor, const EventList::iterator& iter); - virtual ~Event(); + /** + * Enqueues "callback" into "queue". + */ + StatusWith<CallbackHandle> enqueueWork_inlock(WorkQueue* queue, const CallbackFn& callback); - void signal() override; - void waitUntilSignaled() override; - bool isSignaled() override; + /** + * Notifies interested parties that shutdown has completed, if it has. + */ + void maybeNotifyShutdownComplete_inlock(); - private: + /** + * Completes the shutdown process. Called by run(). + */ + void finishShutdown(); - // Note that the caller is responsible for removing any references to any EventHandles - // pointing to this event. - void _signal_inlock(); + void _finishRemoteCommand(const RemoteCommandRequest& request, + const StatusWith<RemoteCommandResponse>& response, + const CallbackHandle& cbHandle, + const uint64_t expectedHandleGeneration, + const RemoteCommandCallbackFn& cb); - ReplicationExecutor* _executor; + /** + * Executes the callback referenced by "cbHandle", and moves the underlying + * WorkQueue::iterator from "workQueue" into the _freeQueue. + * + * "txn" is a pointer to the OperationContext. + * + * "status" is the callback status from the task runner. Only possible values are + * Status::OK and ErrorCodes::CallbackCanceled (when task runner is canceled). + * + * If "terribleExLockSyncMutex" is not null, serializes execution of "cbHandle" with the + * execution of other callbacks. + */ + void _doOperation(OperationContext* txn, + const Status& taskRunnerStatus, + const CallbackHandle& cbHandle, + WorkQueue* workQueue, + stdx::mutex* terribleExLockSyncMutex); - // All members other than _executor are protected by the executor's _mutex. - bool _isSignaled; - stdx::condition_variable _isSignaledCondition; - EventList::iterator _iter; - WorkQueue _waiters; - }; + /** + * Wrapper around TaskExecutor::getCallbackFromHandle that return an Event* instead of + * a generic EventState*. + */ + Event* _getEventFromHandle(const EventHandle& eventHandle); + + /** + * Wrapper around TaskExecutor::getCallbackFromHandle that return an Event* instead of + * a generic EventState*. + */ + Callback* _getCallbackFromHandle(const CallbackHandle& callbackHandle); + + // PRNG; seeded at class construction time. + PseudoRandom _random; + + std::unique_ptr<executor::NetworkInterface> _networkInterface; + std::unique_ptr<StorageInterface> _storageInterface; + stdx::mutex _mutex; + stdx::mutex _terribleExLockSyncMutex; + stdx::condition_variable _noMoreWaitingThreads; + WorkQueue _freeQueue; + WorkQueue _readyQueue; + WorkQueue _dbWorkInProgressQueue; + WorkQueue _exclusiveLockInProgressQueue; + WorkQueue _networkInProgressQueue; + WorkQueue _sleepersQueue; + EventList _unsignaledEvents; + int64_t _totalEventWaiters; + bool _inShutdown; + OldThreadPool _dblockWorkers; + TaskRunner _dblockTaskRunner; + TaskRunner _dblockExclusiveLockTaskRunner; + uint64_t _nextId; +}; + +class ReplicationExecutor::Callback : public executor::TaskExecutor::CallbackState { + friend class ReplicationExecutor; + +public: + Callback(ReplicationExecutor* executor, + const CallbackFn callbackFn, + const WorkQueue::iterator& iter, + const EventHandle& finishedEvent); + virtual ~Callback(); + + void cancel() override; + void waitForCompletion() override; + +private: + ReplicationExecutor* _executor; + + // All members other than _executor are protected by the executor's _mutex. + CallbackFn _callbackFn; + bool _isCanceled; + WorkQueue::iterator _iter; + EventHandle _finishedEvent; +}; + +typedef ReplicationExecutor::ResponseStatus ResponseStatus; + +/** + * Description of a scheduled but not-yet-run work item. + * + * Once created, WorkItem objects remain in scope until the executor is destroyed. + * However, over their lifetime, they may represent many different work items. This + * divorces the lifetime of CallbackHandles from the lifetime of WorkItem objects, but + * requires a unique generation identifier in CallbackHandles and WorkItem objects. + * + * WorkItem is copyable so that it may be stored in a list. However, in practice they + * should only be copied by getWork() and when allocating new entries into a WorkQueue (not + * when moving entries between work lists). + */ +struct ReplicationExecutor::WorkItem { + WorkItem(); + uint64_t generation; + CallbackHandle callback; + EventHandle finishedEvent; + Date_t readyDate; + bool isNetworkOperation; +}; + +/** + * Description of an event. + * + * Like WorkItem, above, but for events. On signaling, the executor removes the event from the + * "unsignaled" EventList and schedules all work items in the _waiters list. + */ +class ReplicationExecutor::Event : public executor::TaskExecutor::EventState { + friend class ReplicationExecutor; + +public: + Event(ReplicationExecutor* executor, const EventList::iterator& iter); + virtual ~Event(); + + void signal() override; + void waitUntilSignaled() override; + bool isSignaled() override; + +private: + // Note that the caller is responsible for removing any references to any EventHandles + // pointing to this event. + void _signal_inlock(); + + ReplicationExecutor* _executor; + + // All members other than _executor are protected by the executor's _mutex. + bool _isSignaled; + stdx::condition_variable _isSignaledCondition; + EventList::iterator _iter; + WorkQueue _waiters; +}; } // namespace repl } // namespace mongo |