diff options
author | Matt Cotter <matt.cotter@mongodb.com> | 2016-09-01 10:59:05 -0400 |
---|---|---|
committer | Matt Cotter <matt.cotter@mongodb.com> | 2016-09-02 08:56:50 -0400 |
commit | 9dd6ba84b674356bf9a31ce416a383c8d559fcbd (patch) | |
tree | ee738f5ecfa8f3bd1f404383cd46ebf7f46f105f /src | |
parent | 9dc189dc4be151f5cab1b23c97d2eefbe60b74f6 (diff) | |
download | mongo-9dd6ba84b674356bf9a31ce416a383c8d559fcbd.tar.gz |
SERVER-25919 rewrite asio stress test
The old test introduced a thread pool and deffered objects in the test
itself, which was often the bottle neck and therefore a poor stress test
of the actual system.
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/executor/network_interface_asio_integration_test.cpp | 129 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_asio_test_utils.h | 26 |
2 files changed, 82 insertions, 73 deletions
diff --git a/src/mongo/executor/network_interface_asio_integration_test.cpp b/src/mongo/executor/network_interface_asio_integration_test.cpp index 548f14e1049..7a39a589673 100644 --- a/src/mongo/executor/network_interface_asio_integration_test.cpp +++ b/src/mongo/executor/network_interface_asio_integration_test.cpp @@ -55,6 +55,8 @@ namespace mongo { namespace executor { namespace { +using StartCommandCB = stdx::function<void(const RemoteCommandResponse&)>; + class NetworkInterfaceASIOIntegrationTest : public mongo::unittest::Test { public: void startNet(NetworkInterfaceASIO::Options options = NetworkInterfaceASIO::Options()) { @@ -87,6 +89,12 @@ public: return _rng; } + void startCommand(const TaskExecutor::CallbackHandle& cbHandle, + RemoteCommandRequest& request, + StartCommandCB onFinish) { + net().startCommand(cbHandle, request, onFinish); + } + Deferred<RemoteCommandResponse> runCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request) { Deferred<RemoteCommandResponse> deferred; @@ -175,90 +183,76 @@ public: using Fixture = NetworkInterfaceASIOIntegrationTest; using Pool = ThreadPoolInterface; - Deferred<Status> run(Fixture* fixture, - Pool* pool, - Milliseconds timeout = RemoteCommandRequest::kNoTimeout) { + void run(Fixture* fixture, + StartCommandCB onFinish, + Milliseconds timeout = RemoteCommandRequest::kNoTimeout) { auto cb = makeCallbackHandle(); - auto self = *this; + RemoteCommandRequest request{unittest::getFixtureConnectionString().getServers()[0], "admin", _command, nullptr, timeout}; - auto out = fixture->runCommand(cb, request) - .then(pool, [self](RemoteCommandResponse resp) -> Status { - auto status = - resp.isOK() ? getStatusFromCommandResult(resp.data) : resp.status; - - return status == self._expected - ? Status::OK() - : Status{ErrorCodes::BadValue, - str::stream() << "Expected " - << ErrorCodes::errorString(self._expected) - << " but got " - << status.toString()}; - }); + + fixture->startCommand(cb, request, onFinish); + if (_cancel) { invariant(fixture->randomNumberGenerator()); sleepmillis(fixture->randomNumberGenerator()->nextInt32(10)); fixture->net().cancelCommand(cb); } - return out; } - static Deferred<Status> runTimeoutOp(Fixture* fixture, Pool* pool) { + static void runTimeoutOp(Fixture* fixture, StartCommandCB onFinish) { return StressTestOp(BSON("sleep" << 1 << "lock" << "none" << "secs" << 1), - ErrorCodes::ExceededTimeLimit, false) - .run(fixture, pool, Milliseconds(100)); + .run(fixture, onFinish, Milliseconds(100)); } - static Deferred<Status> runCompleteOp(Fixture* fixture, Pool* pool) { + static void runCompleteOp(Fixture* fixture, StartCommandCB onFinish) { return StressTestOp(BSON("sleep" << 1 << "lock" << "none" << "millis" << 100), - ErrorCodes::OK, false) - .run(fixture, pool); + .run(fixture, onFinish); } - static Deferred<Status> runCancelOp(Fixture* fixture, Pool* pool) { + static void runCancelOp(Fixture* fixture, StartCommandCB onFinish) { return StressTestOp(BSON("sleep" << 1 << "lock" << "none" << "secs" << 10), - ErrorCodes::CallbackCanceled, true) - .run(fixture, pool); + .run(fixture, onFinish); } - static Deferred<Status> runLongOp(Fixture* fixture, Pool* pool) { + static void runLongOp(Fixture* fixture, StartCommandCB onFinish) { return StressTestOp(BSON("sleep" << 1 << "lock" << "none" << "secs" << 30), - ErrorCodes::OK, false) - .run(fixture, pool, RemoteCommandRequest::kNoTimeout); + .run(fixture, onFinish); } private: - StressTestOp(const BSONObj& command, ErrorCodes::Error expected, bool cancel) - : _command(command), _expected(expected), _cancel(cancel) {} + StressTestOp(const BSONObj& command, bool cancel) : _command(command), _cancel(cancel) {} BSONObj _command; - ErrorCodes::Error _expected; bool _cancel; }; TEST_F(NetworkInterfaceASIOIntegrationTest, StressTest) { + constexpr std::size_t numOps = 10000; + RemoteCommandResponse testResults[numOps]; + ErrorCodes::Error expectedResults[numOps]; + CountdownLatch cl(numOps); + startNet(); - const std::size_t numOps = 10000; - std::vector<Deferred<Status>> ops; std::unique_ptr<SecureRandom> seedSource{SecureRandom::create()}; auto seed = seedSource->nextInt64(); @@ -268,49 +262,40 @@ TEST_F(NetworkInterfaceASIOIntegrationTest, StressTest) { randomNumberGenerator(&rng); log() << "Starting stress test..."; - ThreadPool::Options threadPoolOpts; - threadPoolOpts.poolName = "StressTestPool"; - threadPoolOpts.maxThreads = 8; - ThreadPool pool(threadPoolOpts); - pool.startup(); - - auto poolGuard = MakeGuard([&pool] { - pool.schedule([&pool] { pool.shutdown(); }); - pool.join(); - }); - - std::generate_n(std::back_inserter(ops), numOps, [&rng, &pool, this] { - + for (std::size_t i = 0; i < numOps; ++i) { // stagger operations slightly to mitigate connection pool contention sleepmillis(rng.nextInt32(10)); - auto i = rng.nextCanonicalDouble(); - - if (i < .3) { - return StressTestOp::runCancelOp(this, &pool); - } else if (i < .7) { - return StressTestOp::runCompleteOp(this, &pool); - } else if (i < .99) { - return StressTestOp::runTimeoutOp(this, &pool); + auto r = rng.nextCanonicalDouble(); + + auto cb = [&testResults, &cl, i](const RemoteCommandResponse& resp) { + testResults[i] = resp; + cl.countDown(); + }; + + if (r < .3) { + expectedResults[i] = ErrorCodes::CallbackCanceled; + StressTestOp::runCancelOp(this, cb); + } else if (r < .7) { + expectedResults[i] = ErrorCodes::OK; + StressTestOp::runCompleteOp(this, cb); + } else if (r < .99) { + expectedResults[i] = ErrorCodes::ExceededTimeLimit; + StressTestOp::runTimeoutOp(this, cb); } else { // Just a sprinkling of long ops, to mitigate connection pool contention - return StressTestOp::runLongOp(this, &pool); + expectedResults[i] = ErrorCodes::OK; + StressTestOp::runLongOp(this, cb); } - }); - - log() << "running ops"; - auto res = helpers::collect(ops, &pool) - .then(&pool, - [](std::vector<Status> opResults) -> Status { - for (const auto& opResult : opResults) { - if (!opResult.isOK()) { - return opResult; - } - } - return Status::OK(); - }) - .get(); - ASSERT_OK(res); + }; + + cl.await(); + + for (std::size_t i = 0; i < numOps; ++i) { + const auto& resp = testResults[i]; + auto ec = resp.isOK() ? getStatusFromCommandResult(resp.data) : resp.status; + ASSERT_EQ(ec, expectedResults[i]); + } } // Hook that intentionally never finishes diff --git a/src/mongo/executor/network_interface_asio_test_utils.h b/src/mongo/executor/network_interface_asio_test_utils.h index 766c52e82ed..b65d7eaa11e 100644 --- a/src/mongo/executor/network_interface_asio_test_utils.h +++ b/src/mongo/executor/network_interface_asio_test_utils.h @@ -33,6 +33,7 @@ #include <vector> #include "mongo/executor/task_executor.h" +#include "mongo/platform/atomic_word.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/future.h" #include "mongo/stdx/mutex.h" @@ -113,10 +114,33 @@ private: return *state->thing; } -private: std::shared_ptr<State> _state = std::make_shared<State>(); }; +class CountdownLatch { +public: + CountdownLatch(uint32_t count) : _count(count) {} + + void countDown() { + if (_count.load() == 0) { + return; + } + if (_count.subtractAndFetch(1) == 0) { + _cv.notify_all(); + } + } + + void await() { + stdx::unique_lock<stdx::mutex> lk(_mtx); + _cv.wait(lk, [&] { return _count.load() == 0; }); + } + +private: + stdx::condition_variable _cv; + stdx::mutex _mtx; + AtomicUInt32 _count; +}; + namespace helpers { template <typename T> |