summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatt Cotter <matt.cotter@mongodb.com>2016-09-01 10:59:05 -0400
committerMatt Cotter <matt.cotter@mongodb.com>2016-09-02 08:56:50 -0400
commit9dd6ba84b674356bf9a31ce416a383c8d559fcbd (patch)
treeee738f5ecfa8f3bd1f404383cd46ebf7f46f105f /src
parent9dc189dc4be151f5cab1b23c97d2eefbe60b74f6 (diff)
downloadmongo-9dd6ba84b674356bf9a31ce416a383c8d559fcbd.tar.gz
SERVER-25919 rewrite asio stress test
The old test introduced a thread pool and deffered objects in the test itself, which was often the bottle neck and therefore a poor stress test of the actual system.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/executor/network_interface_asio_integration_test.cpp129
-rw-r--r--src/mongo/executor/network_interface_asio_test_utils.h26
2 files changed, 82 insertions, 73 deletions
diff --git a/src/mongo/executor/network_interface_asio_integration_test.cpp b/src/mongo/executor/network_interface_asio_integration_test.cpp
index 548f14e1049..7a39a589673 100644
--- a/src/mongo/executor/network_interface_asio_integration_test.cpp
+++ b/src/mongo/executor/network_interface_asio_integration_test.cpp
@@ -55,6 +55,8 @@ namespace mongo {
namespace executor {
namespace {
+using StartCommandCB = stdx::function<void(const RemoteCommandResponse&)>;
+
class NetworkInterfaceASIOIntegrationTest : public mongo::unittest::Test {
public:
void startNet(NetworkInterfaceASIO::Options options = NetworkInterfaceASIO::Options()) {
@@ -87,6 +89,12 @@ public:
return _rng;
}
+ void startCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ RemoteCommandRequest& request,
+ StartCommandCB onFinish) {
+ net().startCommand(cbHandle, request, onFinish);
+ }
+
Deferred<RemoteCommandResponse> runCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequest& request) {
Deferred<RemoteCommandResponse> deferred;
@@ -175,90 +183,76 @@ public:
using Fixture = NetworkInterfaceASIOIntegrationTest;
using Pool = ThreadPoolInterface;
- Deferred<Status> run(Fixture* fixture,
- Pool* pool,
- Milliseconds timeout = RemoteCommandRequest::kNoTimeout) {
+ void run(Fixture* fixture,
+ StartCommandCB onFinish,
+ Milliseconds timeout = RemoteCommandRequest::kNoTimeout) {
auto cb = makeCallbackHandle();
- auto self = *this;
+
RemoteCommandRequest request{unittest::getFixtureConnectionString().getServers()[0],
"admin",
_command,
nullptr,
timeout};
- auto out = fixture->runCommand(cb, request)
- .then(pool, [self](RemoteCommandResponse resp) -> Status {
- auto status =
- resp.isOK() ? getStatusFromCommandResult(resp.data) : resp.status;
-
- return status == self._expected
- ? Status::OK()
- : Status{ErrorCodes::BadValue,
- str::stream() << "Expected "
- << ErrorCodes::errorString(self._expected)
- << " but got "
- << status.toString()};
- });
+
+ fixture->startCommand(cb, request, onFinish);
+
if (_cancel) {
invariant(fixture->randomNumberGenerator());
sleepmillis(fixture->randomNumberGenerator()->nextInt32(10));
fixture->net().cancelCommand(cb);
}
- return out;
}
- static Deferred<Status> runTimeoutOp(Fixture* fixture, Pool* pool) {
+ static void runTimeoutOp(Fixture* fixture, StartCommandCB onFinish) {
return StressTestOp(BSON("sleep" << 1 << "lock"
<< "none"
<< "secs"
<< 1),
- ErrorCodes::ExceededTimeLimit,
false)
- .run(fixture, pool, Milliseconds(100));
+ .run(fixture, onFinish, Milliseconds(100));
}
- static Deferred<Status> runCompleteOp(Fixture* fixture, Pool* pool) {
+ static void runCompleteOp(Fixture* fixture, StartCommandCB onFinish) {
return StressTestOp(BSON("sleep" << 1 << "lock"
<< "none"
<< "millis"
<< 100),
- ErrorCodes::OK,
false)
- .run(fixture, pool);
+ .run(fixture, onFinish);
}
- static Deferred<Status> runCancelOp(Fixture* fixture, Pool* pool) {
+ static void runCancelOp(Fixture* fixture, StartCommandCB onFinish) {
return StressTestOp(BSON("sleep" << 1 << "lock"
<< "none"
<< "secs"
<< 10),
- ErrorCodes::CallbackCanceled,
true)
- .run(fixture, pool);
+ .run(fixture, onFinish);
}
- static Deferred<Status> runLongOp(Fixture* fixture, Pool* pool) {
+ static void runLongOp(Fixture* fixture, StartCommandCB onFinish) {
return StressTestOp(BSON("sleep" << 1 << "lock"
<< "none"
<< "secs"
<< 30),
- ErrorCodes::OK,
false)
- .run(fixture, pool, RemoteCommandRequest::kNoTimeout);
+ .run(fixture, onFinish);
}
private:
- StressTestOp(const BSONObj& command, ErrorCodes::Error expected, bool cancel)
- : _command(command), _expected(expected), _cancel(cancel) {}
+ StressTestOp(const BSONObj& command, bool cancel) : _command(command), _cancel(cancel) {}
BSONObj _command;
- ErrorCodes::Error _expected;
bool _cancel;
};
TEST_F(NetworkInterfaceASIOIntegrationTest, StressTest) {
+ constexpr std::size_t numOps = 10000;
+ RemoteCommandResponse testResults[numOps];
+ ErrorCodes::Error expectedResults[numOps];
+ CountdownLatch cl(numOps);
+
startNet();
- const std::size_t numOps = 10000;
- std::vector<Deferred<Status>> ops;
std::unique_ptr<SecureRandom> seedSource{SecureRandom::create()};
auto seed = seedSource->nextInt64();
@@ -268,49 +262,40 @@ TEST_F(NetworkInterfaceASIOIntegrationTest, StressTest) {
randomNumberGenerator(&rng);
log() << "Starting stress test...";
- ThreadPool::Options threadPoolOpts;
- threadPoolOpts.poolName = "StressTestPool";
- threadPoolOpts.maxThreads = 8;
- ThreadPool pool(threadPoolOpts);
- pool.startup();
-
- auto poolGuard = MakeGuard([&pool] {
- pool.schedule([&pool] { pool.shutdown(); });
- pool.join();
- });
-
- std::generate_n(std::back_inserter(ops), numOps, [&rng, &pool, this] {
-
+ for (std::size_t i = 0; i < numOps; ++i) {
// stagger operations slightly to mitigate connection pool contention
sleepmillis(rng.nextInt32(10));
- auto i = rng.nextCanonicalDouble();
-
- if (i < .3) {
- return StressTestOp::runCancelOp(this, &pool);
- } else if (i < .7) {
- return StressTestOp::runCompleteOp(this, &pool);
- } else if (i < .99) {
- return StressTestOp::runTimeoutOp(this, &pool);
+ auto r = rng.nextCanonicalDouble();
+
+ auto cb = [&testResults, &cl, i](const RemoteCommandResponse& resp) {
+ testResults[i] = resp;
+ cl.countDown();
+ };
+
+ if (r < .3) {
+ expectedResults[i] = ErrorCodes::CallbackCanceled;
+ StressTestOp::runCancelOp(this, cb);
+ } else if (r < .7) {
+ expectedResults[i] = ErrorCodes::OK;
+ StressTestOp::runCompleteOp(this, cb);
+ } else if (r < .99) {
+ expectedResults[i] = ErrorCodes::ExceededTimeLimit;
+ StressTestOp::runTimeoutOp(this, cb);
} else {
// Just a sprinkling of long ops, to mitigate connection pool contention
- return StressTestOp::runLongOp(this, &pool);
+ expectedResults[i] = ErrorCodes::OK;
+ StressTestOp::runLongOp(this, cb);
}
- });
-
- log() << "running ops";
- auto res = helpers::collect(ops, &pool)
- .then(&pool,
- [](std::vector<Status> opResults) -> Status {
- for (const auto& opResult : opResults) {
- if (!opResult.isOK()) {
- return opResult;
- }
- }
- return Status::OK();
- })
- .get();
- ASSERT_OK(res);
+ };
+
+ cl.await();
+
+ for (std::size_t i = 0; i < numOps; ++i) {
+ const auto& resp = testResults[i];
+ auto ec = resp.isOK() ? getStatusFromCommandResult(resp.data) : resp.status;
+ ASSERT_EQ(ec, expectedResults[i]);
+ }
}
// Hook that intentionally never finishes
diff --git a/src/mongo/executor/network_interface_asio_test_utils.h b/src/mongo/executor/network_interface_asio_test_utils.h
index 766c52e82ed..b65d7eaa11e 100644
--- a/src/mongo/executor/network_interface_asio_test_utils.h
+++ b/src/mongo/executor/network_interface_asio_test_utils.h
@@ -33,6 +33,7 @@
#include <vector>
#include "mongo/executor/task_executor.h"
+#include "mongo/platform/atomic_word.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/future.h"
#include "mongo/stdx/mutex.h"
@@ -113,10 +114,33 @@ private:
return *state->thing;
}
-private:
std::shared_ptr<State> _state = std::make_shared<State>();
};
+class CountdownLatch {
+public:
+ CountdownLatch(uint32_t count) : _count(count) {}
+
+ void countDown() {
+ if (_count.load() == 0) {
+ return;
+ }
+ if (_count.subtractAndFetch(1) == 0) {
+ _cv.notify_all();
+ }
+ }
+
+ void await() {
+ stdx::unique_lock<stdx::mutex> lk(_mtx);
+ _cv.wait(lk, [&] { return _count.load() == 0; });
+ }
+
+private:
+ stdx::condition_variable _cv;
+ stdx::mutex _mtx;
+ AtomicUInt32 _count;
+};
+
namespace helpers {
template <typename T>