diff options
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/client.cpp | 188 | ||||
-rw-r--r-- | src/mongo/db/client.h | 64 | ||||
-rw-r--r-- | src/mongo/db/curop.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/dbhelpers.cpp | 19 | ||||
-rw-r--r-- | src/mongo/db/dbhelpers.h | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/master_slave.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 6 |
7 files changed, 142 insertions, 150 deletions
diff --git a/src/mongo/db/client.cpp b/src/mongo/db/client.cpp index 69dc31f8f1c..f39d00ec750 100644 --- a/src/mongo/db/client.cpp +++ b/src/mongo/db/client.cpp @@ -56,7 +56,6 @@ #include "mongo/db/dbwebserver.h" #include "mongo/db/instance.h" #include "mongo/db/json.h" -#include "mongo/db/jsobj.h" #include "mongo/db/lasterror.h" #include "mongo/db/repl/handshake_args.h" #include "mongo/db/repl/replication_coordinator_global.h" @@ -77,14 +76,70 @@ namespace mongo { using logger::LogComponent; +namespace { + + class HandshakeCmd : public Command { + public: + void help(stringstream& h) const { h << "internal"; } + HandshakeCmd() : Command("handshake") {} + virtual bool isWriteCommandForConfigServer() const { return false; } + virtual bool slaveOk() const { return true; } + virtual bool adminOnly() const { return false; } + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) { + ActionSet actions; + actions.addAction(ActionType::internal); + out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); + } + + virtual bool run(OperationContext* txn, + const string& ns, + BSONObj& cmdObj, + int options, + string& errmsg, + BSONObjBuilder& result, + bool fromRepl) { + + repl::HandshakeArgs handshake; + Status status = handshake.initialize(cmdObj); + if (!status.isOK()) { + return appendCommandStatus(result, status); + } + + // TODO(dannenberg) move this into actual processing for both version + txn->getClient()->setRemoteID(handshake.getRid()); + + status = repl::getGlobalReplicationCoordinator()->processHandshake(txn, handshake); + return appendCommandStatus(result, status); + } + + } handshakeCmd; + + + /** + * Create an appropriate new locker for the storage engine in use. Caller owns the return. + */ + Locker* newLocker() { + if (isMMAPV1()) { + return new MMAPV1LockerImpl(); + } + + return new LockerImpl<false>(); + } + +} // namespace + + boost::mutex Client::clientsMutex; ClientSet Client::clients; TSP_DEFINE(Client, currentClient) - /* each thread which does db operations has a Client object in TLS. - call this when your thread starts. - */ + /** + * This must be called whenever a new thread is started, so that active threads can be tracked + * so each thread has a Client object in TLS. + */ void Client::initThread(const char *desc, AbstractMessagingPort *mp) { invariant(currentClient.get() == 0); @@ -112,17 +167,6 @@ namespace mongo { clients.insert(client); } -namespace { - // Create an appropriate new locker for the storage engine in use. Caller owns. - Locker* newLocker() { - if (isMMAPV1()) { - return new MMAPV1LockerImpl(); - } - - return new LockerImpl<false>(); - } -} - Client::Client(const string& desc, AbstractMessagingPort *p) : ClientBasic(p), _desc(desc), @@ -169,11 +213,9 @@ namespace { return false; } - BSONObj CachedBSONObjBase::_tooBig = fromjson("{\"$msg\":\"query not recording (too large)\"}"); Client::Context::Context(OperationContext* txn, const std::string& ns, Database * db) - : _client(currentClient.get()), - _justCreated(false), + : _justCreated(false), _doVersion(true), _ns(ns), _db(db), @@ -184,8 +226,7 @@ namespace { const std::string& ns, Database* db, bool justCreated) - : _client(currentClient.get()), - _justCreated(justCreated), + : _justCreated(justCreated), _doVersion(true), _ns(ns), _db(db), @@ -196,8 +237,7 @@ namespace { Client::Context::Context(OperationContext* txn, const string& ns, bool doVersion) - : _client(currentClient.get()), - _justCreated(false), // set for real in finishInit + : _justCreated(false), // set for real in finishInit _doVersion(doVersion), _ns(ns), _db(NULL), @@ -206,6 +246,42 @@ namespace { _finishInit(); } + void Client::Context::_finishInit() { + _db = dbHolder().get(_txn, _ns); + if (_db) { + _justCreated = false; + } + else { + invariant(_txn->lockState()->isDbLockedForMode(nsToDatabaseSubstring(_ns), MODE_X)); + _db = dbHolder().openDb(_txn, _ns, &_justCreated); + invariant(_db); + } + + if (_doVersion) { + _checkNotStale(); + } + + _txn->getCurOp()->enter(_ns.c_str(), _db->getProfilingLevel()); + } + + void Client::Context::_checkNotStale() const { + switch (_txn->getCurOp()->getOp()) { + case dbGetMore: // getMore is special and should be handled elsewhere. + case dbUpdate: // update & delete check shard version in instance.cpp, so don't check + case dbDelete: // here as well. + break; + default: + ensureShardVersionOKOrThrow(_ns); + } + } + + Client::Context::~Context() { + // Lock must still be held + invariant(_txn->lockState()->isLocked()); + + _txn->getCurOp()->recordGlobalTime(_txn->lockState()->isWriteLocked(), _timer.micros()); + } + AutoGetDb::AutoGetDb(OperationContext* txn, StringData ns, LockMode mode) : _dbLock(txn->lockState(), ns, mode), @@ -294,42 +370,6 @@ namespace { } } - void Client::Context::checkNotStale() const { - switch ( _client->_curOp->getOp() ) { - case dbGetMore: // getMore's are special and should be handled else where - case dbUpdate: // update & delete check shard version in instance.cpp, so don't check here as well - case dbDelete: - break; - default: { - ensureShardVersionOKOrThrow(_ns); - } - } - } - - void Client::Context::_finishInit() { - _db = dbHolder().get(_txn, _ns); - if (_db) { - _justCreated = false; - } - else { - invariant(_txn->lockState()->isDbLockedForMode(nsToDatabaseSubstring(_ns), MODE_X)); - _db = dbHolder().openDb(_txn, _ns, &_justCreated); - invariant(_db); - } - - if( _doVersion ) checkNotStale(); - - _client->_curOp->enter(_ns.c_str(), _db->getProfilingLevel()); - } - - Client::Context::~Context() { - DEV verify( _client == currentClient.get() ); - - // Lock must still be held - invariant(_txn->lockState()->isLocked()); - - _client->_curOp->recordGlobalTime(_txn->lockState()->isWriteLocked(), _timer.micros()); - } void Client::appendLastOp( BSONObjBuilder& b ) const { // _lastOp is never set if replication is off @@ -375,38 +415,6 @@ namespace { return currentClient.get(); } - class HandshakeCmd : public Command { - public: - void help(stringstream& h) const { h << "internal"; } - HandshakeCmd() : Command( "handshake" ) {} - virtual bool isWriteCommandForConfigServer() const { return false; } - virtual bool slaveOk() const { return true; } - virtual bool adminOnly() const { return false; } - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) { - ActionSet actions; - actions.addAction(ActionType::internal); - out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); - } - virtual bool run(OperationContext* txn, const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { - repl::HandshakeArgs handshake; - Status status = handshake.initialize(cmdObj); - if (!status.isOK()) { - return appendCommandStatus(result, status); - } - - // TODO(dannenberg) move this into actual processing for both version - txn->getClient()->setRemoteID(handshake.getRid()); - - status = repl::getGlobalReplicationCoordinator()->processHandshake(txn, - handshake); - return appendCommandStatus(result, status); - } - - } handshakeCmd; - - void OpDebug::reset() { extra.reset(); diff --git a/src/mongo/db/client.h b/src/mongo/db/client.h index f9de90fe203..94a11fb3080 100644 --- a/src/mongo/db/client.h +++ b/src/mongo/db/client.h @@ -36,7 +36,6 @@ #pragma once -#include <boost/noncopyable.hpp> #include <boost/scoped_ptr.hpp> #include <boost/thread/thread.hpp> @@ -54,7 +53,6 @@ namespace mongo { - class AuthenticationInfo; class CurOp; class Collection; class AbstractMessagingPort; @@ -104,11 +102,11 @@ namespace mongo { public: AutoGetOrCreateDb(OperationContext* txn, StringData ns, LockMode mode); - Database* getDb() { + Database* getDb() const { return _db; } - bool justCreated() { + bool justCreated() const { return _justCreated; } @@ -178,7 +176,7 @@ namespace mongo { * Inits a thread if that thread has not already been init'd, setting the thread name to * "desc". */ - static void initThreadIfNotAlready(const char *desc) { + static void initThreadIfNotAlready(const char* desc) { if (currentClient.get()) return; initThread(desc); @@ -198,17 +196,17 @@ namespace mongo { */ bool shutdown(); - std::string clientAddress(bool includePort=false) const; + std::string clientAddress(bool includePort = false) const; CurOp* curop() const { return _curOp; } const std::string& desc() const { return _desc; } - void setLastOp( OpTime op ) { _lastOp = op; } + void setLastOp(OpTime op) { _lastOp = op; } OpTime getLastOp() const { return _lastOp; } // Return a reference to the Locker for this client. Client retains ownership. Locker* getLocker() const { return _locker.get(); } /* report what the last operation was. used by getlasterror */ - void appendLastOp( BSONObjBuilder& b ) const; + void appendLastOp(BSONObjBuilder& b) const; void reportState(BSONObjBuilder& builder); // Ensures stability of the client's OperationContext. When the client is locked, @@ -226,15 +224,19 @@ namespace mongo { bool isGod() const { return _god; } /* this is for map/reduce writes */ bool setGod(bool newVal) { const bool prev = _god; _god = newVal; return prev; } - void setRemoteID(const OID& rid) { _remoteId = rid; } // Only used for master/slave - OID getRemoteID() const { return _remoteId; } // Only used for master/slave + // Only used for master/slave + void setRemoteID(const OID& rid) { _remoteId = rid; } + OID getRemoteID() const { return _remoteId; } + ConnectionId getConnectionId() const { return _connectionId; } bool isFromUserConnection() const { return _connectionId > 0; } private: - Client(const std::string& desc, AbstractMessagingPort *p = 0); friend class CurOp; + Client(const std::string& desc, AbstractMessagingPort *p = 0); + + // Description for the client (e.g. conn8) const std::string _desc; @@ -262,16 +264,19 @@ namespace mongo { // Used by replication OpTime _lastOp; - OID _remoteId; // Only used by master-slave + + // Only used by master-slave + OID _remoteId; // Tracks if Client::shutdown() gets called (TODO: Is this necessary?) bool _shutdown; public: - /* Set database we want to use, then, restores when we finish (are out of scope) - Note this is also helpful if an exception happens as the state if fixed up. - */ + /** + * Opens the database that we want to use and sets the appropriate namespace on the + * current operation. + */ class Context { MONGO_DISALLOW_COPYING(Context); public: @@ -295,26 +300,18 @@ namespace mongo { Context(OperationContext* txn, const std::string& ns, Database * db); ~Context(); - Client* getClient() const { return _client; } + Database* db() const { return _db; } - const char * ns() const { return _ns.c_str(); } + const char* ns() const { return _ns.c_str(); } /** @return if the db was created by this Context */ bool justCreated() const { return _justCreated; } - /** call before unlocking, so clear any non-thread safe state - * _db gets restored on the relock - */ - void unlocked() { _db = 0; } - - /** call after going back into the lock, will re-establish non-thread safe stuff */ - void relocked() { _finishInit(); } - private: friend class CurOp; void _finishInit(); - void checkNotStale() const; - Client * const _client; + void _checkNotStale() const; + bool _justCreated; bool _doVersion; const std::string _ns; @@ -325,7 +322,8 @@ namespace mongo { }; // class Client::Context - class WriteContext : boost::noncopyable { + class WriteContext { + MONGO_DISALLOW_COPYING(WriteContext); public: WriteContext(OperationContext* opCtx, const std::string& ns); @@ -335,17 +333,17 @@ namespace mongo { return _c.db()->getCollection(_nss.ns()); } - Context& ctx() { return _c; } - private: - OperationContext* _txn; - NamespaceString _nss; + OperationContext* const _txn; + const NamespaceString _nss; + AutoGetOrCreateDb _autodb; Lock::CollectionLock _collk; Context _c; Collection* _collection; }; - }; // class Client + + }; /** get the Client object for this thread. */ inline Client& cc() { diff --git a/src/mongo/db/curop.cpp b/src/mongo/db/curop.cpp index f69d6196575..ae26aa84dd2 100644 --- a/src/mongo/db/curop.cpp +++ b/src/mongo/db/curop.cpp @@ -37,6 +37,7 @@ #include "mongo/db/commands/server_status_metric.h" #include "mongo/db/catalog/database.h" #include "mongo/db/global_environment_experiment.h" +#include "mongo/db/json.h" #include "mongo/db/stats/top.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" @@ -45,9 +46,9 @@ namespace mongo { using std::string; - // Enabling the maxTimeAlwaysTimeOut fail point will cause any query or command run with a valid - // non-zero max time to fail immediately. Any getmore operation on a cursor already created - // with a valid non-zero max time will also fail immediately. + // Enabling the maxTimeAlwaysTimeOut fail point will cause any query or command run with a + // valid non-zero max time to fail immediately. Any getmore operation on a cursor already + // created with a valid non-zero max time will also fail immediately. // // This fail point cannot be used with the maxTimeNeverTimeOut fail point. MONGO_FP_DECLARE(maxTimeAlwaysTimeOut); @@ -58,7 +59,10 @@ namespace mongo { // This fail point cannot be used with the maxTimeAlwaysTimeOut fail point. MONGO_FP_DECLARE(maxTimeNeverTimeOut); - // todo : move more here + + BSONObj CachedBSONObjBase::_tooBig = + fromjson("{\"$msg\":\"query not recording (too large)\"}"); + CurOp::CurOp( Client * client , CurOp * wrapped ) : _client(client), diff --git a/src/mongo/db/dbhelpers.cpp b/src/mongo/db/dbhelpers.cpp index 32fa31f3725..4a271717406 100644 --- a/src/mongo/db/dbhelpers.cpp +++ b/src/mongo/db/dbhelpers.cpp @@ -271,24 +271,7 @@ namespace mongo { update(txn, context.db(), request, &debug); - context.getClient()->curop()->done(); - } - - void Helpers::putSingletonGod(OperationContext* txn, const char *ns, BSONObj obj, bool logTheOp) { - OpDebug debug; - Client::Context context(txn, ns); - - const NamespaceString requestNs(ns); - UpdateRequest request(requestNs); - - request.setGod(); - request.setUpdates(obj); - request.setUpsert(); - request.setUpdateOpLog(logTheOp); - - update(txn, context.db(), request, &debug); - - context.getClient()->curop()->done(); + txn->getClient()->curop()->done(); } BSONObj Helpers::toKeyFormat( const BSONObj& o ) { diff --git a/src/mongo/db/dbhelpers.h b/src/mongo/db/dbhelpers.h index f9ead76bc20..e75a99aa9b6 100644 --- a/src/mongo/db/dbhelpers.h +++ b/src/mongo/db/dbhelpers.h @@ -116,7 +116,6 @@ namespace mongo { */ static bool getSingleton(OperationContext* txn, const char *ns, BSONObj& result); static void putSingleton(OperationContext* txn, const char *ns, BSONObj obj); - static void putSingletonGod(OperationContext* txn, const char *ns, BSONObj obj, bool logTheOp); /** * get last object int he collection; e.g. {$natural : -1} diff --git a/src/mongo/db/repl/master_slave.cpp b/src/mongo/db/repl/master_slave.cpp index 8f9fe332453..ae90609016e 100644 --- a/src/mongo/db/repl/master_slave.cpp +++ b/src/mongo/db/repl/master_slave.cpp @@ -688,7 +688,7 @@ namespace repl { // mongos will not send requests there. That's why the last argument is false (do not do // version checking). Client::Context ctx(txn, ns, false); - ctx.getClient()->curop()->reset(); + txn->getCurOp()->reset(); bool empty = !ctx.db()->getDatabaseCatalogEntry()->hasUserData(); bool incompleteClone = incompleteCloneDbs.count( clientName ) != 0; diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index beca5f37083..5086c6b3943 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -269,7 +269,7 @@ namespace { OplogDocWriter writer( partial, obj ); checkOplogInsert( localOplogRSCollection->insertDocument( txn, &writer, false ) ); - ctx.getClient()->setLastOp( slot.first ); + txn->getClient()->setLastOp( slot.first ); wunit.commit(); @@ -335,7 +335,7 @@ namespace { OplogDocWriter writer( partial, obj ); checkOplogInsert( localOplogMainCollection->insertDocument( txn, &writer, false ) ); - ctx.getClient()->setLastOp( slot.first ); + txn->getClient()->setLastOp(slot.first); wunit.commit(); } @@ -462,7 +462,7 @@ namespace { long long hash = ops.back()["h"].numberLong(); bgsync->setLastAppliedHash(hash); - ctx.getClient()->setLastOp(lastOptime); + txn->getClient()->setLastOp(lastOptime); replCoord->setMyLastOptime(lastOptime); setNewOptime(lastOptime); |