// instance.cpp
/**
* Copyright (C) 2008 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand
#include "mongo/platform/basic.h"
#include
#include
#include
#include
#include "mongo/base/status.h"
#include "mongo/db/audit.h"
#include "mongo/db/auth/action_type.h"
#include "mongo/db/auth/authorization_manager.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/background.h"
#include "mongo/db/catalog/index_create.h"
#include "mongo/db/clientcursor.h"
#include "mongo/db/commands.h"
#include "mongo/db/commands/fsync.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/lock_state.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/db.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/dbmessage.h"
#include "mongo/db/exec/delete.h"
#include "mongo/db/exec/update.h"
#include "mongo/db/service_context.h"
#include "mongo/db/global_timestamp.h"
#include "mongo/db/instance.h"
#include "mongo/db/introspect.h"
#include "mongo/db/json.h"
#include "mongo/db/lasterror.h"
#include "mongo/db/matcher/matcher.h"
#include "mongo/db/mongod_options.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/op_observer.h"
#include "mongo/db/operation_context_impl.h"
#include "mongo/db/ops/delete_request.h"
#include "mongo/db/ops/insert.h"
#include "mongo/db/ops/parsed_delete.h"
#include "mongo/db/ops/parsed_update.h"
#include "mongo/db/ops/update_driver.h"
#include "mongo/db/ops/update_lifecycle_impl.h"
#include "mongo/db/ops/update_request.h"
#include "mongo/db/query/find.h"
#include "mongo/db/query/get_executor.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/storage/storage_engine.h"
#include "mongo/db/storage_options.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/process_id.h"
#include "mongo/s/d_state.h"
#include "mongo/s/stale_exception.h" // for SendStaleConfigException
#include "mongo/scripting/engine.h"
#include "mongo/util/exit.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/quick_exit.h"
#include "mongo/util/time_support.h"
namespace mongo {
using boost::scoped_ptr;
using logger::LogComponent;
using std::auto_ptr;
using std::endl;
using std::hex;
using std::ios;
using std::ofstream;
using std::string;
using std::stringstream;
using std::vector;
// for diaglog
inline void opread(Message& m) {
if (_diaglog.getLevel() & 2) {
_diaglog.readop(m.singleData().view2ptr(), m.header().getLen());
}
}
inline void opwrite(Message& m) {
if (_diaglog.getLevel() & 1) {
_diaglog.writeop(m.singleData().view2ptr(), m.header().getLen());
}
}
void receivedKillCursors(OperationContext* txn, Message& m);
void receivedUpdate(OperationContext* txn,
const NamespaceString& nsString,
Message& m,
CurOp& op);
void receivedDelete(OperationContext* txn,
const NamespaceString& nsString,
Message& m,
CurOp& op);
void receivedInsert(OperationContext* txn,
const NamespaceString& nsString,
Message& m,
CurOp& op);
bool receivedGetMore(OperationContext* txn,
DbResponse& dbresponse,
Message& m,
CurOp& curop);
int nloggedsome = 0;
#define LOGWITHRATELIMIT if( ++nloggedsome < 1000 || nloggedsome % 100 == 0 )
string dbExecCommand;
MONGO_FP_DECLARE(rsStopGetMore);
namespace {
void generateErrorResponse(const AssertionException* exception,
const QueryMessage& queryMessage,
CurOp* curop,
Message* response) {
curop->debug().exceptionInfo = exception->getInfo();
log(LogComponent::kQuery) << "assertion " << exception->toString()
<< " ns:" << queryMessage.ns << " query:"
<< (queryMessage.query.valid() ? queryMessage.query.toString()
: "query object is corrupt");
if (queryMessage.ntoskip || queryMessage.ntoreturn) {
log(LogComponent::kQuery) << " ntoskip:" << queryMessage.ntoskip
<< " ntoreturn:" << queryMessage.ntoreturn;
}
const SendStaleConfigException* scex = (exception->getCode() == SendStaleConfigCode)
? static_cast(exception)
: NULL;
BSONObjBuilder err;
exception->getInfo().append(err);
if (scex) {
err.append("ns", scex->getns());
scex->getVersionReceived().addToBSON(err, "vReceived");
scex->getVersionWanted().addToBSON(err, "vWanted");
}
BSONObj errObj = err.done();
if (scex) {
log(LogComponent::kQuery) << "stale version detected during query over "
<< queryMessage.ns << " : " << errObj;
}
BufBuilder bb;
bb.skip(sizeof(QueryResult::Value));
bb.appendBuf((void*) errObj.objdata(), errObj.objsize());
// TODO: call replyToQuery() from here instead of this!!! see dbmessage.h
QueryResult::View msgdata = bb.buf();
bb.decouple();
QueryResult::View qr = msgdata;
qr.setResultFlags(ResultFlag_ErrSet);
if (scex) qr.setResultFlags(qr.getResultFlags() | ResultFlag_ShardConfigStale);
qr.msgdata().setLen(bb.len());
qr.msgdata().setOperation(opReply);
qr.setCursorId(0);
qr.setStartingFrom(0);
qr.setNReturned(1);
response->setData(msgdata.view2ptr(), true);
}
} // namespace
static void receivedCommand(OperationContext* txn,
const NamespaceString& nss,
Client& client,
DbResponse& dbResponse,
Message& message) {
invariant(nss.isCommand());
const MSGID responseTo = message.header().getId();
DbMessage dbMessage(message);
QueryMessage queryMessage(dbMessage);
CurOp* op = CurOp::get(client);
std::unique_ptr response(new Message());
try {
// Do the namespace validity check under the try/catch block so it does not cause the
// connection to be terminated.
uassert(ErrorCodes::InvalidNamespace,
str::stream() << "Invalid ns [" << nss.ns() << "]",
nss.isValid());
// Auth checking for Commands happens later.
int nToReturn = queryMessage.ntoreturn;
beginQueryOp(nss, queryMessage.query, nToReturn, queryMessage.ntoskip, op);
op->markCommand();
uassert(16979, str::stream() << "bad numberToReturn (" << nToReturn
<< ") for $cmd type ns - can only be 1 or -1",
nToReturn == 1 || nToReturn == -1);
BufBuilder bb;
bb.skip(sizeof(QueryResult::Value));
BSONObjBuilder cmdResBuf;
if (!runCommands(txn, queryMessage.ns, queryMessage.query, *op, bb, cmdResBuf, false,
queryMessage.queryOptions)) {
uasserted(13530, "bad or malformed command request?");
}
op->debug().iscommand = true;
// TODO: Does this get overwritten/do we really need to set this twice?
op->debug().query = queryMessage.query;
QueryResult::View qr = bb.buf();
bb.decouple();
qr.setResultFlagsToOk();
qr.msgdata().setLen(bb.len());
op->debug().responseLength = bb.len();
qr.msgdata().setOperation(opReply);
qr.setCursorId(0);
qr.setStartingFrom(0);
qr.setNReturned(1);
response->setData(qr.view2ptr(), true);
invariant(!response->empty());
}
catch (const AssertionException& exception) {
response.reset(new Message());
generateErrorResponse(&exception, queryMessage, op, response.get());
}
op->debug().responseLength = response->header().dataLen();
dbResponse.response = response.release();
dbResponse.responseTo = responseTo;
}
namespace {
// In SERVER-7775 we reimplemented the pseudo-commands fsyncUnlock, inProg, and killOp
// as ordinary commands. To support old clients for another release, this helper serves
// to execute the real command from the legacy pseudo-command codepath.
// TODO: remove after MongoDB 3.2 is released
void receivedPseudoCommand(OperationContext* txn,
Client& client,
DbResponse& dbResponse,
Message& message,
StringData realCommandName) {
DbMessage originalDbm(message);
originalDbm.pullInt(); // ntoskip
originalDbm.pullInt(); // ntoreturn
auto cmdParams = originalDbm.nextJsObj();
Message interposed;
// HACK:
// legacy pseudo-commands could run on any database. The command replacements
// can only run on 'admin'. To avoid breaking old shells and a multitude
// of third-party tools, we rewrite the namespace. As auth is checked
// later in Command::_checkAuthorizationImpl, we will still properly
// reject the request if the client is not authorized.
NamespaceString interposedNss("admin", "$cmd");
BSONObjBuilder cmdBob;
cmdBob.append(realCommandName, 1);
cmdBob.appendElements(cmdParams);
auto cmd = cmdBob.done();
// TODO: use OP_COMMAND here instead of constructing
// a legacy OP_QUERY style command
BufBuilder cmdMsgBuf;
int32_t flags = DataView(message.header().data()).read>();
cmdMsgBuf.appendNum(flags);
cmdMsgBuf.appendStr(interposedNss.db(), false); // not including null byte
cmdMsgBuf.appendStr(".$cmd");
cmdMsgBuf.appendNum(0); // ntoskip
cmdMsgBuf.appendNum(1); // ntoreturn
cmdMsgBuf.appendBuf(cmd.objdata(), cmd.objsize());
interposed.setData(dbQuery, cmdMsgBuf.buf(), cmdMsgBuf.len());
interposed.header().setId(message.header().getId());
receivedCommand(txn, interposedNss, client, dbResponse, interposed);
}
} // namespace
static void receivedQuery(OperationContext* txn,
const NamespaceString& nss,
Client& c,
DbResponse& dbResponse,
Message& m) {
invariant(!nss.isCommand());
MSGID responseTo = m.header().getId();
DbMessage d(m);
QueryMessage q(d);
auto_ptr< Message > resp( new Message() );
CurOp& op = *CurOp::get(c);
try {
Client* client = txn->getClient();
Status status = client->getAuthorizationSession()->checkAuthForQuery(nss, q.query);
audit::logQueryAuthzCheck(client, nss, q.query, status.code());
uassertStatusOK(status);
dbResponse.exhaustNS = runQuery(txn, q, nss, op, *resp);
verify( !resp->empty() );
}
catch (const AssertionException& exception) {
resp.reset(new Message());
generateErrorResponse(&exception, q, &op, resp.get());
}
op.debug().responseLength = resp->header().dataLen();
dbResponse.response = resp.release();
dbResponse.responseTo = responseTo;
}
// Mongod on win32 defines a value for this function. In all other executables it is NULL.
void (*reportEventToSystem)(const char *msg) = 0;
void mongoAbort(const char *msg) {
if( reportEventToSystem )
reportEventToSystem(msg);
severe() << msg;
::abort();
}
// Returns false when request includes 'end'
void assembleResponse( OperationContext* txn,
Message& m,
DbResponse& dbresponse,
const HostAndPort& remote) {
// before we lock...
int op = m.operation();
bool isCommand = false;
DbMessage dbmsg(m);
Client& c = *txn->getClient();
if (!txn->getClient()->isInDirectClient()) {
c.getAuthorizationSession()->startRequest(txn);
// We should not be holding any locks at this point
invariant(!txn->lockState()->isLocked());
}
const char* ns = dbmsg.messageShouldHaveNs() ? dbmsg.getns() : NULL;
const NamespaceString nsString = ns ? NamespaceString(ns) : NamespaceString();
if ( op == dbQuery ) {
if (nsString.isCommand()) {
isCommand = true;
opwrite(m);
}
// TODO: remove this entire code path after 3.2. Refs SERVER-7775
else if (nsString.isSpecialCommand()) {
opwrite(m);
if (nsString.coll() == "$cmd.sys.inprog") {
receivedPseudoCommand(txn, c, dbresponse, m, "currentOp");
return;
}
if (nsString.coll() == "$cmd.sys.killop") {
receivedPseudoCommand(txn, c, dbresponse, m, "killOp");
return;
}
if (nsString.coll() == "$cmd.sys.unlock") {
receivedPseudoCommand(txn, c, dbresponse, m, "fsyncUnlock");
return;
}
}
else {
opread(m);
}
}
else if( op == dbGetMore ) {
opread(m);
}
else {
opwrite(m);
}
// Increment op counters.
switch (op) {
case dbQuery:
if (!isCommand) {
globalOpCounters.gotQuery();
}
else {
// Command counting is deferred, since it is not known yet whether the command
// needs counting.
}
break;
case dbGetMore:
globalOpCounters.gotGetMore();
break;
case dbInsert:
// Insert counting is deferred, since it is not known yet whether the insert contains
// multiple documents (each of which needs to be counted).
break;
case dbUpdate:
globalOpCounters.gotUpdate();
break;
case dbDelete:
globalOpCounters.gotDelete();
break;
}
scoped_ptr nestedOp;
if (CurOp::get(c)->active()) {
nestedOp.reset(new CurOp(&c));
}
CurOp& currentOp = *CurOp::get(c);
currentOp.reset(remote,op);
OpDebug& debug = currentOp.debug();
debug.op = op;
long long logThreshold = serverGlobalParams.slowMS;
LogComponent responseComponent(LogComponent::kQuery);
if (op == dbInsert ||
op == dbDelete ||
op == dbUpdate) {
responseComponent = LogComponent::kWrite;
}
else if (isCommand) {
responseComponent = LogComponent::kCommand;
}
bool shouldLog = logger::globalLogDomain()->shouldLog(responseComponent,
logger::LogSeverity::Debug(1));
if ( op == dbQuery ) {
if (isCommand) {
receivedCommand(txn, nsString, c, dbresponse, m);
}
else {
receivedQuery(txn, nsString, c, dbresponse, m);
}
}
else if ( op == dbGetMore ) {
if ( ! receivedGetMore(txn, dbresponse, m, currentOp) )
shouldLog = true;
}
else if ( op == dbMsg ) {
// deprecated - replaced by commands
const char *p = dbmsg.getns();
int len = strlen(p);
if ( len > 400 )
log(LogComponent::kQuery) << curTimeMillis64() % 10000 <<
" long msg received, len:" << len << endl;
Message *resp = new Message();
if ( strcmp( "end" , p ) == 0 )
resp->setData( opReply , "dbMsg end no longer supported" );
else
resp->setData( opReply , "i am fine - dbMsg deprecated");
dbresponse.response = resp;
dbresponse.responseTo = m.header().getId();
}
else {
try {
// The following operations all require authorization.
// dbInsert, dbUpdate and dbDelete can be easily pre-authorized,
// here, but dbKillCursors cannot.
if ( op == dbKillCursors ) {
currentOp.ensureStarted();
logThreshold = 10;
receivedKillCursors(txn, m);
}
else if (op != dbInsert && op != dbUpdate && op != dbDelete) {
log(LogComponent::kQuery) << " operation isn't supported: " << op << endl;
currentOp.done();
shouldLog = true;
}
else {
if (remote != DBDirectClient::dummyHost) {
const ShardedConnectionInfo* connInfo = ShardedConnectionInfo::get(false);
uassert(18663,
str::stream() << "legacy writeOps not longer supported for "
<< "versioned connections, ns: " << nsString.ns()
<< ", op: " << opToString(op)
<< ", remote: " << remote.toString(),
connInfo == NULL);
}
if (!nsString.isValid()) {
uassert(16257, str::stream() << "Invalid ns [" << ns << "]", false);
}
else if (op == dbInsert) {
receivedInsert(txn, nsString, m, currentOp);
}
else if (op == dbUpdate) {
receivedUpdate(txn, nsString, m, currentOp);
}
else if (op == dbDelete) {
receivedDelete(txn, nsString, m, currentOp);
}
else {
invariant(false);
}
}
}
catch (const UserException& ue) {
setLastError(ue.getCode(), ue.getInfo().msg.c_str());
MONGO_LOG_COMPONENT(3, responseComponent)
<< " Caught Assertion in " << opToString(op) << ", continuing "
<< ue.toString() << endl;
debug.exceptionInfo = ue.getInfo();
}
catch (const AssertionException& e) {
setLastError(e.getCode(), e.getInfo().msg.c_str());
MONGO_LOG_COMPONENT(3, responseComponent)
<< " Caught Assertion in " << opToString(op) << ", continuing "
<< e.toString() << endl;
debug.exceptionInfo = e.getInfo();
shouldLog = true;
}
}
currentOp.ensureStarted();
currentOp.done();
debug.executionTime = currentOp.totalTimeMillis();
logThreshold += currentOp.getExpectedLatencyMs();
if ( shouldLog || debug.executionTime > logThreshold ) {
Locker::LockerInfo lockerInfo;
txn->lockState()->getLockerInfo(&lockerInfo);
MONGO_LOG_COMPONENT(0, responseComponent) << debug.report(currentOp, lockerInfo.stats);
}
if (currentOp.shouldDBProfile(debug.executionTime)) {
// Performance profiling is on
if (txn->lockState()->isReadLocked()) {
MONGO_LOG_COMPONENT(1, responseComponent)
<< "note: not profiling because recursive read lock";
}
else if (lockedForWriting()) {
MONGO_LOG_COMPONENT(1, responseComponent)
<< "note: not profiling because doing fsync+lock";
}
else {
profile(txn, op);
}
}
debug.recordStats();
debug.reset();
}
void receivedKillCursors(OperationContext* txn, Message& m) {
DbMessage dbmessage(m);
int n = dbmessage.pullInt();
uassert( 13659 , "sent 0 cursors to kill" , n != 0 );
massert( 13658 , str::stream() << "bad kill cursors size: " << m.dataSize() , m.dataSize() == 8 + ( 8 * n ) );
uassert( 13004 , str::stream() << "sent negative cursors to kill: " << n , n >= 1 );
if ( n > 2000 ) {
( n < 30000 ? warning() : error() ) << "receivedKillCursors, n=" << n << endl;
verify( n < 30000 );
}
const char* cursorArray = dbmessage.getArray(n);
int found = CursorManager::eraseCursorGlobalIfAuthorized(txn, n, cursorArray);
if ( shouldLog(logger::LogSeverity::Debug(1)) || found != n ) {
LOG( found == n ? 1 : 0 ) << "killcursors: found " << found << " of " << n << endl;
}
}
void receivedUpdate(OperationContext* txn,
const NamespaceString& nsString,
Message& m,
CurOp& op) {
DbMessage d(m);
uassertStatusOK(userAllowedWriteNS(nsString));
op.debug().ns = nsString.ns();
int flags = d.pullInt();
BSONObj query = d.nextJsObj();
verify( d.moreJSObjs() );
verify( query.objsize() < m.header().dataLen() );
BSONObj toupdate = d.nextJsObj();
uassert( 10055 , "update object too large", toupdate.objsize() <= BSONObjMaxUserSize);
verify( toupdate.objsize() < m.header().dataLen() );
verify( query.objsize() + toupdate.objsize() < m.header().dataLen() );
bool upsert = flags & UpdateOption_Upsert;
bool multi = flags & UpdateOption_Multi;
bool broadcast = flags & UpdateOption_Broadcast;
Status status = txn->getClient()->getAuthorizationSession()->checkAuthForUpdate(nsString,
query,
toupdate,
upsert);
audit::logUpdateAuthzCheck(txn->getClient(), nsString, query, toupdate, upsert, multi,
status.code());
uassertStatusOK(status);
op.debug().query = query;
op.setQuery(query);
UpdateRequest request(nsString);
request.setUpsert(upsert);
request.setMulti(multi);
request.setQuery(query);
request.setUpdates(toupdate);
UpdateLifecycleImpl updateLifecycle(broadcast, nsString);
request.setLifecycle(&updateLifecycle);
request.setYieldPolicy(PlanExecutor::YIELD_AUTO);
int attempt = 1;
while ( 1 ) {
try {
ParsedUpdate parsedUpdate(txn, &request);
uassertStatusOK(parsedUpdate.parseRequest());
// Tentatively take an intent lock, fix up if we need to create the collection
ScopedTransaction transaction(txn, MODE_IX);
Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_IX);
if (dbHolder().get(txn, nsString.db()) == NULL) {
// If DB doesn't exist, don't implicitly create it in OldClientContext
break;
}
Lock::CollectionLock collLock(txn->lockState(),
nsString.ns(),
parsedUpdate.isIsolated() ? MODE_X : MODE_IX);
OldClientContext ctx(txn, nsString);
// The common case: no implicit collection creation
if (!upsert || ctx.db()->getCollection(nsString) != NULL) {
PlanExecutor* rawExec;
uassertStatusOK(getExecutorUpdate(txn,
ctx.db()->getCollection(nsString),
&parsedUpdate,
&op.debug(),
&rawExec));
boost::scoped_ptr exec(rawExec);
// Run the plan and get stats out.
uassertStatusOK(exec->executePlan());
UpdateResult res = UpdateStage::makeUpdateResult(exec.get(), &op.debug());
// for getlasterror
lastError.getSafe()->recordUpdate( res.existing , res.numMatched , res.upserted );
return;
}
break;
}
catch ( const WriteConflictException& dle ) {
op.debug().writeConflicts++;
if ( multi ) {
log(LogComponent::kWrite) << "Had WriteConflict during multi update, aborting";
throw;
}
WriteConflictException::logAndBackoff( attempt++, "update", nsString.toString() );
}
}
// This is an upsert into a non-existing database, so need an exclusive lock
// to avoid deadlock
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
ParsedUpdate parsedUpdate(txn, &request);
uassertStatusOK(parsedUpdate.parseRequest());
ScopedTransaction transaction(txn, MODE_IX);
Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_X);
OldClientContext ctx(txn, nsString);
uassert(ErrorCodes::NotMaster,
str::stream() << "Not primary while performing update on " << nsString.ns(),
repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(
nsString.db()));
Database* db = ctx.db();
if (db->getCollection(nsString)) {
// someone else beat us to it, that's ok
// we might race while we unlock if someone drops
// but that's ok, we'll just do nothing and error out
}
else {
WriteUnitOfWork wuow(txn);
uassertStatusOK(userCreateNS(txn, db, nsString.ns(), BSONObj()));
wuow.commit();
}
PlanExecutor* rawExec;
uassertStatusOK(getExecutorUpdate(txn,
ctx.db()->getCollection(nsString),
&parsedUpdate,
&op.debug(),
&rawExec));
boost::scoped_ptr exec(rawExec);
// Run the plan and get stats out.
uassertStatusOK(exec->executePlan());
UpdateResult res = UpdateStage::makeUpdateResult(exec.get(), &op.debug());
lastError.getSafe()->recordUpdate( res.existing , res.numMatched , res.upserted );
} MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "update", nsString.ns());
}
void receivedDelete(OperationContext* txn,
const NamespaceString& nsString,
Message& m,
CurOp& op) {
DbMessage d(m);
uassertStatusOK(userAllowedWriteNS(nsString));
op.debug().ns = nsString.ns();
int flags = d.pullInt();
bool justOne = flags & RemoveOption_JustOne;
verify( d.moreJSObjs() );
BSONObj pattern = d.nextJsObj();
Status status = txn->getClient()->getAuthorizationSession()->checkAuthForDelete(nsString,
pattern);
audit::logDeleteAuthzCheck(txn->getClient(), nsString, pattern, status.code());
uassertStatusOK(status);
op.debug().query = pattern;
op.setQuery(pattern);
DeleteRequest request(nsString);
request.setQuery(pattern);
request.setMulti(!justOne);
request.setYieldPolicy(PlanExecutor::YIELD_AUTO);
int attempt = 1;
while ( 1 ) {
try {
ParsedDelete parsedDelete(txn, &request);
uassertStatusOK(parsedDelete.parseRequest());
ScopedTransaction scopedXact(txn, MODE_IX);
AutoGetDb autoDb(txn, nsString.db(), MODE_IX);
if (!autoDb.getDb()) {
break;
}
Lock::CollectionLock collLock(txn->lockState(),
nsString.ns(),
parsedDelete.isIsolated() ? MODE_X : MODE_IX);
OldClientContext ctx(txn, nsString);
PlanExecutor* rawExec;
uassertStatusOK(getExecutorDelete(txn,
ctx.db()->getCollection(nsString),
&parsedDelete,
&rawExec));
boost::scoped_ptr exec(rawExec);
// Run the plan and get the number of docs deleted.
uassertStatusOK(exec->executePlan());
long long n = DeleteStage::getNumDeleted(exec.get());
lastError.getSafe()->recordDelete(n);
op.debug().ndeleted = n;
break;
}
catch ( const WriteConflictException& dle ) {
op.debug().writeConflicts++;
WriteConflictException::logAndBackoff( attempt++, "delete", nsString.toString() );
}
}
}
QueryResult::View emptyMoreResult(long long);
bool receivedGetMore(OperationContext* txn,
DbResponse& dbresponse,
Message& m,
CurOp& curop) {
bool ok = true;
DbMessage d(m);
const char *ns = d.getns();
int ntoreturn = d.pullInt();
long long cursorid = d.pullInt64();
curop.debug().ns = ns;
curop.debug().ntoreturn = ntoreturn;
curop.debug().cursorid = cursorid;
scoped_ptr ex;
scoped_ptr timer;
int pass = 0;
bool exhaust = false;
QueryResult::View msgdata = 0;
Timestamp last;
while( 1 ) {
bool isCursorAuthorized = false;
try {
const NamespaceString nsString( ns );
uassert( 16258, str::stream() << "Invalid ns [" << ns << "]", nsString.isValid() );
Status status = txn->getClient()->getAuthorizationSession()->checkAuthForGetMore(
nsString, cursorid);
audit::logGetMoreAuthzCheck(txn->getClient(), nsString, cursorid, status.code());
uassertStatusOK(status);
if (str::startsWith(ns, "local.oplog.")){
while (MONGO_FAIL_POINT(rsStopGetMore)) {
sleepmillis(0);
}
if (pass == 0) {
last = getLastSetTimestamp();
}
else {
repl::waitUpToOneSecondForTimestampChange(last);
}
}
msgdata = getMore(txn,
ns,
ntoreturn,
cursorid,
curop,
pass,
exhaust,
&isCursorAuthorized);
}
catch ( AssertionException& e ) {
if ( isCursorAuthorized ) {
// If a cursor with id 'cursorid' was authorized, it may have been advanced
// before an exception terminated processGetMore. Erase the ClientCursor
// because it may now be out of sync with the client's iteration state.
// SERVER-7952
// TODO Temporary code, see SERVER-4563 for a cleanup overview.
CursorManager::eraseCursorGlobal(txn, cursorid );
}
ex.reset( new AssertionException( e.getInfo().msg, e.getCode() ) );
ok = false;
break;
}
if (msgdata.view2ptr() == 0) {
// this should only happen with QueryOption_AwaitData
exhaust = false;
massert(13073, "shutting down", !inShutdown() );
if ( ! timer ) {
timer.reset( new Timer() );
}
else {
if ( timer->seconds() >= 4 ) {
// after about 4 seconds, return. pass stops at 1000 normally.
// we want to return occasionally so slave can checkpoint.
pass = 10000;
}
}
pass++;
if (kDebugBuild)
sleepmillis(20);
else
sleepmillis(2);
// note: the 1100 is beacuse of the waitForDifferent above
// should eventually clean this up a bit
curop.setExpectedLatencyMs( 1100 + timer->millis() );
continue;
}
break;
};
if (ex) {
BSONObjBuilder err;
ex->getInfo().append( err );
BSONObj errObj = err.done();
curop.debug().exceptionInfo = ex->getInfo();
replyToQuery(ResultFlag_ErrSet, m, dbresponse, errObj);
curop.debug().responseLength = dbresponse.response->header().dataLen();
curop.debug().nreturned = 1;
return ok;
}
Message *resp = new Message();
resp->setData(msgdata.view2ptr(), true);
curop.debug().responseLength = resp->header().dataLen();
curop.debug().nreturned = msgdata.getNReturned();
dbresponse.response = resp;
dbresponse.responseTo = m.header().getId();
if( exhaust ) {
curop.debug().exhaust = true;
dbresponse.exhaustNS = ns;
}
return ok;
}
void checkAndInsert(OperationContext* txn,
OldClientContext& ctx,
const char *ns,
/*modifies*/BSONObj& js) {
StatusWith fixed = fixDocumentForInsert( js );
uassertStatusOK( fixed.getStatus() );
if ( !fixed.getValue().isEmpty() )
js = fixed.getValue();
int attempt = 0;
while ( true ) {
try {
WriteUnitOfWork wunit(txn);
Collection* collection = ctx.db()->getCollection( ns );
if ( !collection ) {
collection = ctx.db()->createCollection( txn, ns );
verify( collection );
}
StatusWith status = collection->insertDocument( txn, js, true );
uassertStatusOK( status.getStatus() );
wunit.commit();
break;
}
catch( const WriteConflictException& e ) {
txn->getCurOp()->debug().writeConflicts++;
txn->recoveryUnit()->commitAndRestart();
WriteConflictException::logAndBackoff( attempt++, "insert", ns);
}
}
}
NOINLINE_DECL void insertMulti(OperationContext* txn,
OldClientContext& ctx,
bool keepGoing,
const char *ns,
vector& objs,
CurOp& op) {
size_t i;
for (i=0; isubobjStart());
cmdObjBuilder << "createIndexes" << nsToCollectionSubstring(nsToIndex);
BSONArrayBuilder specArrayBuilder(cmdObjBuilder.subarrayStart("indexes"));
while (true) {
BSONObjBuilder specBuilder(specArrayBuilder.subobjStart());
BSONElement specNsElement = spec["ns"];
if ((specNsElement.type() != String) ||
(specNsElement.valueStringData() != nsToIndex)) {
break;
}
for (BSONObjIterator iter(spec); iter.more();) {
BSONElement element = iter.next();
if (element.fieldNameStringData() != "ns") {
specBuilder.append(element);
}
}
if (!d.moreJSObjs()) {
break;
}
spec = d.nextJsObj();
}
}
}
static void insertSystemIndexes(OperationContext* txn, DbMessage& d, CurOp& curOp) {
BSONArrayBuilder allCmdsBuilder;
try {
convertSystemIndexInsertsToCommands(d, &allCmdsBuilder);
}
catch (const DBException& ex) {
setLastError(ex.getCode(), ex.getInfo().msg.c_str());
curOp.debug().exceptionInfo = ex.getInfo();
return;
}
BSONArray allCmds(allCmdsBuilder.done());
Command* createIndexesCmd = Command::findCommand("createIndexes");
invariant(createIndexesCmd);
const bool keepGoing = d.reservedField() & InsertOption_ContinueOnError;
for (BSONObjIterator iter(allCmds); iter.more();) {
try {
BSONObjBuilder resultBuilder;
BSONObj cmdObj = iter.next().Obj();
Command::execCommand(
txn,
createIndexesCmd,
0, /* what should I use for query option? */
d.getns(),
cmdObj,
resultBuilder,
false /* fromRepl */);
uassertStatusOK(Command::getStatusFromCommandResult(resultBuilder.done()));
}
catch (const DBException& ex) {
setLastError(ex.getCode(), ex.getInfo().msg.c_str());
curOp.debug().exceptionInfo = ex.getInfo();
if (!keepGoing) {
return;
}
}
}
}
void receivedInsert(OperationContext* txn,
const NamespaceString& nsString,
Message& m,
CurOp& op) {
DbMessage d(m);
const char* ns = d.getns();
op.debug().ns = ns;
uassertStatusOK(userAllowedWriteNS(nsString.ns()));
if (nsString.isSystemDotIndexes()) {
insertSystemIndexes(txn, d, op);
return;
}
if( !d.moreJSObjs() ) {
// strange. should we complain?
return;
}
vector multi;
while (d.moreJSObjs()){
BSONObj obj = d.nextJsObj();
multi.push_back(obj);
// Check auth for insert (also handles checking if this is an index build and checks
// for the proper privileges in that case).
Status status = txn->getClient()->getAuthorizationSession()->checkAuthForInsert(nsString, obj);
audit::logInsertAuthzCheck(txn->getClient(), nsString, obj, status.code());
uassertStatusOK(status);
}
const int notMasterCodeForInsert = 10058; // This is different from ErrorCodes::NotMaster
{
ScopedTransaction transaction(txn, MODE_IX);
Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_IX);
Lock::CollectionLock collLock(txn->lockState(), nsString.ns(), MODE_IX);
// CONCURRENCY TODO: is being read locked in big log sufficient here?
// writelock is used to synchronize stepdowns w/ writes
uassert(notMasterCodeForInsert, "not master",
repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(nsString.db()));
// OldClientContext may implicitly create a database, so check existence
if (dbHolder().get(txn, nsString.db()) != NULL) {
OldClientContext ctx(txn, ns);
if (ctx.db()->getCollection(nsString)) {
if (multi.size() > 1) {
const bool keepGoing = d.reservedField() & InsertOption_ContinueOnError;
insertMulti(txn, ctx, keepGoing, ns, multi, op);
}
else {
checkAndInsert(txn, ctx, ns, multi[0]);
globalOpCounters.incInsertInWriteLock(1);
op.debug().ninserted = 1;
}
return;
}
}
}
// Collection didn't exist so try again with MODE_X
ScopedTransaction transaction(txn, MODE_IX);
Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_X);
// CONCURRENCY TODO: is being read locked in big log sufficient here?
// writelock is used to synchronize stepdowns w/ writes
uassert(notMasterCodeForInsert, "not master",
repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(nsString.db()));
OldClientContext ctx(txn, ns);
if (multi.size() > 1) {
const bool keepGoing = d.reservedField() & InsertOption_ContinueOnError;
insertMulti(txn, ctx, keepGoing, ns, multi, op);
} else {
checkAndInsert(txn, ctx, ns, multi[0]);
globalOpCounters.incInsertInWriteLock(1);
op.debug().ninserted = 1;
}
}
static AtomicUInt32 shutdownInProgress(0);
bool inShutdown() {
return shutdownInProgress.loadRelaxed() != 0;
}
bool inShutdownStrict() {
return shutdownInProgress.load() != 0;
}
static void shutdownServer() {
log(LogComponent::kNetwork) << "shutdown: going to close listening sockets..." << endl;
ListeningSockets::get()->closeAll();
log(LogComponent::kNetwork) << "shutdown: going to flush diaglog..." << endl;
_diaglog.flush();
/* must do this before unmapping mem or you may get a seg fault */
log(LogComponent::kNetwork) << "shutdown: going to close sockets..." << endl;
boost::thread close_socket_thread( stdx::bind(MessagingPort::closeAllSockets, 0) );
getGlobalServiceContext()->shutdownGlobalStorageEngineCleanly();
}
// shutdownLock
//
// Protects:
// Ensures shutdown is single threaded.
// Lock Ordering:
// No restrictions
boost::mutex shutdownLock;
void signalShutdown() {
// Notify all threads shutdown has started
shutdownInProgress.fetchAndAdd(1);
}
void exitCleanly(ExitCode code) {
// Notify all threads shutdown has started
shutdownInProgress.fetchAndAdd(1);
// Grab the shutdown lock to prevent concurrent callers
boost::lock_guard lockguard(shutdownLock);
// Global storage engine may not be started in all cases before we exit
if (getGlobalServiceContext()->getGlobalStorageEngine() == NULL) {
dbexit(code); // returns only under a windows service
invariant(code == EXIT_WINDOWS_SERVICE_STOP);
return;
}
getGlobalServiceContext()->setKillAllOperations();
repl::getGlobalReplicationCoordinator()->shutdown();
// We should always be able to acquire the global lock at shutdown.
//
// TODO: This call chain uses the locker directly, because we do not want to start an
// operation context, which also instantiates a recovery unit. Also, using the
// lockGlobalBegin/lockGlobalComplete sequence, we avoid taking the flush lock. This will
// all go away if we start acquiring the global/flush lock as part of ScopedTransaction.
//
// For a Windows service, dbexit does not call exit(), so we must leak the lock outside
// of this function to prevent any operations from running that need a lock.
//
DefaultLockerImpl* globalLocker = new DefaultLockerImpl();
LockResult result = globalLocker->lockGlobalBegin(MODE_X);
if (result == LOCK_WAITING) {
result = globalLocker->lockGlobalComplete(UINT_MAX);
}
invariant(LOCK_OK == result);
log(LogComponent::kControl) << "now exiting" << endl;
// Execute the graceful shutdown tasks, such as flushing the outstanding journal
// and data files, close sockets, etc.
try {
shutdownServer();
}
catch (const DBException& ex) {
severe() << "shutdown failed with DBException " << ex;
std::terminate();
}
catch (const std::exception& ex) {
severe() << "shutdown failed with std::exception: " << ex.what();
std::terminate();
}
catch (...) {
severe() << "shutdown failed with exception";
std::terminate();
}
dbexit( code );
}
NOINLINE_DECL void dbexit( ExitCode rc, const char *why ) {
audit::logShutdown(currentClient.get());
log(LogComponent::kControl) << "dbexit: " << why << " rc: " << rc;
#ifdef _WIN32
// Windows Service Controller wants to be told when we are down,
// so don't call quickExit() yet, or say "really exiting now"
//
if ( rc == EXIT_WINDOWS_SERVICE_STOP ) {
return;
}
#endif
quickExit(rc);
}
// ----- BEGIN Diaglog -----
DiagLog::DiagLog() : f(0), level(0) {}
void DiagLog::openFile() {
verify( f == 0 );
stringstream ss;
ss << storageGlobalParams.dbpath << "/diaglog." << hex << time(0);
string name = ss.str();
f = new ofstream(name.c_str(), ios::out | ios::binary);
if ( ! f->good() ) {
log() << "diagLogging couldn't open " << name << endl;
// todo what is this? :
throw 1717;
}
else {
log() << "diagLogging using file " << name << endl;
}
}
int DiagLog::setLevel( int newLevel ) {
boost::lock_guard lk(mutex);
int old = level;
log() << "diagLogging level=" << newLevel << endl;
if( f == 0 ) {
openFile();
}
level = newLevel; // must be done AFTER f is set
return old;
}
void DiagLog::flush() {
if ( level ) {
log() << "flushing diag log" << endl;
boost::lock_guard lk(mutex);
f->flush();
}
}
void DiagLog::writeop(char *data,int len) {
if ( level & 1 ) {
boost::lock_guard lk(mutex);
f->write(data,len);
}
}
void DiagLog::readop(char *data, int len) {
if ( level & 2 ) {
bool log = (level & 4) == 0;
OCCASIONALLY log = true;
if ( log ) {
boost::lock_guard lk(mutex);
verify( f );
f->write(data,len);
}
}
}
DiagLog _diaglog;
// ----- END Diaglog -----
} // namespace mongo