summaryrefslogtreecommitdiff
path: root/src/mongo/transport/session_asio.h
diff options
context:
space:
mode:
authorAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2022-04-28 15:55:44 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-28 16:37:31 +0000
commit2255f824d44caf48f9c8b0e23ffaf8483f4e0afe (patch)
tree97e265bc0b5c71ee71270f0b5c309bb4d27776df /src/mongo/transport/session_asio.h
parentdbbf2c06895528cbb96d1098ef00e8bca08e9784 (diff)
downloadmongo-2255f824d44caf48f9c8b0e23ffaf8483f4e0afe.tar.gz
SERVER-64191 Strictly order cancellation/scheduling of async operations on `Session`
Diffstat (limited to 'src/mongo/transport/session_asio.h')
-rw-r--r--src/mongo/transport/session_asio.h60
1 files changed, 60 insertions, 0 deletions
diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h
index aa747202945..5142d60202d 100644
--- a/src/mongo/transport/session_asio.h
+++ b/src/mongo/transport/session_asio.h
@@ -34,6 +34,7 @@
#include "mongo/base/system_error.h"
#include "mongo/config.h"
#include "mongo/db/stats/counters.h"
+#include "mongo/stdx/mutex.h"
#include "mongo/transport/asio_utils.h"
#include "mongo/transport/baton.h"
#include "mongo/transport/ssl_connection_context.h"
@@ -160,10 +161,57 @@ protected:
void ensureAsync();
private:
+ /**
+ * Provides the means to track and cancel async I/O operations scheduled through `Session`.
+ * Any I/O operation goes through the following steps:
+ * - `start()`: changes the state from `kNotStarted` to `kRunning`.
+ * - Before scheduling the async operation, checks for cancellation through `isCanceled()`.
+ * - `complete()`: clears the state, and prepares the session for future operations.
+ *
+ * This class is thread-safe.
+ */
+ class AsyncOperationState {
+ public:
+ void start() {
+ const auto prev = _state.swap(State::kRunning);
+ invariant(prev == State::kNotStarted, "Another operation was in progress");
+ }
+
+ bool isCanceled() const {
+ return _state.load() == State::kCanceled;
+ }
+
+ void complete() {
+ const auto prev = _state.swap(State::kNotStarted);
+ invariant(prev != State::kNotStarted, "No operation was running");
+ }
+
+ /**
+ * Instructs an active operation to cancel (if there is any). Otherwise, it does nothing.
+ * Cancellation is non-blocking and `cancel()` doesn't block for completion of ongoing
+ * operations.
+ */
+ void cancel() {
+ auto expected = State::kRunning;
+ _state.compareAndSwap(&expected, State::kCanceled);
+ }
+
+ private:
+ /**
+ * State transition diagram:
+ * -+-> [kNotStarted] --> [kRunning] --> [kCanceled]
+ * | | |
+ * +--------------------------+--------------+
+ */
+ enum class State { kNotStarted, kRunning, kCanceled };
+ AtomicWord<State> _state{State::kNotStarted};
+ };
+
GenericSocket& getSocket();
ExecutorFuture<void> parseProxyProtocolHeader(const ReactorHandle& reactor);
Future<Message> sourceMessageImpl(const BatonHandle& baton = nullptr);
+ Future<void> sinkMessageImpl(Message message, const BatonHandle& baton = nullptr);
template <typename MutableBufferSequence>
Future<void> read(const MutableBufferSequence& buffers, const BatonHandle& baton = nullptr);
@@ -257,6 +305,18 @@ private:
bool _isFromLoadBalancer = false;
boost::optional<SockAddr> _proxiedSrcEndpoint;
boost::optional<SockAddr> _proxiedDstEndpoint;
+
+ AsyncOperationState _asyncOpState;
+
+ /**
+ * The following mutex strictly orders the start and cancellation of asynchronous operations:
+ * - Holding the mutex while starting asynchronous operations (e.g., adding the session to the
+ * networking baton) ensures cancellation either happens before or after scheduling of the
+ * operation.
+ * - Holding the mutex while canceling asynchronous operations guarantees no operation can start
+ * while cancellation is in progress.
+ */
+ stdx::mutex _asyncOpMutex; // NOLINT
};
} // namespace mongo::transport