diff options
Diffstat (limited to 'src/mongo/executor/task_executor.h')
-rw-r--r-- | src/mongo/executor/task_executor.h | 542 |
1 files changed, 266 insertions, 276 deletions
diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h index 5f42ebeee08..46e195bd7a9 100644 --- a/src/mongo/executor/task_executor.h +++ b/src/mongo/executor/task_executor.h @@ -41,328 +41,318 @@ namespace mongo { - class OperationContext; +class OperationContext; namespace executor { +/** + * Generic event loop with notions of events and callbacks. + * + * Callbacks represent work to be performed by the executor. + * They may be scheduled by client threads or by other callbacks. Methods that + * schedule callbacks return a CallbackHandle if they are able to enqueue the callback in the + * appropriate work queue. Every CallbackHandle represents an invocation of a function that + * will happen before the executor goes out of scope. Calling cancel(CallbackHandle) schedules + * the specified callback to run with a flag indicating that it is "canceled," but it will run. + * Client threads may block waiting for a callback to execute by calling wait(CallbackHandle). + * + * Events are level-triggered and may only be signaled one time. Client threads and callbacks + * may schedule callbacks to be run by the executor after the event is signaled, and client + * threads may ask the executor to block them until after the event is signaled. + * + * If an event is unsignaled when shutdown is called, the executor will ensure that any threads + * blocked in waitForEvent() eventually return. + * + * Logically, Callbacks and Events exist for the life of the executor. That means that while + * the executor is in scope, no CallbackHandle or EventHandle is stale. + */ +class TaskExecutor { + MONGO_DISALLOW_COPYING(TaskExecutor); + +public: + struct CallbackArgs; + struct RemoteCommandCallbackArgs; + class CallbackState; + class CallbackHandle; + class EventState; + class EventHandle; + + using ResponseStatus = StatusWith<RemoteCommandResponse>; + /** - * Generic event loop with notions of events and callbacks. + * Type of a regular callback function. * - * Callbacks represent work to be performed by the executor. - * They may be scheduled by client threads or by other callbacks. Methods that - * schedule callbacks return a CallbackHandle if they are able to enqueue the callback in the - * appropriate work queue. Every CallbackHandle represents an invocation of a function that - * will happen before the executor goes out of scope. Calling cancel(CallbackHandle) schedules - * the specified callback to run with a flag indicating that it is "canceled," but it will run. - * Client threads may block waiting for a callback to execute by calling wait(CallbackHandle). + * The status argument passed at invocation will have code ErrorCodes::CallbackCanceled if + * the callback was canceled for any reason (including shutdown). Otherwise, it should have + * Status::OK(). + */ + using CallbackFn = stdx::function<void(const CallbackArgs&)>; + + /** + * Type of a callback from a request to run a command on a remote MongoDB node. * - * Events are level-triggered and may only be signaled one time. Client threads and callbacks - * may schedule callbacks to be run by the executor after the event is signaled, and client - * threads may ask the executor to block them until after the event is signaled. + * The StatusWith<const BSONObj> will have ErrorCodes::CallbackCanceled if the callback was + * canceled. Otherwise, its status will represent any failure to execute the command. + * If the command executed and a response came back, then the status object will contain + * the BSONObj returned by the command, with the "ok" field indicating the success of the + * command in the usual way. + */ + using RemoteCommandCallbackFn = stdx::function<void(const RemoteCommandCallbackArgs&)>; + + virtual ~TaskExecutor(); + + /** + * Signals to the executor that it should shut down. + */ + virtual void shutdown() = 0; + + /** + * Returns diagnostic information. + */ + virtual std::string getDiagnosticString() = 0; + + /** + * Gets the current time. Callbacks should use this method to read the system clock. + */ + virtual Date_t now() = 0; + + /** + * Creates a new event. Returns a handle to the event, or ErrorCodes::ShutdownInProgress if + * makeEvent() fails because the executor is shutting down. + * + * May be called by client threads or callbacks running in the executor. + */ + virtual StatusWith<EventHandle> makeEvent() = 0; + + /** + * Signals the event, making waiting client threads and callbacks runnable. * - * If an event is unsignaled when shutdown is called, the executor will ensure that any threads - * blocked in waitForEvent() eventually return. + * May be called up to one time per event. * - * Logically, Callbacks and Events exist for the life of the executor. That means that while - * the executor is in scope, no CallbackHandle or EventHandle is stale. + * May be called by client threads or callbacks running in the executor. */ - class TaskExecutor { - MONGO_DISALLOW_COPYING(TaskExecutor); - public: - struct CallbackArgs; - struct RemoteCommandCallbackArgs; - class CallbackState; - class CallbackHandle; - class EventState; - class EventHandle; - - using ResponseStatus = StatusWith<RemoteCommandResponse>; - - /** - * Type of a regular callback function. - * - * The status argument passed at invocation will have code ErrorCodes::CallbackCanceled if - * the callback was canceled for any reason (including shutdown). Otherwise, it should have - * Status::OK(). - */ - using CallbackFn = stdx::function<void (const CallbackArgs&)>; - - /** - * Type of a callback from a request to run a command on a remote MongoDB node. - * - * The StatusWith<const BSONObj> will have ErrorCodes::CallbackCanceled if the callback was - * canceled. Otherwise, its status will represent any failure to execute the command. - * If the command executed and a response came back, then the status object will contain - * the BSONObj returned by the command, with the "ok" field indicating the success of the - * command in the usual way. - */ - using RemoteCommandCallbackFn = stdx::function<void (const RemoteCommandCallbackArgs&)>; - - virtual ~TaskExecutor(); - - /** - * Signals to the executor that it should shut down. - */ - virtual void shutdown() = 0; - - /** - * Returns diagnostic information. - */ - virtual std::string getDiagnosticString() = 0; - - /** - * Gets the current time. Callbacks should use this method to read the system clock. - */ - virtual Date_t now() = 0; - - /** - * Creates a new event. Returns a handle to the event, or ErrorCodes::ShutdownInProgress if - * makeEvent() fails because the executor is shutting down. - * - * May be called by client threads or callbacks running in the executor. - */ - virtual StatusWith<EventHandle> makeEvent() = 0; - - /** - * Signals the event, making waiting client threads and callbacks runnable. - * - * May be called up to one time per event. - * - * May be called by client threads or callbacks running in the executor. - */ - virtual void signalEvent(const EventHandle& event) = 0; - - /** - * Schedules a callback, "work", to run after "event" is signaled. If "event" - * has already been signaled, marks "work" as immediately runnable. - * - * If "event" has yet to be signaled when "shutdown()" is called, "work" will - * be scheduled with a status of ErrorCodes::CallbackCanceled. - * - * May be called by client threads or callbacks running in the executor. - */ - virtual StatusWith<CallbackHandle> onEvent(const EventHandle& event, - const CallbackFn& work) = 0; - - /** - * Blocks the calling thread until after "event" is signaled. Also returns - * if the event is never signaled but shutdown() is called on the executor. - * - * NOTE: Do not call from a callback running in the executor. - * - * TODO(schwerin): Change return type so that the caller can know which of the two reasons - * led to this method returning. - */ - virtual void waitForEvent(const EventHandle& event) = 0; - - /** - * Schedules "work" to be run by the executor ASAP. - * - * Returns a handle for waiting on or canceling the callback, or - * ErrorCodes::ShutdownInProgress. - * - * May be called by client threads or callbacks running in the executor. - */ - virtual StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) = 0; - - /** - * Schedules "work" to be run by the executor no sooner than "when". - * - * Returns a handle for waiting on or canceling the callback, or - * ErrorCodes::ShutdownInProgress. - * - * May be called by client threads or callbacks running in the executor. - */ - virtual StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) = 0; - - /** - * Schedules "cb" to be run by the executor with the result of executing the remote command - * described by "request". - * - * Returns a handle for waiting on or canceling the callback, or - * ErrorCodes::ShutdownInProgress. - * - * May be called by client threads or callbacks running in the executor. - */ - virtual StatusWith<CallbackHandle> scheduleRemoteCommand( - const RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb) = 0; - - /** - * If the callback referenced by "cbHandle" hasn't already executed, marks it as - * canceled and runnable. - * - * May be called by client threads or callbacks running in the executor. - */ - virtual void cancel(const CallbackHandle& cbHandle) = 0; - - /** - * Blocks until the executor finishes running the callback referenced by "cbHandle". - * - * Because callbacks all run during shutdown if they weren't run beforehand, there is no need - * to indicate the reason for returning from wait(CallbackHandle). It is always that the - * callback ran. - * - * NOTE: Do not call from a callback running in the executor. - */ - virtual void wait(const CallbackHandle& cbHandle) = 0; - - protected: - - TaskExecutor(); - - // Retrieves the Callback from a given CallbackHandle - CallbackState* getCallbackFromHandle(const CallbackHandle& cbHandle); - - // Retrieves the Event from a given EventHandle - EventState* getEventFromHandle(const EventHandle& eventHandle); - - // Sets the given CallbackHandle to point to the given callback. - void setCallbackForHandle(CallbackHandle* cbHandle, - std::shared_ptr<CallbackState> callback); - - // Sets the given EventHandle to point to the given event. - void setEventForHandle(EventHandle* eventHandle, std::shared_ptr<EventState> event); - }; + virtual void signalEvent(const EventHandle& event) = 0; /** - * Class representing a scheduled callback and providing methods for interacting with it. + * Schedules a callback, "work", to run after "event" is signaled. If "event" + * has already been signaled, marks "work" as immediately runnable. + * + * If "event" has yet to be signaled when "shutdown()" is called, "work" will + * be scheduled with a status of ErrorCodes::CallbackCanceled. + * + * May be called by client threads or callbacks running in the executor. */ - class TaskExecutor::CallbackState { - MONGO_DISALLOW_COPYING(CallbackState); - public: + virtual StatusWith<CallbackHandle> onEvent(const EventHandle& event, + const CallbackFn& work) = 0; - virtual ~CallbackState(); + /** + * Blocks the calling thread until after "event" is signaled. Also returns + * if the event is never signaled but shutdown() is called on the executor. + * + * NOTE: Do not call from a callback running in the executor. + * + * TODO(schwerin): Change return type so that the caller can know which of the two reasons + * led to this method returning. + */ + virtual void waitForEvent(const EventHandle& event) = 0; - virtual void cancel() = 0; - virtual void waitForCompletion() = 0; + /** + * Schedules "work" to be run by the executor ASAP. + * + * Returns a handle for waiting on or canceling the callback, or + * ErrorCodes::ShutdownInProgress. + * + * May be called by client threads or callbacks running in the executor. + */ + virtual StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) = 0; - protected: + /** + * Schedules "work" to be run by the executor no sooner than "when". + * + * Returns a handle for waiting on or canceling the callback, or + * ErrorCodes::ShutdownInProgress. + * + * May be called by client threads or callbacks running in the executor. + */ + virtual StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) = 0; - CallbackState(); + /** + * Schedules "cb" to be run by the executor with the result of executing the remote command + * described by "request". + * + * Returns a handle for waiting on or canceling the callback, or + * ErrorCodes::ShutdownInProgress. + * + * May be called by client threads or callbacks running in the executor. + */ + virtual StatusWith<CallbackHandle> scheduleRemoteCommand(const RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb) = 0; - }; + /** + * If the callback referenced by "cbHandle" hasn't already executed, marks it as + * canceled and runnable. + * + * May be called by client threads or callbacks running in the executor. + */ + virtual void cancel(const CallbackHandle& cbHandle) = 0; /** - * Handle to a CallbackState. + * Blocks until the executor finishes running the callback referenced by "cbHandle". + * + * Because callbacks all run during shutdown if they weren't run beforehand, there is no need + * to indicate the reason for returning from wait(CallbackHandle). It is always that the + * callback ran. + * + * NOTE: Do not call from a callback running in the executor. */ - class TaskExecutor::CallbackHandle { - friend class TaskExecutor; + virtual void wait(const CallbackHandle& cbHandle) = 0; - public: +protected: + TaskExecutor(); - CallbackHandle(); - explicit CallbackHandle(std::shared_ptr<CallbackState> cbData); + // Retrieves the Callback from a given CallbackHandle + CallbackState* getCallbackFromHandle(const CallbackHandle& cbHandle); - bool operator==(const CallbackHandle &other) const { - return _callback == other._callback; - } + // Retrieves the Event from a given EventHandle + EventState* getEventFromHandle(const EventHandle& eventHandle); - bool operator!=(const CallbackHandle &other) const { - return !(*this == other); - } + // Sets the given CallbackHandle to point to the given callback. + void setCallbackForHandle(CallbackHandle* cbHandle, std::shared_ptr<CallbackState> callback); - bool isValid() const { - return _callback.get(); - } + // Sets the given EventHandle to point to the given event. + void setEventForHandle(EventHandle* eventHandle, std::shared_ptr<EventState> event); +}; - private: +/** + * Class representing a scheduled callback and providing methods for interacting with it. + */ +class TaskExecutor::CallbackState { + MONGO_DISALLOW_COPYING(CallbackState); - void setCallback(std::shared_ptr<CallbackState> callback) { - _callback = callback; - } +public: + virtual ~CallbackState(); - CallbackState* getCallback() const { - return _callback.get(); - } + virtual void cancel() = 0; + virtual void waitForCompletion() = 0; - std::shared_ptr<CallbackState> _callback; - }; +protected: + CallbackState(); +}; - /** - * Class representing a scheduled event and providing methods for interacting with it. - */ - class TaskExecutor::EventState { - MONGO_DISALLOW_COPYING(EventState); - public: +/** + * Handle to a CallbackState. + */ +class TaskExecutor::CallbackHandle { + friend class TaskExecutor; - virtual ~EventState(); +public: + CallbackHandle(); + explicit CallbackHandle(std::shared_ptr<CallbackState> cbData); - virtual void signal() = 0; - virtual void waitUntilSignaled() = 0; - virtual bool isSignaled() = 0; + bool operator==(const CallbackHandle& other) const { + return _callback == other._callback; + } - protected: + bool operator!=(const CallbackHandle& other) const { + return !(*this == other); + } - EventState(); - }; + bool isValid() const { + return _callback.get(); + } - /** - * Handle to an EventState. - */ - class TaskExecutor::EventHandle { - friend class TaskExecutor; +private: + void setCallback(std::shared_ptr<CallbackState> callback) { + _callback = callback; + } - public: + CallbackState* getCallback() const { + return _callback.get(); + } - EventHandle(); - explicit EventHandle(std::shared_ptr<EventState> event); + std::shared_ptr<CallbackState> _callback; +}; - bool operator==(const EventHandle &other) const { - return _event == other._event; - } +/** + * Class representing a scheduled event and providing methods for interacting with it. + */ +class TaskExecutor::EventState { + MONGO_DISALLOW_COPYING(EventState); - bool operator!=(const EventHandle &other) const { - return !(*this == other); - } +public: + virtual ~EventState(); - bool isValid() const { - return _event.get(); - } + virtual void signal() = 0; + virtual void waitUntilSignaled() = 0; + virtual bool isSignaled() = 0; - private: +protected: + EventState(); +}; - void setEvent(std::shared_ptr<EventState> event) { - _event = event; - } +/** + * Handle to an EventState. + */ +class TaskExecutor::EventHandle { + friend class TaskExecutor; - EventState* getEvent() const { - return _event.get(); - } +public: + EventHandle(); + explicit EventHandle(std::shared_ptr<EventState> event); - std::shared_ptr<EventState> _event; - }; + bool operator==(const EventHandle& other) const { + return _event == other._event; + } - /** - * Argument passed to all callbacks scheduled via a TaskExecutor. - */ - struct TaskExecutor::CallbackArgs { - CallbackArgs(TaskExecutor* theExecutor, - const CallbackHandle& theHandle, - const Status& theStatus, - OperationContext* txn = NULL); - - TaskExecutor* executor; - CallbackHandle myHandle; - Status status; - OperationContext* txn; - }; + bool operator!=(const EventHandle& other) const { + return !(*this == other); + } - /** - * Argument passed to all remote command callbacks scheduled via a TaskExecutor. - */ - struct TaskExecutor::RemoteCommandCallbackArgs { - RemoteCommandCallbackArgs(TaskExecutor* theExecutor, - const CallbackHandle& theHandle, - const RemoteCommandRequest& theRequest, - const StatusWith<RemoteCommandResponse>& theResponse); + bool isValid() const { + return _event.get(); + } - TaskExecutor* executor; - CallbackHandle myHandle; - RemoteCommandRequest request; - StatusWith<RemoteCommandResponse> response; - }; +private: + void setEvent(std::shared_ptr<EventState> event) { + _event = event; + } -} // namespace executor -} // namespace mongo + EventState* getEvent() const { + return _event.get(); + } + std::shared_ptr<EventState> _event; +}; + +/** + * Argument passed to all callbacks scheduled via a TaskExecutor. + */ +struct TaskExecutor::CallbackArgs { + CallbackArgs(TaskExecutor* theExecutor, + const CallbackHandle& theHandle, + const Status& theStatus, + OperationContext* txn = NULL); + + TaskExecutor* executor; + CallbackHandle myHandle; + Status status; + OperationContext* txn; +}; + +/** + * Argument passed to all remote command callbacks scheduled via a TaskExecutor. + */ +struct TaskExecutor::RemoteCommandCallbackArgs { + RemoteCommandCallbackArgs(TaskExecutor* theExecutor, + const CallbackHandle& theHandle, + const RemoteCommandRequest& theRequest, + const StatusWith<RemoteCommandResponse>& theResponse); + + TaskExecutor* executor; + CallbackHandle myHandle; + RemoteCommandRequest request; + StatusWith<RemoteCommandResponse> response; +}; + +} // namespace executor +} // namespace mongo |