diff options
author | Ben Caimano <ben.caimano@10gen.com> | 2020-09-23 15:09:24 +0000 |
---|---|---|
committer | Ben Caimano <ben.caimano@10gen.com> | 2020-10-20 22:06:45 +0000 |
commit | 439730c7f46977844042c20591a368854d010902 (patch) | |
tree | 0ec8fecbac477596a4d365f614dd2c316a7c0cf7 | |
parent | 18f1d000290ef77758455bbbe9c5de174614c3e7 (diff) | |
download | mongo-439730c7f46977844042c20591a368854d010902.tar.gz |
SERVER-51279 Updating runOnDataAvailable
-rw-r--r-- | src/mongo/transport/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/transport/mock_session.h | 6 | ||||
-rw-r--r-- | src/mongo/transport/service_executor.h | 2 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_fixed.cpp | 6 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_fixed.h | 2 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_reserved.cpp | 2 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_reserved.h | 2 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_synchronous.cpp | 2 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_synchronous.h | 2 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_test.cpp | 2 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_utils.cpp | 16 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_utils.h | 2 | ||||
-rw-r--r-- | src/mongo/transport/service_state_machine.cpp | 31 | ||||
-rw-r--r-- | src/mongo/transport/session.h | 5 | ||||
-rw-r--r-- | src/mongo/transport/session_asio.h | 16 |
15 files changed, 59 insertions, 38 deletions
diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript index 7932abc662d..f3053f121c4 100644 --- a/src/mongo/transport/SConscript +++ b/src/mongo/transport/SConscript @@ -188,6 +188,7 @@ tlEnv.CppUnitTest( '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/dbmessage', '$BUILD_DIR/mongo/db/service_context', + '$BUILD_DIR/mongo/db/service_context_test_fixture', '$BUILD_DIR/mongo/rpc/protocol', '$BUILD_DIR/mongo/rpc/rpc', '$BUILD_DIR/mongo/unittest/unittest', diff --git a/src/mongo/transport/mock_session.h b/src/mongo/transport/mock_session.h index 72d05aa3e66..b1ed38a9081 100644 --- a/src/mongo/transport/mock_session.h +++ b/src/mongo/transport/mock_session.h @@ -103,7 +103,11 @@ public: return Future<Message>::makeReady(sourceMessage()); } - Future<void> waitForData() override { + Status waitForData() override { + return asyncWaitForData().getNoThrow(); + } + + Future<void> asyncWaitForData() override { auto fp = makePromiseFuture<void>(); stdx::lock_guard<Latch> lk(_waitForDataMutex); _waitForDataQueue.emplace_back(std::move(fp.promise)); diff --git a/src/mongo/transport/service_executor.h b/src/mongo/transport/service_executor.h index ea27129ab94..773f9372913 100644 --- a/src/mongo/transport/service_executor.h +++ b/src/mongo/transport/service_executor.h @@ -100,7 +100,7 @@ public: * schedule the callback on current executor. Otherwise, it will invoke the callback with a * non-okay status on the caller thread. */ - virtual void runOnDataAvailable(Session* session, + virtual void runOnDataAvailable(const SessionHandle& session, OutOfLineExecutor::Task onCompletionCallback) = 0; /* diff --git a/src/mongo/transport/service_executor_fixed.cpp b/src/mongo/transport/service_executor_fixed.cpp index f48a9d7a170..1cd8c213afe 100644 --- a/src/mongo/transport/service_executor_fixed.cpp +++ b/src/mongo/transport/service_executor_fixed.cpp @@ -169,10 +169,12 @@ Status ServiceExecutorFixed::scheduleTask(Task task, ScheduleFlags flags) { return Status::OK(); } -void ServiceExecutorFixed::runOnDataAvailable(Session* session, +void ServiceExecutorFixed::runOnDataAvailable(const SessionHandle& session, OutOfLineExecutor::Task onCompletionCallback) { invariant(session); - session->waitForData().thenRunOn(shared_from_this()).getAsync(std::move(onCompletionCallback)); + session->asyncWaitForData() + .thenRunOn(shared_from_this()) + .getAsync(std::move(onCompletionCallback)); } void ServiceExecutorFixed::appendStats(BSONObjBuilder* bob) const { diff --git a/src/mongo/transport/service_executor_fixed.h b/src/mongo/transport/service_executor_fixed.h index baf3a7be7e3..4e6f3b4f9cc 100644 --- a/src/mongo/transport/service_executor_fixed.h +++ b/src/mongo/transport/service_executor_fixed.h @@ -61,7 +61,7 @@ public: Status shutdown(Milliseconds timeout) override; Status scheduleTask(Task task, ScheduleFlags flags) override; - void runOnDataAvailable(Session* session, + void runOnDataAvailable(const SessionHandle& session, OutOfLineExecutor::Task onCompletionCallback) override; Mode transportMode() const override { diff --git a/src/mongo/transport/service_executor_reserved.cpp b/src/mongo/transport/service_executor_reserved.cpp index d81ad5be200..b4d3709008a 100644 --- a/src/mongo/transport/service_executor_reserved.cpp +++ b/src/mongo/transport/service_executor_reserved.cpp @@ -221,7 +221,7 @@ void ServiceExecutorReserved::appendStats(BSONObjBuilder* bob) const { << static_cast<int>(_numStartingThreads); } -void ServiceExecutorReserved::runOnDataAvailable(Session* session, +void ServiceExecutorReserved::runOnDataAvailable(const SessionHandle& session, OutOfLineExecutor::Task onCompletionCallback) { scheduleCallbackOnDataAvailable(session, std::move(onCompletionCallback), this); } diff --git a/src/mongo/transport/service_executor_reserved.h b/src/mongo/transport/service_executor_reserved.h index 60de4bc2993..f29d91edd23 100644 --- a/src/mongo/transport/service_executor_reserved.h +++ b/src/mongo/transport/service_executor_reserved.h @@ -65,7 +65,7 @@ public: return Mode::kSynchronous; } - void runOnDataAvailable(Session* session, + void runOnDataAvailable(const SessionHandle& session, OutOfLineExecutor::Task onCompletionCallback) override; void appendStats(BSONObjBuilder* bob) const override; diff --git a/src/mongo/transport/service_executor_synchronous.cpp b/src/mongo/transport/service_executor_synchronous.cpp index c75bdbb0952..b5df5501d82 100644 --- a/src/mongo/transport/service_executor_synchronous.cpp +++ b/src/mongo/transport/service_executor_synchronous.cpp @@ -156,7 +156,7 @@ void ServiceExecutorSynchronous::appendStats(BSONObjBuilder* bob) const { << static_cast<int>(_numRunningWorkerThreads.loadRelaxed()); } -void ServiceExecutorSynchronous::runOnDataAvailable(Session* session, +void ServiceExecutorSynchronous::runOnDataAvailable(const SessionHandle& session, OutOfLineExecutor::Task onCompletionCallback) { scheduleCallbackOnDataAvailable(session, std::move(onCompletionCallback), this); } diff --git a/src/mongo/transport/service_executor_synchronous.h b/src/mongo/transport/service_executor_synchronous.h index 840dc702d4c..826cbfdcf1d 100644 --- a/src/mongo/transport/service_executor_synchronous.h +++ b/src/mongo/transport/service_executor_synchronous.h @@ -60,7 +60,7 @@ public: return Mode::kSynchronous; } - void runOnDataAvailable(Session* session, + void runOnDataAvailable(const SessionHandle& session, OutOfLineExecutor::Task onCompletionCallback) override; void appendStats(BSONObjBuilder* bob) const override; diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp index 608b0a09232..747c18532da 100644 --- a/src/mongo/transport/service_executor_test.cpp +++ b/src/mongo/transport/service_executor_test.cpp @@ -357,7 +357,7 @@ TEST_F(ServiceExecutorFixedFixture, RunTaskAfterWaitingForData) { AtomicWord<bool> ranOnDataAvailable{false}; auto barrier = std::make_shared<unittest::Barrier>(2); executorHandle->runOnDataAvailable( - session.get(), [&ranOnDataAvailable, mainThreadId, barrier](Status) mutable -> void { + session, [&ranOnDataAvailable, mainThreadId, barrier](Status) mutable -> void { ranOnDataAvailable.store(true); ASSERT(stdx::this_thread::get_id() != mainThreadId); barrier->countDownAndWait(); diff --git a/src/mongo/transport/service_executor_utils.cpp b/src/mongo/transport/service_executor_utils.cpp index d8165fdd740..0ae26a1130a 100644 --- a/src/mongo/transport/service_executor_utils.cpp +++ b/src/mongo/transport/service_executor_utils.cpp @@ -146,16 +146,18 @@ Status launchServiceWorkerThread(unique_function<void()> task) noexcept { return Status::OK(); } -void scheduleCallbackOnDataAvailable(transport::Session* session, +void scheduleCallbackOnDataAvailable(const transport::SessionHandle& session, unique_function<void(Status)> callback, transport::ServiceExecutor* executor) noexcept { invariant(session); - try { - session->waitForData().get(); - executor->schedule(std::move(callback)); - } catch (DBException& e) { - callback(e.toStatus()); - } + executor->schedule([session, callback = std::move(callback)](Status status) { + if (!status.isOK()) { + callback(std::move(status)); + return; + } + + callback(session->waitForData()); + }); } } // namespace mongo diff --git a/src/mongo/transport/service_executor_utils.h b/src/mongo/transport/service_executor_utils.h index 3dc0c4ab11f..10ab4880dea 100644 --- a/src/mongo/transport/service_executor_utils.h +++ b/src/mongo/transport/service_executor_utils.h @@ -46,7 +46,7 @@ Status launchServiceWorkerThread(unique_function<void()> task) noexcept; * thread until data is available for reading. On success, it schedules "callback" on "executor". * Other implementations (e.g., "ServiceExecutorFixed") may provide asynchronous variants. */ -void scheduleCallbackOnDataAvailable(transport::Session* session, +void scheduleCallbackOnDataAvailable(const transport::SessionHandle& session, unique_function<void(Status)> callback, transport::ServiceExecutor* executor) noexcept; diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp index acc60bc3fcb..9a773cd2a48 100644 --- a/src/mongo/transport/service_state_machine.cpp +++ b/src/mongo/transport/service_state_machine.cpp @@ -257,7 +257,7 @@ public: /* * Releases all the resources associated with the session and call the cleanupHook. */ - void cleanupSession(); + void cleanupSession(const Status& status); /* * This is the initial function called at the beginning of a thread's lifecycle in the @@ -523,8 +523,8 @@ void ServiceStateMachine::Impl::start(ServiceExecutorContext seCtx) { auto cb = [this, anchor = shared_from_this()](Status status) { _clientStrand->run([&] { - if (ErrorCodes::isCancelationError(status)) { - cleanupSession(); + if (ErrorCodes::isCancelationError(status) || ErrorCodes::isNetworkError(status)) { + cleanupSession(status); return; } invariant(status); @@ -532,7 +532,7 @@ void ServiceStateMachine::Impl::start(ServiceExecutorContext seCtx) { runOnce(); }); }; - executor()->schedule(std::move(cb)); + executor()->runOnDataAvailable(session(), std::move(cb)); } void ServiceStateMachine::Impl::runOnce() { @@ -568,18 +568,16 @@ void ServiceStateMachine::Impl::runOnce() { "Terminating session due to error", "error"_attr = status); terminate(); + cleanupSession(status); - auto cb = [this, anchor = shared_from_this()](Status status) { - _clientStrand->run([&] { cleanupSession(); }); - }; - executor()->schedule(std::move(cb)); return; } auto cb = [this, anchor = shared_from_this()](Status status) { _clientStrand->run([&] { - if (ErrorCodes::isCancelationError(status)) { - cleanupSession(); + if (ErrorCodes::isCancelationError(status) || + ErrorCodes::isNetworkError(status)) { + cleanupSession(status); return; } invariant(status); @@ -587,7 +585,14 @@ void ServiceStateMachine::Impl::runOnce() { runOnce(); }); }; - executor()->schedule(std::move(cb)); + + // Start our loop again with a new stack. + if (_inExhaust) { + // If we're in exhaust, we're not expecting more data. + executor()->schedule(std::move(cb)); + } else { + executor()->runOnDataAvailable(session(), std::move(cb)); + } }); } @@ -652,7 +657,9 @@ void ServiceStateMachine::Impl::setCleanupHook(std::function<void()> hook) { _cleanupHook = std::move(hook); } -void ServiceStateMachine::Impl::cleanupSession() { +void ServiceStateMachine::Impl::cleanupSession(const Status& status) { + LOGV2_INFO(5127900, "Ending session", "error"_attr = status); + // Ensure the delayed destruction of opCtx always happens before doing the cleanup. if (MONGO_likely(_opCtx)) { _opCtx.reset(); diff --git a/src/mongo/transport/session.h b/src/mongo/transport/session.h index 39595d8ed37..0ea3ff0db79 100644 --- a/src/mongo/transport/session.h +++ b/src/mongo/transport/session.h @@ -116,9 +116,10 @@ public: virtual Future<Message> asyncSourceMessage(const BatonHandle& handle = nullptr) = 0; /** - * Asynchronously waits for the availability of incoming data. + * Waits for the availability of incoming data. */ - virtual Future<void> waitForData() = 0; + virtual Status waitForData() = 0; + virtual Future<void> asyncWaitForData() = 0; /** * Sink (send) a Message to the remote host for this Session. diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h index 263f76439e4..fa15a00710c 100644 --- a/src/mongo/transport/session_asio.h +++ b/src/mongo/transport/session_asio.h @@ -167,12 +167,16 @@ public: return sourceMessageImpl(baton); } - Future<void> waitForData() override { -#ifdef MONGO_CONFIG_SSL - if (_sslSocket) - return asio::async_read(*_sslSocket, asio::null_buffers(), UseFuture{}).ignoreValue(); -#endif - return asio::async_read(_socket, asio::null_buffers(), UseFuture{}).ignoreValue(); + Status waitForData() override { + ensureSync(); + asio::error_code ec; + getSocket().wait(asio::ip::tcp::socket::wait_read, ec); + return errorCodeToStatus(ec); + } + + Future<void> asyncWaitForData() override { + ensureAsync(); + return getSocket().async_wait(asio::ip::tcp::socket::wait_read, UseFuture{}); } Status sinkMessage(Message message) override { |