From 15bdad6f352cbc23cef3338fd57d61ac00cc6149 Mon Sep 17 00:00:00 2001 From: Matt Cotter Date: Thu, 1 Sep 2016 10:59:05 -0400 Subject: SERVER-25919 rewrite asio stress test The old test introduced a thread pool and deffered objects in the test itself, which was often the bottle neck and therefore a poor stress test of the actual system. (cherry picked from commit 9dd6ba84b674356bf9a31ce416a383c8d559fcbd) --- .../network_interface_asio_integration_test.cpp | 149 +++++++++------------ .../executor/network_interface_asio_test_utils.h | 26 +++- 2 files changed, 90 insertions(+), 85 deletions(-) diff --git a/src/mongo/executor/network_interface_asio_integration_test.cpp b/src/mongo/executor/network_interface_asio_integration_test.cpp index cb85f273764..f2af23d7a97 100644 --- a/src/mongo/executor/network_interface_asio_integration_test.cpp +++ b/src/mongo/executor/network_interface_asio_integration_test.cpp @@ -32,6 +32,7 @@ #include #include +#include #include "mongo/client/connection_string.h" #include "mongo/executor/async_stream_factory.h" @@ -55,6 +56,8 @@ namespace mongo { namespace executor { namespace { +using StartCommandCB = stdx::function&)>; + class NetworkInterfaceASIOIntegrationTest : public mongo::unittest::Test { public: void startNet(NetworkInterfaceASIO::Options options = NetworkInterfaceASIO::Options()) { @@ -87,6 +90,12 @@ public: return _rng; } + void startCommand(const TaskExecutor::CallbackHandle& cbHandle, + RemoteCommandRequest& request, + StartCommandCB onFinish) { + net().startCommand(cbHandle, request, onFinish); + } + Deferred> runCommand( const TaskExecutor::CallbackHandle& cbHandle, const RemoteCommandRequest& request) { Deferred> deferred; @@ -170,81 +179,65 @@ public: using Fixture = NetworkInterfaceASIOIntegrationTest; using Pool = ThreadPoolInterface; - Deferred run(Fixture* fixture, Pool* pool, Milliseconds timeout = Milliseconds(5000)) { + void run(Fixture* fixture, + StartCommandCB onFinish, + Milliseconds timeout = RemoteCommandRequest::kNoTimeout) { auto cb = makeCallbackHandle(); - auto self = *this; - auto out = - fixture->runCommand(cb, - {unittest::getFixtureConnectionString().getServers()[0], - "admin", - _command, - timeout}) - .then(pool, - [self](StatusWith resp) -> Status { - auto status = resp.isOK() - ? getStatusFromCommandResult(resp.getValue().data) - : resp.getStatus(); - - return status == self._expected - ? Status::OK() - : Status{ErrorCodes::BadValue, - str::stream() << "Expected " - << ErrorCodes::errorString(self._expected) - << " but got " << status.toString()}; - }); + + RemoteCommandRequest request{ + unittest::getFixtureConnectionString().getServers()[0], "admin", _command, timeout}; + + fixture->startCommand(cb, request, onFinish); + if (_cancel) { invariant(fixture->randomNumberGenerator()); sleepmillis(fixture->randomNumberGenerator()->nextInt32(10)); fixture->net().cancelCommand(cb); } - return out; } - static Deferred runTimeoutOp(Fixture* fixture, Pool* pool) { + static void runTimeoutOp(Fixture* fixture, StartCommandCB onFinish) { return StressTestOp(BSON("sleep" << 1 << "lock" << "none" << "secs" << 1), - ErrorCodes::ExceededTimeLimit, - false).run(fixture, pool, Milliseconds(100)); + false).run(fixture, onFinish, Milliseconds(100)); } - static Deferred runCompleteOp(Fixture* fixture, Pool* pool) { + static void runCompleteOp(Fixture* fixture, StartCommandCB onFinish) { return StressTestOp(BSON("sleep" << 1 << "lock" << "none" << "millis" << 100), - ErrorCodes::OK, - false).run(fixture, pool); + false).run(fixture, onFinish); } - static Deferred runCancelOp(Fixture* fixture, Pool* pool) { + static void runCancelOp(Fixture* fixture, StartCommandCB onFinish) { return StressTestOp(BSON("sleep" << 1 << "lock" << "none" << "secs" << 10), - ErrorCodes::CallbackCanceled, - true).run(fixture, pool); + true).run(fixture, onFinish); } - static Deferred runLongOp(Fixture* fixture, Pool* pool) { + static void runLongOp(Fixture* fixture, StartCommandCB onFinish) { return StressTestOp(BSON("sleep" << 1 << "lock" << "none" << "secs" << 30), - ErrorCodes::OK, - false).run(fixture, pool, RemoteCommandRequest::kNoTimeout); + false).run(fixture, onFinish); } private: - StressTestOp(const BSONObj& command, ErrorCodes::Error expected, bool cancel) - : _command(command), _expected(expected), _cancel(cancel) {} + StressTestOp(const BSONObj& command, bool cancel) : _command(command), _cancel(cancel) {} BSONObj _command; - ErrorCodes::Error _expected; bool _cancel; }; TEST_F(NetworkInterfaceASIOIntegrationTest, StressTest) { + constexpr std::size_t numOps = 10000; + std::vector testResults(numOps, {ErrorCodes::InternalError, "uninitialized"}); + ErrorCodes::Error expectedResults[numOps]; + CountdownLatch cl(numOps); + startNet(); - const std::size_t numOps = 10000; - std::vector> ops; std::unique_ptr seedSource{SecureRandom::create()}; auto seed = seedSource->nextInt64(); @@ -254,51 +247,39 @@ TEST_F(NetworkInterfaceASIOIntegrationTest, StressTest) { randomNumberGenerator(&rng); log() << "Starting stress test..."; - ThreadPool::Options threadPoolOpts; - threadPoolOpts.poolName = "StressTestPool"; - threadPoolOpts.maxThreads = 8; - ThreadPool pool(threadPoolOpts); - pool.startup(); - - auto poolGuard = MakeGuard([&pool] { - pool.schedule([&pool] { pool.shutdown(); }); - pool.join(); - }); - - std::generate_n(std::back_inserter(ops), - numOps, - [&rng, &pool, this] { - - // stagger operations slightly to mitigate connection pool contention - sleepmillis(rng.nextInt32(10)); - - auto i = rng.nextCanonicalDouble(); - - if (i < .3) { - return StressTestOp::runCancelOp(this, &pool); - } else if (i < .7) { - return StressTestOp::runCompleteOp(this, &pool); - } else if (i < .99) { - return StressTestOp::runTimeoutOp(this, &pool); - } else { - // Just a sprinkling of long ops, to mitigate connection pool contention - return StressTestOp::runLongOp(this, &pool); - } - }); - - log() << "running ops"; - auto res = helpers::collect(ops, &pool) - .then(&pool, - [](std::vector opResults) -> Status { - for (const auto& opResult : opResults) { - if (!opResult.isOK()) { - return opResult; - } - } - return Status::OK(); - }) - .get(); - ASSERT_OK(res); + for (std::size_t i = 0; i < numOps; ++i) { + // stagger operations slightly to mitigate connection pool contention + sleepmillis(rng.nextInt32(10)); + + auto r = rng.nextCanonicalDouble(); + + auto cb = [&testResults, &cl, i](const StatusWith& resp) { + testResults[i] = + resp.isOK() ? getStatusFromCommandResult(resp.getValue().data) : resp.getStatus(); + cl.countDown(); + }; + + if (r < .3) { + expectedResults[i] = ErrorCodes::CallbackCanceled; + StressTestOp::runCancelOp(this, cb); + } else if (r < .7) { + expectedResults[i] = ErrorCodes::OK; + StressTestOp::runCompleteOp(this, cb); + } else if (r < .99) { + expectedResults[i] = ErrorCodes::ExceededTimeLimit; + StressTestOp::runTimeoutOp(this, cb); + } else { + // Just a sprinkling of long ops, to mitigate connection pool contention + expectedResults[i] = ErrorCodes::OK; + StressTestOp::runLongOp(this, cb); + } + }; + + cl.await(); + + for (std::size_t i = 0; i < numOps; ++i) { + ASSERT_EQ(testResults[i], expectedResults[i]); + } } // Hook that intentionally never finishes diff --git a/src/mongo/executor/network_interface_asio_test_utils.h b/src/mongo/executor/network_interface_asio_test_utils.h index bfd489514b0..55754050047 100644 --- a/src/mongo/executor/network_interface_asio_test_utils.h +++ b/src/mongo/executor/network_interface_asio_test_utils.h @@ -33,6 +33,7 @@ #include #include "mongo/executor/task_executor.h" +#include "mongo/platform/atomic_word.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/future.h" #include "mongo/stdx/mutex.h" @@ -113,10 +114,33 @@ private: return *state->thing; } -private: std::shared_ptr _state = std::make_shared(); }; +class CountdownLatch { +public: + CountdownLatch(uint32_t count) : _count(count) {} + + void countDown() { + if (_count.load() == 0) { + return; + } + if (_count.subtractAndFetch(1) == 0) { + _cv.notify_all(); + } + } + + void await() { + stdx::unique_lock lk(_mtx); + _cv.wait(lk, [&] { return _count.load() == 0; }); + } + +private: + stdx::condition_variable _cv; + stdx::mutex _mtx; + AtomicUInt32 _count; +}; + namespace helpers { template -- cgit v1.2.1