summaryrefslogtreecommitdiff
path: root/src/mongo/executor
diff options
context:
space:
mode:
authorAlex Li <alex.li@mongodb.com>2021-08-16 20:27:51 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-08-16 20:54:43 +0000
commit0028db3e9c096e2196e66b1181f5e3c33cc435a3 (patch)
tree908a894f41927710c97250177a38601d1aee5b63 /src/mongo/executor
parent8dc5961aa06a9a8f6e98aefe228f2c837a846786 (diff)
downloadmongo-0028db3e9c096e2196e66b1181f5e3c33cc435a3.tar.gz
SERVER-58209 NetworkInterfaceIntegrationFixture waits for in progress requests
Diffstat (limited to 'src/mongo/executor')
-rw-r--r--src/mongo/executor/network_interface_integration_fixture.cpp89
-rw-r--r--src/mongo/executor/network_interface_integration_fixture.h14
-rw-r--r--src/mongo/executor/network_interface_integration_test.cpp43
3 files changed, 120 insertions, 26 deletions
diff --git a/src/mongo/executor/network_interface_integration_fixture.cpp b/src/mongo/executor/network_interface_integration_fixture.cpp
index c3401173cc1..cc40fb415d8 100644
--- a/src/mongo/executor/network_interface_integration_fixture.cpp
+++ b/src/mongo/executor/network_interface_integration_fixture.cpp
@@ -48,9 +48,10 @@
namespace mongo {
namespace executor {
+MONGO_FAIL_POINT_DEFINE(networkInterfaceFixtureHangOnCompletion);
+
void NetworkInterfaceIntegrationFixture::createNet(
std::unique_ptr<NetworkConnectionHook> connectHook, ConnectionPool::Options options) {
-
options.minConnections = 0u;
#ifdef _WIN32
@@ -66,7 +67,6 @@ void NetworkInterfaceIntegrationFixture::createNet(
void NetworkInterfaceIntegrationFixture::startNet(
std::unique_ptr<NetworkConnectionHook> connectHook) {
-
createNet(std::move(connectHook));
net().startup();
}
@@ -74,6 +74,10 @@ void NetworkInterfaceIntegrationFixture::startNet(
void NetworkInterfaceIntegrationFixture::tearDown() {
// Network interface will only shutdown once because of an internal shutdown guard
_net->shutdown();
+
+ auto lk = stdx::unique_lock(_mutex);
+ auto checkIdle = [&]() { return _workInProgress == 0; };
+ _fixtureIsIdle.wait(lk, checkIdle);
}
NetworkInterface& NetworkInterfaceIntegrationFixture::net() {
@@ -113,35 +117,57 @@ Future<RemoteCommandResponse> NetworkInterfaceIntegrationFixture::runCommand(
const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest request) {
RemoteCommandRequestOnAny rcroa{request};
- return net().startCommand(cbHandle, rcroa).then([](TaskExecutor::ResponseOnAnyStatus roa) {
- auto res = RemoteCommandResponse(roa);
- if (res.isOK()) {
- LOGV2(4820500,
- "Got command result: {response}",
- "Got command result",
- "response"_attr = res.toString());
- } else {
- LOGV2(4820501, "Command failed: {error}", "Command failed", "error"_attr = res.status);
- }
- return res;
- });
+ _onSchedulingCommand();
+
+ return net()
+ .startCommand(cbHandle, rcroa)
+ .then([](TaskExecutor::ResponseOnAnyStatus roa) {
+ auto res = RemoteCommandResponse(roa);
+ if (res.isOK()) {
+ LOGV2(4820500,
+ "Got command result: {response}",
+ "Got command result",
+ "response"_attr = res.toString());
+ } else {
+ LOGV2(4820501,
+ "Command failed: {error}",
+ "Command failed",
+ "error"_attr = res.status);
+ }
+ return res;
+ })
+ .onCompletion([this](StatusOrStatusWith<RemoteCommandResponse> status) {
+ _onCompletingCommand();
+ return status;
+ });
}
Future<RemoteCommandOnAnyResponse> NetworkInterfaceIntegrationFixture::runCommandOnAny(
const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequestOnAny request) {
RemoteCommandRequestOnAny rcroa{request};
- return net().startCommand(cbHandle, rcroa).then([](TaskExecutor::ResponseOnAnyStatus roa) {
- if (roa.isOK()) {
- LOGV2(4820502,
- "Got command result: {response}",
- "Got command result",
- "response"_attr = roa.toString());
- } else {
- LOGV2(4820503, "Command failed: {error}", "Command failed", "error"_attr = roa.status);
- }
- return roa;
- });
+ _onSchedulingCommand();
+
+ return net()
+ .startCommand(cbHandle, rcroa)
+ .then([](TaskExecutor::ResponseOnAnyStatus roa) {
+ if (roa.isOK()) {
+ LOGV2(4820502,
+ "Got command result: {response}",
+ "Got command result",
+ "response"_attr = roa.toString());
+ } else {
+ LOGV2(4820503,
+ "Command failed: {error}",
+ "Command failed",
+ "error"_attr = roa.status);
+ }
+ return roa;
+ })
+ .onCompletion([this](StatusOrStatusWith<TaskExecutor::ResponseOnAnyStatus> status) {
+ _onCompletingCommand();
+ return status;
+ });
}
Future<void> NetworkInterfaceIntegrationFixture::startExhaustCommand(
@@ -235,5 +261,18 @@ void NetworkInterfaceIntegrationFixture::assertWriteError(StringData db,
ASSERT_EQ(reason, writeErrorStatus);
}
+void NetworkInterfaceIntegrationFixture::_onSchedulingCommand() {
+ auto lk = stdx::lock_guard(_mutex);
+ _workInProgress++;
+}
+
+void NetworkInterfaceIntegrationFixture::_onCompletingCommand() {
+ networkInterfaceFixtureHangOnCompletion.pauseWhileSet();
+ auto lk = stdx::lock_guard(_mutex);
+ if (--_workInProgress == 0) {
+ _fixtureIsIdle.notify_all();
+ }
+}
+
} // namespace executor
} // namespace mongo
diff --git a/src/mongo/executor/network_interface_integration_fixture.h b/src/mongo/executor/network_interface_integration_fixture.h
index 3fd1e58413d..30c4fd4132a 100644
--- a/src/mongo/executor/network_interface_integration_fixture.h
+++ b/src/mongo/executor/network_interface_integration_fixture.h
@@ -35,6 +35,7 @@
#include "mongo/executor/network_connection_hook.h"
#include "mongo/executor/network_interface.h"
#include "mongo/executor/task_executor.h"
+#include "mongo/stdx/condition_variable.h"
#include "mongo/util/future.h"
namespace mongo {
@@ -116,9 +117,22 @@ public:
ErrorCodes::Error reason,
Milliseconds timeoutMillis = Minutes(5));
+ size_t getInProgress() {
+ auto lk = stdx::unique_lock(_mutex);
+ return _workInProgress;
+ }
+
private:
+ void _onSchedulingCommand();
+ void _onCompletingCommand();
+
std::unique_ptr<NetworkInterface> _net;
PseudoRandom* _rng = nullptr;
+
+ size_t _workInProgress = 0;
+ stdx::condition_variable _fixtureIsIdle;
+ mutable Mutex _mutex = MONGO_MAKE_LATCH("NetworkInterfaceIntegrationFixture::_mutex");
};
+
} // namespace executor
} // namespace mongo
diff --git a/src/mongo/executor/network_interface_integration_test.cpp b/src/mongo/executor/network_interface_integration_test.cpp
index c5b23cd8a15..d3944e24cb8 100644
--- a/src/mongo/executor/network_interface_integration_test.cpp
+++ b/src/mongo/executor/network_interface_integration_test.cpp
@@ -164,7 +164,10 @@ public:
// NetworkInterfaceIntegrationFixture::tearDown() shuts down the NetworkInterface. We always
// need to do it even if we have additional tearDown tasks.
- using NetworkInterfaceIntegrationFixture::tearDown;
+ void tearDown() override {
+ NetworkInterfaceIntegrationFixture::tearDown();
+ ASSERT_EQ(getInProgress(), 0);
+ }
RemoteCommandRequest makeTestCommand(
Milliseconds timeout,
@@ -895,6 +898,44 @@ TEST_F(NetworkInterfaceTest, StartExhaustCommandShouldStopOnFailure) {
}
}
+TEST_F(NetworkInterfaceTest, TearDownWaitsForInProgress) {
+ boost::optional<stdx::thread> tearDownThread;
+ auto tearDownPF = makePromiseFuture<void>();
+
+ auto deferred = [&] {
+ // Enable failpoint to make sure tearDown is blocked
+ FailPointEnableBlock fpb("networkInterfaceFixtureHangOnCompletion");
+
+ auto future = runCommand(makeCallbackHandle(), makeTestCommand(kMaxWait, makeEchoCmdObj()));
+
+ // Wait for the completion of the command
+ fpb->waitForTimesEntered(fpb.initialTimesEntered() + 1);
+
+ tearDownThread.emplace([this, promise = std::move(tearDownPF.promise)]() mutable {
+ tearDown();
+ promise.setWith([] {});
+ });
+
+ // Arbitrary delay between spawning the tearDown thread and checking futures
+ // to increase the chance of failures if tearDown doesn't wait for
+ // in-progress commands.
+ sleepFor(Milliseconds(50));
+
+ ASSERT_EQ(getInProgress(), 1);
+ ASSERT_FALSE(future.isReady()) << "Expected the command to be blocked";
+ ASSERT_FALSE(tearDownPF.future.isReady())
+ << "Expected tearDown to wait for blocked command";
+
+ return future;
+ }();
+
+ tearDownThread->join();
+
+ ASSERT(deferred.isReady());
+ ASSERT(tearDownPF.future.isReady());
+ ASSERT_EQ(getInProgress(), 0);
+}
+
} // namespace
} // namespace executor
} // namespace mongo