// client.cpp
/**
* Copyright (C) 2009 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
/* Client represents a connection to the database (the server-side) and corresponds
to an open socket (or logical connection if pooling on sockets) from a client.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
#include "mongo/platform/basic.h"
#include "mongo/db/client.h"
#include
#include
#include "mongo/base/status.h"
#include "mongo/bson/mutable/document.h"
#include "mongo/db/auth/action_set.h"
#include "mongo/db/auth/action_type.h"
#include "mongo/db/auth/authorization_manager_global.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/auth/authz_session_external_state_d.h"
#include "mongo/db/auth/privilege.h"
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/commands.h"
#include "mongo/db/curop.h"
#include "mongo/db/dbwebserver.h"
#include "mongo/db/instance.h"
#include "mongo/db/json.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/lasterror.h"
#include "mongo/db/repl/handshake_args.h"
#include "mongo/db/repl/repl_coordinator_global.h"
#include "mongo/db/storage_options.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/d_state.h"
#include "mongo/scripting/engine.h"
#include "mongo/util/concurrency/thread_name.h"
#include "mongo/util/file_allocator.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/log.h"
namespace mongo {
using logger::LogComponent;
boost::mutex Client::clientsMutex;
ClientSet Client::clients;
TSP_DEFINE(Client, currentClient)
/* each thread which does db operations has a Client object in TLS.
call this when your thread starts.
*/
void Client::initThread(const char *desc, AbstractMessagingPort *mp) {
invariant(currentClient.get() == 0);
string fullDesc;
if (mp != NULL) {
fullDesc = str::stream() << desc << mp->connectionId();
}
else {
fullDesc = desc;
}
setThreadName(fullDesc.c_str());
mongo::lastError.initThread();
// Create the client obj, attach to thread
Client* client = new Client(fullDesc, mp);
client->setAuthorizationSession(
new AuthorizationSession(
new AuthzSessionExternalStateMongod(getGlobalAuthorizationManager())));
currentClient.reset(client);
// This makes the client visible to maintenance threads
boost::mutex::scoped_lock clientLock(clientsMutex);
clients.insert(client);
}
Client::Client(const string& desc, AbstractMessagingPort *p)
: ClientBasic(p),
_desc(desc),
_threadId(boost::this_thread::get_id()),
_connectionId(p ? p->connectionId() : 0),
_god(0),
_txn(NULL),
_lastOp(0),
_shutdown(false) {
_curOp = new CurOp( this );
}
Client::~Client() {
_god = 0;
if ( ! inShutdown() ) {
// we can't clean up safely once we're in shutdown
{
boost::mutex::scoped_lock clientLock(clientsMutex);
if ( ! _shutdown )
clients.erase(this);
}
CurOp* last;
do {
last = _curOp;
delete _curOp;
// _curOp may have been reset to _curOp->_wrapped
} while (_curOp != last);
}
}
bool Client::shutdown() {
_shutdown = true;
if ( inShutdown() )
return false;
{
boost::mutex::scoped_lock clientLock(clientsMutex);
clients.erase(this);
}
return false;
}
BSONObj CachedBSONObjBase::_tooBig = fromjson("{\"$msg\":\"query not recording (too large)\"}");
Client::Context::Context(OperationContext* txn, const std::string& ns, Database * db)
: _client(currentClient.get()),
_justCreated(false),
_doVersion(true),
_ns(ns),
_db(db),
_txn(txn) {
}
Client::Context::Context(OperationContext* txn,
const std::string& ns,
Database* db,
bool justCreated)
: _client(currentClient.get()),
_justCreated(justCreated),
_doVersion(true),
_ns(ns),
_db(db),
_txn(txn) {
_finishInit();
}
Client::Context::Context(OperationContext* txn,
const string& ns,
bool doVersion)
: _client(currentClient.get()),
_justCreated(false), // set for real in finishInit
_doVersion(doVersion),
_ns(ns),
_db(NULL),
_txn(txn) {
_finishInit();
}
AutoGetDb::AutoGetDb(OperationContext* txn, const StringData& ns, LockMode mode)
: _dbLock(txn->lockState(), ns, mode),
_db(dbHolder().get(txn, ns)) {
}
AutoGetOrCreateDb::AutoGetOrCreateDb(OperationContext* txn,
const StringData& ns,
LockMode mode)
: _transaction(txn, MODE_IX),
_dbLock(txn->lockState(), ns, mode),
_db(dbHolder().get(txn, ns)) {
invariant(mode == MODE_IX || mode == MODE_X);
_justCreated = false;
// If the database didn't exist, relock in MODE_X
if (_db == NULL) {
if (mode != MODE_X) {
_dbLock.relockWithMode(MODE_X);
}
_db = dbHolder().openDb(txn, ns);
_justCreated = true;
}
}
AutoGetCollectionForRead::AutoGetCollectionForRead(OperationContext* txn,
const std::string& ns)
: _txn(txn),
_db(_txn, nsToDatabaseSubstring(ns), MODE_IS),
_collLock(_txn->lockState(), ns, MODE_IS),
_coll(NULL) {
_init(ns, nsToCollectionSubstring(ns));
}
AutoGetCollectionForRead::AutoGetCollectionForRead(OperationContext* txn,
const NamespaceString& nss)
: _txn(txn),
_db(_txn, nss.db(), MODE_IS),
_collLock(_txn->lockState(), nss.toString(), MODE_IS),
_coll(NULL) {
_init(nss.toString(), nss.coll());
}
void AutoGetCollectionForRead::_init(const std::string& ns, const StringData& coll) {
massert(28535, "need a non-empty collection name", !coll.empty());
// TODO: Client::Context legacy, needs to be removed
_txn->getCurOp()->ensureStarted();
_txn->getCurOp()->setNS(ns);
// We have both the DB and collection locked, which the prerequisite to do a stable shard
// version check.
ensureShardVersionOKOrThrow(ns);
// At this point, we are locked in shared mode for the database by the DB lock in the
// constructor, so it is safe to load the DB pointer.
if (_db.getDb()) {
// TODO: Client::Context legacy, needs to be removed
_txn->getCurOp()->enter(ns.c_str(), _db.getDb()->getProfilingLevel());
_coll = _db.getDb()->getCollection(_txn, ns);
}
}
AutoGetCollectionForRead::~AutoGetCollectionForRead() {
// Report time spent in read lock
_txn->getCurOp()->recordGlobalTime(false, _timer.micros());
}
Client::WriteContext::WriteContext(OperationContext* opCtx, const std::string& ns)
: _txn(opCtx),
_nss(ns),
_autodb(opCtx, _nss.db(), MODE_IX),
_collk(opCtx->lockState(), ns, MODE_IX),
_c(opCtx, ns, _autodb.getDb(), _autodb.justCreated()) {
_collection = _c.db()->getCollection( _txn, ns );
if ( !_collection && !_autodb.justCreated() ) {
// relock in MODE_X
_collk.relockWithMode( MODE_X, _autodb.lock() );
Database* db = dbHolder().get(_txn, ns );
invariant( db == _c.db() );
}
}
void Client::Context::checkNotStale() const {
switch ( _client->_curOp->getOp() ) {
case dbGetMore: // getMore's are special and should be handled else where
case dbUpdate: // update & delete check shard version in instance.cpp, so don't check here as well
case dbDelete:
break;
default: {
ensureShardVersionOKOrThrow(_ns);
}
}
}
void Client::Context::_finishInit() {
_db = dbHolder().get(_txn, _ns);
if (_db) {
_justCreated = false;
}
else {
invariant(_txn->lockState()->isDbLockedForMode(nsToDatabaseSubstring(_ns), MODE_X));
_db = dbHolder().openDb(_txn, _ns, &_justCreated);
invariant(_db);
}
if( _doVersion ) checkNotStale();
_client->_curOp->enter(_ns.c_str(), _db->getProfilingLevel());
}
Client::Context::~Context() {
DEV verify( _client == currentClient.get() );
// Lock must still be held
invariant(_txn->lockState()->isLocked());
_client->_curOp->recordGlobalTime(_txn->lockState()->isWriteLocked(), _timer.micros());
}
void Client::appendLastOp( BSONObjBuilder& b ) const {
// _lastOp is never set if replication is off
if (repl::getGlobalReplicationCoordinator()->getReplicationMode() ==
repl::ReplicationCoordinator::modeReplSet || !_lastOp.isNull()) {
b.appendTimestamp( "lastOp" , _lastOp.asDate() );
}
}
void Client::reportState(BSONObjBuilder& builder) {
builder.append("desc", desc());
std::stringstream ss;
ss << _threadId;
builder.append("threadId", ss.str());
if (_connectionId) {
builder.appendNumber("connectionId", _connectionId);
}
}
void Client::setOperationContext(OperationContext* txn) {
// We can only set or unset the OperationContexts, never swap them.
invariant((txn == NULL) ^ (_txn == NULL));
boost::unique_lock uniqueLock(_lock);
_txn = txn;
}
string Client::clientAddress(bool includePort) const {
if( _curOp )
return _curOp->getRemoteString(includePort);
return "";
}
ClientBasic* ClientBasic::getCurrent() {
return currentClient.get();
}
class HandshakeCmd : public Command {
public:
void help(stringstream& h) const { h << "internal"; }
HandshakeCmd() : Command( "handshake" ) {}
virtual bool isWriteCommandForConfigServer() const { return false; }
virtual bool slaveOk() const { return true; }
virtual bool adminOnly() const { return false; }
virtual void addRequiredPrivileges(const std::string& dbname,
const BSONObj& cmdObj,
std::vector* out) {
ActionSet actions;
actions.addAction(ActionType::internal);
out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
}
virtual bool run(OperationContext* txn, const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
repl::HandshakeArgs handshake;
Status status = handshake.initialize(cmdObj);
if (!status.isOK()) {
return appendCommandStatus(result, status);
}
// TODO(dannenberg) move this into actual processing for both version
txn->getClient()->setRemoteID(handshake.getRid());
status = repl::getGlobalReplicationCoordinator()->processHandshake(txn,
handshake);
return appendCommandStatus(result, status);
}
} handshakeCmd;
void OpDebug::reset() {
extra.reset();
op = 0;
iscommand = false;
ns = "";
query = BSONObj();
updateobj = BSONObj();
cursorid = -1;
ntoreturn = -1;
ntoskip = -1;
exhaust = false;
nscanned = -1;
nscannedObjects = -1;
idhack = false;
scanAndOrder = false;
nMatched = -1;
nModified = -1;
ninserted = -1;
ndeleted = -1;
nmoved = -1;
fastmod = false;
fastmodinsert = false;
upsert = false;
keyUpdates = 0; // unsigned, so -1 not possible
planSummary = "";
execStats.reset();
exceptionInfo.reset();
executionTime = 0;
nreturned = -1;
responseLength = -1;
}
#define OPDEBUG_TOSTRING_HELP(x) if( x >= 0 ) s << " " #x ":" << (x)
#define OPDEBUG_TOSTRING_HELP_BOOL(x) if( x ) s << " " #x ":" << (x)
string OpDebug::report( const CurOp& curop ) const {
StringBuilder s;
if ( iscommand )
s << "command ";
else
s << opToString( op ) << ' ';
s << ns.toString();
if ( ! query.isEmpty() ) {
if ( iscommand ) {
s << " command: ";
Command* curCommand = curop.getCommand();
if (curCommand) {
mutablebson::Document cmdToLog(query, mutablebson::Document::kInPlaceDisabled);
curCommand->redactForLogging(&cmdToLog);
s << curCommand->name << " ";
s << cmdToLog.toString();
}
else { // Should not happen but we need to handle curCommand == NULL gracefully
s << query.toString();
}
}
else {
s << " query: ";
s << query.toString();
}
}
if (!planSummary.empty()) {
s << " planSummary: " << planSummary.toString();
}
if ( ! updateobj.isEmpty() ) {
s << " update: ";
updateobj.toString( s );
}
OPDEBUG_TOSTRING_HELP( cursorid );
OPDEBUG_TOSTRING_HELP( ntoreturn );
OPDEBUG_TOSTRING_HELP( ntoskip );
OPDEBUG_TOSTRING_HELP_BOOL( exhaust );
OPDEBUG_TOSTRING_HELP( nscanned );
OPDEBUG_TOSTRING_HELP( nscannedObjects );
OPDEBUG_TOSTRING_HELP_BOOL( idhack );
OPDEBUG_TOSTRING_HELP_BOOL( scanAndOrder );
OPDEBUG_TOSTRING_HELP( nmoved );
OPDEBUG_TOSTRING_HELP( nMatched );
OPDEBUG_TOSTRING_HELP( nModified );
OPDEBUG_TOSTRING_HELP( ninserted );
OPDEBUG_TOSTRING_HELP( ndeleted );
OPDEBUG_TOSTRING_HELP_BOOL( fastmod );
OPDEBUG_TOSTRING_HELP_BOOL( fastmodinsert );
OPDEBUG_TOSTRING_HELP_BOOL( upsert );
OPDEBUG_TOSTRING_HELP( keyUpdates );
if ( extra.len() )
s << " " << extra.str();
if ( ! exceptionInfo.empty() ) {
s << " exception: " << exceptionInfo.msg;
if ( exceptionInfo.code )
s << " code:" << exceptionInfo.code;
}
if (!getGlobalEnvironment()->getGlobalStorageEngine()->supportsDocLocking())
s << " numYields:" << curop.numYields();
s << " ";
OPDEBUG_TOSTRING_HELP( nreturned );
if ( responseLength > 0 )
s << " reslen:" << responseLength;
s << " " << executionTime << "ms";
return s.str();
}
namespace {
/**
* Appends {name: obj} to the provided builder. If obj is greater than maxSize, appends a
* string summary of obj instead of the object itself.
*/
void appendAsObjOrString(const StringData& name,
const BSONObj& obj,
size_t maxSize,
BSONObjBuilder* builder) {
if (static_cast(obj.objsize()) <= maxSize) {
builder->append(name, obj);
}
else {
// Generate an abbreviated serialization for the object, by passing false as the
// "full" argument to obj.toString().
const bool isArray = false;
const bool full = false;
std::string objToString = obj.toString(isArray, full);
if (objToString.size() <= maxSize) {
builder->append(name, objToString);
}
else {
// objToString is still too long, so we append to the builder a truncated form
// of objToString concatenated with "...". Instead of creating a new string
// temporary, mutate objToString to do this (we know that we can mutate
// characters in objToString up to and including objToString[maxSize]).
objToString[maxSize - 3] = '.';
objToString[maxSize - 2] = '.';
objToString[maxSize - 1] = '.';
builder->append(name, StringData(objToString).substr(0, maxSize));
}
}
}
}
#define OPDEBUG_APPEND_NUMBER(x) if( x != -1 ) b.appendNumber( #x , (x) )
#define OPDEBUG_APPEND_BOOL(x) if( x ) b.appendBool( #x , (x) )
void OpDebug::append(const CurOp& curop, BSONObjBuilder& b) const {
const size_t maxElementSize = 50 * 1024;
b.append( "op" , iscommand ? "command" : opToString( op ) );
b.append( "ns" , ns.toString() );
if (!query.isEmpty()) {
appendAsObjOrString(iscommand ? "command" : "query", query, maxElementSize, &b);
}
else if (!iscommand && curop.haveQuery()) {
appendAsObjOrString("query", curop.query(), maxElementSize, &b);
}
if (!updateobj.isEmpty()) {
appendAsObjOrString("updateobj", updateobj, maxElementSize, &b);
}
const bool moved = (nmoved >= 1);
OPDEBUG_APPEND_NUMBER( cursorid );
OPDEBUG_APPEND_NUMBER( ntoreturn );
OPDEBUG_APPEND_NUMBER( ntoskip );
OPDEBUG_APPEND_BOOL( exhaust );
OPDEBUG_APPEND_NUMBER( nscanned );
OPDEBUG_APPEND_NUMBER( nscannedObjects );
OPDEBUG_APPEND_BOOL( idhack );
OPDEBUG_APPEND_BOOL( scanAndOrder );
OPDEBUG_APPEND_BOOL( moved );
OPDEBUG_APPEND_NUMBER( nmoved );
OPDEBUG_APPEND_NUMBER( nMatched );
OPDEBUG_APPEND_NUMBER( nModified );
OPDEBUG_APPEND_NUMBER( ninserted );
OPDEBUG_APPEND_NUMBER( ndeleted );
OPDEBUG_APPEND_BOOL( fastmod );
OPDEBUG_APPEND_BOOL( fastmodinsert );
OPDEBUG_APPEND_BOOL( upsert );
OPDEBUG_APPEND_NUMBER( keyUpdates );
if (!getGlobalEnvironment()->getGlobalStorageEngine()->supportsDocLocking())
b.appendNumber( "numYield" , curop.numYields() );
if ( ! exceptionInfo.empty() )
exceptionInfo.append( b , "exception" , "exceptionCode" );
OPDEBUG_APPEND_NUMBER( nreturned );
OPDEBUG_APPEND_NUMBER( responseLength );
b.append( "millis" , executionTime );
execStats.append(b, "execStats");
}
void saveGLEStats(const BSONObj& result, const std::string& conn) {
// This can be called in mongod, which is unfortunate. To fix this,
// we can redesign how connection pooling works on mongod for sharded operations.
}
}