diff options
Diffstat (limited to 'src/mongo/db/client.cpp')
-rw-r--r-- | src/mongo/db/client.cpp | 188 |
1 files changed, 98 insertions, 90 deletions
diff --git a/src/mongo/db/client.cpp b/src/mongo/db/client.cpp index 69dc31f8f1c..f39d00ec750 100644 --- a/src/mongo/db/client.cpp +++ b/src/mongo/db/client.cpp @@ -56,7 +56,6 @@ #include "mongo/db/dbwebserver.h" #include "mongo/db/instance.h" #include "mongo/db/json.h" -#include "mongo/db/jsobj.h" #include "mongo/db/lasterror.h" #include "mongo/db/repl/handshake_args.h" #include "mongo/db/repl/replication_coordinator_global.h" @@ -77,14 +76,70 @@ namespace mongo { using logger::LogComponent; +namespace { + + class HandshakeCmd : public Command { + public: + void help(stringstream& h) const { h << "internal"; } + HandshakeCmd() : Command("handshake") {} + virtual bool isWriteCommandForConfigServer() const { return false; } + virtual bool slaveOk() const { return true; } + virtual bool adminOnly() const { return false; } + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) { + ActionSet actions; + actions.addAction(ActionType::internal); + out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); + } + + virtual bool run(OperationContext* txn, + const string& ns, + BSONObj& cmdObj, + int options, + string& errmsg, + BSONObjBuilder& result, + bool fromRepl) { + + repl::HandshakeArgs handshake; + Status status = handshake.initialize(cmdObj); + if (!status.isOK()) { + return appendCommandStatus(result, status); + } + + // TODO(dannenberg) move this into actual processing for both version + txn->getClient()->setRemoteID(handshake.getRid()); + + status = repl::getGlobalReplicationCoordinator()->processHandshake(txn, handshake); + return appendCommandStatus(result, status); + } + + } handshakeCmd; + + + /** + * Create an appropriate new locker for the storage engine in use. Caller owns the return. + */ + Locker* newLocker() { + if (isMMAPV1()) { + return new MMAPV1LockerImpl(); + } + + return new LockerImpl<false>(); + } + +} // namespace + + boost::mutex Client::clientsMutex; ClientSet Client::clients; TSP_DEFINE(Client, currentClient) - /* each thread which does db operations has a Client object in TLS. - call this when your thread starts. - */ + /** + * This must be called whenever a new thread is started, so that active threads can be tracked + * so each thread has a Client object in TLS. + */ void Client::initThread(const char *desc, AbstractMessagingPort *mp) { invariant(currentClient.get() == 0); @@ -112,17 +167,6 @@ namespace mongo { clients.insert(client); } -namespace { - // Create an appropriate new locker for the storage engine in use. Caller owns. - Locker* newLocker() { - if (isMMAPV1()) { - return new MMAPV1LockerImpl(); - } - - return new LockerImpl<false>(); - } -} - Client::Client(const string& desc, AbstractMessagingPort *p) : ClientBasic(p), _desc(desc), @@ -169,11 +213,9 @@ namespace { return false; } - BSONObj CachedBSONObjBase::_tooBig = fromjson("{\"$msg\":\"query not recording (too large)\"}"); Client::Context::Context(OperationContext* txn, const std::string& ns, Database * db) - : _client(currentClient.get()), - _justCreated(false), + : _justCreated(false), _doVersion(true), _ns(ns), _db(db), @@ -184,8 +226,7 @@ namespace { const std::string& ns, Database* db, bool justCreated) - : _client(currentClient.get()), - _justCreated(justCreated), + : _justCreated(justCreated), _doVersion(true), _ns(ns), _db(db), @@ -196,8 +237,7 @@ namespace { Client::Context::Context(OperationContext* txn, const string& ns, bool doVersion) - : _client(currentClient.get()), - _justCreated(false), // set for real in finishInit + : _justCreated(false), // set for real in finishInit _doVersion(doVersion), _ns(ns), _db(NULL), @@ -206,6 +246,42 @@ namespace { _finishInit(); } + void Client::Context::_finishInit() { + _db = dbHolder().get(_txn, _ns); + if (_db) { + _justCreated = false; + } + else { + invariant(_txn->lockState()->isDbLockedForMode(nsToDatabaseSubstring(_ns), MODE_X)); + _db = dbHolder().openDb(_txn, _ns, &_justCreated); + invariant(_db); + } + + if (_doVersion) { + _checkNotStale(); + } + + _txn->getCurOp()->enter(_ns.c_str(), _db->getProfilingLevel()); + } + + void Client::Context::_checkNotStale() const { + switch (_txn->getCurOp()->getOp()) { + case dbGetMore: // getMore is special and should be handled elsewhere. + case dbUpdate: // update & delete check shard version in instance.cpp, so don't check + case dbDelete: // here as well. + break; + default: + ensureShardVersionOKOrThrow(_ns); + } + } + + Client::Context::~Context() { + // Lock must still be held + invariant(_txn->lockState()->isLocked()); + + _txn->getCurOp()->recordGlobalTime(_txn->lockState()->isWriteLocked(), _timer.micros()); + } + AutoGetDb::AutoGetDb(OperationContext* txn, StringData ns, LockMode mode) : _dbLock(txn->lockState(), ns, mode), @@ -294,42 +370,6 @@ namespace { } } - void Client::Context::checkNotStale() const { - switch ( _client->_curOp->getOp() ) { - case dbGetMore: // getMore's are special and should be handled else where - case dbUpdate: // update & delete check shard version in instance.cpp, so don't check here as well - case dbDelete: - break; - default: { - ensureShardVersionOKOrThrow(_ns); - } - } - } - - void Client::Context::_finishInit() { - _db = dbHolder().get(_txn, _ns); - if (_db) { - _justCreated = false; - } - else { - invariant(_txn->lockState()->isDbLockedForMode(nsToDatabaseSubstring(_ns), MODE_X)); - _db = dbHolder().openDb(_txn, _ns, &_justCreated); - invariant(_db); - } - - if( _doVersion ) checkNotStale(); - - _client->_curOp->enter(_ns.c_str(), _db->getProfilingLevel()); - } - - Client::Context::~Context() { - DEV verify( _client == currentClient.get() ); - - // Lock must still be held - invariant(_txn->lockState()->isLocked()); - - _client->_curOp->recordGlobalTime(_txn->lockState()->isWriteLocked(), _timer.micros()); - } void Client::appendLastOp( BSONObjBuilder& b ) const { // _lastOp is never set if replication is off @@ -375,38 +415,6 @@ namespace { return currentClient.get(); } - class HandshakeCmd : public Command { - public: - void help(stringstream& h) const { h << "internal"; } - HandshakeCmd() : Command( "handshake" ) {} - virtual bool isWriteCommandForConfigServer() const { return false; } - virtual bool slaveOk() const { return true; } - virtual bool adminOnly() const { return false; } - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) { - ActionSet actions; - actions.addAction(ActionType::internal); - out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); - } - virtual bool run(OperationContext* txn, const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { - repl::HandshakeArgs handshake; - Status status = handshake.initialize(cmdObj); - if (!status.isOK()) { - return appendCommandStatus(result, status); - } - - // TODO(dannenberg) move this into actual processing for both version - txn->getClient()->setRemoteID(handshake.getRid()); - - status = repl::getGlobalReplicationCoordinator()->processHandshake(txn, - handshake); - return appendCommandStatus(result, status); - } - - } handshakeCmd; - - void OpDebug::reset() { extra.reset(); |