summaryrefslogtreecommitdiff
path: root/src/mongo/db/client.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/client.cpp')
-rw-r--r--src/mongo/db/client.cpp188
1 files changed, 98 insertions, 90 deletions
diff --git a/src/mongo/db/client.cpp b/src/mongo/db/client.cpp
index 69dc31f8f1c..f39d00ec750 100644
--- a/src/mongo/db/client.cpp
+++ b/src/mongo/db/client.cpp
@@ -56,7 +56,6 @@
#include "mongo/db/dbwebserver.h"
#include "mongo/db/instance.h"
#include "mongo/db/json.h"
-#include "mongo/db/jsobj.h"
#include "mongo/db/lasterror.h"
#include "mongo/db/repl/handshake_args.h"
#include "mongo/db/repl/replication_coordinator_global.h"
@@ -77,14 +76,70 @@ namespace mongo {
using logger::LogComponent;
+namespace {
+
+ class HandshakeCmd : public Command {
+ public:
+ void help(stringstream& h) const { h << "internal"; }
+ HandshakeCmd() : Command("handshake") {}
+ virtual bool isWriteCommandForConfigServer() const { return false; }
+ virtual bool slaveOk() const { return true; }
+ virtual bool adminOnly() const { return false; }
+ virtual void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) {
+ ActionSet actions;
+ actions.addAction(ActionType::internal);
+ out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
+ }
+
+ virtual bool run(OperationContext* txn,
+ const string& ns,
+ BSONObj& cmdObj,
+ int options,
+ string& errmsg,
+ BSONObjBuilder& result,
+ bool fromRepl) {
+
+ repl::HandshakeArgs handshake;
+ Status status = handshake.initialize(cmdObj);
+ if (!status.isOK()) {
+ return appendCommandStatus(result, status);
+ }
+
+ // TODO(dannenberg) move this into actual processing for both version
+ txn->getClient()->setRemoteID(handshake.getRid());
+
+ status = repl::getGlobalReplicationCoordinator()->processHandshake(txn, handshake);
+ return appendCommandStatus(result, status);
+ }
+
+ } handshakeCmd;
+
+
+ /**
+ * Create an appropriate new locker for the storage engine in use. Caller owns the return.
+ */
+ Locker* newLocker() {
+ if (isMMAPV1()) {
+ return new MMAPV1LockerImpl();
+ }
+
+ return new LockerImpl<false>();
+ }
+
+} // namespace
+
+
boost::mutex Client::clientsMutex;
ClientSet Client::clients;
TSP_DEFINE(Client, currentClient)
- /* each thread which does db operations has a Client object in TLS.
- call this when your thread starts.
- */
+ /**
+ * This must be called whenever a new thread is started, so that active threads can be tracked
+ * so each thread has a Client object in TLS.
+ */
void Client::initThread(const char *desc, AbstractMessagingPort *mp) {
invariant(currentClient.get() == 0);
@@ -112,17 +167,6 @@ namespace mongo {
clients.insert(client);
}
-namespace {
- // Create an appropriate new locker for the storage engine in use. Caller owns.
- Locker* newLocker() {
- if (isMMAPV1()) {
- return new MMAPV1LockerImpl();
- }
-
- return new LockerImpl<false>();
- }
-}
-
Client::Client(const string& desc, AbstractMessagingPort *p)
: ClientBasic(p),
_desc(desc),
@@ -169,11 +213,9 @@ namespace {
return false;
}
- BSONObj CachedBSONObjBase::_tooBig = fromjson("{\"$msg\":\"query not recording (too large)\"}");
Client::Context::Context(OperationContext* txn, const std::string& ns, Database * db)
- : _client(currentClient.get()),
- _justCreated(false),
+ : _justCreated(false),
_doVersion(true),
_ns(ns),
_db(db),
@@ -184,8 +226,7 @@ namespace {
const std::string& ns,
Database* db,
bool justCreated)
- : _client(currentClient.get()),
- _justCreated(justCreated),
+ : _justCreated(justCreated),
_doVersion(true),
_ns(ns),
_db(db),
@@ -196,8 +237,7 @@ namespace {
Client::Context::Context(OperationContext* txn,
const string& ns,
bool doVersion)
- : _client(currentClient.get()),
- _justCreated(false), // set for real in finishInit
+ : _justCreated(false), // set for real in finishInit
_doVersion(doVersion),
_ns(ns),
_db(NULL),
@@ -206,6 +246,42 @@ namespace {
_finishInit();
}
+ void Client::Context::_finishInit() {
+ _db = dbHolder().get(_txn, _ns);
+ if (_db) {
+ _justCreated = false;
+ }
+ else {
+ invariant(_txn->lockState()->isDbLockedForMode(nsToDatabaseSubstring(_ns), MODE_X));
+ _db = dbHolder().openDb(_txn, _ns, &_justCreated);
+ invariant(_db);
+ }
+
+ if (_doVersion) {
+ _checkNotStale();
+ }
+
+ _txn->getCurOp()->enter(_ns.c_str(), _db->getProfilingLevel());
+ }
+
+ void Client::Context::_checkNotStale() const {
+ switch (_txn->getCurOp()->getOp()) {
+ case dbGetMore: // getMore is special and should be handled elsewhere.
+ case dbUpdate: // update & delete check shard version in instance.cpp, so don't check
+ case dbDelete: // here as well.
+ break;
+ default:
+ ensureShardVersionOKOrThrow(_ns);
+ }
+ }
+
+ Client::Context::~Context() {
+ // Lock must still be held
+ invariant(_txn->lockState()->isLocked());
+
+ _txn->getCurOp()->recordGlobalTime(_txn->lockState()->isWriteLocked(), _timer.micros());
+ }
+
AutoGetDb::AutoGetDb(OperationContext* txn, StringData ns, LockMode mode)
: _dbLock(txn->lockState(), ns, mode),
@@ -294,42 +370,6 @@ namespace {
}
}
- void Client::Context::checkNotStale() const {
- switch ( _client->_curOp->getOp() ) {
- case dbGetMore: // getMore's are special and should be handled else where
- case dbUpdate: // update & delete check shard version in instance.cpp, so don't check here as well
- case dbDelete:
- break;
- default: {
- ensureShardVersionOKOrThrow(_ns);
- }
- }
- }
-
- void Client::Context::_finishInit() {
- _db = dbHolder().get(_txn, _ns);
- if (_db) {
- _justCreated = false;
- }
- else {
- invariant(_txn->lockState()->isDbLockedForMode(nsToDatabaseSubstring(_ns), MODE_X));
- _db = dbHolder().openDb(_txn, _ns, &_justCreated);
- invariant(_db);
- }
-
- if( _doVersion ) checkNotStale();
-
- _client->_curOp->enter(_ns.c_str(), _db->getProfilingLevel());
- }
-
- Client::Context::~Context() {
- DEV verify( _client == currentClient.get() );
-
- // Lock must still be held
- invariant(_txn->lockState()->isLocked());
-
- _client->_curOp->recordGlobalTime(_txn->lockState()->isWriteLocked(), _timer.micros());
- }
void Client::appendLastOp( BSONObjBuilder& b ) const {
// _lastOp is never set if replication is off
@@ -375,38 +415,6 @@ namespace {
return currentClient.get();
}
- class HandshakeCmd : public Command {
- public:
- void help(stringstream& h) const { h << "internal"; }
- HandshakeCmd() : Command( "handshake" ) {}
- virtual bool isWriteCommandForConfigServer() const { return false; }
- virtual bool slaveOk() const { return true; }
- virtual bool adminOnly() const { return false; }
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) {
- ActionSet actions;
- actions.addAction(ActionType::internal);
- out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
- }
- virtual bool run(OperationContext* txn, const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
- repl::HandshakeArgs handshake;
- Status status = handshake.initialize(cmdObj);
- if (!status.isOK()) {
- return appendCommandStatus(result, status);
- }
-
- // TODO(dannenberg) move this into actual processing for both version
- txn->getClient()->setRemoteID(handshake.getRid());
-
- status = repl::getGlobalReplicationCoordinator()->processHandshake(txn,
- handshake);
- return appendCommandStatus(result, status);
- }
-
- } handshakeCmd;
-
-
void OpDebug::reset() {
extra.reset();