summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2020-09-23 15:09:24 +0000
committerBen Caimano <ben.caimano@10gen.com>2020-10-20 22:06:45 +0000
commit439730c7f46977844042c20591a368854d010902 (patch)
tree0ec8fecbac477596a4d365f614dd2c316a7c0cf7
parent18f1d000290ef77758455bbbe9c5de174614c3e7 (diff)
downloadmongo-439730c7f46977844042c20591a368854d010902.tar.gz
SERVER-51279 Updating runOnDataAvailable
-rw-r--r--src/mongo/transport/SConscript1
-rw-r--r--src/mongo/transport/mock_session.h6
-rw-r--r--src/mongo/transport/service_executor.h2
-rw-r--r--src/mongo/transport/service_executor_fixed.cpp6
-rw-r--r--src/mongo/transport/service_executor_fixed.h2
-rw-r--r--src/mongo/transport/service_executor_reserved.cpp2
-rw-r--r--src/mongo/transport/service_executor_reserved.h2
-rw-r--r--src/mongo/transport/service_executor_synchronous.cpp2
-rw-r--r--src/mongo/transport/service_executor_synchronous.h2
-rw-r--r--src/mongo/transport/service_executor_test.cpp2
-rw-r--r--src/mongo/transport/service_executor_utils.cpp16
-rw-r--r--src/mongo/transport/service_executor_utils.h2
-rw-r--r--src/mongo/transport/service_state_machine.cpp31
-rw-r--r--src/mongo/transport/session.h5
-rw-r--r--src/mongo/transport/session_asio.h16
15 files changed, 59 insertions, 38 deletions
diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript
index 7932abc662d..f3053f121c4 100644
--- a/src/mongo/transport/SConscript
+++ b/src/mongo/transport/SConscript
@@ -188,6 +188,7 @@ tlEnv.CppUnitTest(
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/dbmessage',
'$BUILD_DIR/mongo/db/service_context',
+ '$BUILD_DIR/mongo/db/service_context_test_fixture',
'$BUILD_DIR/mongo/rpc/protocol',
'$BUILD_DIR/mongo/rpc/rpc',
'$BUILD_DIR/mongo/unittest/unittest',
diff --git a/src/mongo/transport/mock_session.h b/src/mongo/transport/mock_session.h
index 72d05aa3e66..b1ed38a9081 100644
--- a/src/mongo/transport/mock_session.h
+++ b/src/mongo/transport/mock_session.h
@@ -103,7 +103,11 @@ public:
return Future<Message>::makeReady(sourceMessage());
}
- Future<void> waitForData() override {
+ Status waitForData() override {
+ return asyncWaitForData().getNoThrow();
+ }
+
+ Future<void> asyncWaitForData() override {
auto fp = makePromiseFuture<void>();
stdx::lock_guard<Latch> lk(_waitForDataMutex);
_waitForDataQueue.emplace_back(std::move(fp.promise));
diff --git a/src/mongo/transport/service_executor.h b/src/mongo/transport/service_executor.h
index ea27129ab94..773f9372913 100644
--- a/src/mongo/transport/service_executor.h
+++ b/src/mongo/transport/service_executor.h
@@ -100,7 +100,7 @@ public:
* schedule the callback on current executor. Otherwise, it will invoke the callback with a
* non-okay status on the caller thread.
*/
- virtual void runOnDataAvailable(Session* session,
+ virtual void runOnDataAvailable(const SessionHandle& session,
OutOfLineExecutor::Task onCompletionCallback) = 0;
/*
diff --git a/src/mongo/transport/service_executor_fixed.cpp b/src/mongo/transport/service_executor_fixed.cpp
index f48a9d7a170..1cd8c213afe 100644
--- a/src/mongo/transport/service_executor_fixed.cpp
+++ b/src/mongo/transport/service_executor_fixed.cpp
@@ -169,10 +169,12 @@ Status ServiceExecutorFixed::scheduleTask(Task task, ScheduleFlags flags) {
return Status::OK();
}
-void ServiceExecutorFixed::runOnDataAvailable(Session* session,
+void ServiceExecutorFixed::runOnDataAvailable(const SessionHandle& session,
OutOfLineExecutor::Task onCompletionCallback) {
invariant(session);
- session->waitForData().thenRunOn(shared_from_this()).getAsync(std::move(onCompletionCallback));
+ session->asyncWaitForData()
+ .thenRunOn(shared_from_this())
+ .getAsync(std::move(onCompletionCallback));
}
void ServiceExecutorFixed::appendStats(BSONObjBuilder* bob) const {
diff --git a/src/mongo/transport/service_executor_fixed.h b/src/mongo/transport/service_executor_fixed.h
index baf3a7be7e3..4e6f3b4f9cc 100644
--- a/src/mongo/transport/service_executor_fixed.h
+++ b/src/mongo/transport/service_executor_fixed.h
@@ -61,7 +61,7 @@ public:
Status shutdown(Milliseconds timeout) override;
Status scheduleTask(Task task, ScheduleFlags flags) override;
- void runOnDataAvailable(Session* session,
+ void runOnDataAvailable(const SessionHandle& session,
OutOfLineExecutor::Task onCompletionCallback) override;
Mode transportMode() const override {
diff --git a/src/mongo/transport/service_executor_reserved.cpp b/src/mongo/transport/service_executor_reserved.cpp
index d81ad5be200..b4d3709008a 100644
--- a/src/mongo/transport/service_executor_reserved.cpp
+++ b/src/mongo/transport/service_executor_reserved.cpp
@@ -221,7 +221,7 @@ void ServiceExecutorReserved::appendStats(BSONObjBuilder* bob) const {
<< static_cast<int>(_numStartingThreads);
}
-void ServiceExecutorReserved::runOnDataAvailable(Session* session,
+void ServiceExecutorReserved::runOnDataAvailable(const SessionHandle& session,
OutOfLineExecutor::Task onCompletionCallback) {
scheduleCallbackOnDataAvailable(session, std::move(onCompletionCallback), this);
}
diff --git a/src/mongo/transport/service_executor_reserved.h b/src/mongo/transport/service_executor_reserved.h
index 60de4bc2993..f29d91edd23 100644
--- a/src/mongo/transport/service_executor_reserved.h
+++ b/src/mongo/transport/service_executor_reserved.h
@@ -65,7 +65,7 @@ public:
return Mode::kSynchronous;
}
- void runOnDataAvailable(Session* session,
+ void runOnDataAvailable(const SessionHandle& session,
OutOfLineExecutor::Task onCompletionCallback) override;
void appendStats(BSONObjBuilder* bob) const override;
diff --git a/src/mongo/transport/service_executor_synchronous.cpp b/src/mongo/transport/service_executor_synchronous.cpp
index c75bdbb0952..b5df5501d82 100644
--- a/src/mongo/transport/service_executor_synchronous.cpp
+++ b/src/mongo/transport/service_executor_synchronous.cpp
@@ -156,7 +156,7 @@ void ServiceExecutorSynchronous::appendStats(BSONObjBuilder* bob) const {
<< static_cast<int>(_numRunningWorkerThreads.loadRelaxed());
}
-void ServiceExecutorSynchronous::runOnDataAvailable(Session* session,
+void ServiceExecutorSynchronous::runOnDataAvailable(const SessionHandle& session,
OutOfLineExecutor::Task onCompletionCallback) {
scheduleCallbackOnDataAvailable(session, std::move(onCompletionCallback), this);
}
diff --git a/src/mongo/transport/service_executor_synchronous.h b/src/mongo/transport/service_executor_synchronous.h
index 840dc702d4c..826cbfdcf1d 100644
--- a/src/mongo/transport/service_executor_synchronous.h
+++ b/src/mongo/transport/service_executor_synchronous.h
@@ -60,7 +60,7 @@ public:
return Mode::kSynchronous;
}
- void runOnDataAvailable(Session* session,
+ void runOnDataAvailable(const SessionHandle& session,
OutOfLineExecutor::Task onCompletionCallback) override;
void appendStats(BSONObjBuilder* bob) const override;
diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp
index 608b0a09232..747c18532da 100644
--- a/src/mongo/transport/service_executor_test.cpp
+++ b/src/mongo/transport/service_executor_test.cpp
@@ -357,7 +357,7 @@ TEST_F(ServiceExecutorFixedFixture, RunTaskAfterWaitingForData) {
AtomicWord<bool> ranOnDataAvailable{false};
auto barrier = std::make_shared<unittest::Barrier>(2);
executorHandle->runOnDataAvailable(
- session.get(), [&ranOnDataAvailable, mainThreadId, barrier](Status) mutable -> void {
+ session, [&ranOnDataAvailable, mainThreadId, barrier](Status) mutable -> void {
ranOnDataAvailable.store(true);
ASSERT(stdx::this_thread::get_id() != mainThreadId);
barrier->countDownAndWait();
diff --git a/src/mongo/transport/service_executor_utils.cpp b/src/mongo/transport/service_executor_utils.cpp
index d8165fdd740..0ae26a1130a 100644
--- a/src/mongo/transport/service_executor_utils.cpp
+++ b/src/mongo/transport/service_executor_utils.cpp
@@ -146,16 +146,18 @@ Status launchServiceWorkerThread(unique_function<void()> task) noexcept {
return Status::OK();
}
-void scheduleCallbackOnDataAvailable(transport::Session* session,
+void scheduleCallbackOnDataAvailable(const transport::SessionHandle& session,
unique_function<void(Status)> callback,
transport::ServiceExecutor* executor) noexcept {
invariant(session);
- try {
- session->waitForData().get();
- executor->schedule(std::move(callback));
- } catch (DBException& e) {
- callback(e.toStatus());
- }
+ executor->schedule([session, callback = std::move(callback)](Status status) {
+ if (!status.isOK()) {
+ callback(std::move(status));
+ return;
+ }
+
+ callback(session->waitForData());
+ });
}
} // namespace mongo
diff --git a/src/mongo/transport/service_executor_utils.h b/src/mongo/transport/service_executor_utils.h
index 3dc0c4ab11f..10ab4880dea 100644
--- a/src/mongo/transport/service_executor_utils.h
+++ b/src/mongo/transport/service_executor_utils.h
@@ -46,7 +46,7 @@ Status launchServiceWorkerThread(unique_function<void()> task) noexcept;
* thread until data is available for reading. On success, it schedules "callback" on "executor".
* Other implementations (e.g., "ServiceExecutorFixed") may provide asynchronous variants.
*/
-void scheduleCallbackOnDataAvailable(transport::Session* session,
+void scheduleCallbackOnDataAvailable(const transport::SessionHandle& session,
unique_function<void(Status)> callback,
transport::ServiceExecutor* executor) noexcept;
diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp
index acc60bc3fcb..9a773cd2a48 100644
--- a/src/mongo/transport/service_state_machine.cpp
+++ b/src/mongo/transport/service_state_machine.cpp
@@ -257,7 +257,7 @@ public:
/*
* Releases all the resources associated with the session and call the cleanupHook.
*/
- void cleanupSession();
+ void cleanupSession(const Status& status);
/*
* This is the initial function called at the beginning of a thread's lifecycle in the
@@ -523,8 +523,8 @@ void ServiceStateMachine::Impl::start(ServiceExecutorContext seCtx) {
auto cb = [this, anchor = shared_from_this()](Status status) {
_clientStrand->run([&] {
- if (ErrorCodes::isCancelationError(status)) {
- cleanupSession();
+ if (ErrorCodes::isCancelationError(status) || ErrorCodes::isNetworkError(status)) {
+ cleanupSession(status);
return;
}
invariant(status);
@@ -532,7 +532,7 @@ void ServiceStateMachine::Impl::start(ServiceExecutorContext seCtx) {
runOnce();
});
};
- executor()->schedule(std::move(cb));
+ executor()->runOnDataAvailable(session(), std::move(cb));
}
void ServiceStateMachine::Impl::runOnce() {
@@ -568,18 +568,16 @@ void ServiceStateMachine::Impl::runOnce() {
"Terminating session due to error",
"error"_attr = status);
terminate();
+ cleanupSession(status);
- auto cb = [this, anchor = shared_from_this()](Status status) {
- _clientStrand->run([&] { cleanupSession(); });
- };
- executor()->schedule(std::move(cb));
return;
}
auto cb = [this, anchor = shared_from_this()](Status status) {
_clientStrand->run([&] {
- if (ErrorCodes::isCancelationError(status)) {
- cleanupSession();
+ if (ErrorCodes::isCancelationError(status) ||
+ ErrorCodes::isNetworkError(status)) {
+ cleanupSession(status);
return;
}
invariant(status);
@@ -587,7 +585,14 @@ void ServiceStateMachine::Impl::runOnce() {
runOnce();
});
};
- executor()->schedule(std::move(cb));
+
+ // Start our loop again with a new stack.
+ if (_inExhaust) {
+ // If we're in exhaust, we're not expecting more data.
+ executor()->schedule(std::move(cb));
+ } else {
+ executor()->runOnDataAvailable(session(), std::move(cb));
+ }
});
}
@@ -652,7 +657,9 @@ void ServiceStateMachine::Impl::setCleanupHook(std::function<void()> hook) {
_cleanupHook = std::move(hook);
}
-void ServiceStateMachine::Impl::cleanupSession() {
+void ServiceStateMachine::Impl::cleanupSession(const Status& status) {
+ LOGV2_INFO(5127900, "Ending session", "error"_attr = status);
+
// Ensure the delayed destruction of opCtx always happens before doing the cleanup.
if (MONGO_likely(_opCtx)) {
_opCtx.reset();
diff --git a/src/mongo/transport/session.h b/src/mongo/transport/session.h
index 39595d8ed37..0ea3ff0db79 100644
--- a/src/mongo/transport/session.h
+++ b/src/mongo/transport/session.h
@@ -116,9 +116,10 @@ public:
virtual Future<Message> asyncSourceMessage(const BatonHandle& handle = nullptr) = 0;
/**
- * Asynchronously waits for the availability of incoming data.
+ * Waits for the availability of incoming data.
*/
- virtual Future<void> waitForData() = 0;
+ virtual Status waitForData() = 0;
+ virtual Future<void> asyncWaitForData() = 0;
/**
* Sink (send) a Message to the remote host for this Session.
diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h
index 263f76439e4..fa15a00710c 100644
--- a/src/mongo/transport/session_asio.h
+++ b/src/mongo/transport/session_asio.h
@@ -167,12 +167,16 @@ public:
return sourceMessageImpl(baton);
}
- Future<void> waitForData() override {
-#ifdef MONGO_CONFIG_SSL
- if (_sslSocket)
- return asio::async_read(*_sslSocket, asio::null_buffers(), UseFuture{}).ignoreValue();
-#endif
- return asio::async_read(_socket, asio::null_buffers(), UseFuture{}).ignoreValue();
+ Status waitForData() override {
+ ensureSync();
+ asio::error_code ec;
+ getSocket().wait(asio::ip::tcp::socket::wait_read, ec);
+ return errorCodeToStatus(ec);
+ }
+
+ Future<void> asyncWaitForData() override {
+ ensureAsync();
+ return getSocket().async_wait(asio::ip::tcp::socket::wait_read, UseFuture{});
}
Status sinkMessage(Message message) override {