summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatt Cotter <matt.cotter@mongodb.com>2016-09-01 10:59:05 -0400
committerMatt Cotter <matt.cotter@mongodb.com>2016-10-04 16:01:41 -0400
commit15bdad6f352cbc23cef3338fd57d61ac00cc6149 (patch)
tree65a725d1b92aace2c8d570f058cdb9f87a3d8441
parentdbd52147346f303ebba72a20f8fd8baa43094d10 (diff)
downloadmongo-15bdad6f352cbc23cef3338fd57d61ac00cc6149.tar.gz
SERVER-25919 rewrite asio stress test
The old test introduced a thread pool and deffered objects in the test itself, which was often the bottle neck and therefore a poor stress test of the actual system. (cherry picked from commit 9dd6ba84b674356bf9a31ce416a383c8d559fcbd)
-rw-r--r--src/mongo/executor/network_interface_asio_integration_test.cpp149
-rw-r--r--src/mongo/executor/network_interface_asio_test_utils.h26
2 files changed, 90 insertions, 85 deletions
diff --git a/src/mongo/executor/network_interface_asio_integration_test.cpp b/src/mongo/executor/network_interface_asio_integration_test.cpp
index cb85f273764..f2af23d7a97 100644
--- a/src/mongo/executor/network_interface_asio_integration_test.cpp
+++ b/src/mongo/executor/network_interface_asio_integration_test.cpp
@@ -32,6 +32,7 @@
#include <algorithm>
#include <exception>
+#include <vector>
#include "mongo/client/connection_string.h"
#include "mongo/executor/async_stream_factory.h"
@@ -55,6 +56,8 @@ namespace mongo {
namespace executor {
namespace {
+using StartCommandCB = stdx::function<void(const StatusWith<RemoteCommandResponse>&)>;
+
class NetworkInterfaceASIOIntegrationTest : public mongo::unittest::Test {
public:
void startNet(NetworkInterfaceASIO::Options options = NetworkInterfaceASIO::Options()) {
@@ -87,6 +90,12 @@ public:
return _rng;
}
+ void startCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ RemoteCommandRequest& request,
+ StartCommandCB onFinish) {
+ net().startCommand(cbHandle, request, onFinish);
+ }
+
Deferred<StatusWith<RemoteCommandResponse>> runCommand(
const TaskExecutor::CallbackHandle& cbHandle, const RemoteCommandRequest& request) {
Deferred<StatusWith<RemoteCommandResponse>> deferred;
@@ -170,81 +179,65 @@ public:
using Fixture = NetworkInterfaceASIOIntegrationTest;
using Pool = ThreadPoolInterface;
- Deferred<Status> run(Fixture* fixture, Pool* pool, Milliseconds timeout = Milliseconds(5000)) {
+ void run(Fixture* fixture,
+ StartCommandCB onFinish,
+ Milliseconds timeout = RemoteCommandRequest::kNoTimeout) {
auto cb = makeCallbackHandle();
- auto self = *this;
- auto out =
- fixture->runCommand(cb,
- {unittest::getFixtureConnectionString().getServers()[0],
- "admin",
- _command,
- timeout})
- .then(pool,
- [self](StatusWith<RemoteCommandResponse> resp) -> Status {
- auto status = resp.isOK()
- ? getStatusFromCommandResult(resp.getValue().data)
- : resp.getStatus();
-
- return status == self._expected
- ? Status::OK()
- : Status{ErrorCodes::BadValue,
- str::stream() << "Expected "
- << ErrorCodes::errorString(self._expected)
- << " but got " << status.toString()};
- });
+
+ RemoteCommandRequest request{
+ unittest::getFixtureConnectionString().getServers()[0], "admin", _command, timeout};
+
+ fixture->startCommand(cb, request, onFinish);
+
if (_cancel) {
invariant(fixture->randomNumberGenerator());
sleepmillis(fixture->randomNumberGenerator()->nextInt32(10));
fixture->net().cancelCommand(cb);
}
- return out;
}
- static Deferred<Status> runTimeoutOp(Fixture* fixture, Pool* pool) {
+ static void runTimeoutOp(Fixture* fixture, StartCommandCB onFinish) {
return StressTestOp(BSON("sleep" << 1 << "lock"
<< "none"
<< "secs" << 1),
- ErrorCodes::ExceededTimeLimit,
- false).run(fixture, pool, Milliseconds(100));
+ false).run(fixture, onFinish, Milliseconds(100));
}
- static Deferred<Status> runCompleteOp(Fixture* fixture, Pool* pool) {
+ static void runCompleteOp(Fixture* fixture, StartCommandCB onFinish) {
return StressTestOp(BSON("sleep" << 1 << "lock"
<< "none"
<< "millis" << 100),
- ErrorCodes::OK,
- false).run(fixture, pool);
+ false).run(fixture, onFinish);
}
- static Deferred<Status> runCancelOp(Fixture* fixture, Pool* pool) {
+ static void runCancelOp(Fixture* fixture, StartCommandCB onFinish) {
return StressTestOp(BSON("sleep" << 1 << "lock"
<< "none"
<< "secs" << 10),
- ErrorCodes::CallbackCanceled,
- true).run(fixture, pool);
+ true).run(fixture, onFinish);
}
- static Deferred<Status> runLongOp(Fixture* fixture, Pool* pool) {
+ static void runLongOp(Fixture* fixture, StartCommandCB onFinish) {
return StressTestOp(BSON("sleep" << 1 << "lock"
<< "none"
<< "secs" << 30),
- ErrorCodes::OK,
- false).run(fixture, pool, RemoteCommandRequest::kNoTimeout);
+ false).run(fixture, onFinish);
}
private:
- StressTestOp(const BSONObj& command, ErrorCodes::Error expected, bool cancel)
- : _command(command), _expected(expected), _cancel(cancel) {}
+ StressTestOp(const BSONObj& command, bool cancel) : _command(command), _cancel(cancel) {}
BSONObj _command;
- ErrorCodes::Error _expected;
bool _cancel;
};
TEST_F(NetworkInterfaceASIOIntegrationTest, StressTest) {
+ constexpr std::size_t numOps = 10000;
+ std::vector<Status> testResults(numOps, {ErrorCodes::InternalError, "uninitialized"});
+ ErrorCodes::Error expectedResults[numOps];
+ CountdownLatch cl(numOps);
+
startNet();
- const std::size_t numOps = 10000;
- std::vector<Deferred<Status>> ops;
std::unique_ptr<SecureRandom> seedSource{SecureRandom::create()};
auto seed = seedSource->nextInt64();
@@ -254,51 +247,39 @@ TEST_F(NetworkInterfaceASIOIntegrationTest, StressTest) {
randomNumberGenerator(&rng);
log() << "Starting stress test...";
- ThreadPool::Options threadPoolOpts;
- threadPoolOpts.poolName = "StressTestPool";
- threadPoolOpts.maxThreads = 8;
- ThreadPool pool(threadPoolOpts);
- pool.startup();
-
- auto poolGuard = MakeGuard([&pool] {
- pool.schedule([&pool] { pool.shutdown(); });
- pool.join();
- });
-
- std::generate_n(std::back_inserter(ops),
- numOps,
- [&rng, &pool, this] {
-
- // stagger operations slightly to mitigate connection pool contention
- sleepmillis(rng.nextInt32(10));
-
- auto i = rng.nextCanonicalDouble();
-
- if (i < .3) {
- return StressTestOp::runCancelOp(this, &pool);
- } else if (i < .7) {
- return StressTestOp::runCompleteOp(this, &pool);
- } else if (i < .99) {
- return StressTestOp::runTimeoutOp(this, &pool);
- } else {
- // Just a sprinkling of long ops, to mitigate connection pool contention
- return StressTestOp::runLongOp(this, &pool);
- }
- });
-
- log() << "running ops";
- auto res = helpers::collect(ops, &pool)
- .then(&pool,
- [](std::vector<Status> opResults) -> Status {
- for (const auto& opResult : opResults) {
- if (!opResult.isOK()) {
- return opResult;
- }
- }
- return Status::OK();
- })
- .get();
- ASSERT_OK(res);
+ for (std::size_t i = 0; i < numOps; ++i) {
+ // stagger operations slightly to mitigate connection pool contention
+ sleepmillis(rng.nextInt32(10));
+
+ auto r = rng.nextCanonicalDouble();
+
+ auto cb = [&testResults, &cl, i](const StatusWith<RemoteCommandResponse>& resp) {
+ testResults[i] =
+ resp.isOK() ? getStatusFromCommandResult(resp.getValue().data) : resp.getStatus();
+ cl.countDown();
+ };
+
+ if (r < .3) {
+ expectedResults[i] = ErrorCodes::CallbackCanceled;
+ StressTestOp::runCancelOp(this, cb);
+ } else if (r < .7) {
+ expectedResults[i] = ErrorCodes::OK;
+ StressTestOp::runCompleteOp(this, cb);
+ } else if (r < .99) {
+ expectedResults[i] = ErrorCodes::ExceededTimeLimit;
+ StressTestOp::runTimeoutOp(this, cb);
+ } else {
+ // Just a sprinkling of long ops, to mitigate connection pool contention
+ expectedResults[i] = ErrorCodes::OK;
+ StressTestOp::runLongOp(this, cb);
+ }
+ };
+
+ cl.await();
+
+ for (std::size_t i = 0; i < numOps; ++i) {
+ ASSERT_EQ(testResults[i], expectedResults[i]);
+ }
}
// Hook that intentionally never finishes
diff --git a/src/mongo/executor/network_interface_asio_test_utils.h b/src/mongo/executor/network_interface_asio_test_utils.h
index bfd489514b0..55754050047 100644
--- a/src/mongo/executor/network_interface_asio_test_utils.h
+++ b/src/mongo/executor/network_interface_asio_test_utils.h
@@ -33,6 +33,7 @@
#include <vector>
#include "mongo/executor/task_executor.h"
+#include "mongo/platform/atomic_word.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/future.h"
#include "mongo/stdx/mutex.h"
@@ -113,10 +114,33 @@ private:
return *state->thing;
}
-private:
std::shared_ptr<State> _state = std::make_shared<State>();
};
+class CountdownLatch {
+public:
+ CountdownLatch(uint32_t count) : _count(count) {}
+
+ void countDown() {
+ if (_count.load() == 0) {
+ return;
+ }
+ if (_count.subtractAndFetch(1) == 0) {
+ _cv.notify_all();
+ }
+ }
+
+ void await() {
+ stdx::unique_lock<stdx::mutex> lk(_mtx);
+ _cv.wait(lk, [&] { return _count.load() == 0; });
+ }
+
+private:
+ stdx::condition_variable _cv;
+ stdx::mutex _mtx;
+ AtomicUInt32 _count;
+};
+
namespace helpers {
template <typename T>