summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2016-10-05 11:34:42 -0400
committerBenety Goh <benety@mongodb.com>2016-10-05 11:34:42 -0400
commitceac8df2ac2fac51c12ba0c576aa953349da6418 (patch)
tree2100c71c9f7c3b708a003d9adb8df6e2ea267d19
parent15ff9c78a8dfccd7ce2d2c861a646170e0cd41cc (diff)
downloadmongo-ceac8df2ac2fac51c12ba0c576aa953349da6418.tar.gz
Revert "SERVER-25919 rewrite asio stress test"
This reverts commit 15bdad6f352cbc23cef3338fd57d61ac00cc6149.
-rw-r--r--src/mongo/executor/network_interface_asio_integration_test.cpp149
-rw-r--r--src/mongo/executor/network_interface_asio_test_utils.h26
2 files changed, 85 insertions, 90 deletions
diff --git a/src/mongo/executor/network_interface_asio_integration_test.cpp b/src/mongo/executor/network_interface_asio_integration_test.cpp
index f2af23d7a97..cb85f273764 100644
--- a/src/mongo/executor/network_interface_asio_integration_test.cpp
+++ b/src/mongo/executor/network_interface_asio_integration_test.cpp
@@ -32,7 +32,6 @@
#include <algorithm>
#include <exception>
-#include <vector>
#include "mongo/client/connection_string.h"
#include "mongo/executor/async_stream_factory.h"
@@ -56,8 +55,6 @@ namespace mongo {
namespace executor {
namespace {
-using StartCommandCB = stdx::function<void(const StatusWith<RemoteCommandResponse>&)>;
-
class NetworkInterfaceASIOIntegrationTest : public mongo::unittest::Test {
public:
void startNet(NetworkInterfaceASIO::Options options = NetworkInterfaceASIO::Options()) {
@@ -90,12 +87,6 @@ public:
return _rng;
}
- void startCommand(const TaskExecutor::CallbackHandle& cbHandle,
- RemoteCommandRequest& request,
- StartCommandCB onFinish) {
- net().startCommand(cbHandle, request, onFinish);
- }
-
Deferred<StatusWith<RemoteCommandResponse>> runCommand(
const TaskExecutor::CallbackHandle& cbHandle, const RemoteCommandRequest& request) {
Deferred<StatusWith<RemoteCommandResponse>> deferred;
@@ -179,65 +170,81 @@ public:
using Fixture = NetworkInterfaceASIOIntegrationTest;
using Pool = ThreadPoolInterface;
- void run(Fixture* fixture,
- StartCommandCB onFinish,
- Milliseconds timeout = RemoteCommandRequest::kNoTimeout) {
+ Deferred<Status> run(Fixture* fixture, Pool* pool, Milliseconds timeout = Milliseconds(5000)) {
auto cb = makeCallbackHandle();
-
- RemoteCommandRequest request{
- unittest::getFixtureConnectionString().getServers()[0], "admin", _command, timeout};
-
- fixture->startCommand(cb, request, onFinish);
-
+ auto self = *this;
+ auto out =
+ fixture->runCommand(cb,
+ {unittest::getFixtureConnectionString().getServers()[0],
+ "admin",
+ _command,
+ timeout})
+ .then(pool,
+ [self](StatusWith<RemoteCommandResponse> resp) -> Status {
+ auto status = resp.isOK()
+ ? getStatusFromCommandResult(resp.getValue().data)
+ : resp.getStatus();
+
+ return status == self._expected
+ ? Status::OK()
+ : Status{ErrorCodes::BadValue,
+ str::stream() << "Expected "
+ << ErrorCodes::errorString(self._expected)
+ << " but got " << status.toString()};
+ });
if (_cancel) {
invariant(fixture->randomNumberGenerator());
sleepmillis(fixture->randomNumberGenerator()->nextInt32(10));
fixture->net().cancelCommand(cb);
}
+ return out;
}
- static void runTimeoutOp(Fixture* fixture, StartCommandCB onFinish) {
+ static Deferred<Status> runTimeoutOp(Fixture* fixture, Pool* pool) {
return StressTestOp(BSON("sleep" << 1 << "lock"
<< "none"
<< "secs" << 1),
- false).run(fixture, onFinish, Milliseconds(100));
+ ErrorCodes::ExceededTimeLimit,
+ false).run(fixture, pool, Milliseconds(100));
}
- static void runCompleteOp(Fixture* fixture, StartCommandCB onFinish) {
+ static Deferred<Status> runCompleteOp(Fixture* fixture, Pool* pool) {
return StressTestOp(BSON("sleep" << 1 << "lock"
<< "none"
<< "millis" << 100),
- false).run(fixture, onFinish);
+ ErrorCodes::OK,
+ false).run(fixture, pool);
}
- static void runCancelOp(Fixture* fixture, StartCommandCB onFinish) {
+ static Deferred<Status> runCancelOp(Fixture* fixture, Pool* pool) {
return StressTestOp(BSON("sleep" << 1 << "lock"
<< "none"
<< "secs" << 10),
- true).run(fixture, onFinish);
+ ErrorCodes::CallbackCanceled,
+ true).run(fixture, pool);
}
- static void runLongOp(Fixture* fixture, StartCommandCB onFinish) {
+ static Deferred<Status> runLongOp(Fixture* fixture, Pool* pool) {
return StressTestOp(BSON("sleep" << 1 << "lock"
<< "none"
<< "secs" << 30),
- false).run(fixture, onFinish);
+ ErrorCodes::OK,
+ false).run(fixture, pool, RemoteCommandRequest::kNoTimeout);
}
private:
- StressTestOp(const BSONObj& command, bool cancel) : _command(command), _cancel(cancel) {}
+ StressTestOp(const BSONObj& command, ErrorCodes::Error expected, bool cancel)
+ : _command(command), _expected(expected), _cancel(cancel) {}
BSONObj _command;
+ ErrorCodes::Error _expected;
bool _cancel;
};
TEST_F(NetworkInterfaceASIOIntegrationTest, StressTest) {
- constexpr std::size_t numOps = 10000;
- std::vector<Status> testResults(numOps, {ErrorCodes::InternalError, "uninitialized"});
- ErrorCodes::Error expectedResults[numOps];
- CountdownLatch cl(numOps);
-
startNet();
+ const std::size_t numOps = 10000;
+ std::vector<Deferred<Status>> ops;
std::unique_ptr<SecureRandom> seedSource{SecureRandom::create()};
auto seed = seedSource->nextInt64();
@@ -247,39 +254,51 @@ TEST_F(NetworkInterfaceASIOIntegrationTest, StressTest) {
randomNumberGenerator(&rng);
log() << "Starting stress test...";
- for (std::size_t i = 0; i < numOps; ++i) {
- // stagger operations slightly to mitigate connection pool contention
- sleepmillis(rng.nextInt32(10));
-
- auto r = rng.nextCanonicalDouble();
-
- auto cb = [&testResults, &cl, i](const StatusWith<RemoteCommandResponse>& resp) {
- testResults[i] =
- resp.isOK() ? getStatusFromCommandResult(resp.getValue().data) : resp.getStatus();
- cl.countDown();
- };
-
- if (r < .3) {
- expectedResults[i] = ErrorCodes::CallbackCanceled;
- StressTestOp::runCancelOp(this, cb);
- } else if (r < .7) {
- expectedResults[i] = ErrorCodes::OK;
- StressTestOp::runCompleteOp(this, cb);
- } else if (r < .99) {
- expectedResults[i] = ErrorCodes::ExceededTimeLimit;
- StressTestOp::runTimeoutOp(this, cb);
- } else {
- // Just a sprinkling of long ops, to mitigate connection pool contention
- expectedResults[i] = ErrorCodes::OK;
- StressTestOp::runLongOp(this, cb);
- }
- };
-
- cl.await();
-
- for (std::size_t i = 0; i < numOps; ++i) {
- ASSERT_EQ(testResults[i], expectedResults[i]);
- }
+ ThreadPool::Options threadPoolOpts;
+ threadPoolOpts.poolName = "StressTestPool";
+ threadPoolOpts.maxThreads = 8;
+ ThreadPool pool(threadPoolOpts);
+ pool.startup();
+
+ auto poolGuard = MakeGuard([&pool] {
+ pool.schedule([&pool] { pool.shutdown(); });
+ pool.join();
+ });
+
+ std::generate_n(std::back_inserter(ops),
+ numOps,
+ [&rng, &pool, this] {
+
+ // stagger operations slightly to mitigate connection pool contention
+ sleepmillis(rng.nextInt32(10));
+
+ auto i = rng.nextCanonicalDouble();
+
+ if (i < .3) {
+ return StressTestOp::runCancelOp(this, &pool);
+ } else if (i < .7) {
+ return StressTestOp::runCompleteOp(this, &pool);
+ } else if (i < .99) {
+ return StressTestOp::runTimeoutOp(this, &pool);
+ } else {
+ // Just a sprinkling of long ops, to mitigate connection pool contention
+ return StressTestOp::runLongOp(this, &pool);
+ }
+ });
+
+ log() << "running ops";
+ auto res = helpers::collect(ops, &pool)
+ .then(&pool,
+ [](std::vector<Status> opResults) -> Status {
+ for (const auto& opResult : opResults) {
+ if (!opResult.isOK()) {
+ return opResult;
+ }
+ }
+ return Status::OK();
+ })
+ .get();
+ ASSERT_OK(res);
}
// Hook that intentionally never finishes
diff --git a/src/mongo/executor/network_interface_asio_test_utils.h b/src/mongo/executor/network_interface_asio_test_utils.h
index 55754050047..bfd489514b0 100644
--- a/src/mongo/executor/network_interface_asio_test_utils.h
+++ b/src/mongo/executor/network_interface_asio_test_utils.h
@@ -33,7 +33,6 @@
#include <vector>
#include "mongo/executor/task_executor.h"
-#include "mongo/platform/atomic_word.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/future.h"
#include "mongo/stdx/mutex.h"
@@ -114,31 +113,8 @@ private:
return *state->thing;
}
- std::shared_ptr<State> _state = std::make_shared<State>();
-};
-
-class CountdownLatch {
-public:
- CountdownLatch(uint32_t count) : _count(count) {}
-
- void countDown() {
- if (_count.load() == 0) {
- return;
- }
- if (_count.subtractAndFetch(1) == 0) {
- _cv.notify_all();
- }
- }
-
- void await() {
- stdx::unique_lock<stdx::mutex> lk(_mtx);
- _cv.wait(lk, [&] { return _count.load() == 0; });
- }
-
private:
- stdx::condition_variable _cv;
- stdx::mutex _mtx;
- AtomicUInt32 _count;
+ std::shared_ptr<State> _state = std::make_shared<State>();
};
namespace helpers {