/** @file bench.cpp */
/*
* Copyright (C) 2010 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
#include "mongo/platform/basic.h"
#include "mongo/shell/bench.h"
#include
#include
#include "mongo/client/dbclientcursor.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/query/getmore_request.h"
#include "mongo/db/query/query_request.h"
#include "mongo/scripting/bson_template_evaluator.h"
#include "mongo/scripting/engine.h"
#include "mongo/stdx/thread.h"
#include "mongo/util/log.h"
#include "mongo/util/md5.h"
#include "mongo/util/time_support.h"
#include "mongo/util/timer.h"
#include "mongo/util/version.h"
// ---------------------------------
// ---- benchmarking system --------
// ---------------------------------
// TODO: Maybe extract as library to avoid code duplication?
namespace {
inline pcrecpp::RE_Options flags2options(const char* flags) {
pcrecpp::RE_Options options;
options.set_utf8(true);
while (flags && *flags) {
if (*flags == 'i')
options.set_caseless(true);
else if (*flags == 'm')
options.set_multiline(true);
else if (*flags == 'x')
options.set_extended(true);
flags++;
}
return options;
}
}
namespace mongo {
using std::unique_ptr;
using std::cout;
using std::map;
const std::map opTypeName{{OpType::NONE, "none"},
{OpType::NOP, "nop"},
{OpType::FINDONE, "findOne"},
{OpType::COMMAND, "command"},
{OpType::FIND, "find"},
{OpType::UPDATE, "update"},
{OpType::INSERT, "insert"},
{OpType::REMOVE, "remove"},
{OpType::CREATEINDEX, "createIndex"},
{OpType::DROPINDEX, "dropIndex"},
{OpType::LET, "let"},
{OpType::CPULOAD, "cpuload"}};
BenchRunEventCounter::BenchRunEventCounter() {
reset();
}
BenchRunEventCounter::~BenchRunEventCounter() {}
void BenchRunEventCounter::reset() {
_numEvents = 0;
_totalTimeMicros = 0;
}
void BenchRunEventCounter::updateFrom(const BenchRunEventCounter& other) {
_numEvents += other._numEvents;
_totalTimeMicros += other._totalTimeMicros;
}
BenchRunStats::BenchRunStats() {
reset();
}
BenchRunStats::~BenchRunStats() {}
void BenchRunStats::reset() {
error = false;
errCount = 0;
opCount = 0;
findOneCounter.reset();
updateCounter.reset();
insertCounter.reset();
deleteCounter.reset();
queryCounter.reset();
commandCounter.reset();
trappedErrors.clear();
}
void BenchRunStats::updateFrom(const BenchRunStats& other) {
if (other.error)
error = true;
errCount += other.errCount;
opCount += other.opCount;
findOneCounter.updateFrom(other.findOneCounter);
updateCounter.updateFrom(other.updateCounter);
insertCounter.updateFrom(other.insertCounter);
deleteCounter.updateFrom(other.deleteCounter);
queryCounter.updateFrom(other.queryCounter);
commandCounter.updateFrom(other.commandCounter);
for (size_t i = 0; i < other.trappedErrors.size(); ++i)
trappedErrors.push_back(other.trappedErrors[i]);
}
BenchRunConfig::BenchRunConfig() {
initializeToDefaults();
}
void BenchRunConfig::initializeToDefaults() {
host = "localhost";
db = "test";
username = "";
password = "";
parallel = 1;
seconds = 1.0;
hideResults = true;
handleErrors = false;
hideErrors = false;
trapPattern.reset();
noTrapPattern.reset();
watchPattern.reset();
noWatchPattern.reset();
throwGLE = false;
breakOnTrap = true;
randomSeed = 1314159265358979323;
}
BenchRunConfig* BenchRunConfig::createFromBson(const BSONObj& args) {
BenchRunConfig* config = new BenchRunConfig();
config->initializeFromBson(args);
return config;
}
BenchRunOp opFromBson(const BSONObj& op) {
BenchRunOp myOp;
myOp.myBsonOp = op.getOwned(); // save an owned copy of the BSON obj
auto opType = myOp.myBsonOp["op"].valueStringData();
for (auto arg : myOp.myBsonOp) {
auto name = arg.fieldNameStringData();
if (name == "batchSize") {
uassert(34377,
str::stream() << "Field 'batchSize' should be a number, instead it's type: "
<< typeName(arg.type()),
arg.isNumber());
uassert(34378,
str::stream() << "Field 'batchSize' only valid for find op types. Type is "
<< opType,
(opType == "find") || (opType == "query"));
myOp.batchSize = arg.numberInt();
} else if (name == "check") {
// check function gets thrown into a scoped function. Leaving that parsing in main loop.
myOp.useCheck = true;
myOp.check = arg;
uassert(
34420,
str::stream()
<< "Check field requires type CodeWScoe, Code, or String, instead its type is: "
<< typeName(myOp.check.type()),
(myOp.check.type() == CodeWScope || myOp.check.type() == Code ||
myOp.check.type() == String));
} else if (name == "command") {
// type needs to be command
uassert(34398,
str::stream() << "Field 'command' only valid for command op type. Type is "
<< opType,
opType == "command");
myOp.command = arg.Obj();
} else if (name == "context") {
myOp.context = arg.Obj();
} else if (name == "cpuFactor") {
uassert(40436,
str::stream() << "Field 'cpuFactor' should be a number, instead it's type: "
<< typeName(arg.type()),
arg.isNumber());
myOp.cpuFactor = arg.numberDouble();
} else if (name == "delay") {
uassert(34379,
str::stream() << "Field 'delay' should be a number, instead it's type: "
<< typeName(arg.type()),
arg.isNumber());
myOp.delay = arg.numberInt();
} else if (name == "doc") {
uassert(34399,
str::stream() << "Field 'doc' only valid for insert op type. Type is "
<< opType,
(opType == "insert"));
myOp.isDocAnArray = arg.type() == Array;
myOp.doc = arg.Obj();
} else if (name == "expected") {
uassert(34380,
str::stream() << "Field 'Expected' should be a number, instead it's type: "
<< typeName(arg.type()),
arg.isNumber());
uassert(34400,
str::stream() << "Field 'Expected' only valid for find op type. Type is "
<< opType,
(opType == "find") || (opType == "query"));
myOp.expected = arg.numberInt();
} else if (name == "filter") {
uassert(
34401,
str::stream()
<< "Field 'Filter' (projection) only valid for find/findOne op type. Type is "
<< opType,
(opType == "find") || (opType == "query") || (opType == "findOne"));
myOp.projection = arg.Obj(); // the name should be switched to projection
// also, but that will break things
} else if (name == "handleError") {
myOp.handleError = arg.trueValue();
} else if (name == "key") {
uassert(34402,
str::stream()
<< "Field 'key' only valid for create or drop index op types. Type is "
<< opType,
(opType == "createIndex") || (opType == "dropIndex"));
myOp.key = arg.Obj();
} else if (name == "limit") {
uassert(34381,
str::stream() << "Field 'limit' is only valid for find op types. Type is "
<< opType,
(opType == "find") || (opType == "query"));
uassert(ErrorCodes::BadValue,
str::stream() << "Field 'limit' should be a number, instead it's type: "
<< typeName(arg.type()),
arg.isNumber());
myOp.limit = arg.numberInt();
} else if (name == "multi") {
uassert(34383,
str::stream()
<< "Field 'multi' is only valid for update/remove/delete types. Type is "
<< opType,
(opType == "update") || (opType == "remove") || (opType == "delete"));
myOp.multi = arg.trueValue();
} else if (name == "ns") {
uassert(34385,
str::stream() << "Field 'ns' should be a string, instead it's type: "
<< typeName(arg.type()),
arg.type() == String);
myOp.ns = arg.String();
} else if (name == "op") {
uassert(ErrorCodes::BadValue,
str::stream() << "Field 'op' is not a string, instead it's type: "
<< typeName(arg.type()),
arg.type() == String);
auto type = arg.valueStringData();
if (type == "nop") {
myOp.op = OpType::NOP;
} else if (type == "findOne") {
myOp.op = OpType::FINDONE;
} else if (type == "command") {
myOp.op = OpType::COMMAND;
} else if (type == "find" || type == "query") {
myOp.op = OpType::FIND;
} else if (type == "update") {
myOp.op = OpType::UPDATE;
} else if (type == "insert") {
myOp.op = OpType::INSERT;
} else if (type == "delete" || type == "remove") {
myOp.op = OpType::REMOVE;
} else if (type == "createIndex") {
myOp.op = OpType::CREATEINDEX;
} else if (type == "dropIndex") {
myOp.op = OpType::DROPINDEX;
} else if (type == "let") {
myOp.op = OpType::LET;
} else if (type == "cpuload") {
myOp.op = OpType::CPULOAD;
} else {
uassert(34387,
str::stream() << "benchRun passed an unsupported op type: " << type,
false);
}
} else if (name == "options") {
uassert(ErrorCodes::BadValue,
str::stream() << "Field 'options' should be a number, instead it's type: "
<< typeName(arg.type()),
arg.isNumber());
uassert(34388,
str::stream() << "Field 'options' but not a command or find type. Type is "
<< opType,
(opType == "command") || (opType == "query") || (opType == "find"));
myOp.options = arg.numberInt();
} else if (name == "query") {
uassert(34389,
str::stream() << "Field 'query' is only valid for findOne, find, update, and "
"remove types. Type is "
<< opType,
(opType == "findOne") || (opType == "query") ||
(opType == "find" || (opType == "update") || (opType == "delete") ||
(opType == "remove")));
myOp.query = arg.Obj();
} else if (name == "safe") {
myOp.safe = arg.trueValue();
} else if (name == "skip") {
uassert(ErrorCodes::BadValue,
str::stream() << "Field 'skip' should be a number, instead it's type: "
<< typeName(arg.type()),
arg.isNumber());
uassert(34390,
str::stream() << "Field 'skip' is only valid for find/query op types. Type is "
<< opType,
(opType == "find") || (opType == "query"));
myOp.skip = arg.numberInt();
} else if (name == "showError") {
myOp.showError = arg.trueValue();
} else if (name == "showResult") {
myOp.showResult = arg.trueValue();
} else if (name == "target") {
uassert(ErrorCodes::BadValue,
str::stream() << "Field 'target' should be a string. It's type: "
<< typeName(arg.type()),
arg.type() == String);
myOp.target = arg.String();
} else if (name == "throwGLE") {
myOp.throwGLE = arg.trueValue();
} else if (name == "update") {
uassert(34391,
str::stream() << "Field 'update' is only valid for update op type. Op type is "
<< opType,
(opType == "update"));
myOp.update = arg.Obj();
} else if (name == "upsert") {
uassert(34392,
str::stream() << "Field 'upsert' is only valid for update op type. Op type is "
<< opType,
(opType == "update"));
myOp.upsert = arg.trueValue();
} else if (name == "readCmd") {
myOp.useReadCmd = arg.trueValue();
} else if (name == "writeCmd") {
myOp.useWriteCmd = arg.trueValue();
} else if (name == "writeConcern") {
// Mongo-perf wants to pass the write concern into all calls. It is only used for
// update, insert, delete
myOp.writeConcern = arg.Obj();
} else if (name == "value") {
uassert(34403,
str::stream() << "Field 'value' is only valid for let op type. Op type is "
<< opType,
opType == "let");
BSONObjBuilder valBuilder;
valBuilder.append(arg);
myOp.value = valBuilder.obj();
} else {
uassert(34394, str::stream() << "Benchrun op has unsupported field: " << name, false);
}
}
uassert(34395, "Benchrun op has an zero length ns", !myOp.ns.empty());
uassert(34396, "Benchrun op doesn't have an optype set", myOp.op != OpType::NONE);
return myOp;
}
void BenchRunConfig::initializeFromBson(const BSONObj& args) {
initializeToDefaults();
for (auto arg : args) {
auto name = arg.fieldNameStringData();
if (name == "host") {
uassert(34404,
str::stream() << "Field '" << name << "' should be a string. . Type is "
<< typeName(arg.type()),
arg.type() == String);
host = arg.String();
} else if (name == "db") {
uassert(34405,
str::stream() << "Field '" << name << "' should be a string. . Type is "
<< typeName(arg.type()),
arg.type() == String);
db = arg.String();
} else if (name == "username") {
uassert(34406,
str::stream() << "Field '" << name << "' should be a string. . Type is "
<< typeName(arg.type()),
arg.type() == String);
username = arg.String();
} else if (name == "password") {
uassert(34407,
str::stream() << "Field '" << name << "' should be a string. . Type is "
<< typeName(arg.type()),
arg.type() == String);
password = arg.String();
} else if (name == "parallel") {
uassert(34409,
str::stream() << "Field '" << name << "' should be a number. . Type is "
<< typeName(arg.type()),
arg.isNumber());
parallel = arg.numberInt();
} else if (name == "randomSeed") {
uassert(34365,
str::stream() << "Field '" << name << "' should be a number. . Type is "
<< typeName(arg.type()),
arg.isNumber());
randomSeed = arg.numberInt();
} else if (name == "seconds") {
uassert(34408,
str::stream() << "Field '" << name << "' should be a number. . Type is "
<< typeName(arg.type()),
arg.isNumber());
seconds = arg.number();
} else if (name == "hideResults") {
hideResults = arg.trueValue();
} else if (name == "handleErrors") {
handleErrors = arg.trueValue();
} else if (name == "hideErrors") {
hideErrors = arg.trueValue();
} else if (name == "throwGLE") {
throwGLE = arg.trueValue();
} else if (name == "breakOnTrap") {
breakOnTrap = arg.trueValue();
} else if (name == "trapPattern") {
const char* regex = arg.regex();
const char* flags = arg.regexFlags();
trapPattern =
std::shared_ptr(new pcrecpp::RE(regex, flags2options(flags)));
} else if (name == "noTrapPattern") {
const char* regex = arg.regex();
const char* flags = arg.regexFlags();
noTrapPattern =
std::shared_ptr(new pcrecpp::RE(regex, flags2options(flags)));
} else if (name == "watchPattern") {
const char* regex = arg.regex();
const char* flags = arg.regexFlags();
watchPattern =
std::shared_ptr(new pcrecpp::RE(regex, flags2options(flags)));
} else if (name == "noWatchPattern") {
const char* regex = arg.regex();
const char* flags = arg.regexFlags();
noWatchPattern =
std::shared_ptr(new pcrecpp::RE(regex, flags2options(flags)));
} else if (name == "ops") {
// iterate through the objects in ops
// create an BenchRunOp per
// put in ops vector.
BSONObjIterator i(arg.Obj());
while (i.more()) {
ops.push_back(opFromBson(i.next().Obj()));
}
} else {
log() << "benchRun passed an unsupported field: " << name;
uassert(34376, "benchRun passed an unsupported configuration field", false);
}
}
}
DBClientBase* BenchRunConfig::createConnection() const {
const ConnectionString connectionString = uassertStatusOK(ConnectionString::parse(host));
std::string errorMessage;
DBClientBase* connection = connectionString.connect("BenchRun", errorMessage);
uassert(16158, errorMessage, connection != NULL);
return connection;
}
BenchRunState::BenchRunState(unsigned numWorkers)
: _mutex(),
_numUnstartedWorkers(numWorkers),
_numActiveWorkers(0),
_isShuttingDown(0),
_isCollectingStats(0) {}
BenchRunState::~BenchRunState() {
wassert(_numActiveWorkers == 0 && _numUnstartedWorkers == 0);
}
void BenchRunState::waitForState(State awaitedState) {
stdx::unique_lock lk(_mutex);
switch (awaitedState) {
case BRS_RUNNING:
while (_numUnstartedWorkers > 0) {
massert(16147, "Already finished.", _numUnstartedWorkers + _numActiveWorkers > 0);
_stateChangeCondition.wait(lk);
}
break;
case BRS_FINISHED:
while (_numUnstartedWorkers + _numActiveWorkers > 0) {
_stateChangeCondition.wait(lk);
}
break;
default:
msgasserted(16152,
mongoutils::str::stream() << "Cannot wait for state " << awaitedState);
}
}
void BenchRunState::tellWorkersToFinish() {
_isShuttingDown.store(1);
}
void BenchRunState::tellWorkersToCollectStats() {
_isCollectingStats.store(1);
}
void BenchRunState::assertFinished() {
stdx::lock_guard lk(_mutex);
verify(0 == _numUnstartedWorkers + _numActiveWorkers);
}
bool BenchRunState::shouldWorkerFinish() {
return (_isShuttingDown.loadRelaxed() == 1);
}
bool BenchRunState::shouldWorkerCollectStats() {
return (_isCollectingStats.loadRelaxed() == 1);
}
void BenchRunState::onWorkerStarted() {
stdx::lock_guard lk(_mutex);
verify(_numUnstartedWorkers > 0);
--_numUnstartedWorkers;
++_numActiveWorkers;
if (_numUnstartedWorkers == 0) {
_stateChangeCondition.notify_all();
}
}
void BenchRunState::onWorkerFinished() {
stdx::lock_guard lk(_mutex);
verify(_numActiveWorkers > 0);
--_numActiveWorkers;
if (_numActiveWorkers + _numUnstartedWorkers == 0) {
_stateChangeCondition.notify_all();
}
}
BSONObj benchStart(const BSONObj&, void*);
BSONObj benchFinish(const BSONObj&, void*);
static bool _hasSpecial(const BSONObj& obj) {
BSONObjIterator i(obj);
while (i.more()) {
BSONElement e = i.next();
if (e.fieldName()[0] == '#')
return true;
if (!e.isABSONObj())
continue;
if (_hasSpecial(e.Obj()))
return true;
}
return false;
}
static BSONObj fixQuery(const BSONObj& obj, BsonTemplateEvaluator& btl) {
if (!_hasSpecial(obj))
return obj;
BSONObjBuilder b(obj.objsize() + 128);
verify(BsonTemplateEvaluator::StatusSuccess == btl.evaluate(obj, b));
return b.obj();
}
BenchRunWorker::BenchRunWorker(size_t id,
const BenchRunConfig* config,
BenchRunState* brState,
int64_t randomSeed)
: _id(id), _config(config), _brState(brState), _randomSeed(randomSeed) {}
BenchRunWorker::~BenchRunWorker() {}
void BenchRunWorker::start() {
stdx::thread(stdx::bind(&BenchRunWorker::run, this)).detach();
}
bool BenchRunWorker::shouldStop() const {
return _brState->shouldWorkerFinish();
}
bool BenchRunWorker::shouldCollectStats() const {
return _brState->shouldWorkerCollectStats();
}
void doNothing(const BSONObj&) {}
/**
* Issues the query 'qr' against 'conn' using read commands. Returns the size of the result set
* returned by the query.
*
* If 'qr' has the 'wantMore' flag set to false and the 'limit' option set to 1LL, then the caller
* may optionally specify a pointer to an object in 'objOut', which will be filled in with the
* single object in the query result set (or the empty object, if the result set is empty).
* If 'qr' doesn't have these options set, then nullptr must be passed for 'objOut'.
*
* On error, throws a UserException.
*/
int runQueryWithReadCommands(DBClientBase* conn,
unique_ptr qr,
BSONObj* objOut = nullptr) {
std::string dbName = qr->nss().db().toString();
BSONObj findCommandResult;
bool res = conn->runCommand(dbName, qr->asFindCommand(), findCommandResult);
uassert(ErrorCodes::CommandFailed,
str::stream() << "find command failed; reply was: " << findCommandResult,
res);
CursorResponse cursorResponse =
uassertStatusOK(CursorResponse::parseFromBSON(findCommandResult));
int count = cursorResponse.getBatch().size();
if (objOut) {
invariant(qr->getLimit() && *qr->getLimit() == 1 && !qr->wantMore());
// Since this is a "single batch" query, we can simply grab the first item in the result set
// and return here.
*objOut = (count > 0) ? cursorResponse.getBatch()[0] : BSONObj();
return count;
}
while (cursorResponse.getCursorId() != 0) {
GetMoreRequest getMoreRequest(qr->nss(),
cursorResponse.getCursorId(),
qr->getBatchSize(),
boost::none, // maxTimeMS
boost::none, // term
boost::none); // lastKnownCommittedOpTime
BSONObj getMoreCommandResult;
res = conn->runCommand(dbName, getMoreRequest.toBSON(), getMoreCommandResult);
uassert(ErrorCodes::CommandFailed,
str::stream() << "getMore command failed; reply was: " << getMoreCommandResult,
res);
cursorResponse = uassertStatusOK(CursorResponse::parseFromBSON(getMoreCommandResult));
count += cursorResponse.getBatch().size();
}
return count;
}
void BenchRunWorker::generateLoadOnConnection(DBClientBase* conn) {
verify(conn);
long long count = 0;
mongo::Timer timer;
BsonTemplateEvaluator bsonTemplateEvaluator(_randomSeed);
invariant(bsonTemplateEvaluator.setId(_id) == BsonTemplateEvaluator::StatusSuccess);
if (_config->username != "") {
string errmsg;
if (!conn->auth("admin", _config->username, _config->password, errmsg)) {
uasserted(15931, "Authenticating to connection for _benchThread failed: " + errmsg);
}
}
unique_ptr scope{getGlobalScriptEngine()->newScopeForCurrentThread()};
verify(scope.get());
while (!shouldStop()) {
for (const auto& op : _config->ops) {
if (shouldStop())
break;
auto& stats = shouldCollectStats() ? _stats : _statsBlackHole;
ScriptingFunction scopeFunc = 0;
BSONObj scopeObj;
if (op.useCheck) {
auto check = op.check;
if (check.type() == CodeWScope) {
scopeFunc = scope->createFunction(check.codeWScopeCode());
scopeObj = BSONObj(check.codeWScopeScopeDataUnsafe());
} else {
scopeFunc = scope->createFunction(check.valuestr());
}
scope->init(&scopeObj);
invariant(scopeFunc);
}
try {
switch (op.op) {
case OpType::NOP:
break;
case OpType::CPULOAD: {
// perform a tight multiplication loop. The
// performance of this loop should be
// predictable, and this operation can be used
// to test underlying system variability.
long long limit = 10000 * op.cpuFactor;
// volatile used to ensure that loop is not optimized away
volatile uint64_t result = 0; // NOLINT
uint64_t x = 100;
for (long long i = 0; i < limit; i++) {
x *= 13;
}
result = x;
} break;
case OpType::FINDONE: {
BSONObj fixedQuery = fixQuery(op.query, bsonTemplateEvaluator);
BSONObj result;
if (op.useReadCmd) {
auto qr = stdx::make_unique(NamespaceString(op.ns));
qr->setFilter(fixedQuery);
qr->setProj(op.projection);
qr->setLimit(1LL);
qr->setWantMore(false);
invariantOK(qr->validate());
BenchRunEventTrace _bret(&stats.findOneCounter);
runQueryWithReadCommands(conn, std::move(qr), &result);
} else {
BenchRunEventTrace _bret(&stats.findOneCounter);
result = conn->findOne(op.ns, fixedQuery);
}
if (op.useCheck) {
int err = scope->invoke(scopeFunc, 0, &result, 1000 * 60, false);
if (err) {
log() << "Error checking in benchRun thread [findOne]"
<< causedBy(scope->getError());
stats.errCount++;
return;
}
}
if (!_config->hideResults || op.showResult)
log() << "Result from benchRun thread [findOne] : " << result;
} break;
case OpType::COMMAND: {
bool ok;
BSONObj result;
{
BenchRunEventTrace _bret(&stats.commandCounter);
ok = conn->runCommand(op.ns,
fixQuery(op.command, bsonTemplateEvaluator),
result,
op.options);
}
if (!ok) {
stats.errCount++;
}
if (!result["cursor"].eoo()) {
// The command returned a cursor, so iterate all results.
auto cursorResponse =
uassertStatusOK(CursorResponse::parseFromBSON(result));
int count = cursorResponse.getBatch().size();
while (cursorResponse.getCursorId() != 0) {
GetMoreRequest getMoreRequest(
cursorResponse.getNSS(),
cursorResponse.getCursorId(),
boost::none, // batchSize
boost::none, // maxTimeMS
boost::none, // term
boost::none); // lastKnownCommittedOpTime
BSONObj getMoreCommandResult;
ok = conn->runCommand(
op.ns, getMoreRequest.toBSON(), getMoreCommandResult);
uassert(ErrorCodes::CommandFailed,
str::stream() << "getMore command failed; reply was: "
<< getMoreCommandResult,
ok);
cursorResponse = uassertStatusOK(
CursorResponse::parseFromBSON(getMoreCommandResult));
count += cursorResponse.getBatch().size();
}
// Just give the count to the check function.
result = BSON("count" << count << "context" << op.context);
}
if (op.useCheck) {
int err = scope->invoke(scopeFunc, 0, &result, 1000 * 60, false);
if (err) {
log() << "Error checking in benchRun thread [command]"
<< causedBy(scope->getError());
int err = scope->invoke(scopeFunc, 0, &result, 1000 * 60, false);
if (err) {
log() << "Error checking in benchRun thread [command]"
<< causedBy(scope->getError());
stats.errCount++;
return;
}
}
if (!_config->hideResults || op.showResult)
log() << "Result from benchRun thread [command] : " << result;
}
} break;
case OpType::FIND: {
int count;
BSONObj fixedQuery = fixQuery(op.query, bsonTemplateEvaluator);
if (op.useReadCmd) {
uassert(28824,
"cannot use 'options' in combination with read commands",
!op.options);
auto qr = stdx::make_unique(NamespaceString(op.ns));
qr->setFilter(fixedQuery);
qr->setProj(op.projection);
if (op.skip) {
qr->setSkip(op.skip);
}
if (op.limit) {
qr->setLimit(op.limit);
}
if (op.batchSize) {
qr->setBatchSize(op.batchSize);
}
invariantOK(qr->validate());
BenchRunEventTrace _bret(&stats.queryCounter);
count = runQueryWithReadCommands(conn, std::move(qr));
} else {
// Use special query function for exhaust query option.
if (op.options & QueryOption_Exhaust) {
BenchRunEventTrace _bret(&stats.queryCounter);
stdx::function castedDoNothing(doNothing);
count = conn->query(
castedDoNothing, op.ns, fixedQuery, &op.projection, op.options);
} else {
BenchRunEventTrace _bret(&stats.queryCounter);
unique_ptr cursor;
cursor = conn->query(op.ns,
fixedQuery,
op.limit,
op.skip,
&op.projection,
op.options,
op.batchSize);
count = cursor->itcount();
}
}
if (op.expected >= 0 && count != op.expected) {
log() << "bench query on: " << op.ns << " expected: " << op.expected
<< " got: " << count;
verify(false);
}
if (op.useCheck) {
BSONObj thisValue = BSON("count" << count << "context" << op.context);
int err = scope->invoke(scopeFunc, 0, &thisValue, 1000 * 60, false);
if (err) {
log() << "Error checking in benchRun thread [find]"
<< causedBy(scope->getError());
stats.errCount++;
return;
}
}
if (!_config->hideResults || op.showResult)
log() << "Result from benchRun thread [query] : " << count;
} break;
case OpType::UPDATE: {
BSONObj result;
{
BenchRunEventTrace _bret(&stats.updateCounter);
BSONObj query = fixQuery(op.query, bsonTemplateEvaluator);
BSONObj update = fixQuery(op.update, bsonTemplateEvaluator);
if (op.useWriteCmd) {
// TODO: Replace after SERVER-11774.
BSONObjBuilder builder;
builder.append("update", nsToCollectionSubstring(op.ns));
BSONArrayBuilder docBuilder(builder.subarrayStart("updates"));
docBuilder.append(BSON(
"q" << query << "u" << update << "multi" << op.multi << "upsert"
<< op.upsert));
docBuilder.done();
builder.append("writeConcern", op.writeConcern);
conn->runCommand(nsToDatabaseSubstring(op.ns).toString(),
builder.done(),
result);
} else {
conn->update(op.ns, query, update, op.upsert, op.multi);
if (op.safe)
result = conn->getLastErrorDetailed();
}
}
if (op.safe) {
if (op.useCheck) {
int err = scope->invoke(scopeFunc, 0, &result, 1000 * 60, false);
if (err) {
log() << "Error checking in benchRun thread [update]"
<< causedBy(scope->getError());
stats.errCount++;
return;
}
}
if (!_config->hideResults || op.showResult)
log() << "Result from benchRun thread [safe update] : " << result;
if (!result["err"].eoo() && result["err"].type() == String &&
(_config->throwGLE || op.throwGLE))
throw DBException((string) "From benchRun GLE" +
causedBy(result["err"].String()),
result["code"].eoo() ? 0 : result["code"].Int());
}
} break;
case OpType::INSERT: {
BSONObj result;
{
BenchRunEventTrace _bret(&stats.insertCounter);
BSONObj insertDoc;
if (op.useWriteCmd) {
// TODO: Replace after SERVER-11774.
BSONObjBuilder builder;
builder.append("insert", nsToCollectionSubstring(op.ns));
BSONArrayBuilder docBuilder(builder.subarrayStart("documents"));
if (op.isDocAnArray) {
for (const auto& element : op.doc) {
insertDoc = fixQuery(element.Obj(), bsonTemplateEvaluator);
docBuilder.append(insertDoc);
}
} else {
insertDoc = fixQuery(op.doc, bsonTemplateEvaluator);
docBuilder.append(insertDoc);
}
docBuilder.done();
builder.append("writeConcern", op.writeConcern);
conn->runCommand(nsToDatabaseSubstring(op.ns).toString(),
builder.done(),
result);
} else {
if (op.isDocAnArray) {
std::vector insertArray;
for (const auto& element : op.doc) {
BSONObj e = fixQuery(element.Obj(), bsonTemplateEvaluator);
insertArray.push_back(e);
}
conn->insert(op.ns, insertArray);
} else {
insertDoc = fixQuery(op.doc, bsonTemplateEvaluator);
conn->insert(op.ns, insertDoc);
}
if (op.safe)
result = conn->getLastErrorDetailed();
}
}
if (op.safe) {
if (op.useCheck) {
int err = scope->invoke(scopeFunc, 0, &result, 1000 * 60, false);
if (err) {
log() << "Error checking in benchRun thread [insert]"
<< causedBy(scope->getError());
stats.errCount++;
return;
}
}
if (!_config->hideResults || op.showResult)
log() << "Result from benchRun thread [safe insert] : " << result;
if (!result["err"].eoo() && result["err"].type() == String &&
(_config->throwGLE || op.throwGLE))
throw DBException((string) "From benchRun GLE" +
causedBy(result["err"].String()),
result["code"].eoo() ? 0 : result["code"].Int());
}
} break;
case OpType::REMOVE: {
BSONObj result;
{
BenchRunEventTrace _bret(&stats.deleteCounter);
BSONObj predicate = fixQuery(op.query, bsonTemplateEvaluator);
if (op.useWriteCmd) {
// TODO: Replace after SERVER-11774.
BSONObjBuilder builder;
builder.append("delete", nsToCollectionSubstring(op.ns));
BSONArrayBuilder docBuilder(builder.subarrayStart("deletes"));
int limit = (op.multi == true) ? 0 : 1;
docBuilder.append(BSON("q" << predicate << "limit" << limit));
docBuilder.done();
builder.append("writeConcern", op.writeConcern);
conn->runCommand(nsToDatabaseSubstring(op.ns).toString(),
builder.done(),
result);
} else {
conn->remove(op.ns, predicate, !op.multi);
if (op.safe)
result = conn->getLastErrorDetailed();
}
}
if (op.safe) {
if (op.useCheck) {
int err = scope->invoke(scopeFunc, 0, &result, 1000 * 60, false);
if (err) {
log() << "Error checking in benchRun thread [delete]"
<< causedBy(scope->getError());
stats.errCount++;
return;
}
}
if (!_config->hideResults || op.showResult)
log() << "Result from benchRun thread [safe remove] : " << result;
if (!result["err"].eoo() && result["err"].type() == String &&
(_config->throwGLE || op.throwGLE))
throw DBException((string) "From benchRun GLE " +
causedBy(result["err"].String()),
result["code"].eoo() ? 0 : result["code"].Int());
}
} break;
case OpType::CREATEINDEX:
conn->createIndex(op.ns, op.key);
break;
case OpType::DROPINDEX:
conn->dropIndex(op.ns, op.key);
break;
case OpType::LET: {
BSONObjBuilder templateBuilder;
bsonTemplateEvaluator.evaluate(op.value, templateBuilder);
bsonTemplateEvaluator.setVariable(op.target,
templateBuilder.done().firstElement());
} break;
default:
uassert(34397, "In benchRun loop and got unknown op type", false);
}
// Count 1 for total ops. Successfully got through the try phrase
stats.opCount++;
} catch (DBException& ex) {
if (!_config->hideErrors || op.showError) {
bool yesWatch =
(_config->watchPattern && _config->watchPattern->FullMatch(ex.what()));
bool noWatch =
(_config->noWatchPattern && _config->noWatchPattern->FullMatch(ex.what()));
if ((!_config->watchPattern && _config->noWatchPattern &&
!noWatch) || // If we're just ignoring things
(!_config->noWatchPattern && _config->watchPattern &&
yesWatch) || // If we're just watching things
(_config->watchPattern && _config->noWatchPattern && yesWatch && !noWatch))
log() << "Error in benchRun thread for op "
<< opTypeName.find(op.op)->second << causedBy(ex);
}
bool yesTrap = (_config->trapPattern && _config->trapPattern->FullMatch(ex.what()));
bool noTrap =
(_config->noTrapPattern && _config->noTrapPattern->FullMatch(ex.what()));
if ((!_config->trapPattern && _config->noTrapPattern && !noTrap) ||
(!_config->noTrapPattern && _config->trapPattern && yesTrap) ||
(_config->trapPattern && _config->noTrapPattern && yesTrap && !noTrap)) {
{
stats.trappedErrors.push_back(BSON("error" << ex.what() << "op"
<< opTypeName.find(op.op)->second
<< "count"
<< count));
}
if (_config->breakOnTrap)
return;
}
if (!_config->handleErrors && !op.handleError)
return;
stats.errCount++;
} catch (...) {
if (!_config->hideErrors || op.showError)
log() << "Error in benchRun thread caused by unknown error for op "
<< opTypeName.find(op.op)->second;
if (!_config->handleErrors && !op.handleError)
return;
stats.errCount++;
}
if (++count % 100 == 0 && !op.useWriteCmd) {
conn->getLastError();
}
if (op.delay > 0)
sleepmillis(op.delay);
}
}
conn->getLastError();
}
namespace {
class BenchRunWorkerStateGuard {
MONGO_DISALLOW_COPYING(BenchRunWorkerStateGuard);
public:
explicit BenchRunWorkerStateGuard(BenchRunState* brState) : _brState(brState) {
_brState->onWorkerStarted();
}
~BenchRunWorkerStateGuard() {
_brState->onWorkerFinished();
}
private:
BenchRunState* _brState;
};
} // namespace
void BenchRunWorker::run() {
try {
std::unique_ptr conn(_config->createConnection());
if (!_config->username.empty()) {
string errmsg;
if (!conn->auth("admin", _config->username, _config->password, errmsg)) {
uasserted(15932, "Authenticating to connection for benchThread failed: " + errmsg);
}
}
BenchRunWorkerStateGuard _workerStateGuard(_brState);
generateLoadOnConnection(conn.get());
} catch (DBException& e) {
error() << "DBException not handled in benchRun thread" << causedBy(e);
} catch (std::exception& e) {
error() << "std::exception not handled in benchRun thread" << causedBy(e);
} catch (...) {
error() << "Unknown exception not handled in benchRun thread.";
}
}
BenchRunner::BenchRunner(BenchRunConfig* config) : _brState(config->parallel), _config(config) {
_oid.init();
stdx::lock_guard lk(_staticMutex);
_activeRuns[_oid] = this;
}
BenchRunner::~BenchRunner() {
for (size_t i = 0; i < _workers.size(); ++i)
delete _workers[i];
}
void BenchRunner::start() {
{
std::unique_ptr conn(_config->createConnection());
// Must authenticate to admin db in order to run serverStatus command
if (_config->username != "") {
string errmsg;
if (!conn->auth("admin", _config->username, _config->password, errmsg)) {
uasserted(
16704,
str::stream() << "User " << _config->username
<< " could not authenticate to admin db; admin db access is "
"required to use benchRun with auth enabled");
}
}
// Start threads
for (int64_t i = 0; i < _config->parallel; i++) {
// Make a unique random seed for the worker.
int64_t seed = _config->randomSeed + i;
BenchRunWorker* worker = new BenchRunWorker(i, _config.get(), &_brState, seed);
worker->start();
_workers.push_back(worker);
}
_brState.waitForState(BenchRunState::BRS_RUNNING);
// initial stats
_brState.tellWorkersToCollectStats();
_brTimer = new mongo::Timer();
}
}
void BenchRunner::stop() {
_brState.tellWorkersToFinish();
_brState.waitForState(BenchRunState::BRS_FINISHED);
_microsElapsed = _brTimer->micros();
delete _brTimer;
{
std::unique_ptr conn(_config->createConnection());
if (_config->username != "") {
string errmsg;
// this can only fail if admin access was revoked since start of run
if (!conn->auth("admin", _config->username, _config->password, errmsg)) {
uasserted(
16705,
str::stream() << "User " << _config->username
<< " could not authenticate to admin db; admin db access is "
"still required to use benchRun with auth enabled");
}
}
}
{
stdx::lock_guard lk(_staticMutex);
_activeRuns.erase(_oid);
}
}
BenchRunner* BenchRunner::createWithConfig(const BSONObj& configArgs) {
BenchRunConfig* config = BenchRunConfig::createFromBson(configArgs);
return new BenchRunner(config);
}
BenchRunner* BenchRunner::get(OID oid) {
stdx::lock_guard lk(_staticMutex);
return _activeRuns[oid];
}
void BenchRunner::populateStats(BenchRunStats* stats) {
_brState.assertFinished();
stats->reset();
for (size_t i = 0; i < _workers.size(); ++i)
stats->updateFrom(_workers[i]->stats());
}
static void appendAverageMicrosIfAvailable(BSONObjBuilder& buf,
const std::string& name,
const BenchRunEventCounter& counter) {
if (counter.getNumEvents() > 0)
buf.append(name,
static_cast(counter.getTotalTimeMicros()) / counter.getNumEvents());
}
BSONObj BenchRunner::finish(BenchRunner* runner) {
runner->stop();
BenchRunStats stats;
runner->populateStats(&stats);
// vector errors = runner->config.errors;
bool error = stats.error;
if (error)
return BSON("err" << 1);
BSONObjBuilder buf;
buf.append("note", "values per second");
buf.append("errCount", static_cast(stats.errCount));
buf.append("trapped", "error: not implemented");
appendAverageMicrosIfAvailable(buf, "findOneLatencyAverageMicros", stats.findOneCounter);
appendAverageMicrosIfAvailable(buf, "insertLatencyAverageMicros", stats.insertCounter);
appendAverageMicrosIfAvailable(buf, "deleteLatencyAverageMicros", stats.deleteCounter);
appendAverageMicrosIfAvailable(buf, "updateLatencyAverageMicros", stats.updateCounter);
appendAverageMicrosIfAvailable(buf, "queryLatencyAverageMicros", stats.queryCounter);
appendAverageMicrosIfAvailable(buf, "commandsLatencyAverageMicros", stats.commandCounter);
buf.append("totalOps", static_cast(stats.opCount));
auto appendPerSec = [&buf, runner](StringData name, double total) {
buf.append(name, total / (runner->_microsElapsed / 1000000.0));
};
appendPerSec("totalOps/s", stats.opCount);
appendPerSec("findOne", stats.findOneCounter.getNumEvents());
appendPerSec("insert", stats.insertCounter.getNumEvents());
appendPerSec("delete", stats.deleteCounter.getNumEvents());
appendPerSec("update", stats.updateCounter.getNumEvents());
appendPerSec("query", stats.queryCounter.getNumEvents());
appendPerSec("command", stats.commandCounter.getNumEvents());
BSONObj zoo = buf.obj();
delete runner;
return zoo;
}
stdx::mutex BenchRunner::_staticMutex;
map BenchRunner::_activeRuns;
/**
* benchRun( { ops : [] , host : XXX , db : XXXX , parallel : 5 , seconds : 5 }
*/
BSONObj BenchRunner::benchRunSync(const BSONObj& argsFake, void* data) {
BSONObj start = benchStart(argsFake, data);
OID oid = OID(start.firstElement().String());
BenchRunner* runner = BenchRunner::get(oid);
sleepmillis((int)(1000.0 * runner->config().seconds));
return benchFinish(start, data);
}
/**
* benchRun( { ops : [] , host : XXX , db : XXXX , parallel : 5 , seconds : 5 }
*/
BSONObj BenchRunner::benchStart(const BSONObj& argsFake, void* data) {
verify(argsFake.firstElement().isABSONObj());
BSONObj args = argsFake.firstElement().Obj();
// Get new BenchRunner object
BenchRunner* runner = BenchRunner::createWithConfig(args);
runner->start();
return BSON("" << runner->oid().toString());
}
/**
* benchRun( { ops : [] , host : XXX , db : XXXX , parallel : 5 , seconds : 5 }
*/
BSONObj BenchRunner::benchFinish(const BSONObj& argsFake, void* data) {
OID oid = OID(argsFake.firstElement().String());
// Get old BenchRunner object
BenchRunner* runner = BenchRunner::get(oid);
BSONObj finalObj = BenchRunner::finish(runner);
return BSON("" << finalObj);
}
} // namespace mongo