summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/client.cpp188
-rw-r--r--src/mongo/db/client.h64
-rw-r--r--src/mongo/db/curop.cpp12
-rw-r--r--src/mongo/db/dbhelpers.cpp19
-rw-r--r--src/mongo/db/dbhelpers.h1
-rw-r--r--src/mongo/db/repl/master_slave.cpp2
-rw-r--r--src/mongo/db/repl/oplog.cpp6
7 files changed, 142 insertions, 150 deletions
diff --git a/src/mongo/db/client.cpp b/src/mongo/db/client.cpp
index 69dc31f8f1c..f39d00ec750 100644
--- a/src/mongo/db/client.cpp
+++ b/src/mongo/db/client.cpp
@@ -56,7 +56,6 @@
#include "mongo/db/dbwebserver.h"
#include "mongo/db/instance.h"
#include "mongo/db/json.h"
-#include "mongo/db/jsobj.h"
#include "mongo/db/lasterror.h"
#include "mongo/db/repl/handshake_args.h"
#include "mongo/db/repl/replication_coordinator_global.h"
@@ -77,14 +76,70 @@ namespace mongo {
using logger::LogComponent;
+namespace {
+
+ class HandshakeCmd : public Command {
+ public:
+ void help(stringstream& h) const { h << "internal"; }
+ HandshakeCmd() : Command("handshake") {}
+ virtual bool isWriteCommandForConfigServer() const { return false; }
+ virtual bool slaveOk() const { return true; }
+ virtual bool adminOnly() const { return false; }
+ virtual void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) {
+ ActionSet actions;
+ actions.addAction(ActionType::internal);
+ out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
+ }
+
+ virtual bool run(OperationContext* txn,
+ const string& ns,
+ BSONObj& cmdObj,
+ int options,
+ string& errmsg,
+ BSONObjBuilder& result,
+ bool fromRepl) {
+
+ repl::HandshakeArgs handshake;
+ Status status = handshake.initialize(cmdObj);
+ if (!status.isOK()) {
+ return appendCommandStatus(result, status);
+ }
+
+ // TODO(dannenberg) move this into actual processing for both version
+ txn->getClient()->setRemoteID(handshake.getRid());
+
+ status = repl::getGlobalReplicationCoordinator()->processHandshake(txn, handshake);
+ return appendCommandStatus(result, status);
+ }
+
+ } handshakeCmd;
+
+
+ /**
+ * Create an appropriate new locker for the storage engine in use. Caller owns the return.
+ */
+ Locker* newLocker() {
+ if (isMMAPV1()) {
+ return new MMAPV1LockerImpl();
+ }
+
+ return new LockerImpl<false>();
+ }
+
+} // namespace
+
+
boost::mutex Client::clientsMutex;
ClientSet Client::clients;
TSP_DEFINE(Client, currentClient)
- /* each thread which does db operations has a Client object in TLS.
- call this when your thread starts.
- */
+ /**
+ * This must be called whenever a new thread is started, so that active threads can be tracked
+ * so each thread has a Client object in TLS.
+ */
void Client::initThread(const char *desc, AbstractMessagingPort *mp) {
invariant(currentClient.get() == 0);
@@ -112,17 +167,6 @@ namespace mongo {
clients.insert(client);
}
-namespace {
- // Create an appropriate new locker for the storage engine in use. Caller owns.
- Locker* newLocker() {
- if (isMMAPV1()) {
- return new MMAPV1LockerImpl();
- }
-
- return new LockerImpl<false>();
- }
-}
-
Client::Client(const string& desc, AbstractMessagingPort *p)
: ClientBasic(p),
_desc(desc),
@@ -169,11 +213,9 @@ namespace {
return false;
}
- BSONObj CachedBSONObjBase::_tooBig = fromjson("{\"$msg\":\"query not recording (too large)\"}");
Client::Context::Context(OperationContext* txn, const std::string& ns, Database * db)
- : _client(currentClient.get()),
- _justCreated(false),
+ : _justCreated(false),
_doVersion(true),
_ns(ns),
_db(db),
@@ -184,8 +226,7 @@ namespace {
const std::string& ns,
Database* db,
bool justCreated)
- : _client(currentClient.get()),
- _justCreated(justCreated),
+ : _justCreated(justCreated),
_doVersion(true),
_ns(ns),
_db(db),
@@ -196,8 +237,7 @@ namespace {
Client::Context::Context(OperationContext* txn,
const string& ns,
bool doVersion)
- : _client(currentClient.get()),
- _justCreated(false), // set for real in finishInit
+ : _justCreated(false), // set for real in finishInit
_doVersion(doVersion),
_ns(ns),
_db(NULL),
@@ -206,6 +246,42 @@ namespace {
_finishInit();
}
+ void Client::Context::_finishInit() {
+ _db = dbHolder().get(_txn, _ns);
+ if (_db) {
+ _justCreated = false;
+ }
+ else {
+ invariant(_txn->lockState()->isDbLockedForMode(nsToDatabaseSubstring(_ns), MODE_X));
+ _db = dbHolder().openDb(_txn, _ns, &_justCreated);
+ invariant(_db);
+ }
+
+ if (_doVersion) {
+ _checkNotStale();
+ }
+
+ _txn->getCurOp()->enter(_ns.c_str(), _db->getProfilingLevel());
+ }
+
+ void Client::Context::_checkNotStale() const {
+ switch (_txn->getCurOp()->getOp()) {
+ case dbGetMore: // getMore is special and should be handled elsewhere.
+ case dbUpdate: // update & delete check shard version in instance.cpp, so don't check
+ case dbDelete: // here as well.
+ break;
+ default:
+ ensureShardVersionOKOrThrow(_ns);
+ }
+ }
+
+ Client::Context::~Context() {
+ // Lock must still be held
+ invariant(_txn->lockState()->isLocked());
+
+ _txn->getCurOp()->recordGlobalTime(_txn->lockState()->isWriteLocked(), _timer.micros());
+ }
+
AutoGetDb::AutoGetDb(OperationContext* txn, StringData ns, LockMode mode)
: _dbLock(txn->lockState(), ns, mode),
@@ -294,42 +370,6 @@ namespace {
}
}
- void Client::Context::checkNotStale() const {
- switch ( _client->_curOp->getOp() ) {
- case dbGetMore: // getMore's are special and should be handled else where
- case dbUpdate: // update & delete check shard version in instance.cpp, so don't check here as well
- case dbDelete:
- break;
- default: {
- ensureShardVersionOKOrThrow(_ns);
- }
- }
- }
-
- void Client::Context::_finishInit() {
- _db = dbHolder().get(_txn, _ns);
- if (_db) {
- _justCreated = false;
- }
- else {
- invariant(_txn->lockState()->isDbLockedForMode(nsToDatabaseSubstring(_ns), MODE_X));
- _db = dbHolder().openDb(_txn, _ns, &_justCreated);
- invariant(_db);
- }
-
- if( _doVersion ) checkNotStale();
-
- _client->_curOp->enter(_ns.c_str(), _db->getProfilingLevel());
- }
-
- Client::Context::~Context() {
- DEV verify( _client == currentClient.get() );
-
- // Lock must still be held
- invariant(_txn->lockState()->isLocked());
-
- _client->_curOp->recordGlobalTime(_txn->lockState()->isWriteLocked(), _timer.micros());
- }
void Client::appendLastOp( BSONObjBuilder& b ) const {
// _lastOp is never set if replication is off
@@ -375,38 +415,6 @@ namespace {
return currentClient.get();
}
- class HandshakeCmd : public Command {
- public:
- void help(stringstream& h) const { h << "internal"; }
- HandshakeCmd() : Command( "handshake" ) {}
- virtual bool isWriteCommandForConfigServer() const { return false; }
- virtual bool slaveOk() const { return true; }
- virtual bool adminOnly() const { return false; }
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) {
- ActionSet actions;
- actions.addAction(ActionType::internal);
- out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
- }
- virtual bool run(OperationContext* txn, const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
- repl::HandshakeArgs handshake;
- Status status = handshake.initialize(cmdObj);
- if (!status.isOK()) {
- return appendCommandStatus(result, status);
- }
-
- // TODO(dannenberg) move this into actual processing for both version
- txn->getClient()->setRemoteID(handshake.getRid());
-
- status = repl::getGlobalReplicationCoordinator()->processHandshake(txn,
- handshake);
- return appendCommandStatus(result, status);
- }
-
- } handshakeCmd;
-
-
void OpDebug::reset() {
extra.reset();
diff --git a/src/mongo/db/client.h b/src/mongo/db/client.h
index f9de90fe203..94a11fb3080 100644
--- a/src/mongo/db/client.h
+++ b/src/mongo/db/client.h
@@ -36,7 +36,6 @@
#pragma once
-#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/thread/thread.hpp>
@@ -54,7 +53,6 @@
namespace mongo {
- class AuthenticationInfo;
class CurOp;
class Collection;
class AbstractMessagingPort;
@@ -104,11 +102,11 @@ namespace mongo {
public:
AutoGetOrCreateDb(OperationContext* txn, StringData ns, LockMode mode);
- Database* getDb() {
+ Database* getDb() const {
return _db;
}
- bool justCreated() {
+ bool justCreated() const {
return _justCreated;
}
@@ -178,7 +176,7 @@ namespace mongo {
* Inits a thread if that thread has not already been init'd, setting the thread name to
* "desc".
*/
- static void initThreadIfNotAlready(const char *desc) {
+ static void initThreadIfNotAlready(const char* desc) {
if (currentClient.get())
return;
initThread(desc);
@@ -198,17 +196,17 @@ namespace mongo {
*/
bool shutdown();
- std::string clientAddress(bool includePort=false) const;
+ std::string clientAddress(bool includePort = false) const;
CurOp* curop() const { return _curOp; }
const std::string& desc() const { return _desc; }
- void setLastOp( OpTime op ) { _lastOp = op; }
+ void setLastOp(OpTime op) { _lastOp = op; }
OpTime getLastOp() const { return _lastOp; }
// Return a reference to the Locker for this client. Client retains ownership.
Locker* getLocker() const { return _locker.get(); }
/* report what the last operation was. used by getlasterror */
- void appendLastOp( BSONObjBuilder& b ) const;
+ void appendLastOp(BSONObjBuilder& b) const;
void reportState(BSONObjBuilder& builder);
// Ensures stability of the client's OperationContext. When the client is locked,
@@ -226,15 +224,19 @@ namespace mongo {
bool isGod() const { return _god; } /* this is for map/reduce writes */
bool setGod(bool newVal) { const bool prev = _god; _god = newVal; return prev; }
- void setRemoteID(const OID& rid) { _remoteId = rid; } // Only used for master/slave
- OID getRemoteID() const { return _remoteId; } // Only used for master/slave
+ // Only used for master/slave
+ void setRemoteID(const OID& rid) { _remoteId = rid; }
+ OID getRemoteID() const { return _remoteId; }
+
ConnectionId getConnectionId() const { return _connectionId; }
bool isFromUserConnection() const { return _connectionId > 0; }
private:
- Client(const std::string& desc, AbstractMessagingPort *p = 0);
friend class CurOp;
+ Client(const std::string& desc, AbstractMessagingPort *p = 0);
+
+
// Description for the client (e.g. conn8)
const std::string _desc;
@@ -262,16 +264,19 @@ namespace mongo {
// Used by replication
OpTime _lastOp;
- OID _remoteId; // Only used by master-slave
+
+ // Only used by master-slave
+ OID _remoteId;
// Tracks if Client::shutdown() gets called (TODO: Is this necessary?)
bool _shutdown;
public:
- /* Set database we want to use, then, restores when we finish (are out of scope)
- Note this is also helpful if an exception happens as the state if fixed up.
- */
+ /**
+ * Opens the database that we want to use and sets the appropriate namespace on the
+ * current operation.
+ */
class Context {
MONGO_DISALLOW_COPYING(Context);
public:
@@ -295,26 +300,18 @@ namespace mongo {
Context(OperationContext* txn, const std::string& ns, Database * db);
~Context();
- Client* getClient() const { return _client; }
+
Database* db() const { return _db; }
- const char * ns() const { return _ns.c_str(); }
+ const char* ns() const { return _ns.c_str(); }
/** @return if the db was created by this Context */
bool justCreated() const { return _justCreated; }
- /** call before unlocking, so clear any non-thread safe state
- * _db gets restored on the relock
- */
- void unlocked() { _db = 0; }
-
- /** call after going back into the lock, will re-establish non-thread safe stuff */
- void relocked() { _finishInit(); }
-
private:
friend class CurOp;
void _finishInit();
- void checkNotStale() const;
- Client * const _client;
+ void _checkNotStale() const;
+
bool _justCreated;
bool _doVersion;
const std::string _ns;
@@ -325,7 +322,8 @@ namespace mongo {
}; // class Client::Context
- class WriteContext : boost::noncopyable {
+ class WriteContext {
+ MONGO_DISALLOW_COPYING(WriteContext);
public:
WriteContext(OperationContext* opCtx, const std::string& ns);
@@ -335,17 +333,17 @@ namespace mongo {
return _c.db()->getCollection(_nss.ns());
}
- Context& ctx() { return _c; }
-
private:
- OperationContext* _txn;
- NamespaceString _nss;
+ OperationContext* const _txn;
+ const NamespaceString _nss;
+
AutoGetOrCreateDb _autodb;
Lock::CollectionLock _collk;
Context _c;
Collection* _collection;
};
- }; // class Client
+
+ };
/** get the Client object for this thread. */
inline Client& cc() {
diff --git a/src/mongo/db/curop.cpp b/src/mongo/db/curop.cpp
index f69d6196575..ae26aa84dd2 100644
--- a/src/mongo/db/curop.cpp
+++ b/src/mongo/db/curop.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/commands/server_status_metric.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/global_environment_experiment.h"
+#include "mongo/db/json.h"
#include "mongo/db/stats/top.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
@@ -45,9 +46,9 @@ namespace mongo {
using std::string;
- // Enabling the maxTimeAlwaysTimeOut fail point will cause any query or command run with a valid
- // non-zero max time to fail immediately. Any getmore operation on a cursor already created
- // with a valid non-zero max time will also fail immediately.
+ // Enabling the maxTimeAlwaysTimeOut fail point will cause any query or command run with a
+ // valid non-zero max time to fail immediately. Any getmore operation on a cursor already
+ // created with a valid non-zero max time will also fail immediately.
//
// This fail point cannot be used with the maxTimeNeverTimeOut fail point.
MONGO_FP_DECLARE(maxTimeAlwaysTimeOut);
@@ -58,7 +59,10 @@ namespace mongo {
// This fail point cannot be used with the maxTimeAlwaysTimeOut fail point.
MONGO_FP_DECLARE(maxTimeNeverTimeOut);
- // todo : move more here
+
+ BSONObj CachedBSONObjBase::_tooBig =
+ fromjson("{\"$msg\":\"query not recording (too large)\"}");
+
CurOp::CurOp( Client * client , CurOp * wrapped ) :
_client(client),
diff --git a/src/mongo/db/dbhelpers.cpp b/src/mongo/db/dbhelpers.cpp
index 32fa31f3725..4a271717406 100644
--- a/src/mongo/db/dbhelpers.cpp
+++ b/src/mongo/db/dbhelpers.cpp
@@ -271,24 +271,7 @@ namespace mongo {
update(txn, context.db(), request, &debug);
- context.getClient()->curop()->done();
- }
-
- void Helpers::putSingletonGod(OperationContext* txn, const char *ns, BSONObj obj, bool logTheOp) {
- OpDebug debug;
- Client::Context context(txn, ns);
-
- const NamespaceString requestNs(ns);
- UpdateRequest request(requestNs);
-
- request.setGod();
- request.setUpdates(obj);
- request.setUpsert();
- request.setUpdateOpLog(logTheOp);
-
- update(txn, context.db(), request, &debug);
-
- context.getClient()->curop()->done();
+ txn->getClient()->curop()->done();
}
BSONObj Helpers::toKeyFormat( const BSONObj& o ) {
diff --git a/src/mongo/db/dbhelpers.h b/src/mongo/db/dbhelpers.h
index f9ead76bc20..e75a99aa9b6 100644
--- a/src/mongo/db/dbhelpers.h
+++ b/src/mongo/db/dbhelpers.h
@@ -116,7 +116,6 @@ namespace mongo {
*/
static bool getSingleton(OperationContext* txn, const char *ns, BSONObj& result);
static void putSingleton(OperationContext* txn, const char *ns, BSONObj obj);
- static void putSingletonGod(OperationContext* txn, const char *ns, BSONObj obj, bool logTheOp);
/**
* get last object int he collection; e.g. {$natural : -1}
diff --git a/src/mongo/db/repl/master_slave.cpp b/src/mongo/db/repl/master_slave.cpp
index 8f9fe332453..ae90609016e 100644
--- a/src/mongo/db/repl/master_slave.cpp
+++ b/src/mongo/db/repl/master_slave.cpp
@@ -688,7 +688,7 @@ namespace repl {
// mongos will not send requests there. That's why the last argument is false (do not do
// version checking).
Client::Context ctx(txn, ns, false);
- ctx.getClient()->curop()->reset();
+ txn->getCurOp()->reset();
bool empty = !ctx.db()->getDatabaseCatalogEntry()->hasUserData();
bool incompleteClone = incompleteCloneDbs.count( clientName ) != 0;
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index beca5f37083..5086c6b3943 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -269,7 +269,7 @@ namespace {
OplogDocWriter writer( partial, obj );
checkOplogInsert( localOplogRSCollection->insertDocument( txn, &writer, false ) );
- ctx.getClient()->setLastOp( slot.first );
+ txn->getClient()->setLastOp( slot.first );
wunit.commit();
@@ -335,7 +335,7 @@ namespace {
OplogDocWriter writer( partial, obj );
checkOplogInsert( localOplogMainCollection->insertDocument( txn, &writer, false ) );
- ctx.getClient()->setLastOp( slot.first );
+ txn->getClient()->setLastOp(slot.first);
wunit.commit();
}
@@ -462,7 +462,7 @@ namespace {
long long hash = ops.back()["h"].numberLong();
bgsync->setLastAppliedHash(hash);
- ctx.getClient()->setLastOp(lastOptime);
+ txn->getClient()->setLastOp(lastOptime);
replCoord->setMyLastOptime(lastOptime);
setNewOptime(lastOptime);