summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSpencer T Brody <spencer@mongodb.com>2015-06-03 14:26:24 -0400
committerSpencer T Brody <spencer@mongodb.com>2015-06-05 11:15:19 -0400
commit99dff01e969a8f8bd06e0f064ad42ca06f2c0a2c (patch)
tree7c81b716b995829770717d21fcfa63bacfa69d7b
parente8bff54a2234cc292d9faefeaa6fedc2d593611f (diff)
downloadmongo-99dff01e969a8f8bd06e0f064ad42ca06f2c0a2c.tar.gz
SERVER-18623 TaskExecutor interface
-rw-r--r--src/mongo/executor/task_executor.cpp76
-rw-r--r--src/mongo/executor/task_executor.h343
2 files changed, 419 insertions, 0 deletions
diff --git a/src/mongo/executor/task_executor.cpp b/src/mongo/executor/task_executor.cpp
new file mode 100644
index 00000000000..cef6b704dd6
--- /dev/null
+++ b/src/mongo/executor/task_executor.cpp
@@ -0,0 +1,76 @@
+/**
+ * Copyright (C) 2014-2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/executor/task_executor.h"
+
+namespace mongo {
+namespace executor {
+
+ TaskExecutor::TaskExecutor() = default;
+ TaskExecutor::~TaskExecutor() = default;
+
+ TaskExecutor::CallbackState::CallbackState() = default;
+ TaskExecutor::CallbackState::~CallbackState() = default;
+
+ TaskExecutor::CallbackHandle::CallbackHandle(std::shared_ptr<CallbackState> callback) :
+ _callback(std::move(callback)) {}
+ TaskExecutor::CallbackHandle::~CallbackHandle() = default;
+
+ TaskExecutor::EventState::EventState() = default;
+ TaskExecutor::EventState::~EventState() = default;
+
+ TaskExecutor::EventHandle::EventHandle(std::shared_ptr<EventState> event) :
+ _event(std::move(event)) {}
+ TaskExecutor::EventHandle::~EventHandle() = default;
+
+ TaskExecutor::CallbackArgs::CallbackArgs(TaskExecutor* theExecutor,
+ const CallbackHandle& theHandle,
+ const Status& theStatus,
+ OperationContext* theTxn) :
+ executor(theExecutor),
+ myHandle(theHandle),
+ status(theStatus),
+ txn(theTxn) {
+ }
+
+
+ TaskExecutor::RemoteCommandCallbackArgs::RemoteCommandCallbackArgs(
+ TaskExecutor* theExecutor,
+ const CallbackHandle& theHandle,
+ const RemoteCommandRequest& theRequest,
+ const ResponseStatus& theResponse) :
+ executor(theExecutor),
+ myHandle(theHandle),
+ request(theRequest),
+ response(theResponse) {
+ }
+
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h
new file mode 100644
index 00000000000..c7c09c44de9
--- /dev/null
+++ b/src/mongo/executor/task_executor.h
@@ -0,0 +1,343 @@
+/**
+ * Copyright (C) 2014-2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <string>
+#include <memory>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/base/status.h"
+#include "mongo/base/status_with.h"
+#include "mongo/base/string_data.h"
+#include "mongo/client/remote_command_runner.h"
+#include "mongo/stdx/functional.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+
+ class OperationContext;
+
+namespace executor {
+
+ /**
+ * Generic event loop with notions of events and callbacks.
+ *
+ * Callbacks are function objects representing work to be performed by the executor.
+ * They may be scheduled by client threads or by other callbacks. Methods that
+ * schedule callbacks return a CallbackHandle if they are able to enqueue the callback in the
+ * appropriate work queue. Every CallbackHandle represents an invocation of a function that
+ * will happen before the executor goes out of scope. Calling cancel(CallbackHandle) schedules
+ * the specified callback to run with a flag indicating that it is "canceled," but it will run.
+ * Client threads may block waiting for a callback to execute by calling wait(CallbackHandle).
+ *
+ * Events are level-triggered and may only be signaled one time. Client threads and callbacks
+ * may schedule callbacks to be run by the executor after the event is signaled, and client
+ * threads may ask the executor to block them until after the event is signaled.
+ *
+ * If an event is unsignaled when shutdown is called, the executor will ensure that any threads
+ * blocked in waitForEvent() eventually return.
+ *
+ * Logically, Callbacks and Events exist for the life of the executor. That means that while
+ * the executor is in scope, no CallbackHandle or EventHandle is stale.
+ */
+ class TaskExecutor {
+ MONGO_DISALLOW_COPYING(TaskExecutor);
+ public:
+ struct CallbackArgs;
+ struct RemoteCommandCallbackArgs;
+ class CallbackState;
+ class CallbackHandle;
+ class EventState;
+ class EventHandle;
+
+ using ResponseStatus = StatusWith<RemoteCommandResponse>;
+
+ /**
+ * Type of a regular callback function.
+ *
+ * The status argument passed at invocation will have code ErrorCodes::CallbackCanceled if
+ * the callback was canceled for any reason (including shutdown). Otherwise, it should have
+ * Status::OK().
+ */
+ using CallbackFn = stdx::function<void (const CallbackArgs&)>;
+
+ /**
+ * Type of a callback from a request to run a command on a remote MongoDB node.
+ *
+ * The StatusWith<const BSONObj> will have ErrorCodes::CallbackCanceled if the callback was
+ * canceled. Otherwise, its status will represent any failure to execute the command.
+ * If the command executed and a response came back, then the status object will contain
+ * the BSONObj returned by the command, with the "ok" field indicating the success of the
+ * command in the usual way.
+ */
+ using RemoteCommandCallbackFn = stdx::function<void (const RemoteCommandCallbackArgs&)>;
+
+ virtual ~TaskExecutor();
+
+ /**
+ * Returns diagnostic information.
+ */
+ virtual std::string getDiagnosticString() = 0;
+
+ /**
+ * Gets the current time. Callbacks should use this method to read the system clock.
+ */
+ virtual Date_t now() = 0;
+
+ /**
+ * Creates a new event. Returns a handle to the event, or ErrorCodes::ShutdownInProgress if
+ * makeEvent() fails because the executor is shutting down.
+ *
+ * May be called by client threads or callbacks running in the executor.
+ */
+ virtual StatusWith<EventHandle> makeEvent() = 0;
+
+ /**
+ * Signals the event, making waiting client threads and callbacks runnable.
+ *
+ * May be called up to one time per event.
+ *
+ * May be called by client threads or callbacks running in the executor.
+ */
+ void signalEvent(const EventHandle& event) { eventHandle.getEventState()->signal(); };
+
+ /**
+ * Schedules a callback, "work", to run after "event" is signaled. If "event"
+ * has already been signaled, marks "work" as immediately runnable.
+ *
+ * If "event" has yet to be signaled when "shutdown()" is called, "work" will
+ * be scheduled with a status of ErrorCodes::CallbackCanceled.
+ *
+ * May be called by client threads or callbacks running in the executor.
+ */
+ virtual StatusWith<CallbackHandle> onEvent(const EventHandle& event,
+ const CallbackFn& work) = 0;
+
+ /**
+ * Blocks the calling thread until after "event" is signaled. Also returns
+ * if the event is never signaled but shutdown() is called on the executor.
+ *
+ * NOTE: Do not call from a callback running in the executor.
+ *
+ * TODO(schwerin): Change return type so that the caller can know which of the two reasons
+ * led to this method returning.
+ */
+ void waitForEvent(const EventHandle& event) {
+ event.getEventState()->waitUntilSignaled();
+ };
+
+ /**
+ * Schedules "work" to be run by the executor ASAP.
+ *
+ * Returns a handle for waiting on or canceling the callback, or
+ * ErrorCodes::ShutdownInProgress.
+ *
+ * May be called by client threads or callbacks running in the executor.
+ */
+ virtual StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) = 0;
+
+ /**
+ * Schedules "work" to be run by the executor no sooner than "when".
+ *
+ * Returns a handle for waiting on or canceling the callback, or
+ * ErrorCodes::ShutdownInProgress.
+ *
+ * May be called by client threads or callbacks running in the executor.
+ */
+ virtual StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) = 0;
+
+ /**
+ * Schedules "cb" to be run by the executor with the result of executing the remote command
+ * described by "request".
+ *
+ * Returns a handle for waiting on or canceling the callback, or
+ * ErrorCodes::ShutdownInProgress.
+ *
+ * May be called by client threads or callbacks running in the executor.
+ */
+ virtual StatusWith<CallbackHandle> scheduleRemoteCommand(
+ const RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb) = 0;
+
+ /**
+ * If the callback referenced by "cbHandle" hasn't already executed, marks it as
+ * canceled and runnable.
+ *
+ * May be called by client threads or callbacks running in the executor.
+ */
+ void cancel(const CallbackHandle& cbHandle) { cbHandle.getCallbackState()->cancel(); };
+
+ /**
+ * Blocks until the executor finishes running the callback referenced by "cbHandle".
+ *
+ * Because callbacks all run during shutdown if they weren't run beforehand, there is no need
+ * to indicate the reason for returning from wait(CallbackHandle). It is always that the
+ * callback ran.
+ *
+ * NOTE: Do not call from a callback running in the executor.
+ */
+ void wait(const CallbackHandle& cbHandle) {
+ cbHandle.getCallbackState()->waitForCompletion();
+ };
+
+ protected:
+
+ TaskExecutor();
+ CallbackState* getCallbackFromHandle(const CallbackHandle& cbHandle) {
+ return cbHandle.getCallbackState();
+ }
+
+ EventState* getEventFromHandle(const EventHandle& eventHandle) {
+ return eventHandle.getEventState();
+ }
+ };
+
+ /**
+ * Argument passed to all callbacks scheduled via a TaskExecutor.
+ */
+ struct TaskExecutor::CallbackArgs {
+ CallbackArgs(TaskExecutor* theExecutor,
+ const CallbackHandle& theHandle,
+ const Status& theStatus,
+ OperationContext* txn = NULL);
+
+ TaskExecutor* executor;
+ CallbackHandle myHandle;
+ Status status;
+ OperationContext* txn;
+ };
+
+ /**
+ * Argument passed to all remote command callbacks scheduled via a TaskExecutor.
+ */
+ struct TaskExecutor::RemoteCommandCallbackArgs {
+ RemoteCommandCallbackArgs(TaskExecutor* theExecutor,
+ const CallbackHandle& theHandle,
+ const RemoteCommandRequest& theRequest,
+ const StatusWith<RemoteCommandResponse>& theResponse);
+
+ TaskExecutor* executor;
+ CallbackHandle myHandle;
+ RemoteCommandRequest request;
+ StatusWith<RemoteCommandResponse> response;
+ };
+
+ /**
+ * Class representing a scheduled callback and providing methods for interacting with it.
+ */
+ class TaskExecutor::CallbackState {
+ MONGO_DISALLOW_COPYING(CallbackState);
+ public:
+
+ virtual ~CallbackState();
+
+ virtual void cancel() = 0;
+ virtual void waitForCompletion() = 0;
+
+ protected:
+
+ CallbackState();
+
+ };
+
+ /**
+ * Handle to a CallbackState.
+ */
+ class TaskExecutor::CallbackHandle {
+ friend class TaskExecutor;
+
+ public:
+
+ explicit CallbackHandle(std::shared_ptr<CallbackState> cbData);
+
+ bool operator==(const CallbackHandle &other) const {
+ return _callback == other._callback;
+ }
+
+ bool operator!=(const CallbackHandle &other) const {
+ return !(*this == other);
+ }
+
+ private:
+
+ CallbackState* getCallbackState() {
+ return _callback.get();
+ }
+
+ std::shared_ptr<CallbackState> _callback;
+ };
+
+ /**
+ * Class representing a scheduled event and providing methods for interacting with it.
+ */
+ class TaskExecutor::EventState {
+ MONGO_DISALLOW_COPYING(EventState);
+ public:
+
+ virtual ~EventState();
+
+ virtual void signal() = 0;
+ virtual void waitUntilSignaled() = 0;
+ virtual bool isSignaled() = 0;
+
+ protected:
+
+ EventState();
+ };
+
+ /**
+ * Handle to an EventState.
+ */
+ class TaskExecutor::EventHandle {
+ friend class TaskExecutor;
+
+ public:
+
+ explicit EventHandle(std::shared_ptr<EventState> event);
+
+ bool operator==(const EventHandle &other) const {
+ return _event == other._event;
+ }
+
+ bool operator!=(const EventHandle &other) const {
+ return !(*this == other);
+ }
+
+ private:
+
+ EventState* getEventState() {
+ return _event.get();
+ }
+
+ std::shared_ptr<EventState> _event;
+ };
+
+} // namespace executor
+} // namespace mongo
+