diff options
author | Spencer T Brody <spencer@mongodb.com> | 2015-06-03 14:26:24 -0400 |
---|---|---|
committer | Spencer T Brody <spencer@mongodb.com> | 2015-06-05 11:15:19 -0400 |
commit | 99dff01e969a8f8bd06e0f064ad42ca06f2c0a2c (patch) | |
tree | 7c81b716b995829770717d21fcfa63bacfa69d7b | |
parent | e8bff54a2234cc292d9faefeaa6fedc2d593611f (diff) | |
download | mongo-99dff01e969a8f8bd06e0f064ad42ca06f2c0a2c.tar.gz |
SERVER-18623 TaskExecutor interface
-rw-r--r-- | src/mongo/executor/task_executor.cpp | 76 | ||||
-rw-r--r-- | src/mongo/executor/task_executor.h | 343 |
2 files changed, 419 insertions, 0 deletions
diff --git a/src/mongo/executor/task_executor.cpp b/src/mongo/executor/task_executor.cpp new file mode 100644 index 00000000000..cef6b704dd6 --- /dev/null +++ b/src/mongo/executor/task_executor.cpp @@ -0,0 +1,76 @@ +/** + * Copyright (C) 2014-2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/executor/task_executor.h" + +namespace mongo { +namespace executor { + + TaskExecutor::TaskExecutor() = default; + TaskExecutor::~TaskExecutor() = default; + + TaskExecutor::CallbackState::CallbackState() = default; + TaskExecutor::CallbackState::~CallbackState() = default; + + TaskExecutor::CallbackHandle::CallbackHandle(std::shared_ptr<CallbackState> callback) : + _callback(std::move(callback)) {} + TaskExecutor::CallbackHandle::~CallbackHandle() = default; + + TaskExecutor::EventState::EventState() = default; + TaskExecutor::EventState::~EventState() = default; + + TaskExecutor::EventHandle::EventHandle(std::shared_ptr<EventState> event) : + _event(std::move(event)) {} + TaskExecutor::EventHandle::~EventHandle() = default; + + TaskExecutor::CallbackArgs::CallbackArgs(TaskExecutor* theExecutor, + const CallbackHandle& theHandle, + const Status& theStatus, + OperationContext* theTxn) : + executor(theExecutor), + myHandle(theHandle), + status(theStatus), + txn(theTxn) { + } + + + TaskExecutor::RemoteCommandCallbackArgs::RemoteCommandCallbackArgs( + TaskExecutor* theExecutor, + const CallbackHandle& theHandle, + const RemoteCommandRequest& theRequest, + const ResponseStatus& theResponse) : + executor(theExecutor), + myHandle(theHandle), + request(theRequest), + response(theResponse) { + } + +} // namespace executor +} // namespace mongo diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h new file mode 100644 index 00000000000..c7c09c44de9 --- /dev/null +++ b/src/mongo/executor/task_executor.h @@ -0,0 +1,343 @@ +/** + * Copyright (C) 2014-2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <string> +#include <memory> + +#include "mongo/base/disallow_copying.h" +#include "mongo/base/status.h" +#include "mongo/base/status_with.h" +#include "mongo/base/string_data.h" +#include "mongo/client/remote_command_runner.h" +#include "mongo/stdx/functional.h" +#include "mongo/util/time_support.h" + +namespace mongo { + + class OperationContext; + +namespace executor { + + /** + * Generic event loop with notions of events and callbacks. + * + * Callbacks are function objects representing work to be performed by the executor. + * They may be scheduled by client threads or by other callbacks. Methods that + * schedule callbacks return a CallbackHandle if they are able to enqueue the callback in the + * appropriate work queue. Every CallbackHandle represents an invocation of a function that + * will happen before the executor goes out of scope. Calling cancel(CallbackHandle) schedules + * the specified callback to run with a flag indicating that it is "canceled," but it will run. + * Client threads may block waiting for a callback to execute by calling wait(CallbackHandle). + * + * Events are level-triggered and may only be signaled one time. Client threads and callbacks + * may schedule callbacks to be run by the executor after the event is signaled, and client + * threads may ask the executor to block them until after the event is signaled. + * + * If an event is unsignaled when shutdown is called, the executor will ensure that any threads + * blocked in waitForEvent() eventually return. + * + * Logically, Callbacks and Events exist for the life of the executor. That means that while + * the executor is in scope, no CallbackHandle or EventHandle is stale. + */ + class TaskExecutor { + MONGO_DISALLOW_COPYING(TaskExecutor); + public: + struct CallbackArgs; + struct RemoteCommandCallbackArgs; + class CallbackState; + class CallbackHandle; + class EventState; + class EventHandle; + + using ResponseStatus = StatusWith<RemoteCommandResponse>; + + /** + * Type of a regular callback function. + * + * The status argument passed at invocation will have code ErrorCodes::CallbackCanceled if + * the callback was canceled for any reason (including shutdown). Otherwise, it should have + * Status::OK(). + */ + using CallbackFn = stdx::function<void (const CallbackArgs&)>; + + /** + * Type of a callback from a request to run a command on a remote MongoDB node. + * + * The StatusWith<const BSONObj> will have ErrorCodes::CallbackCanceled if the callback was + * canceled. Otherwise, its status will represent any failure to execute the command. + * If the command executed and a response came back, then the status object will contain + * the BSONObj returned by the command, with the "ok" field indicating the success of the + * command in the usual way. + */ + using RemoteCommandCallbackFn = stdx::function<void (const RemoteCommandCallbackArgs&)>; + + virtual ~TaskExecutor(); + + /** + * Returns diagnostic information. + */ + virtual std::string getDiagnosticString() = 0; + + /** + * Gets the current time. Callbacks should use this method to read the system clock. + */ + virtual Date_t now() = 0; + + /** + * Creates a new event. Returns a handle to the event, or ErrorCodes::ShutdownInProgress if + * makeEvent() fails because the executor is shutting down. + * + * May be called by client threads or callbacks running in the executor. + */ + virtual StatusWith<EventHandle> makeEvent() = 0; + + /** + * Signals the event, making waiting client threads and callbacks runnable. + * + * May be called up to one time per event. + * + * May be called by client threads or callbacks running in the executor. + */ + void signalEvent(const EventHandle& event) { eventHandle.getEventState()->signal(); }; + + /** + * Schedules a callback, "work", to run after "event" is signaled. If "event" + * has already been signaled, marks "work" as immediately runnable. + * + * If "event" has yet to be signaled when "shutdown()" is called, "work" will + * be scheduled with a status of ErrorCodes::CallbackCanceled. + * + * May be called by client threads or callbacks running in the executor. + */ + virtual StatusWith<CallbackHandle> onEvent(const EventHandle& event, + const CallbackFn& work) = 0; + + /** + * Blocks the calling thread until after "event" is signaled. Also returns + * if the event is never signaled but shutdown() is called on the executor. + * + * NOTE: Do not call from a callback running in the executor. + * + * TODO(schwerin): Change return type so that the caller can know which of the two reasons + * led to this method returning. + */ + void waitForEvent(const EventHandle& event) { + event.getEventState()->waitUntilSignaled(); + }; + + /** + * Schedules "work" to be run by the executor ASAP. + * + * Returns a handle for waiting on or canceling the callback, or + * ErrorCodes::ShutdownInProgress. + * + * May be called by client threads or callbacks running in the executor. + */ + virtual StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) = 0; + + /** + * Schedules "work" to be run by the executor no sooner than "when". + * + * Returns a handle for waiting on or canceling the callback, or + * ErrorCodes::ShutdownInProgress. + * + * May be called by client threads or callbacks running in the executor. + */ + virtual StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) = 0; + + /** + * Schedules "cb" to be run by the executor with the result of executing the remote command + * described by "request". + * + * Returns a handle for waiting on or canceling the callback, or + * ErrorCodes::ShutdownInProgress. + * + * May be called by client threads or callbacks running in the executor. + */ + virtual StatusWith<CallbackHandle> scheduleRemoteCommand( + const RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb) = 0; + + /** + * If the callback referenced by "cbHandle" hasn't already executed, marks it as + * canceled and runnable. + * + * May be called by client threads or callbacks running in the executor. + */ + void cancel(const CallbackHandle& cbHandle) { cbHandle.getCallbackState()->cancel(); }; + + /** + * Blocks until the executor finishes running the callback referenced by "cbHandle". + * + * Because callbacks all run during shutdown if they weren't run beforehand, there is no need + * to indicate the reason for returning from wait(CallbackHandle). It is always that the + * callback ran. + * + * NOTE: Do not call from a callback running in the executor. + */ + void wait(const CallbackHandle& cbHandle) { + cbHandle.getCallbackState()->waitForCompletion(); + }; + + protected: + + TaskExecutor(); + CallbackState* getCallbackFromHandle(const CallbackHandle& cbHandle) { + return cbHandle.getCallbackState(); + } + + EventState* getEventFromHandle(const EventHandle& eventHandle) { + return eventHandle.getEventState(); + } + }; + + /** + * Argument passed to all callbacks scheduled via a TaskExecutor. + */ + struct TaskExecutor::CallbackArgs { + CallbackArgs(TaskExecutor* theExecutor, + const CallbackHandle& theHandle, + const Status& theStatus, + OperationContext* txn = NULL); + + TaskExecutor* executor; + CallbackHandle myHandle; + Status status; + OperationContext* txn; + }; + + /** + * Argument passed to all remote command callbacks scheduled via a TaskExecutor. + */ + struct TaskExecutor::RemoteCommandCallbackArgs { + RemoteCommandCallbackArgs(TaskExecutor* theExecutor, + const CallbackHandle& theHandle, + const RemoteCommandRequest& theRequest, + const StatusWith<RemoteCommandResponse>& theResponse); + + TaskExecutor* executor; + CallbackHandle myHandle; + RemoteCommandRequest request; + StatusWith<RemoteCommandResponse> response; + }; + + /** + * Class representing a scheduled callback and providing methods for interacting with it. + */ + class TaskExecutor::CallbackState { + MONGO_DISALLOW_COPYING(CallbackState); + public: + + virtual ~CallbackState(); + + virtual void cancel() = 0; + virtual void waitForCompletion() = 0; + + protected: + + CallbackState(); + + }; + + /** + * Handle to a CallbackState. + */ + class TaskExecutor::CallbackHandle { + friend class TaskExecutor; + + public: + + explicit CallbackHandle(std::shared_ptr<CallbackState> cbData); + + bool operator==(const CallbackHandle &other) const { + return _callback == other._callback; + } + + bool operator!=(const CallbackHandle &other) const { + return !(*this == other); + } + + private: + + CallbackState* getCallbackState() { + return _callback.get(); + } + + std::shared_ptr<CallbackState> _callback; + }; + + /** + * Class representing a scheduled event and providing methods for interacting with it. + */ + class TaskExecutor::EventState { + MONGO_DISALLOW_COPYING(EventState); + public: + + virtual ~EventState(); + + virtual void signal() = 0; + virtual void waitUntilSignaled() = 0; + virtual bool isSignaled() = 0; + + protected: + + EventState(); + }; + + /** + * Handle to an EventState. + */ + class TaskExecutor::EventHandle { + friend class TaskExecutor; + + public: + + explicit EventHandle(std::shared_ptr<EventState> event); + + bool operator==(const EventHandle &other) const { + return _event == other._event; + } + + bool operator!=(const EventHandle &other) const { + return !(*this == other); + } + + private: + + EventState* getEventState() { + return _event.get(); + } + + std::shared_ptr<EventState> _event; + }; + +} // namespace executor +} // namespace mongo + |