summaryrefslogtreecommitdiff
path: root/src/mongo/transport
diff options
context:
space:
mode:
authorGeorge Wangensteen <george.wangensteen@mongodb.com>2021-10-13 19:38:12 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-10-13 20:19:56 +0000
commitb429d5dda98bbe18ab0851ffd1729d3b57fc8a4e (patch)
tree9bfc2b28acfa5ae9e33863ccb2e1922f3faea038 /src/mongo/transport
parentd302f66d4ccd9bf2478518cacce5863dcc2f0c12 (diff)
downloadmongo-b429d5dda98bbe18ab0851ffd1729d3b57fc8a4e.tar.gz
SERVER-58503 Kill open cursors for a connection when a load balanced connection on mongos is closed
Diffstat (limited to 'src/mongo/transport')
-rw-r--r--src/mongo/transport/service_entry_point.h8
-rw-r--r--src/mongo/transport/service_state_machine.cpp3
-rw-r--r--src/mongo/transport/service_state_machine_test.cpp26
3 files changed, 36 insertions, 1 deletions
diff --git a/src/mongo/transport/service_entry_point.h b/src/mongo/transport/service_entry_point.h
index 3e8d279b7b9..44786921474 100644
--- a/src/mongo/transport/service_entry_point.h
+++ b/src/mongo/transport/service_entry_point.h
@@ -32,6 +32,7 @@
#include <limits>
#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/client.h"
#include "mongo/db/dbmessage.h"
#include "mongo/transport/session.h"
#include "mongo/util/future.h"
@@ -102,6 +103,13 @@ public:
*/
virtual void onEndSession(const transport::SessionHandle&) {}
+ /**
+ * Optional handler which is invoked after a client disconnect. A client disconnect occurs when
+ * the connection between the mongo process and client is closed for any reason, and is defined
+ * by the destruction and cleanup of the ServiceStateMachine that manages the client.
+ */
+ virtual void onClientDisconnect(Client* client) {}
+
protected:
ServiceEntryPoint() = default;
};
diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp
index e1c97e4511c..ce330e8b060 100644
--- a/src/mongo/transport/service_state_machine.cpp
+++ b/src/mongo/transport/service_state_machine.cpp
@@ -549,9 +549,10 @@ void ServiceStateMachine::Impl::cleanupSession(const Status& status) {
LOGV2_DEBUG(5127900, 2, "Ending session", "error"_attr = status);
cleanupExhaustResources();
+ auto client = _clientStrand->getClientPointer();
+ _sep->onClientDisconnect(client);
{
- auto client = _clientStrand->getClientPointer();
stdx::lock_guard lk(*client);
transport::ServiceExecutorContext::reset(client);
}
diff --git a/src/mongo/transport/service_state_machine_test.cpp b/src/mongo/transport/service_state_machine_test.cpp
index 3f218bf8c9b..8de9f16d587 100644
--- a/src/mongo/transport/service_state_machine_test.cpp
+++ b/src/mongo/transport/service_state_machine_test.cpp
@@ -258,6 +258,10 @@ public:
return _data->isConnected.load();
}
+ int onClientDisconnectCalledTimes() const {
+ return _onClientDisconnectCalled;
+ }
+
friend constexpr StringData toString(FailureCondition fail) {
switch (fail) {
case FailureCondition::kNone:
@@ -452,6 +456,10 @@ private:
_stateQueue.push(IngressState::kEnd);
}
+ void _onClientDisconnect() {
+ ++_onClientDisconnectCalled;
+ }
+
const boost::optional<ServiceExecutor::ThreadingModel> _threadingModel;
boost::optional<ServiceExecutor::ThreadingModel> _originalThreadingModel;
@@ -463,6 +471,8 @@ private:
std::shared_ptr<ServiceStateMachineTest::Session> _session;
SingleProducerSingleConsumerQueue<IngressState> _stateQueue;
+
+ int _onClientDisconnectCalled{0};
};
/**
@@ -552,6 +562,11 @@ public:
_fixture->_cleanup(session);
}
+ void onClientDisconnect(Client* client) override {
+ invariant(client);
+ _fixture->_onClientDisconnect();
+ }
+
private:
ServiceStateMachineTest* const _fixture;
};
@@ -794,6 +809,17 @@ TEST_F(ServiceStateMachineTest, EndBeforeStartSession) {
startSession();
}
+TEST_F(ServiceStateMachineTest, OnClientDisconnectCalledOnCleanup) {
+ initNewSession();
+ startSession();
+ ASSERT_EQ(popIngressState(), IngressState::kSource);
+ ASSERT_EQ(onClientDisconnectCalledTimes(), 0);
+ endSession();
+ ASSERT_EQ(popIngressState(), IngressState::kEnd);
+ joinSession();
+ ASSERT_EQ(onClientDisconnectCalledTimes(), 1);
+}
+
TEST_F(ServiceStateMachineWithDedicatedThreadsTest, DefaultLoop) {
auto runner = StepRunner(this);