summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-09-19 17:20:04 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-09-27 11:07:42 -0400
commit6bc284630b40fd90f6a8f1521d2d8c881e153a23 (patch)
tree6e19cdbad339db428caa6fab32bf907707656987
parent0a0a8bf93d88ce91629f8ee460e57d8bd56246e6 (diff)
downloadmongo-6bc284630b40fd90f6a8f1521d2d8c881e153a23.tar.gz
SERVER-31170 Add support for session and transaction number to benchRun
-rw-r--r--src/mongo/shell/bench.cpp339
-rw-r--r--src/mongo/shell/bench.h75
2 files changed, 256 insertions, 158 deletions
diff --git a/src/mongo/shell/bench.cpp b/src/mongo/shell/bench.cpp
index eb5f83b1f68..19529f4ba04 100644
--- a/src/mongo/shell/bench.cpp
+++ b/src/mongo/shell/bench.cpp
@@ -51,33 +51,38 @@
namespace mongo {
namespace {
-const std::map<OpType, std::string> opTypeName{{OpType::NONE, "none"},
- {OpType::NOP, "nop"},
- {OpType::FINDONE, "findOne"},
- {OpType::COMMAND, "command"},
- {OpType::FIND, "find"},
- {OpType::UPDATE, "update"},
- {OpType::INSERT, "insert"},
- {OpType::REMOVE, "remove"},
- {OpType::CREATEINDEX, "createIndex"},
- {OpType::DROPINDEX, "dropIndex"},
- {OpType::LET, "let"},
- {OpType::CPULOAD, "cpuload"}};
+const std::map<OpType, std::string> kOpTypeNames{{OpType::NONE, "none"},
+ {OpType::NOP, "nop"},
+ {OpType::FINDONE, "findOne"},
+ {OpType::COMMAND, "command"},
+ {OpType::FIND, "find"},
+ {OpType::UPDATE, "update"},
+ {OpType::INSERT, "insert"},
+ {OpType::REMOVE, "remove"},
+ {OpType::CREATEINDEX, "createIndex"},
+ {OpType::DROPINDEX, "dropIndex"},
+ {OpType::LET, "let"},
+ {OpType::CPULOAD, "cpuload"}};
+
+// When specified to the connection's 'runCommand' call indicates that the command should be
+// executed with no query options. This is only meaningful if a command is run via OP_QUERY against
+// '$cmd'.
+const int kNoOptions = 0;
class BenchRunWorkerStateGuard {
MONGO_DISALLOW_COPYING(BenchRunWorkerStateGuard);
public:
- explicit BenchRunWorkerStateGuard(BenchRunState* brState) : _brState(brState) {
- _brState->onWorkerStarted();
+ explicit BenchRunWorkerStateGuard(BenchRunState& brState) : _brState(brState) {
+ _brState.onWorkerStarted();
}
~BenchRunWorkerStateGuard() {
- _brState->onWorkerFinished();
+ _brState.onWorkerFinished();
}
private:
- BenchRunState* _brState;
+ BenchRunState& _brState;
};
pcrecpp::RE_Options flags2options(const char* flags) {
@@ -95,14 +100,6 @@ pcrecpp::RE_Options flags2options(const char* flags) {
return options;
}
-void appendAverageMicrosIfAvailable(BSONObjBuilder& buf,
- const std::string& name,
- const BenchRunEventCounter& counter) {
- if (counter.getNumEvents() > 0)
- buf.append(name,
- static_cast<double>(counter.getTotalTimeMicros()) / counter.getNumEvents());
-}
-
bool hasSpecial(const BSONObj& obj) {
BSONObjIterator i(obj);
while (i.more()) {
@@ -128,6 +125,54 @@ BSONObj fixQuery(const BSONObj& obj, BsonTemplateEvaluator& btl) {
return b.obj();
}
+bool runCommandWithSession(DBClientBase* conn,
+ const std::string& dbname,
+ const BSONObj& cmdObj,
+ int options,
+ const boost::optional<LogicalSessionIdToClient>& lsid,
+ boost::optional<TxnNumber> txnNumber,
+ BSONObj* result) {
+ if (!lsid) {
+ invariant(!txnNumber);
+ return conn->runCommand(dbname, cmdObj, *result);
+ }
+
+ BSONObjBuilder cmdObjWithLsidBuilder;
+
+ for (const auto& cmdArg : cmdObj) {
+ uassert(ErrorCodes::IllegalOperation,
+ "Command cannot contain session id",
+ cmdArg.fieldName() != OperationSessionInfo::kSessionIdFieldName);
+ uassert(ErrorCodes::IllegalOperation,
+ "Command cannot contain transaction id",
+ cmdArg.fieldName() != OperationSessionInfo::kTxnNumberFieldName);
+
+ cmdObjWithLsidBuilder.append(cmdArg);
+ }
+
+ {
+ BSONObjBuilder lsidBuilder(
+ cmdObjWithLsidBuilder.subobjStart(OperationSessionInfo::kSessionIdFieldName));
+ lsid->serialize(&lsidBuilder);
+ lsidBuilder.doneFast();
+ }
+
+ if (txnNumber) {
+ cmdObjWithLsidBuilder.append(OperationSessionInfo::kTxnNumberFieldName, *txnNumber);
+ }
+
+ return conn->runCommand(dbname, cmdObjWithLsidBuilder.done(), *result);
+}
+
+bool runCommandWithSession(DBClientBase* conn,
+ const std::string& dbname,
+ const BSONObj& cmdObj,
+ int options,
+ const boost::optional<LogicalSessionIdToClient>& lsid,
+ BSONObj* result) {
+ return runCommandWithSession(conn, dbname, cmdObj, options, lsid, boost::none, result);
+}
+
/**
* Issues the query 'qr' against 'conn' using read commands. Returns the size of the result set
* returned by the query.
@@ -140,14 +185,16 @@ BSONObj fixQuery(const BSONObj& obj, BsonTemplateEvaluator& btl) {
* On error, throws a AssertionException.
*/
int runQueryWithReadCommands(DBClientBase* conn,
+ const boost::optional<LogicalSessionIdToClient>& lsid,
std::unique_ptr<QueryRequest> qr,
- BSONObj* objOut = nullptr) {
- std::string dbName = qr->nss().db().toString();
+ BSONObj* objOut) {
+ const auto dbName = qr->nss().db().toString();
+
BSONObj findCommandResult;
- bool res = conn->runCommand(dbName, qr->asFindCommand(), findCommandResult);
uassert(ErrorCodes::CommandFailed,
str::stream() << "find command failed; reply was: " << findCommandResult,
- res);
+ runCommandWithSession(
+ conn, dbName, qr->asFindCommand(), kNoOptions, lsid, &findCommandResult));
CursorResponse cursorResponse =
uassertStatusOK(CursorResponse::parseFromBSON(findCommandResult));
@@ -169,10 +216,12 @@ int runQueryWithReadCommands(DBClientBase* conn,
boost::none, // term
boost::none); // lastKnownCommittedOpTime
BSONObj getMoreCommandResult;
- res = conn->runCommand(dbName, getMoreRequest.toBSON(), getMoreCommandResult);
- uassert(ErrorCodes::CommandFailed,
- str::stream() << "getMore command failed; reply was: " << getMoreCommandResult,
- res);
+ uassert(
+ ErrorCodes::CommandFailed,
+ str::stream() << "getMore command failed; reply was: " << getMoreCommandResult,
+ runCommandWithSession(
+ conn, dbName, getMoreRequest.toBSON(), kNoOptions, lsid, &getMoreCommandResult));
+
cursorResponse = uassertStatusOK(CursorResponse::parseFromBSON(getMoreCommandResult));
count += cursorResponse.getBatch().size();
}
@@ -184,41 +233,16 @@ void doNothing(const BSONObj&) {}
} // namespace
-BenchRunEventCounter::BenchRunEventCounter() {
- reset();
-}
-
-void BenchRunEventCounter::reset() {
- _numEvents = 0;
- _totalTimeMicros = 0;
-}
+BenchRunEventCounter::BenchRunEventCounter() = default;
void BenchRunEventCounter::updateFrom(const BenchRunEventCounter& other) {
_numEvents += other._numEvents;
_totalTimeMicros += other._totalTimeMicros;
}
-BenchRunStats::BenchRunStats() {
- reset();
-}
-
-void BenchRunStats::reset() {
- error = false;
- errCount = 0;
- opCount = 0;
-
- findOneCounter.reset();
- updateCounter.reset();
- insertCounter.reset();
- deleteCounter.reset();
- queryCounter.reset();
- commandCounter.reset();
- trappedErrors.clear();
-}
-
void BenchRunStats::updateFrom(const BenchRunStats& other) {
- if (other.error)
- error = true;
+ error = other.error;
+
errCount += other.errCount;
opCount += other.opCount;
@@ -229,8 +253,9 @@ void BenchRunStats::updateFrom(const BenchRunStats& other) {
queryCounter.updateFrom(other.queryCounter);
commandCounter.updateFrom(other.commandCounter);
- for (size_t i = 0; i < other.trappedErrors.size(); ++i)
- trappedErrors.push_back(other.trappedErrors[i]);
+ for (const auto& trappedError : other.trappedErrors) {
+ trappedErrors.push_back(trappedError);
+ }
}
BenchRunConfig::BenchRunConfig() {
@@ -533,6 +558,18 @@ void BenchRunConfig::initializeFromBson(const BSONObj& args) {
<< typeName(arg.type()),
arg.isNumber());
seconds = arg.number();
+ } else if (name == "useSessions") {
+ uassert(40641,
+ str::stream() << "Field '" << name << "' should be a boolean. . Type is "
+ << typeName(arg.type()),
+ arg.isBoolean());
+ useSessions = arg.boolean();
+ } else if (name == "useIdempotentWrites") {
+ uassert(40642,
+ str::stream() << "Field '" << name << "' should be a boolean. . Type is "
+ << typeName(arg.type()),
+ arg.isBoolean());
+ useIdempotentWrites = arg.boolean();
} else if (name == "hideResults") {
hideResults = arg.trueValue();
} else if (name == "handleErrors") {
@@ -628,16 +665,16 @@ void BenchRunState::tellWorkersToCollectStats() {
_isCollectingStats.store(1);
}
-void BenchRunState::assertFinished() {
+void BenchRunState::assertFinished() const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
verify(0 == _numUnstartedWorkers + _numActiveWorkers);
}
-bool BenchRunState::shouldWorkerFinish() {
+bool BenchRunState::shouldWorkerFinish() const {
return (_isShuttingDown.loadRelaxed() == 1);
}
-bool BenchRunState::shouldWorkerCollectStats() {
+bool BenchRunState::shouldWorkerCollectStats() const {
return (_isCollectingStats.loadRelaxed() == 1);
}
@@ -662,22 +699,22 @@ void BenchRunState::onWorkerFinished() {
BenchRunWorker::BenchRunWorker(size_t id,
const BenchRunConfig* config,
- BenchRunState* brState,
+ BenchRunState& brState,
int64_t randomSeed)
: _id(id), _config(config), _brState(brState), _randomSeed(randomSeed) {}
-BenchRunWorker::~BenchRunWorker() {}
+BenchRunWorker::~BenchRunWorker() = default;
void BenchRunWorker::start() {
stdx::thread(stdx::bind(&BenchRunWorker::run, this)).detach();
}
bool BenchRunWorker::shouldStop() const {
- return _brState->shouldWorkerFinish();
+ return _brState.shouldWorkerFinish();
}
bool BenchRunWorker::shouldCollectStats() const {
- return _brState->shouldWorkerCollectStats();
+ return _brState.shouldWorkerCollectStats();
}
void BenchRunWorker::generateLoadOnConnection(DBClientBase* conn) {
@@ -695,6 +732,22 @@ void BenchRunWorker::generateLoadOnConnection(DBClientBase* conn) {
conn->auth("admin", _config->username, _config->password, errmsg));
}
+ boost::optional<LogicalSessionIdToClient> lsid;
+ if (_config->useSessions) {
+ BSONObj result;
+ uassert(40640,
+ str::stream() << "Unable to create session due to error " << result,
+ conn->runCommand("admin", BSON("startSession" << 1), result));
+
+ lsid.emplace(
+ LogicalSessionIdToClient::parse(IDLParserErrorContext("lsid"), result["id"].Obj()));
+ }
+
+ boost::optional<TxnNumber> txnNumberForWriteCommands;
+ if (_config->useIdempotentWrites) {
+ txnNumberForWriteCommands = 0;
+ }
+
std::unique_ptr<Scope> scope{getGlobalScriptEngine()->newScopeForCurrentThread()};
verify(scope.get());
@@ -750,7 +803,7 @@ void BenchRunWorker::generateLoadOnConnection(DBClientBase* conn) {
invariantOK(qr->validate());
BenchRunEventTrace _bret(&stats.findOneCounter);
- runQueryWithReadCommands(conn, std::move(qr), &result);
+ runQueryWithReadCommands(conn, lsid, std::move(qr), &result);
} else {
BenchRunEventTrace _bret(&stats.findOneCounter);
result = conn->findOne(op.ns,
@@ -779,10 +832,12 @@ void BenchRunWorker::generateLoadOnConnection(DBClientBase* conn) {
BSONObj result;
{
BenchRunEventTrace _bret(&stats.commandCounter);
- ok = conn->runCommand(op.ns,
- fixQuery(op.command, bsonTemplateEvaluator),
- result,
- op.options);
+ ok = runCommandWithSession(conn,
+ op.ns,
+ fixQuery(op.command, bsonTemplateEvaluator),
+ op.options,
+ lsid,
+ &result);
}
if (!ok) {
stats.errCount++;
@@ -802,12 +857,15 @@ void BenchRunWorker::generateLoadOnConnection(DBClientBase* conn) {
boost::none, // term
boost::none); // lastKnownCommittedOpTime
BSONObj getMoreCommandResult;
- ok = conn->runCommand(
- op.ns, getMoreRequest.toBSON(), getMoreCommandResult);
uassert(ErrorCodes::CommandFailed,
str::stream() << "getMore command failed; reply was: "
<< getMoreCommandResult,
- ok);
+ runCommandWithSession(conn,
+ op.ns,
+ getMoreRequest.toBSON(),
+ kNoOptions,
+ lsid,
+ &getMoreCommandResult));
cursorResponse = uassertStatusOK(
CursorResponse::parseFromBSON(getMoreCommandResult));
count += cursorResponse.getBatch().size();
@@ -861,7 +919,7 @@ void BenchRunWorker::generateLoadOnConnection(DBClientBase* conn) {
invariantOK(qr->validate());
BenchRunEventTrace _bret(&stats.queryCounter);
- count = runQueryWithReadCommands(conn, std::move(qr));
+ count = runQueryWithReadCommands(conn, lsid, std::move(qr), nullptr);
} else {
// Use special query function for exhaust query option.
if (op.options & QueryOption_Exhaust) {
@@ -917,7 +975,6 @@ void BenchRunWorker::generateLoadOnConnection(DBClientBase* conn) {
BSONObj update = fixQuery(op.update, bsonTemplateEvaluator);
if (op.useWriteCmd) {
- // TODO: Replace after SERVER-11774.
BSONObjBuilder builder;
builder.append("update", nsToCollectionSubstring(op.ns));
BSONArrayBuilder docBuilder(builder.subarrayStart("updates"));
@@ -926,9 +983,16 @@ void BenchRunWorker::generateLoadOnConnection(DBClientBase* conn) {
<< op.upsert));
docBuilder.done();
builder.append("writeConcern", op.writeConcern);
- conn->runCommand(nsToDatabaseSubstring(op.ns).toString(),
- builder.done(),
- result);
+
+ if (txnNumberForWriteCommands)
+ ++(*txnNumberForWriteCommands);
+ runCommandWithSession(conn,
+ nsToDatabaseSubstring(op.ns).toString(),
+ builder.done(),
+ kNoOptions,
+ lsid,
+ txnNumberForWriteCommands,
+ &result);
} else {
auto toSend =
makeUpdateMessage(op.ns,
@@ -967,13 +1031,11 @@ void BenchRunWorker::generateLoadOnConnection(DBClientBase* conn) {
} break;
case OpType::INSERT: {
BSONObj result;
-
{
BenchRunEventTrace _bret(&stats.insertCounter);
BSONObj insertDoc;
if (op.useWriteCmd) {
- // TODO: Replace after SERVER-11774.
BSONObjBuilder builder;
builder.append("insert", nsToCollectionSubstring(op.ns));
BSONArrayBuilder docBuilder(builder.subarrayStart("documents"));
@@ -988,9 +1050,16 @@ void BenchRunWorker::generateLoadOnConnection(DBClientBase* conn) {
}
docBuilder.done();
builder.append("writeConcern", op.writeConcern);
- conn->runCommand(nsToDatabaseSubstring(op.ns).toString(),
- builder.done(),
- result);
+
+ if (txnNumberForWriteCommands)
+ ++(*txnNumberForWriteCommands);
+ runCommandWithSession(conn,
+ nsToDatabaseSubstring(op.ns).toString(),
+ builder.done(),
+ kNoOptions,
+ lsid,
+ txnNumberForWriteCommands,
+ &result);
} else {
std::vector<BSONObj> insertArray;
if (op.isDocAnArray) {
@@ -1040,7 +1109,6 @@ void BenchRunWorker::generateLoadOnConnection(DBClientBase* conn) {
BenchRunEventTrace _bret(&stats.deleteCounter);
BSONObj predicate = fixQuery(op.query, bsonTemplateEvaluator);
if (op.useWriteCmd) {
- // TODO: Replace after SERVER-11774.
BSONObjBuilder builder;
builder.append("delete", nsToCollectionSubstring(op.ns));
BSONArrayBuilder docBuilder(builder.subarrayStart("deletes"));
@@ -1048,9 +1116,16 @@ void BenchRunWorker::generateLoadOnConnection(DBClientBase* conn) {
docBuilder.append(BSON("q" << predicate << "limit" << limit));
docBuilder.done();
builder.append("writeConcern", op.writeConcern);
- conn->runCommand(nsToDatabaseSubstring(op.ns).toString(),
- builder.done(),
- result);
+
+ if (txnNumberForWriteCommands)
+ ++(*txnNumberForWriteCommands);
+ runCommandWithSession(conn,
+ nsToDatabaseSubstring(op.ns).toString(),
+ builder.done(),
+ kNoOptions,
+ lsid,
+ txnNumberForWriteCommands,
+ &result);
} else {
auto toSend = makeRemoveMessage(
op.ns, predicate, op.multi ? 0 : RemoveOption_JustOne);
@@ -1098,9 +1173,10 @@ void BenchRunWorker::generateLoadOnConnection(DBClientBase* conn) {
default:
uassert(34397, "In benchRun loop and got unknown op type", false);
}
+
// Count 1 for total ops. Successfully got through the try phrase
stats.opCount++;
- } catch (DBException& ex) {
+ } catch (const DBException& ex) {
if (!_config->hideErrors || op.showError) {
bool yesWatch =
(_config->watchPattern && _config->watchPattern->FullMatch(ex.what()));
@@ -1113,7 +1189,7 @@ void BenchRunWorker::generateLoadOnConnection(DBClientBase* conn) {
yesWatch) || // If we're just watching things
(_config->watchPattern && _config->noWatchPattern && yesWatch && !noWatch))
log() << "Error in benchRun thread for op "
- << opTypeName.find(op.op)->second << causedBy(ex);
+ << kOpTypeNames.find(op.op)->second << causedBy(ex);
}
bool yesTrap = (_config->trapPattern && _config->trapPattern->FullMatch(ex.what()));
@@ -1124,10 +1200,10 @@ void BenchRunWorker::generateLoadOnConnection(DBClientBase* conn) {
(!_config->noTrapPattern && _config->trapPattern && yesTrap) ||
(_config->trapPattern && _config->noTrapPattern && yesTrap && !noTrap)) {
{
- stats.trappedErrors.push_back(BSON("error" << ex.what() << "op"
- << opTypeName.find(op.op)->second
- << "count"
- << count));
+ stats.trappedErrors.push_back(
+ BSON("error" << ex.what() << "op" << kOpTypeNames.find(op.op)->second
+ << "count"
+ << count));
}
if (_config->breakOnTrap)
return;
@@ -1139,7 +1215,7 @@ void BenchRunWorker::generateLoadOnConnection(DBClientBase* conn) {
} catch (...) {
if (!_config->hideErrors || op.showError)
log() << "Error in benchRun thread caused by unknown error for op "
- << opTypeName.find(op.op)->second;
+ << kOpTypeNames.find(op.op)->second;
if (!_config->handleErrors && !op.handleError)
return;
@@ -1169,7 +1245,7 @@ void BenchRunWorker::run() {
}
}
- BenchRunWorkerStateGuard _workerStateGuard(_brState);
+ BenchRunWorkerStateGuard workerStateGuard(_brState);
generateLoadOnConnection(conn.get());
} catch (const DBException& e) {
error() << "DBException not handled in benchRun thread" << causedBy(e);
@@ -1182,14 +1258,12 @@ void BenchRunWorker::run() {
BenchRunner::BenchRunner(BenchRunConfig* config) : _brState(config->parallel), _config(config) {
_oid.init();
+
stdx::lock_guard<stdx::mutex> lk(_staticMutex);
_activeRuns[_oid] = this;
}
-BenchRunner::~BenchRunner() {
- for (size_t i = 0; i < _workers.size(); ++i)
- delete _workers[i];
-}
+BenchRunner::~BenchRunner() = default;
void BenchRunner::start() {
{
@@ -1209,17 +1283,19 @@ void BenchRunner::start() {
// Start threads
for (int64_t i = 0; i < _config->parallel; i++) {
// Make a unique random seed for the worker.
- int64_t seed = _config->randomSeed + i;
- BenchRunWorker* worker = new BenchRunWorker(i, _config.get(), &_brState, seed);
+ const int64_t seed = _config->randomSeed + i;
+
+ auto worker = stdx::make_unique<BenchRunWorker>(i, _config.get(), _brState, seed);
worker->start();
- _workers.push_back(worker);
+
+ _workers.push_back(std::move(worker));
}
_brState.waitForState(BenchRunState::BRS_RUNNING);
// initial stats
_brState.tellWorkersToCollectStats();
- _brTimer = new Timer();
+ _brTimer.emplace();
}
}
@@ -1227,7 +1303,7 @@ void BenchRunner::stop() {
_brState.tellWorkersToFinish();
_brState.waitForState(BenchRunState::BRS_FINISHED);
_microsElapsed = _brTimer->micros();
- delete _brTimer;
+ _brTimer.reset();
{
std::unique_ptr<DBClientBase> conn(_config->createConnection());
@@ -1259,39 +1335,52 @@ BenchRunner* BenchRunner::get(OID oid) {
stdx::lock_guard<stdx::mutex> lk(_staticMutex);
return _activeRuns[oid];
}
-void BenchRunner::populateStats(BenchRunStats* stats) {
+
+BenchRunStats BenchRunner::gatherStats() const {
_brState.assertFinished();
- stats->reset();
- for (size_t i = 0; i < _workers.size(); ++i)
- stats->updateFrom(_workers[i]->stats());
+
+ BenchRunStats stats;
+
+ for (size_t i = 0; i < _workers.size(); ++i) {
+ stats.updateFrom(_workers[i]->stats());
+ }
+
+ return stats;
}
BSONObj BenchRunner::finish(BenchRunner* runner) {
runner->stop();
- BenchRunStats stats;
- runner->populateStats(&stats);
-
- // vector<BSONOBj> errors = runner->config.errors;
- bool error = stats.error;
+ const auto stats(runner->gatherStats());
- if (error)
+ const bool error = stats.error;
+ if (error) {
return BSON("err" << 1);
+ }
BSONObjBuilder buf;
buf.append("note", "values per second");
buf.append("errCount", static_cast<long long>(stats.errCount));
buf.append("trapped", "error: not implemented");
- appendAverageMicrosIfAvailable(buf, "findOneLatencyAverageMicros", stats.findOneCounter);
- appendAverageMicrosIfAvailable(buf, "insertLatencyAverageMicros", stats.insertCounter);
- appendAverageMicrosIfAvailable(buf, "deleteLatencyAverageMicros", stats.deleteCounter);
- appendAverageMicrosIfAvailable(buf, "updateLatencyAverageMicros", stats.updateCounter);
- appendAverageMicrosIfAvailable(buf, "queryLatencyAverageMicros", stats.queryCounter);
- appendAverageMicrosIfAvailable(buf, "commandsLatencyAverageMicros", stats.commandCounter);
+
+ const auto appendAverageMicrosIfAvailable = [&buf](StringData name,
+ const BenchRunEventCounter& counter) {
+ if (counter.getNumEvents() > 0) {
+ buf.append(name,
+ static_cast<double>(counter.getTotalTimeMicros()) / counter.getNumEvents());
+ }
+ };
+
+ appendAverageMicrosIfAvailable("findOneLatencyAverageMicros", stats.findOneCounter);
+ appendAverageMicrosIfAvailable("insertLatencyAverageMicros", stats.insertCounter);
+ appendAverageMicrosIfAvailable("deleteLatencyAverageMicros", stats.deleteCounter);
+ appendAverageMicrosIfAvailable("updateLatencyAverageMicros", stats.updateCounter);
+ appendAverageMicrosIfAvailable("queryLatencyAverageMicros", stats.queryCounter);
+ appendAverageMicrosIfAvailable("commandsLatencyAverageMicros", stats.commandCounter);
buf.append("totalOps", static_cast<long long>(stats.opCount));
- auto appendPerSec = [&buf, runner](StringData name, double total) {
+ const auto appendPerSec = [&buf, runner](StringData name, double total) {
buf.append(name, total / (runner->_microsElapsed / 1000000.0));
};
diff --git a/src/mongo/shell/bench.h b/src/mongo/shell/bench.h
index 41a23512d28..107415174e5 100644
--- a/src/mongo/shell/bench.h
+++ b/src/mongo/shell/bench.h
@@ -28,6 +28,7 @@
#pragma once
+#include <boost/optional.hpp>
#include <string>
#include "mongo/client/dbclientinterface.h"
@@ -155,6 +156,17 @@ public:
*/
double seconds;
+ /**
+ * Whether the individual benchRun thread connections should be creating and using sessions.
+ */
+ bool useSessions{false};
+
+ /**
+ * Whether write commands should be sent with a txnNumber to ensure they are idempotent. This
+ * setting doesn't actually cause the workload generator to perform any retries.
+ */
+ bool useIdempotentWrites{false};
+
/// Base random seed for threads
int64_t randomSeed;
@@ -192,18 +204,10 @@ private:
* Not thread safe. Expected use is one instance per thread during parallel execution.
*/
class BenchRunEventCounter {
- MONGO_DISALLOW_COPYING(BenchRunEventCounter);
-
public:
- /// Constructs a zeroed out counter.
BenchRunEventCounter();
/**
- * Zero out the counter.
- */
- void reset();
-
- /**
* Conceptually the equivalent of "+=". Adds "other" into this.
*/
void updateFrom(const BenchRunEventCounter& other);
@@ -231,8 +235,8 @@ public:
}
private:
- unsigned long long _numEvents;
- long long _totalTimeMicros;
+ long long _totalTimeMicros{0};
+ unsigned long long _numEvents{0};
};
/**
@@ -291,19 +295,13 @@ private:
/**
* Statistics object representing the result of a bench run activity.
*/
-class BenchRunStats {
- MONGO_DISALLOW_COPYING(BenchRunStats);
-
-public:
- BenchRunStats();
-
- void reset();
-
+struct BenchRunStats {
void updateFrom(const BenchRunStats& other);
- bool error;
- unsigned long long errCount;
- unsigned long long opCount;
+ bool error{false};
+
+ unsigned long long errCount{0};
+ unsigned long long opCount{0};
BenchRunEventCounter findOneCounter;
BenchRunEventCounter updateCounter;
@@ -352,8 +350,10 @@ public:
*/
void tellWorkersToCollectStats();
- /// Check that the current state is BRS_FINISHED.
- void assertFinished();
+ /**
+ * Check that the current state is BRS_FINISHED.
+ */
+ void assertFinished() const;
//
// Functions called by the worker threads, through instances of BenchRunWorker.
@@ -363,13 +363,13 @@ public:
* Predicate that workers call to see if they should finish (as a result of a call
* to tellWorkersToFinish()).
*/
- bool shouldWorkerFinish();
+ bool shouldWorkerFinish() const;
/**
* Predicate that workers call to see if they should start collecting stats (as a result
* of a call to tellWorkersToCollectStats()).
*/
- bool shouldWorkerCollectStats();
+ bool shouldWorkerCollectStats() const;
/**
* Called by each BenchRunWorker from within its thread context, immediately before it
@@ -384,10 +384,13 @@ public:
void onWorkerFinished();
private:
- stdx::mutex _mutex;
+ mutable stdx::mutex _mutex;
+
stdx::condition_variable _stateChangeCondition;
+
unsigned _numUnstartedWorkers;
unsigned _numActiveWorkers;
+
AtomicUInt32 _isShuttingDown;
AtomicUInt32 _isCollectingStats;
};
@@ -410,7 +413,7 @@ public:
*/
BenchRunWorker(size_t id,
const BenchRunConfig* config,
- BenchRunState* brState,
+ BenchRunState& brState,
int64_t randomSeed);
~BenchRunWorker();
@@ -444,12 +447,16 @@ private:
const size_t _id;
const BenchRunConfig* _config;
- BenchRunState* _brState;
- BenchRunStats _stats;
+
+ BenchRunState& _brState;
+
+ const int64_t _randomSeed;
// Dummy stats to use before observation period.
BenchRunStats _statsBlackHole;
- int64_t _randomSeed;
+
+ // Actual stats collected during the run
+ BenchRunStats _stats;
};
/**
@@ -506,7 +513,7 @@ public:
*
* Illegal to call until after stop() returns.
*/
- void populateStats(BenchRunStats* stats);
+ BenchRunStats gatherStats() const;
OID oid() const {
return _oid;
@@ -528,10 +535,12 @@ private:
OID _oid;
BenchRunState _brState;
- Timer* _brTimer;
+
+ boost::optional<Timer> _brTimer;
unsigned long long _microsElapsed;
+
std::unique_ptr<BenchRunConfig> _config;
- std::vector<BenchRunWorker*> _workers;
+ std::vector<std::unique_ptr<BenchRunWorker>> _workers;
};
} // namespace mongo