diff options
-rw-r--r-- | src/mongo/executor/network_interface_asio_integration_test.cpp | 149 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_asio_test_utils.h | 26 |
2 files changed, 85 insertions, 90 deletions
diff --git a/src/mongo/executor/network_interface_asio_integration_test.cpp b/src/mongo/executor/network_interface_asio_integration_test.cpp index f2af23d7a97..cb85f273764 100644 --- a/src/mongo/executor/network_interface_asio_integration_test.cpp +++ b/src/mongo/executor/network_interface_asio_integration_test.cpp @@ -32,7 +32,6 @@ #include <algorithm> #include <exception> -#include <vector> #include "mongo/client/connection_string.h" #include "mongo/executor/async_stream_factory.h" @@ -56,8 +55,6 @@ namespace mongo { namespace executor { namespace { -using StartCommandCB = stdx::function<void(const StatusWith<RemoteCommandResponse>&)>; - class NetworkInterfaceASIOIntegrationTest : public mongo::unittest::Test { public: void startNet(NetworkInterfaceASIO::Options options = NetworkInterfaceASIO::Options()) { @@ -90,12 +87,6 @@ public: return _rng; } - void startCommand(const TaskExecutor::CallbackHandle& cbHandle, - RemoteCommandRequest& request, - StartCommandCB onFinish) { - net().startCommand(cbHandle, request, onFinish); - } - Deferred<StatusWith<RemoteCommandResponse>> runCommand( const TaskExecutor::CallbackHandle& cbHandle, const RemoteCommandRequest& request) { Deferred<StatusWith<RemoteCommandResponse>> deferred; @@ -179,65 +170,81 @@ public: using Fixture = NetworkInterfaceASIOIntegrationTest; using Pool = ThreadPoolInterface; - void run(Fixture* fixture, - StartCommandCB onFinish, - Milliseconds timeout = RemoteCommandRequest::kNoTimeout) { + Deferred<Status> run(Fixture* fixture, Pool* pool, Milliseconds timeout = Milliseconds(5000)) { auto cb = makeCallbackHandle(); - - RemoteCommandRequest request{ - unittest::getFixtureConnectionString().getServers()[0], "admin", _command, timeout}; - - fixture->startCommand(cb, request, onFinish); - + auto self = *this; + auto out = + fixture->runCommand(cb, + {unittest::getFixtureConnectionString().getServers()[0], + "admin", + _command, + timeout}) + .then(pool, + [self](StatusWith<RemoteCommandResponse> resp) -> Status { + auto status = resp.isOK() + ? getStatusFromCommandResult(resp.getValue().data) + : resp.getStatus(); + + return status == self._expected + ? Status::OK() + : Status{ErrorCodes::BadValue, + str::stream() << "Expected " + << ErrorCodes::errorString(self._expected) + << " but got " << status.toString()}; + }); if (_cancel) { invariant(fixture->randomNumberGenerator()); sleepmillis(fixture->randomNumberGenerator()->nextInt32(10)); fixture->net().cancelCommand(cb); } + return out; } - static void runTimeoutOp(Fixture* fixture, StartCommandCB onFinish) { + static Deferred<Status> runTimeoutOp(Fixture* fixture, Pool* pool) { return StressTestOp(BSON("sleep" << 1 << "lock" << "none" << "secs" << 1), - false).run(fixture, onFinish, Milliseconds(100)); + ErrorCodes::ExceededTimeLimit, + false).run(fixture, pool, Milliseconds(100)); } - static void runCompleteOp(Fixture* fixture, StartCommandCB onFinish) { + static Deferred<Status> runCompleteOp(Fixture* fixture, Pool* pool) { return StressTestOp(BSON("sleep" << 1 << "lock" << "none" << "millis" << 100), - false).run(fixture, onFinish); + ErrorCodes::OK, + false).run(fixture, pool); } - static void runCancelOp(Fixture* fixture, StartCommandCB onFinish) { + static Deferred<Status> runCancelOp(Fixture* fixture, Pool* pool) { return StressTestOp(BSON("sleep" << 1 << "lock" << "none" << "secs" << 10), - true).run(fixture, onFinish); + ErrorCodes::CallbackCanceled, + true).run(fixture, pool); } - static void runLongOp(Fixture* fixture, StartCommandCB onFinish) { + static Deferred<Status> runLongOp(Fixture* fixture, Pool* pool) { return StressTestOp(BSON("sleep" << 1 << "lock" << "none" << "secs" << 30), - false).run(fixture, onFinish); + ErrorCodes::OK, + false).run(fixture, pool, RemoteCommandRequest::kNoTimeout); } private: - StressTestOp(const BSONObj& command, bool cancel) : _command(command), _cancel(cancel) {} + StressTestOp(const BSONObj& command, ErrorCodes::Error expected, bool cancel) + : _command(command), _expected(expected), _cancel(cancel) {} BSONObj _command; + ErrorCodes::Error _expected; bool _cancel; }; TEST_F(NetworkInterfaceASIOIntegrationTest, StressTest) { - constexpr std::size_t numOps = 10000; - std::vector<Status> testResults(numOps, {ErrorCodes::InternalError, "uninitialized"}); - ErrorCodes::Error expectedResults[numOps]; - CountdownLatch cl(numOps); - startNet(); + const std::size_t numOps = 10000; + std::vector<Deferred<Status>> ops; std::unique_ptr<SecureRandom> seedSource{SecureRandom::create()}; auto seed = seedSource->nextInt64(); @@ -247,39 +254,51 @@ TEST_F(NetworkInterfaceASIOIntegrationTest, StressTest) { randomNumberGenerator(&rng); log() << "Starting stress test..."; - for (std::size_t i = 0; i < numOps; ++i) { - // stagger operations slightly to mitigate connection pool contention - sleepmillis(rng.nextInt32(10)); - - auto r = rng.nextCanonicalDouble(); - - auto cb = [&testResults, &cl, i](const StatusWith<RemoteCommandResponse>& resp) { - testResults[i] = - resp.isOK() ? getStatusFromCommandResult(resp.getValue().data) : resp.getStatus(); - cl.countDown(); - }; - - if (r < .3) { - expectedResults[i] = ErrorCodes::CallbackCanceled; - StressTestOp::runCancelOp(this, cb); - } else if (r < .7) { - expectedResults[i] = ErrorCodes::OK; - StressTestOp::runCompleteOp(this, cb); - } else if (r < .99) { - expectedResults[i] = ErrorCodes::ExceededTimeLimit; - StressTestOp::runTimeoutOp(this, cb); - } else { - // Just a sprinkling of long ops, to mitigate connection pool contention - expectedResults[i] = ErrorCodes::OK; - StressTestOp::runLongOp(this, cb); - } - }; - - cl.await(); - - for (std::size_t i = 0; i < numOps; ++i) { - ASSERT_EQ(testResults[i], expectedResults[i]); - } + ThreadPool::Options threadPoolOpts; + threadPoolOpts.poolName = "StressTestPool"; + threadPoolOpts.maxThreads = 8; + ThreadPool pool(threadPoolOpts); + pool.startup(); + + auto poolGuard = MakeGuard([&pool] { + pool.schedule([&pool] { pool.shutdown(); }); + pool.join(); + }); + + std::generate_n(std::back_inserter(ops), + numOps, + [&rng, &pool, this] { + + // stagger operations slightly to mitigate connection pool contention + sleepmillis(rng.nextInt32(10)); + + auto i = rng.nextCanonicalDouble(); + + if (i < .3) { + return StressTestOp::runCancelOp(this, &pool); + } else if (i < .7) { + return StressTestOp::runCompleteOp(this, &pool); + } else if (i < .99) { + return StressTestOp::runTimeoutOp(this, &pool); + } else { + // Just a sprinkling of long ops, to mitigate connection pool contention + return StressTestOp::runLongOp(this, &pool); + } + }); + + log() << "running ops"; + auto res = helpers::collect(ops, &pool) + .then(&pool, + [](std::vector<Status> opResults) -> Status { + for (const auto& opResult : opResults) { + if (!opResult.isOK()) { + return opResult; + } + } + return Status::OK(); + }) + .get(); + ASSERT_OK(res); } // Hook that intentionally never finishes diff --git a/src/mongo/executor/network_interface_asio_test_utils.h b/src/mongo/executor/network_interface_asio_test_utils.h index 55754050047..bfd489514b0 100644 --- a/src/mongo/executor/network_interface_asio_test_utils.h +++ b/src/mongo/executor/network_interface_asio_test_utils.h @@ -33,7 +33,6 @@ #include <vector> #include "mongo/executor/task_executor.h" -#include "mongo/platform/atomic_word.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/future.h" #include "mongo/stdx/mutex.h" @@ -114,31 +113,8 @@ private: return *state->thing; } - std::shared_ptr<State> _state = std::make_shared<State>(); -}; - -class CountdownLatch { -public: - CountdownLatch(uint32_t count) : _count(count) {} - - void countDown() { - if (_count.load() == 0) { - return; - } - if (_count.subtractAndFetch(1) == 0) { - _cv.notify_all(); - } - } - - void await() { - stdx::unique_lock<stdx::mutex> lk(_mtx); - _cv.wait(lk, [&] { return _count.load() == 0; }); - } - private: - stdx::condition_variable _cv; - stdx::mutex _mtx; - AtomicUInt32 _count; + std::shared_ptr<State> _state = std::make_shared<State>(); }; namespace helpers { |