/** * Copyright (C) 2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #pragma once #include #include #include #include "mongo/base/disallow_copying.h" #include "mongo/base/status.h" #include "mongo/base/status_with.h" #include "mongo/base/string_data.h" #include "mongo/executor/remote_command_request.h" #include "mongo/executor/remote_command_response.h" #include "mongo/platform/hash_namespace.h" #include "mongo/stdx/functional.h" #include "mongo/util/time_support.h" namespace mongo { class BSONObjBuilder; class OperationContext; namespace executor { struct ConnectionPoolStats; /** * Executor with notions of events and callbacks. * * Callbacks represent work to be performed by the executor. * They may be scheduled by client threads or by other callbacks. Methods that * schedule callbacks return a CallbackHandle if they are able to enqueue the callback in the * appropriate work queue. Every CallbackHandle represents an invocation of a function that * will happen before the executor goes out of scope. Calling cancel(CallbackHandle) schedules * the specified callback to run with a flag indicating that it is "canceled," but it will run. * Client threads may block waiting for a callback to execute by calling wait(CallbackHandle). * * Events are level-triggered and may only be signaled one time. Client threads and callbacks * may schedule callbacks to be run by the executor after the event is signaled, and client * threads may ask the executor to block them until after the event is signaled. * * If an event is unsignaled when shutdown is called, the executor will ensure that any threads * blocked in waitForEvent() eventually return. */ class TaskExecutor { MONGO_DISALLOW_COPYING(TaskExecutor); public: struct CallbackArgs; struct RemoteCommandCallbackArgs; class CallbackState; class CallbackHandle; class EventState; class EventHandle; using ResponseStatus = RemoteCommandResponse; /** * Type of a regular callback function. * * The status argument passed at invocation will have code ErrorCodes::CallbackCanceled if * the callback was canceled for any reason (including shutdown). Otherwise, it should have * Status::OK(). */ using CallbackFn = stdx::function; /** * Type of a callback from a request to run a command on a remote MongoDB node. * * The StatusWith will have ErrorCodes::CallbackCanceled if the callback was * canceled. Otherwise, its status will represent any failure to execute the command. * If the command executed and a response came back, then the status object will contain * the BSONObj returned by the command, with the "ok" field indicating the success of the * command in the usual way. */ using RemoteCommandCallbackFn = stdx::function; /** * Destroys the task executor. Implicitly performs the equivalent of shutdown() and join() * before returning, if necessary. */ virtual ~TaskExecutor(); /** * Causes the executor to initialize its internal state (start threads if appropriate, create * network sockets, etc). This method may be called at most once for the lifetime of an * executor. */ virtual void startup() = 0; /** * Signals to the executor that it should shut down. This method may be called from within a * callback. As such, this method must not block. After shutdown returns, attempts to schedule * more tasks on the executor will return errors. * * It is legal to call this method multiple times. If the task executor goes out of scope * before this method is called, the destructor performs this activity. */ virtual void shutdown() = 0; /** * Waits for the shutdown sequence initiated by a call to shutdown() to complete. Must not be * called from within a callback. * * Unlike stdx::thread::join, this method may be called from any thread that wishes to wait for * shutdown to complete. */ virtual void join() = 0; /** * Writes diagnostic information into "b". */ virtual void appendDiagnosticBSON(BSONObjBuilder* b) const = 0; /** * Gets the current time. Callbacks should use this method to read the system clock. */ virtual Date_t now() = 0; /** * Creates a new event. Returns a handle to the event, or ErrorCodes::ShutdownInProgress if * makeEvent() fails because the executor is shutting down. * * May be called by client threads or callbacks running in the executor. */ virtual StatusWith makeEvent() = 0; /** * Signals the event, making waiting client threads and callbacks runnable. * * May be called up to one time per event. * * May be called by client threads or callbacks running in the executor. */ virtual void signalEvent(const EventHandle& event) = 0; /** * Schedules a callback, "work", to run after "event" is signaled. If "event" * has already been signaled, marks "work" as immediately runnable. * * If "event" has yet to be signaled when "shutdown()" is called, "work" will * be scheduled with a status of ErrorCodes::CallbackCanceled. * * May be called by client threads or callbacks running in the executor. */ virtual StatusWith onEvent(const EventHandle& event, const CallbackFn& work) = 0; /** * Blocks the calling thread until after "event" is signaled. Also returns * if the event is never signaled but shutdown() is called on the executor. * * NOTE: Do not call from a callback running in the executor. * * TODO(schwerin): Change return type so that the caller can know which of the two reasons * led to this method returning. */ virtual void waitForEvent(const EventHandle& event) = 0; /** * Schedules "work" to be run by the executor ASAP. * * Returns a handle for waiting on or canceling the callback, or * ErrorCodes::ShutdownInProgress. * * May be called by client threads or callbacks running in the executor. * * Contract: Implementations should guarantee that callback should be called *after* doing any * processing related to the callback. */ virtual StatusWith scheduleWork(const CallbackFn& work) = 0; /** * Schedules "work" to be run by the executor no sooner than "when". * * If "when" is <= now(), then it schedules the "work" to be run ASAP. * * Returns a handle for waiting on or canceling the callback, or * ErrorCodes::ShutdownInProgress. * * May be called by client threads or callbacks running in the executor. * * Contract: Implementations should guarantee that callback should be called *after* doing any * processing related to the callback. */ virtual StatusWith scheduleWorkAt(Date_t when, const CallbackFn& work) = 0; /** * Schedules "cb" to be run by the executor with the result of executing the remote command * described by "request". * * Returns a handle for waiting on or canceling the callback, or * ErrorCodes::ShutdownInProgress. * * May be called by client threads or callbacks running in the executor. * * Contract: Implementations should guarantee that callback should be called *after* doing any * processing related to the callback. */ virtual StatusWith scheduleRemoteCommand(const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) = 0; /** * If the callback referenced by "cbHandle" hasn't already executed, marks it as * canceled and runnable. * * May be called by client threads or callbacks running in the executor. */ virtual void cancel(const CallbackHandle& cbHandle) = 0; /** * Blocks until the executor finishes running the callback referenced by "cbHandle". * * Because callbacks all run during shutdown if they weren't run beforehand, there is no need * to indicate the reason for returning from wait(CallbackHandle). It is always that the * callback ran. * * NOTE: Do not call from a callback running in the executor. */ virtual void wait(const CallbackHandle& cbHandle) = 0; /** * Appends information about the underlying network interface's connections to the given * builder. */ virtual void appendConnectionStats(ConnectionPoolStats* stats) const = 0; protected: // Retrieves the Callback from a given CallbackHandle static CallbackState* getCallbackFromHandle(const CallbackHandle& cbHandle); // Retrieves the Event from a given EventHandle static EventState* getEventFromHandle(const EventHandle& eventHandle); // Sets the given CallbackHandle to point to the given callback. static void setCallbackForHandle(CallbackHandle* cbHandle, std::shared_ptr callback); // Sets the given EventHandle to point to the given event. static void setEventForHandle(EventHandle* eventHandle, std::shared_ptr event); TaskExecutor(); }; /** * Class representing a scheduled callback and providing methods for interacting with it. */ class TaskExecutor::CallbackState { MONGO_DISALLOW_COPYING(CallbackState); public: virtual ~CallbackState(); virtual void cancel() = 0; virtual void waitForCompletion() = 0; virtual bool isCanceled() const = 0; protected: CallbackState(); }; /** * Handle to a CallbackState. */ class TaskExecutor::CallbackHandle { friend class TaskExecutor; public: CallbackHandle(); // Exposed solely for testing. explicit CallbackHandle(std::shared_ptr cbData); bool operator==(const CallbackHandle& other) const { return _callback == other._callback; } bool operator!=(const CallbackHandle& other) const { return !(*this == other); } bool isValid() const { return _callback.get(); } /** * True if this handle is valid. */ explicit operator bool() const { return isValid(); } std::size_t hash() const { return std::hash()(_callback); } bool isCanceled() const { return getCallback()->isCanceled(); } private: void setCallback(std::shared_ptr callback) { _callback = callback; } CallbackState* getCallback() const { return _callback.get(); } std::shared_ptr _callback; }; /** * Class representing a scheduled event and providing methods for interacting with it. */ class TaskExecutor::EventState { MONGO_DISALLOW_COPYING(EventState); public: virtual ~EventState(); virtual void signal() = 0; virtual void waitUntilSignaled() = 0; virtual bool isSignaled() = 0; protected: EventState(); }; /** * Handle to an EventState. */ class TaskExecutor::EventHandle { friend class TaskExecutor; public: EventHandle(); explicit EventHandle(std::shared_ptr event); bool operator==(const EventHandle& other) const { return _event == other._event; } bool operator!=(const EventHandle& other) const { return !(*this == other); } bool isValid() const { return _event.get(); } /** * True if this event handle is valid. */ explicit operator bool() const { return isValid(); } private: void setEvent(std::shared_ptr event) { _event = event; } EventState* getEvent() const { return _event.get(); } std::shared_ptr _event; }; /** * Argument passed to all callbacks scheduled via a TaskExecutor. */ struct TaskExecutor::CallbackArgs { CallbackArgs(TaskExecutor* theExecutor, CallbackHandle theHandle, Status theStatus, OperationContext* opCtx = NULL); TaskExecutor* executor; CallbackHandle myHandle; Status status; OperationContext* opCtx; }; /** * Argument passed to all remote command callbacks scheduled via a TaskExecutor. */ struct TaskExecutor::RemoteCommandCallbackArgs { RemoteCommandCallbackArgs(TaskExecutor* theExecutor, const CallbackHandle& theHandle, const RemoteCommandRequest& theRequest, const ResponseStatus& theResponse); TaskExecutor* executor; CallbackHandle myHandle; RemoteCommandRequest request; ResponseStatus response; }; } // namespace executor } // namespace mongo // Provide a specialization for hash so it can easily be stored in unordered_set. MONGO_HASH_NAMESPACE_START template <> struct hash<::mongo::executor::TaskExecutor::CallbackHandle> { size_t operator()(const ::mongo::executor::TaskExecutor::CallbackHandle& x) const { return x.hash(); } }; MONGO_HASH_NAMESPACE_END