/** @file bench.cpp */ /* * Copyright (C) 2010 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault #include "mongo/platform/basic.h" #include "mongo/shell/bench.h" #include #include #include #include #include #include "mongo/db/namespace_string.h" #include "mongo/client/dbclientcursor.h" #include "mongo/scripting/bson_template_evaluator.h" #include "mongo/scripting/engine.h" #include "mongo/util/log.h" #include "mongo/util/md5.h" #include "mongo/util/timer.h" #include "mongo/util/time_support.h" #include "mongo/util/version.h" // --------------------------------- // ---- benchmarking system -------- // --------------------------------- // TODO: Maybe extract as library to avoid code duplication? namespace { inline pcrecpp::RE_Options flags2options(const char* flags) { pcrecpp::RE_Options options; options.set_utf8(true); while ( flags && *flags ) { if ( *flags == 'i' ) options.set_caseless(true); else if ( *flags == 'm' ) options.set_multiline(true); else if ( *flags == 'x' ) options.set_extended(true); flags++; } return options; } } namespace mongo { using std::unique_ptr; using std::cout; using std::endl; using std::map; BenchRunEventCounter::BenchRunEventCounter() { reset(); } BenchRunEventCounter::~BenchRunEventCounter() {} void BenchRunEventCounter::reset() { _numEvents = 0; _totalTimeMicros = 0; } void BenchRunEventCounter::updateFrom(const BenchRunEventCounter &other) { _numEvents += other._numEvents; _totalTimeMicros += other._totalTimeMicros; } BenchRunStats::BenchRunStats() { reset(); } BenchRunStats::~BenchRunStats() {} void BenchRunStats::reset() { error = false; errCount = 0; findOneCounter.reset(); updateCounter.reset(); insertCounter.reset(); deleteCounter.reset(); queryCounter.reset(); trappedErrors.clear(); } void BenchRunStats::updateFrom(const BenchRunStats &other) { if (other.error) error = true; errCount += other.errCount; findOneCounter.updateFrom(other.findOneCounter); updateCounter.updateFrom(other.updateCounter); insertCounter.updateFrom(other.insertCounter); deleteCounter.updateFrom(other.deleteCounter); queryCounter.updateFrom(other.queryCounter); for (size_t i = 0; i < other.trappedErrors.size(); ++i) trappedErrors.push_back(other.trappedErrors[i]); } BenchRunConfig::BenchRunConfig() { initializeToDefaults(); } void BenchRunConfig::initializeToDefaults() { host = "localhost"; db = "test"; username = ""; password = ""; parallel = 1; seconds = 1; hideResults = true; handleErrors = false; hideErrors = false; trapPattern.reset(); noTrapPattern.reset(); watchPattern.reset(); noWatchPattern.reset(); ops = BSONObj(); throwGLE = false; breakOnTrap = true; randomSeed = 1314159265358979323; } BenchRunConfig *BenchRunConfig::createFromBson( const BSONObj &args ) { BenchRunConfig *config = new BenchRunConfig(); config->initializeFromBson( args ); return config; } void BenchRunConfig::initializeFromBson( const BSONObj &args ) { initializeToDefaults(); if ( args["host"].type() == String ) this->host = args["host"].String(); if ( args["db"].type() == String ) this->db = args["db"].String(); if ( args["username"].type() == String ) this->username = args["username"].String(); if ( args["password"].type() == String ) this->password = args["password"].String(); if ( args["parallel"].isNumber() ) this->parallel = args["parallel"].numberInt(); if ( args["randomSeed"].isNumber() ) this->randomSeed = args["randomSeed"].numberInt(); if ( args["seconds"].isNumber() ) this->seconds = args["seconds"].number(); if ( ! args["hideResults"].eoo() ) this->hideResults = args["hideResults"].trueValue(); if ( ! args["handleErrors"].eoo() ) this->handleErrors = args["handleErrors"].trueValue(); if ( ! args["hideErrors"].eoo() ) this->hideErrors = args["hideErrors"].trueValue(); if ( ! args["throwGLE"].eoo() ) this->throwGLE = args["throwGLE"].trueValue(); if ( ! args["breakOnTrap"].eoo() ) this->breakOnTrap = args["breakOnTrap"].trueValue(); uassert(16164, "loopCommands config not supported", args["loopCommands"].eoo()); if ( ! args["trapPattern"].eoo() ){ const char* regex = args["trapPattern"].regex(); const char* flags = args["trapPattern"].regexFlags(); this->trapPattern = boost::shared_ptr< pcrecpp::RE >( new pcrecpp::RE( regex, flags2options( flags ) ) ); } if ( ! args["noTrapPattern"].eoo() ){ const char* regex = args["noTrapPattern"].regex(); const char* flags = args["noTrapPattern"].regexFlags(); this->noTrapPattern = boost::shared_ptr< pcrecpp::RE >( new pcrecpp::RE( regex, flags2options( flags ) ) ); } if ( ! args["watchPattern"].eoo() ){ const char* regex = args["watchPattern"].regex(); const char* flags = args["watchPattern"].regexFlags(); this->watchPattern = boost::shared_ptr< pcrecpp::RE >( new pcrecpp::RE( regex, flags2options( flags ) ) ); } if ( ! args["noWatchPattern"].eoo() ){ const char* regex = args["noWatchPattern"].regex(); const char* flags = args["noWatchPattern"].regexFlags(); this->noWatchPattern = boost::shared_ptr< pcrecpp::RE >( new pcrecpp::RE( regex, flags2options( flags ) ) ); } this->ops = args["ops"].Obj().getOwned(); } DBClientBase *BenchRunConfig::createConnection() const { const ConnectionString connectionString = uassertStatusOK(ConnectionString::parse(host)); std::string errorMessage; DBClientBase *connection = connectionString.connect(errorMessage); uassert( 16158, errorMessage, connection != NULL ); return connection; } BenchRunState::BenchRunState( unsigned numWorkers ) : _mutex(), _numUnstartedWorkers( numWorkers ), _numActiveWorkers( 0 ), _isShuttingDown( 0 ) { } BenchRunState::~BenchRunState() { wassert(_numActiveWorkers == 0 && _numUnstartedWorkers == 0); } void BenchRunState::waitForState(State awaitedState) { boost::lock_guard lk(_mutex); switch ( awaitedState ) { case BRS_RUNNING: while ( _numUnstartedWorkers > 0 ) { massert( 16147, "Already finished.", _numUnstartedWorkers + _numActiveWorkers > 0 ); _stateChangeCondition.wait( _mutex ); } break; case BRS_FINISHED: while ( _numUnstartedWorkers + _numActiveWorkers > 0 ) { _stateChangeCondition.wait( _mutex ); } break; default: msgasserted(16152, mongoutils::str::stream() << "Cannot wait for state " << awaitedState); } } void BenchRunState::tellWorkersToFinish() { _isShuttingDown.store( 1 ); } void BenchRunState::assertFinished() { boost::lock_guard lk(_mutex); verify(0 == _numUnstartedWorkers + _numActiveWorkers); } bool BenchRunState::shouldWorkerFinish() { return (_isShuttingDown.loadRelaxed() == 1); } void BenchRunState::onWorkerStarted() { boost::lock_guard lk(_mutex); verify( _numUnstartedWorkers > 0 ); --_numUnstartedWorkers; ++_numActiveWorkers; if (_numUnstartedWorkers == 0) { _stateChangeCondition.notify_all(); } } void BenchRunState::onWorkerFinished() { boost::lock_guard lk(_mutex); verify( _numActiveWorkers > 0 ); --_numActiveWorkers; if (_numActiveWorkers + _numUnstartedWorkers == 0) { _stateChangeCondition.notify_all(); } } BSONObj benchStart( const BSONObj& , void* ); BSONObj benchFinish( const BSONObj& , void* ); static bool _hasSpecial( const BSONObj& obj ) { BSONObjIterator i( obj ); while ( i.more() ) { BSONElement e = i.next(); if ( e.fieldName()[0] == '#' ) return true; if ( ! e.isABSONObj() ) continue; if ( _hasSpecial( e.Obj() ) ) return true; } return false; } static BSONObj fixQuery( const BSONObj& obj, BsonTemplateEvaluator& btl ) { if ( ! _hasSpecial( obj ) ) return obj; BSONObjBuilder b( obj.objsize() + 128 ); verify(BsonTemplateEvaluator::StatusSuccess == btl.evaluate(obj, b)); return b.obj(); } BenchRunWorker::BenchRunWorker(size_t id, const BenchRunConfig *config, BenchRunState *brState, int64_t randomSeed) : _id(id), _config(config), _brState(brState), _randomSeed(randomSeed) { } BenchRunWorker::~BenchRunWorker() {} void BenchRunWorker::start() { boost::thread(stdx::bind(&BenchRunWorker::run, this)); } bool BenchRunWorker::shouldStop() const { return _brState->shouldWorkerFinish(); } void doNothing(const BSONObj&) { } void BenchRunWorker::generateLoadOnConnection( DBClientBase* conn ) { verify( conn ); long long count = 0; mongo::Timer timer; BsonTemplateEvaluator bsonTemplateEvaluator(_randomSeed); invariant(bsonTemplateEvaluator.setId(_id) == BsonTemplateEvaluator::StatusSuccess); if (_config->username != "") { string errmsg; if (!conn->auth("admin", _config->username, _config->password, errmsg)) { uasserted(15931, "Authenticating to connection for _benchThread failed: " + errmsg); } } while ( !shouldStop() ) { BSONObjIterator i( _config->ops ); while ( i.more() ) { if ( shouldStop() ) break; BSONElement e = i.next(); string ns = e["ns"].String(); string op = e["op"].String(); int delay = e["delay"].eoo() ? 0 : e["delay"].Int(); // Let's default to writeCmd == false. bool useWriteCmd = e["writeCmd"].eoo() ? false : e["writeCmd"].Bool(); BSONObj context = e["context"].eoo() ? BSONObj() : e["context"].Obj(); unique_ptr scope; ScriptingFunction scopeFunc = 0; BSONObj scopeObj; bool check = ! e["check"].eoo(); if( check ){ if ( e["check"].type() == CodeWScope || e["check"].type() == Code || e["check"].type() == String ) { scope = globalScriptEngine->getPooledScope(NULL, ns, "benchrun"); verify( scope.get() ); if ( e.type() == CodeWScope ) { scopeFunc = scope->createFunction( e["check"].codeWScopeCode() ); scopeObj = BSONObj( e.codeWScopeScopeDataUnsafe() ); } else { scopeFunc = scope->createFunction( e["check"].valuestr() ); } scope->init( &scopeObj ); invariant(scopeFunc); } else { warning() << "Invalid check type detected in benchRun op : " << e << endl; check = false; } } try { if ( op == "nop") { // do nothing } else if ( op == "findOne" ) { BSONObj result; { BenchRunEventTrace _bret(&_stats.findOneCounter); result = conn->findOne( ns , fixQuery( e["query"].Obj(), bsonTemplateEvaluator ) ); } if( check ){ int err = scope->invoke( scopeFunc , 0 , &result, 1000 * 60 , false ); if( err ){ log() << "Error checking in benchRun thread [findOne]" << causedBy( scope->getError() ) << endl; _stats.errCount++; return; } } if( ! _config->hideResults || e["showResult"].trueValue() ) log() << "Result from benchRun thread [findOne] : " << result << endl; } else if ( op == "command" ) { BSONObj result; conn->runCommand( ns, fixQuery( e["command"].Obj(), bsonTemplateEvaluator ), result, e["options"].numberInt() ); if( check ){ int err = scope->invoke( scopeFunc , 0 , &result, 1000 * 60 , false ); if( err ){ log() << "Error checking in benchRun thread [command]" << causedBy( scope->getError() ) << endl; _stats.errCount++; return; } } if( ! _config->hideResults || e["showResult"].trueValue() ) log() << "Result from benchRun thread [command] : " << result << endl; } else if( op == "find" || op == "query" ) { int limit = e["limit"].eoo() ? 0 : e["limit"].numberInt(); int skip = e["skip"].eoo() ? 0 : e["skip"].Int(); int options = e["options"].eoo() ? 0 : e["options"].Int(); int batchSize = e["batchSize"].eoo() ? 0 : e["batchSize"].Int(); BSONObj filter = e["filter"].eoo() ? BSONObj() : e["filter"].Obj(); int expected = e["expected"].eoo() ? -1 : e["expected"].Int(); unique_ptr cursor; int count; BSONObj fixedQuery = fixQuery(e["query"].Obj(), bsonTemplateEvaluator); // use special query function for exhaust query option if (options & QueryOption_Exhaust) { BenchRunEventTrace _bret(&_stats.queryCounter); stdx::function castedDoNothing(doNothing); count = conn->query(castedDoNothing, ns, fixedQuery, &filter, options); } else { BenchRunEventTrace _bret(&_stats.queryCounter); cursor = conn->query(ns, fixedQuery, limit, skip, &filter, options, batchSize); count = cursor->itcount(); } if ( expected >= 0 && count != expected ) { cout << "bench query on: " << ns << " expected: " << expected << " got: " << count << endl; verify(false); } if( check ){ BSONObj thisValue = BSON( "count" << count << "context" << context ); int err = scope->invoke( scopeFunc , 0 , &thisValue, 1000 * 60 , false ); if( err ){ log() << "Error checking in benchRun thread [find]" << causedBy( scope->getError() ) << endl; _stats.errCount++; return; } } if( ! _config->hideResults || e["showResult"].trueValue() ) log() << "Result from benchRun thread [query] : " << count << endl; } else if( op == "update" ) { bool multi = e["multi"].trueValue(); bool upsert = e["upsert"].trueValue(); BSONObj queryOrginal = e["query"].eoo() ? BSONObj() : e["query"].Obj(); BSONObj updateOriginal = e["update"].Obj(); BSONObj result; bool safe = e["safe"].trueValue(); { BenchRunEventTrace _bret(&_stats.updateCounter); BSONObj query = fixQuery(queryOrginal, bsonTemplateEvaluator); BSONObj update = fixQuery(updateOriginal, bsonTemplateEvaluator); if (useWriteCmd) { // TODO: Replace after SERVER-11774. BSONObjBuilder builder; builder.append("update", nsToCollectionSubstring(ns)); BSONArrayBuilder docBuilder( builder.subarrayStart("updates")); docBuilder.append(BSON("q" << query << "u" << update << "multi" << multi << "upsert" << upsert)); docBuilder.done(); conn->runCommand( nsToDatabaseSubstring(ns).toString(), builder.done(), result); } else { conn->update(ns, query, update, upsert , multi); if (safe) result = conn->getLastErrorDetailed(); } } if( safe ){ if( check ){ int err = scope->invoke( scopeFunc , 0 , &result, 1000 * 60 , false ); if( err ){ log() << "Error checking in benchRun thread [update]" << causedBy( scope->getError() ) << endl; _stats.errCount++; return; } } if( ! _config->hideResults || e["showResult"].trueValue() ) log() << "Result from benchRun thread [safe update] : " << result << endl; if( ! result["err"].eoo() && result["err"].type() == String && ( _config->throwGLE || e["throwGLE"].trueValue() ) ) throw DBException( (string)"From benchRun GLE" + causedBy( result["err"].String() ), result["code"].eoo() ? 0 : result["code"].Int() ); } } else if( op == "insert" ) { bool safe = e["safe"].trueValue(); BSONObj result; { BenchRunEventTrace _bret(&_stats.insertCounter); BSONObj insertDoc = fixQuery(e["doc"].Obj(), bsonTemplateEvaluator); if (useWriteCmd) { BSONObjBuilder builder; builder.append("insert", nsToCollectionSubstring(ns)); BSONArrayBuilder docBuilder( builder.subarrayStart("documents")); docBuilder.append(insertDoc); docBuilder.done(); // TODO: Replace after SERVER-11774. conn->runCommand( nsToDatabaseSubstring(ns).toString(), builder.done(), result); } else { conn->insert(ns, insertDoc); if (safe) result = conn->getLastErrorDetailed(); } } if( safe ){ if( check ){ int err = scope->invoke( scopeFunc , 0 , &result, 1000 * 60 , false ); if( err ){ log() << "Error checking in benchRun thread [insert]" << causedBy( scope->getError() ) << endl; _stats.errCount++; return; } } if( ! _config->hideResults || e["showResult"].trueValue() ) log() << "Result from benchRun thread [safe insert] : " << result << endl; if( ! result["err"].eoo() && result["err"].type() == String && ( _config->throwGLE || e["throwGLE"].trueValue() ) ) throw DBException( (string)"From benchRun GLE" + causedBy( result["err"].String() ), result["code"].eoo() ? 0 : result["code"].Int() ); } } else if( op == "delete" || op == "remove" ) { bool multi = e["multi"].eoo() ? true : e["multi"].trueValue(); BSONObj query = e["query"].eoo() ? BSONObj() : e["query"].Obj(); bool safe = e["safe"].trueValue(); BSONObj result; { BenchRunEventTrace _bret(&_stats.deleteCounter); BSONObj predicate = fixQuery(query, bsonTemplateEvaluator); if (useWriteCmd) { // TODO: Replace after SERVER-11774. BSONObjBuilder builder; builder.append("delete", nsToCollectionSubstring(ns)); BSONArrayBuilder docBuilder( builder.subarrayStart("deletes")); int limit = (multi == true) ? 0 : 1; docBuilder.append( BSON("q" << predicate << "limit" << limit)); docBuilder.done(); conn->runCommand( nsToDatabaseSubstring(ns).toString(), builder.done(), result); } else { conn->remove(ns, predicate, !multi); if (safe) result = conn->getLastErrorDetailed(); } } if( safe ){ if( check ){ int err = scope->invoke( scopeFunc , 0 , &result, 1000 * 60 , false ); if( err ){ log() << "Error checking in benchRun thread [delete]" << causedBy( scope->getError() ) << endl; _stats.errCount++; return; } } if( ! _config->hideResults || e["showResult"].trueValue() ) log() << "Result from benchRun thread [safe remove] : " << result << endl; if( ! result["err"].eoo() && result["err"].type() == String && ( _config->throwGLE || e["throwGLE"].trueValue() ) ) throw DBException( (string)"From benchRun GLE " + causedBy( result["err"].String() ), result["code"].eoo() ? 0 : result["code"].Int() ); } } else if ( op == "createIndex" ) { conn->ensureIndex(ns, e["key"].Obj(), false, "", false); } else if ( op == "dropIndex" ) { conn->dropIndex( ns , e["key"].Obj() ); } else if( op == "let" ) { string target = e["target"].eoo() ? string() : e["target"].String(); BSONElement value = e["value"].eoo() ? BSONElement() : e["value"]; BSONObjBuilder valBuilder; BSONObjBuilder templateBuilder; valBuilder.append(value); bsonTemplateEvaluator.evaluate(valBuilder.done(), templateBuilder); bsonTemplateEvaluator.setVariable(target, templateBuilder.done().firstElement()); } else { log() << "don't understand op: " << op << endl; _stats.error = true; return; } } catch( DBException& ex ){ if( ! _config->hideErrors || e["showError"].trueValue() ){ bool yesWatch = ( _config->watchPattern && _config->watchPattern->FullMatch( ex.what() ) ); bool noWatch = ( _config->noWatchPattern && _config->noWatchPattern->FullMatch( ex.what() ) ); if( ( ! _config->watchPattern && _config->noWatchPattern && ! noWatch ) || // If we're just ignoring things ( ! _config->noWatchPattern && _config->watchPattern && yesWatch ) || // If we're just watching things ( _config->watchPattern && _config->noWatchPattern && yesWatch && ! noWatch ) ) log() << "Error in benchRun thread for op " << e << causedBy( ex ) << endl; } bool yesTrap = ( _config->trapPattern && _config->trapPattern->FullMatch( ex.what() ) ); bool noTrap = ( _config->noTrapPattern && _config->noTrapPattern->FullMatch( ex.what() ) ); if( ( ! _config->trapPattern && _config->noTrapPattern && ! noTrap ) || ( ! _config->noTrapPattern && _config->trapPattern && yesTrap ) || ( _config->trapPattern && _config->noTrapPattern && yesTrap && ! noTrap ) ){ { _stats.trappedErrors.push_back( BSON( "error" << ex.what() << "op" << e << "count" << count ) ); } if( _config->breakOnTrap ) return; } if( ! _config->handleErrors && ! e["handleError"].trueValue() ) return; _stats.errCount++; } catch( ... ){ if( ! _config->hideErrors || e["showError"].trueValue() ) log() << "Error in benchRun thread caused by unknown error for op " << e << endl; if( ! _config->handleErrors && ! e["handleError"].trueValue() ) return; _stats.errCount++; } if (++count % 100 == 0 && !useWriteCmd) { conn->getLastError(); } if (delay > 0) sleepmillis( delay ); } } conn->getLastError(); } namespace { class BenchRunWorkerStateGuard : private boost::noncopyable { public: explicit BenchRunWorkerStateGuard( BenchRunState *brState ) : _brState( brState ) { _brState->onWorkerStarted(); } ~BenchRunWorkerStateGuard() { _brState->onWorkerFinished(); } private: BenchRunState *_brState; }; } // namespace void BenchRunWorker::run() { try { BenchRunWorkerStateGuard _workerStateGuard( _brState ); boost::scoped_ptr conn( _config->createConnection() ); if ( !_config->username.empty() ) { string errmsg; if (!conn->auth("admin", _config->username, _config->password, errmsg)) { uasserted(15932, "Authenticating to connection for benchThread failed: " + errmsg); } } generateLoadOnConnection( conn.get() ); } catch( DBException& e ){ error() << "DBException not handled in benchRun thread" << causedBy( e ) << endl; } catch( std::exception& e ){ error() << "std::exception not handled in benchRun thread" << causedBy( e ) << endl; } catch( ... ){ error() << "Unknown exception not handled in benchRun thread." << endl; } } BenchRunner::BenchRunner( BenchRunConfig *config ) : _brState(config->parallel), _config(config) { _oid.init(); boost::lock_guard lk(_staticMutex); _activeRuns[_oid] = this; } BenchRunner::~BenchRunner() { for (size_t i = 0; i < _workers.size(); ++i) delete _workers[i]; } void BenchRunner::start( ) { { boost::scoped_ptr conn( _config->createConnection() ); // Must authenticate to admin db in order to run serverStatus command if (_config->username != "") { string errmsg; if (!conn->auth("admin", _config->username, _config->password, errmsg)) { uasserted(16704, str::stream() << "User " << _config->username << " could not authenticate to admin db; admin db access is " "required to use benchRun with auth enabled"); } } // Start threads for ( int64_t i = 0; i < _config->parallel; i++ ) { // Make a unique random seed for the worker. int64_t seed = _config->randomSeed + i; BenchRunWorker *worker = new BenchRunWorker(i, _config.get(), &_brState, seed); worker->start(); _workers.push_back(worker); } _brState.waitForState(BenchRunState::BRS_RUNNING); // initial stats conn->simpleCommand( "admin" , &before , "serverStatus" ); before = before.getOwned(); _brTimer = new mongo::Timer(); } } void BenchRunner::stop() { _brState.tellWorkersToFinish(); _brState.waitForState(BenchRunState::BRS_FINISHED); _microsElapsed = _brTimer->micros(); delete _brTimer; { boost::scoped_ptr conn( _config->createConnection() ); if (_config->username != "") { string errmsg; // this can only fail if admin access was revoked since start of run if (!conn->auth("admin", _config->username, _config->password, errmsg)) { uasserted(16705, str::stream() << "User " << _config->username << " could not authenticate to admin db; admin db access is " "still required to use benchRun with auth enabled"); } } // Get final stats conn->simpleCommand( "admin" , &after , "serverStatus" ); after = after.getOwned(); } { boost::lock_guard lk(_staticMutex); _activeRuns.erase( _oid ); } } BenchRunner* BenchRunner::createWithConfig( const BSONObj &configArgs ) { BenchRunConfig *config = BenchRunConfig::createFromBson( configArgs ); return new BenchRunner(config); } BenchRunner* BenchRunner::get( OID oid ) { boost::lock_guard lk(_staticMutex); return _activeRuns[ oid ]; } void BenchRunner::populateStats( BenchRunStats *stats ) { _brState.assertFinished(); stats->reset(); for ( size_t i = 0; i < _workers.size(); ++i ) stats->updateFrom( _workers[i]->stats() ); BSONObj before = this->before["opcounters"].Obj(); BSONObj after = this->after["opcounters"].Obj(); { BSONObjIterator i( after ); while ( i.more() ) { BSONElement e = i.next(); long long delta = e.numberLong(); delta -= before[e.fieldName()].numberLong(); stats->opcounters[e.fieldName()] = delta; } } } static void appendAverageMicrosIfAvailable( BSONObjBuilder &buf, const std::string &name, const BenchRunEventCounter &counter) { if (counter.getNumEvents() > 0) buf.append(name, static_cast(counter.getTotalTimeMicros()) / counter.getNumEvents()); } BSONObj BenchRunner::finish( BenchRunner* runner ) { runner->stop(); BenchRunStats stats; runner->populateStats(&stats); // vector errors = runner->config.errors; bool error = stats.error; if ( error ) return BSON( "err" << 1 ); // compute actual ops/sec BSONObj before = runner->before["opcounters"].Obj(); BSONObj after = runner->after["opcounters"].Obj(); BSONObjBuilder buf; buf.append( "note" , "values per second" ); buf.append( "errCount", (long long) stats.errCount ); buf.append( "trapped", "error: not implemented" ); appendAverageMicrosIfAvailable(buf, "findOneLatencyAverageMicros", stats.findOneCounter); appendAverageMicrosIfAvailable(buf, "insertLatencyAverageMicros", stats.insertCounter); appendAverageMicrosIfAvailable(buf, "deleteLatencyAverageMicros", stats.deleteCounter); appendAverageMicrosIfAvailable(buf, "updateLatencyAverageMicros", stats.updateCounter); appendAverageMicrosIfAvailable(buf, "queryLatencyAverageMicros", stats.queryCounter); { BSONObjIterator i( after ); while ( i.more() ) { BSONElement e = i.next(); double x = e.number(); x -= before[e.fieldName()].number(); std::string s = e.fieldName(); buf.append( s, x / (runner->_microsElapsed / 1000000.0) ); } } BSONObj zoo = buf.obj(); delete runner; return zoo; } boost::mutex BenchRunner::_staticMutex; map< OID, BenchRunner* > BenchRunner::_activeRuns; /** * benchRun( { ops : [] , host : XXX , db : XXXX , parallel : 5 , seconds : 5 } */ BSONObj BenchRunner::benchRunSync( const BSONObj& argsFake, void* data ) { BSONObj start = benchStart( argsFake, data ); OID oid = OID( start.firstElement().String() ); BenchRunner* runner = BenchRunner::get( oid ); sleepmillis( (int)(1000.0 * runner->config().seconds) ); return benchFinish( start, data ); } /** * benchRun( { ops : [] , host : XXX , db : XXXX , parallel : 5 , seconds : 5 } */ BSONObj BenchRunner::benchStart( const BSONObj& argsFake, void* data ) { verify( argsFake.firstElement().isABSONObj() ); BSONObj args = argsFake.firstElement().Obj(); // Get new BenchRunner object BenchRunner* runner = BenchRunner::createWithConfig( args ); runner->start(); return BSON( "" << runner->oid().toString() ); } /** * benchRun( { ops : [] , host : XXX , db : XXXX , parallel : 5 , seconds : 5 } */ BSONObj BenchRunner::benchFinish( const BSONObj& argsFake, void* data ) { OID oid = OID( argsFake.firstElement().String() ); // Get old BenchRunner object BenchRunner* runner = BenchRunner::get( oid ); BSONObj finalObj = BenchRunner::finish( runner ); return BSON( "" << finalObj ); } } // namespace mongo