diff options
Diffstat (limited to 'src/mongo/executor/network_interface_asio_integration_test.cpp')
-rw-r--r-- | src/mongo/executor/network_interface_asio_integration_test.cpp | 149 |
1 files changed, 65 insertions, 84 deletions
diff --git a/src/mongo/executor/network_interface_asio_integration_test.cpp b/src/mongo/executor/network_interface_asio_integration_test.cpp index cb85f273764..f2af23d7a97 100644 --- a/src/mongo/executor/network_interface_asio_integration_test.cpp +++ b/src/mongo/executor/network_interface_asio_integration_test.cpp @@ -32,6 +32,7 @@ #include <algorithm> #include <exception> +#include <vector> #include "mongo/client/connection_string.h" #include "mongo/executor/async_stream_factory.h" @@ -55,6 +56,8 @@ namespace mongo { namespace executor { namespace { +using StartCommandCB = stdx::function<void(const StatusWith<RemoteCommandResponse>&)>; + class NetworkInterfaceASIOIntegrationTest : public mongo::unittest::Test { public: void startNet(NetworkInterfaceASIO::Options options = NetworkInterfaceASIO::Options()) { @@ -87,6 +90,12 @@ public: return _rng; } + void startCommand(const TaskExecutor::CallbackHandle& cbHandle, + RemoteCommandRequest& request, + StartCommandCB onFinish) { + net().startCommand(cbHandle, request, onFinish); + } + Deferred<StatusWith<RemoteCommandResponse>> runCommand( const TaskExecutor::CallbackHandle& cbHandle, const RemoteCommandRequest& request) { Deferred<StatusWith<RemoteCommandResponse>> deferred; @@ -170,81 +179,65 @@ public: using Fixture = NetworkInterfaceASIOIntegrationTest; using Pool = ThreadPoolInterface; - Deferred<Status> run(Fixture* fixture, Pool* pool, Milliseconds timeout = Milliseconds(5000)) { + void run(Fixture* fixture, + StartCommandCB onFinish, + Milliseconds timeout = RemoteCommandRequest::kNoTimeout) { auto cb = makeCallbackHandle(); - auto self = *this; - auto out = - fixture->runCommand(cb, - {unittest::getFixtureConnectionString().getServers()[0], - "admin", - _command, - timeout}) - .then(pool, - [self](StatusWith<RemoteCommandResponse> resp) -> Status { - auto status = resp.isOK() - ? getStatusFromCommandResult(resp.getValue().data) - : resp.getStatus(); - - return status == self._expected - ? Status::OK() - : Status{ErrorCodes::BadValue, - str::stream() << "Expected " - << ErrorCodes::errorString(self._expected) - << " but got " << status.toString()}; - }); + + RemoteCommandRequest request{ + unittest::getFixtureConnectionString().getServers()[0], "admin", _command, timeout}; + + fixture->startCommand(cb, request, onFinish); + if (_cancel) { invariant(fixture->randomNumberGenerator()); sleepmillis(fixture->randomNumberGenerator()->nextInt32(10)); fixture->net().cancelCommand(cb); } - return out; } - static Deferred<Status> runTimeoutOp(Fixture* fixture, Pool* pool) { + static void runTimeoutOp(Fixture* fixture, StartCommandCB onFinish) { return StressTestOp(BSON("sleep" << 1 << "lock" << "none" << "secs" << 1), - ErrorCodes::ExceededTimeLimit, - false).run(fixture, pool, Milliseconds(100)); + false).run(fixture, onFinish, Milliseconds(100)); } - static Deferred<Status> runCompleteOp(Fixture* fixture, Pool* pool) { + static void runCompleteOp(Fixture* fixture, StartCommandCB onFinish) { return StressTestOp(BSON("sleep" << 1 << "lock" << "none" << "millis" << 100), - ErrorCodes::OK, - false).run(fixture, pool); + false).run(fixture, onFinish); } - static Deferred<Status> runCancelOp(Fixture* fixture, Pool* pool) { + static void runCancelOp(Fixture* fixture, StartCommandCB onFinish) { return StressTestOp(BSON("sleep" << 1 << "lock" << "none" << "secs" << 10), - ErrorCodes::CallbackCanceled, - true).run(fixture, pool); + true).run(fixture, onFinish); } - static Deferred<Status> runLongOp(Fixture* fixture, Pool* pool) { + static void runLongOp(Fixture* fixture, StartCommandCB onFinish) { return StressTestOp(BSON("sleep" << 1 << "lock" << "none" << "secs" << 30), - ErrorCodes::OK, - false).run(fixture, pool, RemoteCommandRequest::kNoTimeout); + false).run(fixture, onFinish); } private: - StressTestOp(const BSONObj& command, ErrorCodes::Error expected, bool cancel) - : _command(command), _expected(expected), _cancel(cancel) {} + StressTestOp(const BSONObj& command, bool cancel) : _command(command), _cancel(cancel) {} BSONObj _command; - ErrorCodes::Error _expected; bool _cancel; }; TEST_F(NetworkInterfaceASIOIntegrationTest, StressTest) { + constexpr std::size_t numOps = 10000; + std::vector<Status> testResults(numOps, {ErrorCodes::InternalError, "uninitialized"}); + ErrorCodes::Error expectedResults[numOps]; + CountdownLatch cl(numOps); + startNet(); - const std::size_t numOps = 10000; - std::vector<Deferred<Status>> ops; std::unique_ptr<SecureRandom> seedSource{SecureRandom::create()}; auto seed = seedSource->nextInt64(); @@ -254,51 +247,39 @@ TEST_F(NetworkInterfaceASIOIntegrationTest, StressTest) { randomNumberGenerator(&rng); log() << "Starting stress test..."; - ThreadPool::Options threadPoolOpts; - threadPoolOpts.poolName = "StressTestPool"; - threadPoolOpts.maxThreads = 8; - ThreadPool pool(threadPoolOpts); - pool.startup(); - - auto poolGuard = MakeGuard([&pool] { - pool.schedule([&pool] { pool.shutdown(); }); - pool.join(); - }); - - std::generate_n(std::back_inserter(ops), - numOps, - [&rng, &pool, this] { - - // stagger operations slightly to mitigate connection pool contention - sleepmillis(rng.nextInt32(10)); - - auto i = rng.nextCanonicalDouble(); - - if (i < .3) { - return StressTestOp::runCancelOp(this, &pool); - } else if (i < .7) { - return StressTestOp::runCompleteOp(this, &pool); - } else if (i < .99) { - return StressTestOp::runTimeoutOp(this, &pool); - } else { - // Just a sprinkling of long ops, to mitigate connection pool contention - return StressTestOp::runLongOp(this, &pool); - } - }); - - log() << "running ops"; - auto res = helpers::collect(ops, &pool) - .then(&pool, - [](std::vector<Status> opResults) -> Status { - for (const auto& opResult : opResults) { - if (!opResult.isOK()) { - return opResult; - } - } - return Status::OK(); - }) - .get(); - ASSERT_OK(res); + for (std::size_t i = 0; i < numOps; ++i) { + // stagger operations slightly to mitigate connection pool contention + sleepmillis(rng.nextInt32(10)); + + auto r = rng.nextCanonicalDouble(); + + auto cb = [&testResults, &cl, i](const StatusWith<RemoteCommandResponse>& resp) { + testResults[i] = + resp.isOK() ? getStatusFromCommandResult(resp.getValue().data) : resp.getStatus(); + cl.countDown(); + }; + + if (r < .3) { + expectedResults[i] = ErrorCodes::CallbackCanceled; + StressTestOp::runCancelOp(this, cb); + } else if (r < .7) { + expectedResults[i] = ErrorCodes::OK; + StressTestOp::runCompleteOp(this, cb); + } else if (r < .99) { + expectedResults[i] = ErrorCodes::ExceededTimeLimit; + StressTestOp::runTimeoutOp(this, cb); + } else { + // Just a sprinkling of long ops, to mitigate connection pool contention + expectedResults[i] = ErrorCodes::OK; + StressTestOp::runLongOp(this, cb); + } + }; + + cl.await(); + + for (std::size_t i = 0; i < numOps; ++i) { + ASSERT_EQ(testResults[i], expectedResults[i]); + } } // Hook that intentionally never finishes |