diff options
author | Alex Li <alex.li@mongodb.com> | 2021-08-16 20:27:51 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-08-16 20:54:43 +0000 |
commit | 0028db3e9c096e2196e66b1181f5e3c33cc435a3 (patch) | |
tree | 908a894f41927710c97250177a38601d1aee5b63 /src/mongo/executor | |
parent | 8dc5961aa06a9a8f6e98aefe228f2c837a846786 (diff) | |
download | mongo-0028db3e9c096e2196e66b1181f5e3c33cc435a3.tar.gz |
SERVER-58209 NetworkInterfaceIntegrationFixture waits for in progress requests
Diffstat (limited to 'src/mongo/executor')
3 files changed, 120 insertions, 26 deletions
diff --git a/src/mongo/executor/network_interface_integration_fixture.cpp b/src/mongo/executor/network_interface_integration_fixture.cpp index c3401173cc1..cc40fb415d8 100644 --- a/src/mongo/executor/network_interface_integration_fixture.cpp +++ b/src/mongo/executor/network_interface_integration_fixture.cpp @@ -48,9 +48,10 @@ namespace mongo { namespace executor { +MONGO_FAIL_POINT_DEFINE(networkInterfaceFixtureHangOnCompletion); + void NetworkInterfaceIntegrationFixture::createNet( std::unique_ptr<NetworkConnectionHook> connectHook, ConnectionPool::Options options) { - options.minConnections = 0u; #ifdef _WIN32 @@ -66,7 +67,6 @@ void NetworkInterfaceIntegrationFixture::createNet( void NetworkInterfaceIntegrationFixture::startNet( std::unique_ptr<NetworkConnectionHook> connectHook) { - createNet(std::move(connectHook)); net().startup(); } @@ -74,6 +74,10 @@ void NetworkInterfaceIntegrationFixture::startNet( void NetworkInterfaceIntegrationFixture::tearDown() { // Network interface will only shutdown once because of an internal shutdown guard _net->shutdown(); + + auto lk = stdx::unique_lock(_mutex); + auto checkIdle = [&]() { return _workInProgress == 0; }; + _fixtureIsIdle.wait(lk, checkIdle); } NetworkInterface& NetworkInterfaceIntegrationFixture::net() { @@ -113,35 +117,57 @@ Future<RemoteCommandResponse> NetworkInterfaceIntegrationFixture::runCommand( const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest request) { RemoteCommandRequestOnAny rcroa{request}; - return net().startCommand(cbHandle, rcroa).then([](TaskExecutor::ResponseOnAnyStatus roa) { - auto res = RemoteCommandResponse(roa); - if (res.isOK()) { - LOGV2(4820500, - "Got command result: {response}", - "Got command result", - "response"_attr = res.toString()); - } else { - LOGV2(4820501, "Command failed: {error}", "Command failed", "error"_attr = res.status); - } - return res; - }); + _onSchedulingCommand(); + + return net() + .startCommand(cbHandle, rcroa) + .then([](TaskExecutor::ResponseOnAnyStatus roa) { + auto res = RemoteCommandResponse(roa); + if (res.isOK()) { + LOGV2(4820500, + "Got command result: {response}", + "Got command result", + "response"_attr = res.toString()); + } else { + LOGV2(4820501, + "Command failed: {error}", + "Command failed", + "error"_attr = res.status); + } + return res; + }) + .onCompletion([this](StatusOrStatusWith<RemoteCommandResponse> status) { + _onCompletingCommand(); + return status; + }); } Future<RemoteCommandOnAnyResponse> NetworkInterfaceIntegrationFixture::runCommandOnAny( const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequestOnAny request) { RemoteCommandRequestOnAny rcroa{request}; - return net().startCommand(cbHandle, rcroa).then([](TaskExecutor::ResponseOnAnyStatus roa) { - if (roa.isOK()) { - LOGV2(4820502, - "Got command result: {response}", - "Got command result", - "response"_attr = roa.toString()); - } else { - LOGV2(4820503, "Command failed: {error}", "Command failed", "error"_attr = roa.status); - } - return roa; - }); + _onSchedulingCommand(); + + return net() + .startCommand(cbHandle, rcroa) + .then([](TaskExecutor::ResponseOnAnyStatus roa) { + if (roa.isOK()) { + LOGV2(4820502, + "Got command result: {response}", + "Got command result", + "response"_attr = roa.toString()); + } else { + LOGV2(4820503, + "Command failed: {error}", + "Command failed", + "error"_attr = roa.status); + } + return roa; + }) + .onCompletion([this](StatusOrStatusWith<TaskExecutor::ResponseOnAnyStatus> status) { + _onCompletingCommand(); + return status; + }); } Future<void> NetworkInterfaceIntegrationFixture::startExhaustCommand( @@ -235,5 +261,18 @@ void NetworkInterfaceIntegrationFixture::assertWriteError(StringData db, ASSERT_EQ(reason, writeErrorStatus); } +void NetworkInterfaceIntegrationFixture::_onSchedulingCommand() { + auto lk = stdx::lock_guard(_mutex); + _workInProgress++; +} + +void NetworkInterfaceIntegrationFixture::_onCompletingCommand() { + networkInterfaceFixtureHangOnCompletion.pauseWhileSet(); + auto lk = stdx::lock_guard(_mutex); + if (--_workInProgress == 0) { + _fixtureIsIdle.notify_all(); + } +} + } // namespace executor } // namespace mongo diff --git a/src/mongo/executor/network_interface_integration_fixture.h b/src/mongo/executor/network_interface_integration_fixture.h index 3fd1e58413d..30c4fd4132a 100644 --- a/src/mongo/executor/network_interface_integration_fixture.h +++ b/src/mongo/executor/network_interface_integration_fixture.h @@ -35,6 +35,7 @@ #include "mongo/executor/network_connection_hook.h" #include "mongo/executor/network_interface.h" #include "mongo/executor/task_executor.h" +#include "mongo/stdx/condition_variable.h" #include "mongo/util/future.h" namespace mongo { @@ -116,9 +117,22 @@ public: ErrorCodes::Error reason, Milliseconds timeoutMillis = Minutes(5)); + size_t getInProgress() { + auto lk = stdx::unique_lock(_mutex); + return _workInProgress; + } + private: + void _onSchedulingCommand(); + void _onCompletingCommand(); + std::unique_ptr<NetworkInterface> _net; PseudoRandom* _rng = nullptr; + + size_t _workInProgress = 0; + stdx::condition_variable _fixtureIsIdle; + mutable Mutex _mutex = MONGO_MAKE_LATCH("NetworkInterfaceIntegrationFixture::_mutex"); }; + } // namespace executor } // namespace mongo diff --git a/src/mongo/executor/network_interface_integration_test.cpp b/src/mongo/executor/network_interface_integration_test.cpp index c5b23cd8a15..d3944e24cb8 100644 --- a/src/mongo/executor/network_interface_integration_test.cpp +++ b/src/mongo/executor/network_interface_integration_test.cpp @@ -164,7 +164,10 @@ public: // NetworkInterfaceIntegrationFixture::tearDown() shuts down the NetworkInterface. We always // need to do it even if we have additional tearDown tasks. - using NetworkInterfaceIntegrationFixture::tearDown; + void tearDown() override { + NetworkInterfaceIntegrationFixture::tearDown(); + ASSERT_EQ(getInProgress(), 0); + } RemoteCommandRequest makeTestCommand( Milliseconds timeout, @@ -895,6 +898,44 @@ TEST_F(NetworkInterfaceTest, StartExhaustCommandShouldStopOnFailure) { } } +TEST_F(NetworkInterfaceTest, TearDownWaitsForInProgress) { + boost::optional<stdx::thread> tearDownThread; + auto tearDownPF = makePromiseFuture<void>(); + + auto deferred = [&] { + // Enable failpoint to make sure tearDown is blocked + FailPointEnableBlock fpb("networkInterfaceFixtureHangOnCompletion"); + + auto future = runCommand(makeCallbackHandle(), makeTestCommand(kMaxWait, makeEchoCmdObj())); + + // Wait for the completion of the command + fpb->waitForTimesEntered(fpb.initialTimesEntered() + 1); + + tearDownThread.emplace([this, promise = std::move(tearDownPF.promise)]() mutable { + tearDown(); + promise.setWith([] {}); + }); + + // Arbitrary delay between spawning the tearDown thread and checking futures + // to increase the chance of failures if tearDown doesn't wait for + // in-progress commands. + sleepFor(Milliseconds(50)); + + ASSERT_EQ(getInProgress(), 1); + ASSERT_FALSE(future.isReady()) << "Expected the command to be blocked"; + ASSERT_FALSE(tearDownPF.future.isReady()) + << "Expected tearDown to wait for blocked command"; + + return future; + }(); + + tearDownThread->join(); + + ASSERT(deferred.isReady()); + ASSERT(tearDownPF.future.isReady()); + ASSERT_EQ(getInProgress(), 0); +} + } // namespace } // namespace executor } // namespace mongo |