diff options
Diffstat (limited to 'src/mongo/shell/bench.cpp')
-rw-r--r-- | src/mongo/shell/bench.cpp | 339 |
1 files changed, 214 insertions, 125 deletions
diff --git a/src/mongo/shell/bench.cpp b/src/mongo/shell/bench.cpp index eb5f83b1f68..19529f4ba04 100644 --- a/src/mongo/shell/bench.cpp +++ b/src/mongo/shell/bench.cpp @@ -51,33 +51,38 @@ namespace mongo { namespace { -const std::map<OpType, std::string> opTypeName{{OpType::NONE, "none"}, - {OpType::NOP, "nop"}, - {OpType::FINDONE, "findOne"}, - {OpType::COMMAND, "command"}, - {OpType::FIND, "find"}, - {OpType::UPDATE, "update"}, - {OpType::INSERT, "insert"}, - {OpType::REMOVE, "remove"}, - {OpType::CREATEINDEX, "createIndex"}, - {OpType::DROPINDEX, "dropIndex"}, - {OpType::LET, "let"}, - {OpType::CPULOAD, "cpuload"}}; +const std::map<OpType, std::string> kOpTypeNames{{OpType::NONE, "none"}, + {OpType::NOP, "nop"}, + {OpType::FINDONE, "findOne"}, + {OpType::COMMAND, "command"}, + {OpType::FIND, "find"}, + {OpType::UPDATE, "update"}, + {OpType::INSERT, "insert"}, + {OpType::REMOVE, "remove"}, + {OpType::CREATEINDEX, "createIndex"}, + {OpType::DROPINDEX, "dropIndex"}, + {OpType::LET, "let"}, + {OpType::CPULOAD, "cpuload"}}; + +// When specified to the connection's 'runCommand' call indicates that the command should be +// executed with no query options. This is only meaningful if a command is run via OP_QUERY against +// '$cmd'. +const int kNoOptions = 0; class BenchRunWorkerStateGuard { MONGO_DISALLOW_COPYING(BenchRunWorkerStateGuard); public: - explicit BenchRunWorkerStateGuard(BenchRunState* brState) : _brState(brState) { - _brState->onWorkerStarted(); + explicit BenchRunWorkerStateGuard(BenchRunState& brState) : _brState(brState) { + _brState.onWorkerStarted(); } ~BenchRunWorkerStateGuard() { - _brState->onWorkerFinished(); + _brState.onWorkerFinished(); } private: - BenchRunState* _brState; + BenchRunState& _brState; }; pcrecpp::RE_Options flags2options(const char* flags) { @@ -95,14 +100,6 @@ pcrecpp::RE_Options flags2options(const char* flags) { return options; } -void appendAverageMicrosIfAvailable(BSONObjBuilder& buf, - const std::string& name, - const BenchRunEventCounter& counter) { - if (counter.getNumEvents() > 0) - buf.append(name, - static_cast<double>(counter.getTotalTimeMicros()) / counter.getNumEvents()); -} - bool hasSpecial(const BSONObj& obj) { BSONObjIterator i(obj); while (i.more()) { @@ -128,6 +125,54 @@ BSONObj fixQuery(const BSONObj& obj, BsonTemplateEvaluator& btl) { return b.obj(); } +bool runCommandWithSession(DBClientBase* conn, + const std::string& dbname, + const BSONObj& cmdObj, + int options, + const boost::optional<LogicalSessionIdToClient>& lsid, + boost::optional<TxnNumber> txnNumber, + BSONObj* result) { + if (!lsid) { + invariant(!txnNumber); + return conn->runCommand(dbname, cmdObj, *result); + } + + BSONObjBuilder cmdObjWithLsidBuilder; + + for (const auto& cmdArg : cmdObj) { + uassert(ErrorCodes::IllegalOperation, + "Command cannot contain session id", + cmdArg.fieldName() != OperationSessionInfo::kSessionIdFieldName); + uassert(ErrorCodes::IllegalOperation, + "Command cannot contain transaction id", + cmdArg.fieldName() != OperationSessionInfo::kTxnNumberFieldName); + + cmdObjWithLsidBuilder.append(cmdArg); + } + + { + BSONObjBuilder lsidBuilder( + cmdObjWithLsidBuilder.subobjStart(OperationSessionInfo::kSessionIdFieldName)); + lsid->serialize(&lsidBuilder); + lsidBuilder.doneFast(); + } + + if (txnNumber) { + cmdObjWithLsidBuilder.append(OperationSessionInfo::kTxnNumberFieldName, *txnNumber); + } + + return conn->runCommand(dbname, cmdObjWithLsidBuilder.done(), *result); +} + +bool runCommandWithSession(DBClientBase* conn, + const std::string& dbname, + const BSONObj& cmdObj, + int options, + const boost::optional<LogicalSessionIdToClient>& lsid, + BSONObj* result) { + return runCommandWithSession(conn, dbname, cmdObj, options, lsid, boost::none, result); +} + /** * Issues the query 'qr' against 'conn' using read commands. Returns the size of the result set * returned by the query. @@ -140,14 +185,16 @@ BSONObj fixQuery(const BSONObj& obj, BsonTemplateEvaluator& btl) { * On error, throws a AssertionException. */ int runQueryWithReadCommands(DBClientBase* conn, + const boost::optional<LogicalSessionIdToClient>& lsid, std::unique_ptr<QueryRequest> qr, - BSONObj* objOut = nullptr) { - std::string dbName = qr->nss().db().toString(); + BSONObj* objOut) { + const auto dbName = qr->nss().db().toString(); + BSONObj findCommandResult; - bool res = conn->runCommand(dbName, qr->asFindCommand(), findCommandResult); uassert(ErrorCodes::CommandFailed, str::stream() << "find command failed; reply was: " << findCommandResult, - res); + runCommandWithSession( + conn, dbName, qr->asFindCommand(), kNoOptions, lsid, &findCommandResult)); CursorResponse cursorResponse = uassertStatusOK(CursorResponse::parseFromBSON(findCommandResult)); @@ -169,10 +216,12 @@ int runQueryWithReadCommands(DBClientBase* conn, boost::none, // term boost::none); // lastKnownCommittedOpTime BSONObj getMoreCommandResult; - res = conn->runCommand(dbName, getMoreRequest.toBSON(), getMoreCommandResult); - uassert(ErrorCodes::CommandFailed, - str::stream() << "getMore command failed; reply was: " << getMoreCommandResult, - res); + uassert( + ErrorCodes::CommandFailed, + str::stream() << "getMore command failed; reply was: " << getMoreCommandResult, + runCommandWithSession( + conn, dbName, getMoreRequest.toBSON(), kNoOptions, lsid, &getMoreCommandResult)); + cursorResponse = uassertStatusOK(CursorResponse::parseFromBSON(getMoreCommandResult)); count += cursorResponse.getBatch().size(); } @@ -184,41 +233,16 @@ void doNothing(const BSONObj&) {} } // namespace -BenchRunEventCounter::BenchRunEventCounter() { - reset(); -} - -void BenchRunEventCounter::reset() { - _numEvents = 0; - _totalTimeMicros = 0; -} +BenchRunEventCounter::BenchRunEventCounter() = default; void BenchRunEventCounter::updateFrom(const BenchRunEventCounter& other) { _numEvents += other._numEvents; _totalTimeMicros += other._totalTimeMicros; } -BenchRunStats::BenchRunStats() { - reset(); -} - -void BenchRunStats::reset() { - error = false; - errCount = 0; - opCount = 0; - - findOneCounter.reset(); - updateCounter.reset(); - insertCounter.reset(); - deleteCounter.reset(); - queryCounter.reset(); - commandCounter.reset(); - trappedErrors.clear(); -} - void BenchRunStats::updateFrom(const BenchRunStats& other) { - if (other.error) - error = true; + error = other.error; + errCount += other.errCount; opCount += other.opCount; @@ -229,8 +253,9 @@ void BenchRunStats::updateFrom(const BenchRunStats& other) { queryCounter.updateFrom(other.queryCounter); commandCounter.updateFrom(other.commandCounter); - for (size_t i = 0; i < other.trappedErrors.size(); ++i) - trappedErrors.push_back(other.trappedErrors[i]); + for (const auto& trappedError : other.trappedErrors) { + trappedErrors.push_back(trappedError); + } } BenchRunConfig::BenchRunConfig() { @@ -533,6 +558,18 @@ void BenchRunConfig::initializeFromBson(const BSONObj& args) { << typeName(arg.type()), arg.isNumber()); seconds = arg.number(); + } else if (name == "useSessions") { + uassert(40641, + str::stream() << "Field '" << name << "' should be a boolean. . Type is " + << typeName(arg.type()), + arg.isBoolean()); + useSessions = arg.boolean(); + } else if (name == "useIdempotentWrites") { + uassert(40642, + str::stream() << "Field '" << name << "' should be a boolean. . Type is " + << typeName(arg.type()), + arg.isBoolean()); + useIdempotentWrites = arg.boolean(); } else if (name == "hideResults") { hideResults = arg.trueValue(); } else if (name == "handleErrors") { @@ -628,16 +665,16 @@ void BenchRunState::tellWorkersToCollectStats() { _isCollectingStats.store(1); } -void BenchRunState::assertFinished() { +void BenchRunState::assertFinished() const { stdx::lock_guard<stdx::mutex> lk(_mutex); verify(0 == _numUnstartedWorkers + _numActiveWorkers); } -bool BenchRunState::shouldWorkerFinish() { +bool BenchRunState::shouldWorkerFinish() const { return (_isShuttingDown.loadRelaxed() == 1); } -bool BenchRunState::shouldWorkerCollectStats() { +bool BenchRunState::shouldWorkerCollectStats() const { return (_isCollectingStats.loadRelaxed() == 1); } @@ -662,22 +699,22 @@ void BenchRunState::onWorkerFinished() { BenchRunWorker::BenchRunWorker(size_t id, const BenchRunConfig* config, - BenchRunState* brState, + BenchRunState& brState, int64_t randomSeed) : _id(id), _config(config), _brState(brState), _randomSeed(randomSeed) {} -BenchRunWorker::~BenchRunWorker() {} +BenchRunWorker::~BenchRunWorker() = default; void BenchRunWorker::start() { stdx::thread(stdx::bind(&BenchRunWorker::run, this)).detach(); } bool BenchRunWorker::shouldStop() const { - return _brState->shouldWorkerFinish(); + return _brState.shouldWorkerFinish(); } bool BenchRunWorker::shouldCollectStats() const { - return _brState->shouldWorkerCollectStats(); + return _brState.shouldWorkerCollectStats(); } void BenchRunWorker::generateLoadOnConnection(DBClientBase* conn) { @@ -695,6 +732,22 @@ void BenchRunWorker::generateLoadOnConnection(DBClientBase* conn) { conn->auth("admin", _config->username, _config->password, errmsg)); } + boost::optional<LogicalSessionIdToClient> lsid; + if (_config->useSessions) { + BSONObj result; + uassert(40640, + str::stream() << "Unable to create session due to error " << result, + conn->runCommand("admin", BSON("startSession" << 1), result)); + + lsid.emplace( + LogicalSessionIdToClient::parse(IDLParserErrorContext("lsid"), result["id"].Obj())); + } + + boost::optional<TxnNumber> txnNumberForWriteCommands; + if (_config->useIdempotentWrites) { + txnNumberForWriteCommands = 0; + } + std::unique_ptr<Scope> scope{getGlobalScriptEngine()->newScopeForCurrentThread()}; verify(scope.get()); @@ -750,7 +803,7 @@ void BenchRunWorker::generateLoadOnConnection(DBClientBase* conn) { invariantOK(qr->validate()); BenchRunEventTrace _bret(&stats.findOneCounter); - runQueryWithReadCommands(conn, std::move(qr), &result); + runQueryWithReadCommands(conn, lsid, std::move(qr), &result); } else { BenchRunEventTrace _bret(&stats.findOneCounter); result = conn->findOne(op.ns, @@ -779,10 +832,12 @@ void BenchRunWorker::generateLoadOnConnection(DBClientBase* conn) { BSONObj result; { BenchRunEventTrace _bret(&stats.commandCounter); - ok = conn->runCommand(op.ns, - fixQuery(op.command, bsonTemplateEvaluator), - result, - op.options); + ok = runCommandWithSession(conn, + op.ns, + fixQuery(op.command, bsonTemplateEvaluator), + op.options, + lsid, + &result); } if (!ok) { stats.errCount++; @@ -802,12 +857,15 @@ void BenchRunWorker::generateLoadOnConnection(DBClientBase* conn) { boost::none, // term boost::none); // lastKnownCommittedOpTime BSONObj getMoreCommandResult; - ok = conn->runCommand( - op.ns, getMoreRequest.toBSON(), getMoreCommandResult); uassert(ErrorCodes::CommandFailed, str::stream() << "getMore command failed; reply was: " << getMoreCommandResult, - ok); + runCommandWithSession(conn, + op.ns, + getMoreRequest.toBSON(), + kNoOptions, + lsid, + &getMoreCommandResult)); cursorResponse = uassertStatusOK( CursorResponse::parseFromBSON(getMoreCommandResult)); count += cursorResponse.getBatch().size(); @@ -861,7 +919,7 @@ void BenchRunWorker::generateLoadOnConnection(DBClientBase* conn) { invariantOK(qr->validate()); BenchRunEventTrace _bret(&stats.queryCounter); - count = runQueryWithReadCommands(conn, std::move(qr)); + count = runQueryWithReadCommands(conn, lsid, std::move(qr), nullptr); } else { // Use special query function for exhaust query option. if (op.options & QueryOption_Exhaust) { @@ -917,7 +975,6 @@ void BenchRunWorker::generateLoadOnConnection(DBClientBase* conn) { BSONObj update = fixQuery(op.update, bsonTemplateEvaluator); if (op.useWriteCmd) { - // TODO: Replace after SERVER-11774. BSONObjBuilder builder; builder.append("update", nsToCollectionSubstring(op.ns)); BSONArrayBuilder docBuilder(builder.subarrayStart("updates")); @@ -926,9 +983,16 @@ void BenchRunWorker::generateLoadOnConnection(DBClientBase* conn) { << op.upsert)); docBuilder.done(); builder.append("writeConcern", op.writeConcern); - conn->runCommand(nsToDatabaseSubstring(op.ns).toString(), - builder.done(), - result); + + if (txnNumberForWriteCommands) + ++(*txnNumberForWriteCommands); + runCommandWithSession(conn, + nsToDatabaseSubstring(op.ns).toString(), + builder.done(), + kNoOptions, + lsid, + txnNumberForWriteCommands, + &result); } else { auto toSend = makeUpdateMessage(op.ns, @@ -967,13 +1031,11 @@ void BenchRunWorker::generateLoadOnConnection(DBClientBase* conn) { } break; case OpType::INSERT: { BSONObj result; - { BenchRunEventTrace _bret(&stats.insertCounter); BSONObj insertDoc; if (op.useWriteCmd) { - // TODO: Replace after SERVER-11774. BSONObjBuilder builder; builder.append("insert", nsToCollectionSubstring(op.ns)); BSONArrayBuilder docBuilder(builder.subarrayStart("documents")); @@ -988,9 +1050,16 @@ void BenchRunWorker::generateLoadOnConnection(DBClientBase* conn) { } docBuilder.done(); builder.append("writeConcern", op.writeConcern); - conn->runCommand(nsToDatabaseSubstring(op.ns).toString(), - builder.done(), - result); + + if (txnNumberForWriteCommands) + ++(*txnNumberForWriteCommands); + runCommandWithSession(conn, + nsToDatabaseSubstring(op.ns).toString(), + builder.done(), + kNoOptions, + lsid, + txnNumberForWriteCommands, + &result); } else { std::vector<BSONObj> insertArray; if (op.isDocAnArray) { @@ -1040,7 +1109,6 @@ void BenchRunWorker::generateLoadOnConnection(DBClientBase* conn) { BenchRunEventTrace _bret(&stats.deleteCounter); BSONObj predicate = fixQuery(op.query, bsonTemplateEvaluator); if (op.useWriteCmd) { - // TODO: Replace after SERVER-11774. BSONObjBuilder builder; builder.append("delete", nsToCollectionSubstring(op.ns)); BSONArrayBuilder docBuilder(builder.subarrayStart("deletes")); @@ -1048,9 +1116,16 @@ void BenchRunWorker::generateLoadOnConnection(DBClientBase* conn) { docBuilder.append(BSON("q" << predicate << "limit" << limit)); docBuilder.done(); builder.append("writeConcern", op.writeConcern); - conn->runCommand(nsToDatabaseSubstring(op.ns).toString(), - builder.done(), - result); + + if (txnNumberForWriteCommands) + ++(*txnNumberForWriteCommands); + runCommandWithSession(conn, + nsToDatabaseSubstring(op.ns).toString(), + builder.done(), + kNoOptions, + lsid, + txnNumberForWriteCommands, + &result); } else { auto toSend = makeRemoveMessage( op.ns, predicate, op.multi ? 0 : RemoveOption_JustOne); @@ -1098,9 +1173,10 @@ void BenchRunWorker::generateLoadOnConnection(DBClientBase* conn) { default: uassert(34397, "In benchRun loop and got unknown op type", false); } + // Count 1 for total ops. Successfully got through the try phrase stats.opCount++; - } catch (DBException& ex) { + } catch (const DBException& ex) { if (!_config->hideErrors || op.showError) { bool yesWatch = (_config->watchPattern && _config->watchPattern->FullMatch(ex.what())); @@ -1113,7 +1189,7 @@ void BenchRunWorker::generateLoadOnConnection(DBClientBase* conn) { yesWatch) || // If we're just watching things (_config->watchPattern && _config->noWatchPattern && yesWatch && !noWatch)) log() << "Error in benchRun thread for op " - << opTypeName.find(op.op)->second << causedBy(ex); + << kOpTypeNames.find(op.op)->second << causedBy(ex); } bool yesTrap = (_config->trapPattern && _config->trapPattern->FullMatch(ex.what())); @@ -1124,10 +1200,10 @@ void BenchRunWorker::generateLoadOnConnection(DBClientBase* conn) { (!_config->noTrapPattern && _config->trapPattern && yesTrap) || (_config->trapPattern && _config->noTrapPattern && yesTrap && !noTrap)) { { - stats.trappedErrors.push_back(BSON("error" << ex.what() << "op" - << opTypeName.find(op.op)->second - << "count" - << count)); + stats.trappedErrors.push_back( + BSON("error" << ex.what() << "op" << kOpTypeNames.find(op.op)->second + << "count" + << count)); } if (_config->breakOnTrap) return; @@ -1139,7 +1215,7 @@ void BenchRunWorker::generateLoadOnConnection(DBClientBase* conn) { } catch (...) { if (!_config->hideErrors || op.showError) log() << "Error in benchRun thread caused by unknown error for op " - << opTypeName.find(op.op)->second; + << kOpTypeNames.find(op.op)->second; if (!_config->handleErrors && !op.handleError) return; @@ -1169,7 +1245,7 @@ void BenchRunWorker::run() { } } - BenchRunWorkerStateGuard _workerStateGuard(_brState); + BenchRunWorkerStateGuard workerStateGuard(_brState); generateLoadOnConnection(conn.get()); } catch (const DBException& e) { error() << "DBException not handled in benchRun thread" << causedBy(e); @@ -1182,14 +1258,12 @@ void BenchRunWorker::run() { BenchRunner::BenchRunner(BenchRunConfig* config) : _brState(config->parallel), _config(config) { _oid.init(); + stdx::lock_guard<stdx::mutex> lk(_staticMutex); _activeRuns[_oid] = this; } -BenchRunner::~BenchRunner() { - for (size_t i = 0; i < _workers.size(); ++i) - delete _workers[i]; -} +BenchRunner::~BenchRunner() = default; void BenchRunner::start() { { @@ -1209,17 +1283,19 @@ void BenchRunner::start() { // Start threads for (int64_t i = 0; i < _config->parallel; i++) { // Make a unique random seed for the worker. - int64_t seed = _config->randomSeed + i; - BenchRunWorker* worker = new BenchRunWorker(i, _config.get(), &_brState, seed); + const int64_t seed = _config->randomSeed + i; + + auto worker = stdx::make_unique<BenchRunWorker>(i, _config.get(), _brState, seed); worker->start(); - _workers.push_back(worker); + + _workers.push_back(std::move(worker)); } _brState.waitForState(BenchRunState::BRS_RUNNING); // initial stats _brState.tellWorkersToCollectStats(); - _brTimer = new Timer(); + _brTimer.emplace(); } } @@ -1227,7 +1303,7 @@ void BenchRunner::stop() { _brState.tellWorkersToFinish(); _brState.waitForState(BenchRunState::BRS_FINISHED); _microsElapsed = _brTimer->micros(); - delete _brTimer; + _brTimer.reset(); { std::unique_ptr<DBClientBase> conn(_config->createConnection()); @@ -1259,39 +1335,52 @@ BenchRunner* BenchRunner::get(OID oid) { stdx::lock_guard<stdx::mutex> lk(_staticMutex); return _activeRuns[oid]; } -void BenchRunner::populateStats(BenchRunStats* stats) { + +BenchRunStats BenchRunner::gatherStats() const { _brState.assertFinished(); - stats->reset(); - for (size_t i = 0; i < _workers.size(); ++i) - stats->updateFrom(_workers[i]->stats()); + + BenchRunStats stats; + + for (size_t i = 0; i < _workers.size(); ++i) { + stats.updateFrom(_workers[i]->stats()); + } + + return stats; } BSONObj BenchRunner::finish(BenchRunner* runner) { runner->stop(); - BenchRunStats stats; - runner->populateStats(&stats); - - // vector<BSONOBj> errors = runner->config.errors; - bool error = stats.error; + const auto stats(runner->gatherStats()); - if (error) + const bool error = stats.error; + if (error) { return BSON("err" << 1); + } BSONObjBuilder buf; buf.append("note", "values per second"); buf.append("errCount", static_cast<long long>(stats.errCount)); buf.append("trapped", "error: not implemented"); - appendAverageMicrosIfAvailable(buf, "findOneLatencyAverageMicros", stats.findOneCounter); - appendAverageMicrosIfAvailable(buf, "insertLatencyAverageMicros", stats.insertCounter); - appendAverageMicrosIfAvailable(buf, "deleteLatencyAverageMicros", stats.deleteCounter); - appendAverageMicrosIfAvailable(buf, "updateLatencyAverageMicros", stats.updateCounter); - appendAverageMicrosIfAvailable(buf, "queryLatencyAverageMicros", stats.queryCounter); - appendAverageMicrosIfAvailable(buf, "commandsLatencyAverageMicros", stats.commandCounter); + + const auto appendAverageMicrosIfAvailable = [&buf](StringData name, + const BenchRunEventCounter& counter) { + if (counter.getNumEvents() > 0) { + buf.append(name, + static_cast<double>(counter.getTotalTimeMicros()) / counter.getNumEvents()); + } + }; + + appendAverageMicrosIfAvailable("findOneLatencyAverageMicros", stats.findOneCounter); + appendAverageMicrosIfAvailable("insertLatencyAverageMicros", stats.insertCounter); + appendAverageMicrosIfAvailable("deleteLatencyAverageMicros", stats.deleteCounter); + appendAverageMicrosIfAvailable("updateLatencyAverageMicros", stats.updateCounter); + appendAverageMicrosIfAvailable("queryLatencyAverageMicros", stats.queryCounter); + appendAverageMicrosIfAvailable("commandsLatencyAverageMicros", stats.commandCounter); buf.append("totalOps", static_cast<long long>(stats.opCount)); - auto appendPerSec = [&buf, runner](StringData name, double total) { + const auto appendPerSec = [&buf, runner](StringData name, double total) { buf.append(name, total / (runner->_microsElapsed / 1000000.0)); }; |