From 2255f824d44caf48f9c8b0e23ffaf8483f4e0afe Mon Sep 17 00:00:00 2001 From: Amirsaman Memaripour Date: Thu, 28 Apr 2022 15:55:44 +0000 Subject: SERVER-64191 Strictly order cancellation/scheduling of async operations on `Session` --- src/mongo/transport/session_asio.h | 60 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) (limited to 'src/mongo/transport/session_asio.h') diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h index aa747202945..5142d60202d 100644 --- a/src/mongo/transport/session_asio.h +++ b/src/mongo/transport/session_asio.h @@ -34,6 +34,7 @@ #include "mongo/base/system_error.h" #include "mongo/config.h" #include "mongo/db/stats/counters.h" +#include "mongo/stdx/mutex.h" #include "mongo/transport/asio_utils.h" #include "mongo/transport/baton.h" #include "mongo/transport/ssl_connection_context.h" @@ -160,10 +161,57 @@ protected: void ensureAsync(); private: + /** + * Provides the means to track and cancel async I/O operations scheduled through `Session`. + * Any I/O operation goes through the following steps: + * - `start()`: changes the state from `kNotStarted` to `kRunning`. + * - Before scheduling the async operation, checks for cancellation through `isCanceled()`. + * - `complete()`: clears the state, and prepares the session for future operations. + * + * This class is thread-safe. + */ + class AsyncOperationState { + public: + void start() { + const auto prev = _state.swap(State::kRunning); + invariant(prev == State::kNotStarted, "Another operation was in progress"); + } + + bool isCanceled() const { + return _state.load() == State::kCanceled; + } + + void complete() { + const auto prev = _state.swap(State::kNotStarted); + invariant(prev != State::kNotStarted, "No operation was running"); + } + + /** + * Instructs an active operation to cancel (if there is any). Otherwise, it does nothing. + * Cancellation is non-blocking and `cancel()` doesn't block for completion of ongoing + * operations. + */ + void cancel() { + auto expected = State::kRunning; + _state.compareAndSwap(&expected, State::kCanceled); + } + + private: + /** + * State transition diagram: + * -+-> [kNotStarted] --> [kRunning] --> [kCanceled] + * | | | + * +--------------------------+--------------+ + */ + enum class State { kNotStarted, kRunning, kCanceled }; + AtomicWord _state{State::kNotStarted}; + }; + GenericSocket& getSocket(); ExecutorFuture parseProxyProtocolHeader(const ReactorHandle& reactor); Future sourceMessageImpl(const BatonHandle& baton = nullptr); + Future sinkMessageImpl(Message message, const BatonHandle& baton = nullptr); template Future read(const MutableBufferSequence& buffers, const BatonHandle& baton = nullptr); @@ -257,6 +305,18 @@ private: bool _isFromLoadBalancer = false; boost::optional _proxiedSrcEndpoint; boost::optional _proxiedDstEndpoint; + + AsyncOperationState _asyncOpState; + + /** + * The following mutex strictly orders the start and cancellation of asynchronous operations: + * - Holding the mutex while starting asynchronous operations (e.g., adding the session to the + * networking baton) ensures cancellation either happens before or after scheduling of the + * operation. + * - Holding the mutex while canceling asynchronous operations guarantees no operation can start + * while cancellation is in progress. + */ + stdx::mutex _asyncOpMutex; // NOLINT }; } // namespace mongo::transport -- cgit v1.2.1