summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2019-02-08 14:15:17 -0500
committerBen Caimano <ben.caimano@10gen.com>2019-02-12 11:14:44 -0500
commit83bcdbd102e48afb5d10f264dd7c38aa75f40ece (patch)
tree19d747e3e422a4cfcb382a7ef31d70c1f335d825
parent77dccdfdc54eab46031f465a2e46570cc5c3c52f (diff)
downloadmongo-83bcdbd102e48afb5d10f264dd7c38aa75f40ece.tar.gz
SERVER-39466 Make transport::Reactor inherit from OutOfLineExecutor
-rw-r--r--src/mongo/executor/network_interface_tl.cpp4
-rw-r--r--src/mongo/executor/network_interface_tl.h3
-rw-r--r--src/mongo/transport/service_executor_adaptive.cpp4
-rw-r--r--src/mongo/transport/service_executor_test.cpp12
-rw-r--r--src/mongo/transport/transport_layer.h19
-rw-r--r--src/mongo/transport/transport_layer_asio.cpp12
-rw-r--r--src/mongo/util/out_of_line_executor.h5
7 files changed, 25 insertions, 34 deletions
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
index 5ca24991e8c..a2bf8f8122e 100644
--- a/src/mongo/executor/network_interface_tl.cpp
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -456,7 +456,7 @@ Status NetworkInterfaceTL::setAlarm(Date_t when, unique_function<void()> action)
}
if (when <= now()) {
- _reactor->schedule(transport::Reactor::kPost, std::move(action));
+ _reactor->schedule(std::move(action));
return Status::OK();
}
@@ -491,7 +491,7 @@ Status NetworkInterfaceTL::setAlarm(Date_t when, unique_function<void()> action)
}
if (status.isOK()) {
- _reactor->schedule(transport::Reactor::kPost, std::move(action));
+ _reactor->schedule(std::move(action));
} else if (status != ErrorCodes::CallbackCanceled) {
warning() << "setAlarm() received an error: " << status;
}
diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h
index 5cd5dd6115b..f2aa47dc6f6 100644
--- a/src/mongo/executor/network_interface_tl.h
+++ b/src/mongo/executor/network_interface_tl.h
@@ -97,8 +97,7 @@ private:
transport::ReactorHandle reactor;
void operator()(ConnectionPool::ConnectionInterface* ptr) const {
- reactor->schedule(transport::Reactor::kDispatch,
- [ ret = returner, ptr ] { ret(ptr); });
+ reactor->dispatch([ ret = returner, ptr ] { ret(ptr); });
}
};
using ConnHandle = std::unique_ptr<ConnectionPool::ConnectionInterface, Deleter>;
diff --git a/src/mongo/transport/service_executor_adaptive.cpp b/src/mongo/transport/service_executor_adaptive.cpp
index b3787002382..d2631b17b25 100644
--- a/src/mongo/transport/service_executor_adaptive.cpp
+++ b/src/mongo/transport/service_executor_adaptive.cpp
@@ -248,9 +248,9 @@ Status ServiceExecutorAdaptive::schedule(ServiceExecutorAdaptive::Task task,
// can be called immediately and recursively.
if ((flags & kMayRecurse) &&
(_localThreadState->recursionDepth + 1 < _config->recursionLimit())) {
- _reactorHandle->schedule(Reactor::kDispatch, std::move(wrappedTask));
+ _reactorHandle->dispatch(std::move(wrappedTask));
} else {
- _reactorHandle->schedule(Reactor::kPost, std::move(wrappedTask));
+ _reactorHandle->schedule(std::move(wrappedTask));
}
_lastScheduleTimer.reset();
diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp
index 9d2d28d12c1..0dc13e5e3c2 100644
--- a/src/mongo/transport/service_executor_test.cpp
+++ b/src/mongo/transport/service_executor_test.cpp
@@ -128,12 +128,12 @@ public:
MONGO_UNREACHABLE;
}
- void schedule(ScheduleMode mode, Task task) final {
- if (mode == kDispatch) {
- asio::dispatch(_ioContext, std::move(task));
- } else {
- asio::post(_ioContext, std::move(task));
- }
+ void schedule(Task task) final {
+ asio::post(_ioContext, std::move(task));
+ }
+
+ void dispatch(Task task) final {
+ asio::dispatch(_ioContext, std::move(task));
}
bool onReactorThread() const final {
diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h
index a96fbeb3d5c..d911525c019 100644
--- a/src/mongo/transport/transport_layer.h
+++ b/src/mongo/transport/transport_layer.h
@@ -38,6 +38,7 @@
#include "mongo/transport/session.h"
#include "mongo/util/functional.h"
#include "mongo/util/future.h"
+#include "mongo/util/out_of_line_executor.h"
#include "mongo/util/time_support.h"
namespace mongo {
@@ -153,7 +154,7 @@ private:
const size_t _id;
};
-class Reactor {
+class Reactor : public OutOfLineExecutor {
public:
Reactor(const Reactor&) = delete;
Reactor& operator=(const Reactor&) = delete;
@@ -168,20 +169,8 @@ public:
virtual void stop() = 0;
virtual void drain() = 0;
- using Task = unique_function<void()>;
-
- enum ScheduleMode { kDispatch, kPost };
- virtual void schedule(ScheduleMode mode, Task task) = 0;
-
- template <typename Callback>
- Future<FutureContinuationResult<Callback>> execute(Callback&& cb) {
- auto pf = makePromiseFuture<FutureContinuationResult<Callback>>();
- schedule(kPost, [ cb = std::forward<Callback>(cb), p = std::move(pf.promise) ]() mutable {
- p.setWith(cb);
- });
-
- return std::move(pf.future);
- }
+ virtual void schedule(Task task) = 0;
+ virtual void dispatch(Task task) = 0;
virtual bool onReactorThread() const = 0;
diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp
index 6f0c02d17fb..2eba5063755 100644
--- a/src/mongo/transport/transport_layer_asio.cpp
+++ b/src/mongo/transport/transport_layer_asio.cpp
@@ -183,12 +183,12 @@ public:
return Date_t(asio::system_timer::clock_type::now());
}
- void schedule(ScheduleMode mode, Task task) override {
- if (mode == kDispatch) {
- asio::dispatch(_ioContext, std::move(task));
- } else {
- asio::post(_ioContext, std::move(task));
- }
+ void schedule(Task task) override {
+ asio::post(_ioContext, std::move(task));
+ }
+
+ void dispatch(Task task) override {
+ asio::dispatch(_ioContext, std::move(task));
}
bool onReactorThread() const override {
diff --git a/src/mongo/util/out_of_line_executor.h b/src/mongo/util/out_of_line_executor.h
index 310b26cdd21..4aa23ad411c 100644
--- a/src/mongo/util/out_of_line_executor.h
+++ b/src/mongo/util/out_of_line_executor.h
@@ -50,6 +50,9 @@ namespace mongo {
*/
class OutOfLineExecutor {
public:
+ using Task = unique_function<void()>;
+
+public:
/**
* Invokes the callback on the executor, as in schedule(), returning a future with its result.
* That future may be ready by the time the caller returns, which means that continuations
@@ -69,7 +72,7 @@ public:
/**
* Invokes the callback on the executor. This never happens immediately on the caller's stack.
*/
- virtual void schedule(unique_function<void()> func) = 0;
+ virtual void schedule(Task func) = 0;
protected:
~OutOfLineExecutor() noexcept {}