summaryrefslogtreecommitdiff
path: root/src/mongo/db/instance.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/instance.cpp')
-rw-r--r--src/mongo/db/instance.cpp2049
1 files changed, 979 insertions, 1070 deletions
diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp
index f207624c4a3..79214191ca8 100644
--- a/src/mongo/db/instance.cpp
+++ b/src/mongo/db/instance.cpp
@@ -1,4 +1,4 @@
-// instance.cpp
+// instance.cpp
/**
* Copyright (C) 2008 10gen Inc.
@@ -96,7 +96,7 @@
#include "mongo/s/catalog/catalog_manager.h"
#include "mongo/s/d_state.h"
#include "mongo/s/grid.h"
-#include "mongo/s/stale_exception.h" // for SendStaleConfigException
+#include "mongo/s/stale_exception.h" // for SendStaleConfigException
#include "mongo/scripting/engine.h"
#include "mongo/stdx/memory.h"
#include "mongo/stdx/thread.h"
@@ -109,381 +109,357 @@
namespace mongo {
- using logger::LogComponent;
- using std::endl;
- using std::hex;
- using std::ios;
- using std::ofstream;
- using std::string;
- using std::stringstream;
- using std::unique_ptr;
- using std::vector;
-
- // for diaglog
- inline void opread(Message& m) {
- if (_diaglog.getLevel() & 2) {
- _diaglog.readop(m.singleData().view2ptr(), m.header().getLen());
- }
+using logger::LogComponent;
+using std::endl;
+using std::hex;
+using std::ios;
+using std::ofstream;
+using std::string;
+using std::stringstream;
+using std::unique_ptr;
+using std::vector;
+
+// for diaglog
+inline void opread(Message& m) {
+ if (_diaglog.getLevel() & 2) {
+ _diaglog.readop(m.singleData().view2ptr(), m.header().getLen());
}
+}
- inline void opwrite(Message& m) {
- if (_diaglog.getLevel() & 1) {
- _diaglog.writeop(m.singleData().view2ptr(), m.header().getLen());
- }
+inline void opwrite(Message& m) {
+ if (_diaglog.getLevel() & 1) {
+ _diaglog.writeop(m.singleData().view2ptr(), m.header().getLen());
}
+}
- void receivedKillCursors(OperationContext* txn, Message& m);
+void receivedKillCursors(OperationContext* txn, Message& m);
- void receivedUpdate(OperationContext* txn,
- const NamespaceString& nsString,
- Message& m,
- CurOp& op);
+void receivedUpdate(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op);
- void receivedDelete(OperationContext* txn,
- const NamespaceString& nsString,
- Message& m,
- CurOp& op);
+void receivedDelete(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op);
- void receivedInsert(OperationContext* txn,
- const NamespaceString& nsString,
- Message& m,
- CurOp& op);
+void receivedInsert(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op);
- bool receivedGetMore(OperationContext* txn,
- DbResponse& dbresponse,
- Message& m,
- CurOp& curop);
+bool receivedGetMore(OperationContext* txn, DbResponse& dbresponse, Message& m, CurOp& curop);
- int nloggedsome = 0;
-#define LOGWITHRATELIMIT if( ++nloggedsome < 1000 || nloggedsome % 100 == 0 )
+int nloggedsome = 0;
+#define LOGWITHRATELIMIT if (++nloggedsome < 1000 || nloggedsome % 100 == 0)
- string dbExecCommand;
+string dbExecCommand;
- MONGO_FP_DECLARE(rsStopGetMore);
+MONGO_FP_DECLARE(rsStopGetMore);
namespace {
- std::unique_ptr<AuthzManagerExternalState> createAuthzManagerExternalStateMongod() {
- return stdx::make_unique<AuthzManagerExternalStateMongod>();
+std::unique_ptr<AuthzManagerExternalState> createAuthzManagerExternalStateMongod() {
+ return stdx::make_unique<AuthzManagerExternalStateMongod>();
+}
+
+MONGO_INITIALIZER(CreateAuthorizationExternalStateFactory)(InitializerContext* context) {
+ AuthzManagerExternalState::create = &createAuthzManagerExternalStateMongod;
+ return Status::OK();
+}
+
+void generateLegacyQueryErrorResponse(const AssertionException* exception,
+ const QueryMessage& queryMessage,
+ CurOp* curop,
+ Message* response) {
+ curop->debug().exceptionInfo = exception->getInfo();
+
+ log(LogComponent::kQuery) << "assertion " << exception->toString() << " ns:" << queryMessage.ns
+ << " query:" << (queryMessage.query.valid()
+ ? queryMessage.query.toString()
+ : "query object is corrupt");
+ if (queryMessage.ntoskip || queryMessage.ntoreturn) {
+ log(LogComponent::kQuery) << " ntoskip:" << queryMessage.ntoskip
+ << " ntoreturn:" << queryMessage.ntoreturn;
}
- MONGO_INITIALIZER(CreateAuthorizationExternalStateFactory) (InitializerContext* context) {
- AuthzManagerExternalState::create = &createAuthzManagerExternalStateMongod;
- return Status::OK();
+ const SendStaleConfigException* scex = (exception->getCode() == SendStaleConfigCode)
+ ? static_cast<const SendStaleConfigException*>(exception)
+ : NULL;
+
+ BSONObjBuilder err;
+ exception->getInfo().append(err);
+ if (scex) {
+ err.append("ok", 0.0);
+ err.append("ns", scex->getns());
+ scex->getVersionReceived().addToBSON(err, "vReceived");
+ scex->getVersionWanted().addToBSON(err, "vWanted");
}
+ BSONObj errObj = err.done();
- void generateLegacyQueryErrorResponse(const AssertionException* exception,
- const QueryMessage& queryMessage,
- CurOp* curop,
- Message* response) {
- curop->debug().exceptionInfo = exception->getInfo();
-
- log(LogComponent::kQuery) << "assertion " << exception->toString()
- << " ns:" << queryMessage.ns << " query:"
- << (queryMessage.query.valid() ? queryMessage.query.toString()
- : "query object is corrupt");
- if (queryMessage.ntoskip || queryMessage.ntoreturn) {
- log(LogComponent::kQuery) << " ntoskip:" << queryMessage.ntoskip
- << " ntoreturn:" << queryMessage.ntoreturn;
- }
-
- const SendStaleConfigException* scex = (exception->getCode() == SendStaleConfigCode)
- ? static_cast<const SendStaleConfigException*>(exception)
- : NULL;
-
- BSONObjBuilder err;
- exception->getInfo().append(err);
- if (scex) {
- err.append("ok", 0.0);
- err.append("ns", scex->getns());
- scex->getVersionReceived().addToBSON(err, "vReceived");
- scex->getVersionWanted().addToBSON(err, "vWanted");
- }
- BSONObj errObj = err.done();
-
- if (scex) {
- log(LogComponent::kQuery) << "stale version detected during query over "
- << queryMessage.ns << " : " << errObj;
- }
-
- BufBuilder bb;
- bb.skip(sizeof(QueryResult::Value));
- bb.appendBuf((void*) errObj.objdata(), errObj.objsize());
-
- // TODO: call replyToQuery() from here instead of this!!! see dbmessage.h
- QueryResult::View msgdata = bb.buf();
- bb.decouple();
- QueryResult::View qr = msgdata;
- qr.setResultFlags(ResultFlag_ErrSet);
- if (scex) qr.setResultFlags(qr.getResultFlags() | ResultFlag_ShardConfigStale);
- qr.msgdata().setLen(bb.len());
- qr.msgdata().setOperation(opReply);
- qr.setCursorId(0);
- qr.setStartingFrom(0);
- qr.setNReturned(1);
- response->setData(msgdata.view2ptr(), true);
+ if (scex) {
+ log(LogComponent::kQuery) << "stale version detected during query over " << queryMessage.ns
+ << " : " << errObj;
}
-} // namespace
-
- static void receivedCommand(OperationContext* txn,
- const NamespaceString& nss,
- Client& client,
- DbResponse& dbResponse,
- Message& message) {
-
- invariant(nss.isCommand());
+ BufBuilder bb;
+ bb.skip(sizeof(QueryResult::Value));
+ bb.appendBuf((void*)errObj.objdata(), errObj.objsize());
+
+ // TODO: call replyToQuery() from here instead of this!!! see dbmessage.h
+ QueryResult::View msgdata = bb.buf();
+ bb.decouple();
+ QueryResult::View qr = msgdata;
+ qr.setResultFlags(ResultFlag_ErrSet);
+ if (scex)
+ qr.setResultFlags(qr.getResultFlags() | ResultFlag_ShardConfigStale);
+ qr.msgdata().setLen(bb.len());
+ qr.msgdata().setOperation(opReply);
+ qr.setCursorId(0);
+ qr.setStartingFrom(0);
+ qr.setNReturned(1);
+ response->setData(msgdata.view2ptr(), true);
+}
- const MSGID responseTo = message.header().getId();
-
- DbMessage dbMessage(message);
- QueryMessage queryMessage(dbMessage);
+} // namespace
- CurOp* op = CurOp::get(txn);
+static void receivedCommand(OperationContext* txn,
+ const NamespaceString& nss,
+ Client& client,
+ DbResponse& dbResponse,
+ Message& message) {
+ invariant(nss.isCommand());
- rpc::LegacyReplyBuilder builder{};
+ const MSGID responseTo = message.header().getId();
- try {
- // This will throw if the request is on an invalid namespace.
- rpc::LegacyRequest request{&message};
- // Auth checking for Commands happens later.
- int nToReturn = queryMessage.ntoreturn;
- beginQueryOp(txn, nss, queryMessage.query, nToReturn, queryMessage.ntoskip);
- {
- stdx::lock_guard<Client> lk(*txn->getClient());
- op->markCommand_inlock();
- }
+ DbMessage dbMessage(message);
+ QueryMessage queryMessage(dbMessage);
- uassert(16979, str::stream() << "bad numberToReturn (" << nToReturn
- << ") for $cmd type ns - can only be 1 or -1",
- nToReturn == 1 || nToReturn == -1);
+ CurOp* op = CurOp::get(txn);
- runCommands(txn, request, &builder);
+ rpc::LegacyReplyBuilder builder{};
- op->debug().iscommand = true;
- // TODO: Does this get overwritten/do we really need to set this twice?
- op->debug().query = request.getCommandArgs();
- }
- catch (const DBException& exception) {
- Command::generateErrorResponse(txn, &builder, exception);
+ try {
+ // This will throw if the request is on an invalid namespace.
+ rpc::LegacyRequest request{&message};
+ // Auth checking for Commands happens later.
+ int nToReturn = queryMessage.ntoreturn;
+ beginQueryOp(txn, nss, queryMessage.query, nToReturn, queryMessage.ntoskip);
+ {
+ stdx::lock_guard<Client> lk(*txn->getClient());
+ op->markCommand_inlock();
}
- auto response = builder.done();
+ uassert(16979,
+ str::stream() << "bad numberToReturn (" << nToReturn
+ << ") for $cmd type ns - can only be 1 or -1",
+ nToReturn == 1 || nToReturn == -1);
- op->debug().responseLength = response->header().dataLen();
+ runCommands(txn, request, &builder);
- dbResponse.response = response.release();
- dbResponse.responseTo = responseTo;
+ op->debug().iscommand = true;
+ // TODO: Does this get overwritten/do we really need to set this twice?
+ op->debug().query = request.getCommandArgs();
+ } catch (const DBException& exception) {
+ Command::generateErrorResponse(txn, &builder, exception);
}
-namespace {
+ auto response = builder.done();
- void receivedRpc(OperationContext* txn,
- Client& client,
- DbResponse& dbResponse,
- Message& message) {
+ op->debug().responseLength = response->header().dataLen();
- invariant(message.operation() == dbCommand);
+ dbResponse.response = response.release();
+ dbResponse.responseTo = responseTo;
+}
- const MSGID responseTo = message.header().getId();
+namespace {
- rpc::CommandReplyBuilder replyBuilder{};
+void receivedRpc(OperationContext* txn, Client& client, DbResponse& dbResponse, Message& message) {
+ invariant(message.operation() == dbCommand);
- auto curOp = CurOp::get(txn);
+ const MSGID responseTo = message.header().getId();
- try {
- // database is validated here
- rpc::CommandRequest request{&message};
-
- // We construct a legacy $cmd namespace so we can fill in curOp using
- // the existing logic that existed for OP_QUERY commands
- NamespaceString nss(request.getDatabase(), "$cmd");
- beginQueryOp(txn, nss, request.getCommandArgs(), 1, 0);
- {
- stdx::lock_guard<Client> lk(*txn->getClient());
- curOp->markCommand_inlock();
- }
+ rpc::CommandReplyBuilder replyBuilder{};
- runCommands(txn, request, &replyBuilder);
+ auto curOp = CurOp::get(txn);
- curOp->debug().iscommand = true;
- curOp->debug().query = request.getCommandArgs();
+ try {
+ // database is validated here
+ rpc::CommandRequest request{&message};
- }
- catch (const DBException& exception) {
- Command::generateErrorResponse(txn, &replyBuilder, exception);
+ // We construct a legacy $cmd namespace so we can fill in curOp using
+ // the existing logic that existed for OP_QUERY commands
+ NamespaceString nss(request.getDatabase(), "$cmd");
+ beginQueryOp(txn, nss, request.getCommandArgs(), 1, 0);
+ {
+ stdx::lock_guard<Client> lk(*txn->getClient());
+ curOp->markCommand_inlock();
}
- auto response = replyBuilder.done();
+ runCommands(txn, request, &replyBuilder);
- curOp->debug().responseLength = response->header().dataLen();
+ curOp->debug().iscommand = true;
+ curOp->debug().query = request.getCommandArgs();
- dbResponse.response = response.release();
- dbResponse.responseTo = responseTo;
+ } catch (const DBException& exception) {
+ Command::generateErrorResponse(txn, &replyBuilder, exception);
}
- // In SERVER-7775 we reimplemented the pseudo-commands fsyncUnlock, inProg, and killOp
- // as ordinary commands. To support old clients for another release, this helper serves
- // to execute the real command from the legacy pseudo-command codepath.
- // TODO: remove after MongoDB 3.2 is released
- void receivedPseudoCommand(OperationContext* txn,
- Client& client,
- DbResponse& dbResponse,
- Message& message,
- StringData realCommandName) {
-
- DbMessage originalDbm(message);
- originalDbm.pullInt(); // ntoskip
- originalDbm.pullInt(); // ntoreturn
- auto cmdParams = originalDbm.nextJsObj();
-
- Message interposed;
- // HACK:
- // legacy pseudo-commands could run on any database. The command replacements
- // can only run on 'admin'. To avoid breaking old shells and a multitude
- // of third-party tools, we rewrite the namespace. As auth is checked
- // later in Command::_checkAuthorizationImpl, we will still properly
- // reject the request if the client is not authorized.
- NamespaceString interposedNss("admin", "$cmd");
-
- BSONObjBuilder cmdBob;
- cmdBob.append(realCommandName, 1);
- cmdBob.appendElements(cmdParams);
- auto cmd = cmdBob.done();
-
- // TODO: use OP_COMMAND here instead of constructing
- // a legacy OP_QUERY style command
- BufBuilder cmdMsgBuf;
-
- int32_t flags = DataView(message.header().data()).read<LittleEndian<int32_t>>();
- cmdMsgBuf.appendNum(flags);
-
- cmdMsgBuf.appendStr(interposedNss.db(), false); // not including null byte
- cmdMsgBuf.appendStr(".$cmd");
- cmdMsgBuf.appendNum(0); // ntoskip
- cmdMsgBuf.appendNum(1); // ntoreturn
- cmdMsgBuf.appendBuf(cmd.objdata(), cmd.objsize());
-
- interposed.setData(dbQuery, cmdMsgBuf.buf(), cmdMsgBuf.len());
- interposed.header().setId(message.header().getId());
-
- receivedCommand(txn, interposedNss, client, dbResponse, interposed);
- }
+ auto response = replyBuilder.done();
+
+ curOp->debug().responseLength = response->header().dataLen();
+
+ dbResponse.response = response.release();
+ dbResponse.responseTo = responseTo;
+}
+
+// In SERVER-7775 we reimplemented the pseudo-commands fsyncUnlock, inProg, and killOp
+// as ordinary commands. To support old clients for another release, this helper serves
+// to execute the real command from the legacy pseudo-command codepath.
+// TODO: remove after MongoDB 3.2 is released
+void receivedPseudoCommand(OperationContext* txn,
+ Client& client,
+ DbResponse& dbResponse,
+ Message& message,
+ StringData realCommandName) {
+ DbMessage originalDbm(message);
+ originalDbm.pullInt(); // ntoskip
+ originalDbm.pullInt(); // ntoreturn
+ auto cmdParams = originalDbm.nextJsObj();
+
+ Message interposed;
+ // HACK:
+ // legacy pseudo-commands could run on any database. The command replacements
+ // can only run on 'admin'. To avoid breaking old shells and a multitude
+ // of third-party tools, we rewrite the namespace. As auth is checked
+ // later in Command::_checkAuthorizationImpl, we will still properly
+ // reject the request if the client is not authorized.
+ NamespaceString interposedNss("admin", "$cmd");
+
+ BSONObjBuilder cmdBob;
+ cmdBob.append(realCommandName, 1);
+ cmdBob.appendElements(cmdParams);
+ auto cmd = cmdBob.done();
+
+ // TODO: use OP_COMMAND here instead of constructing
+ // a legacy OP_QUERY style command
+ BufBuilder cmdMsgBuf;
+
+ int32_t flags = DataView(message.header().data()).read<LittleEndian<int32_t>>();
+ cmdMsgBuf.appendNum(flags);
+
+ cmdMsgBuf.appendStr(interposedNss.db(), false); // not including null byte
+ cmdMsgBuf.appendStr(".$cmd");
+ cmdMsgBuf.appendNum(0); // ntoskip
+ cmdMsgBuf.appendNum(1); // ntoreturn
+ cmdMsgBuf.appendBuf(cmd.objdata(), cmd.objsize());
+
+ interposed.setData(dbQuery, cmdMsgBuf.buf(), cmdMsgBuf.len());
+ interposed.header().setId(message.header().getId());
+
+ receivedCommand(txn, interposedNss, client, dbResponse, interposed);
+}
} // namespace
- static void receivedQuery(OperationContext* txn,
- const NamespaceString& nss,
- Client& c,
- DbResponse& dbResponse,
- Message& m) {
- invariant(!nss.isCommand());
-
- MSGID responseTo = m.header().getId();
+static void receivedQuery(OperationContext* txn,
+ const NamespaceString& nss,
+ Client& c,
+ DbResponse& dbResponse,
+ Message& m) {
+ invariant(!nss.isCommand());
- DbMessage d(m);
- QueryMessage q(d);
- unique_ptr< Message > resp( new Message() );
+ MSGID responseTo = m.header().getId();
- CurOp& op = *CurOp::get(txn);
+ DbMessage d(m);
+ QueryMessage q(d);
+ unique_ptr<Message> resp(new Message());
- try {
- Client* client = txn->getClient();
- Status status = AuthorizationSession::get(client)->checkAuthForQuery(nss, q.query);
- audit::logQueryAuthzCheck(client, nss, q.query, status.code());
- uassertStatusOK(status);
+ CurOp& op = *CurOp::get(txn);
- dbResponse.exhaustNS = runQuery(txn, q, nss, *resp);
- verify( !resp->empty() );
- }
- catch (const AssertionException& exception) {
- resp.reset(new Message());
- generateLegacyQueryErrorResponse(&exception, q, &op, resp.get());
- }
-
- op.debug().responseLength = resp->header().dataLen();
+ try {
+ Client* client = txn->getClient();
+ Status status = AuthorizationSession::get(client)->checkAuthForQuery(nss, q.query);
+ audit::logQueryAuthzCheck(client, nss, q.query, status.code());
+ uassertStatusOK(status);
- dbResponse.response = resp.release();
- dbResponse.responseTo = responseTo;
+ dbResponse.exhaustNS = runQuery(txn, q, nss, *resp);
+ verify(!resp->empty());
+ } catch (const AssertionException& exception) {
+ resp.reset(new Message());
+ generateLegacyQueryErrorResponse(&exception, q, &op, resp.get());
}
- // Mongod on win32 defines a value for this function. In all other executables it is NULL.
- void (*reportEventToSystem)(const char *msg) = 0;
+ op.debug().responseLength = resp->header().dataLen();
- void mongoAbort(const char *msg) {
- if( reportEventToSystem )
- reportEventToSystem(msg);
- severe() << msg;
- ::abort();
- }
+ dbResponse.response = resp.release();
+ dbResponse.responseTo = responseTo;
+}
- // Returns false when request includes 'end'
- void assembleResponse( OperationContext* txn,
- Message& m,
- DbResponse& dbresponse,
- const HostAndPort& remote) {
- // before we lock...
- int op = m.operation();
- bool isCommand = false;
+// Mongod on win32 defines a value for this function. In all other executables it is NULL.
+void (*reportEventToSystem)(const char* msg) = 0;
- DbMessage dbmsg(m);
+void mongoAbort(const char* msg) {
+ if (reportEventToSystem)
+ reportEventToSystem(msg);
+ severe() << msg;
+ ::abort();
+}
- Client& c = *txn->getClient();
- if (!c.isInDirectClient()) {
- LastError::get(c).startRequest();
- AuthorizationSession::get(c)->startRequest(txn);
+// Returns false when request includes 'end'
+void assembleResponse(OperationContext* txn,
+ Message& m,
+ DbResponse& dbresponse,
+ const HostAndPort& remote) {
+ // before we lock...
+ int op = m.operation();
+ bool isCommand = false;
- // We should not be holding any locks at this point
- invariant(!txn->lockState()->isLocked());
- }
+ DbMessage dbmsg(m);
- const char* ns = dbmsg.messageShouldHaveNs() ? dbmsg.getns() : NULL;
- const NamespaceString nsString = ns ? NamespaceString(ns) : NamespaceString();
+ Client& c = *txn->getClient();
+ if (!c.isInDirectClient()) {
+ LastError::get(c).startRequest();
+ AuthorizationSession::get(c)->startRequest(txn);
- if ( op == dbQuery ) {
- if (nsString.isCommand()) {
- isCommand = true;
- opwrite(m);
- }
- // TODO: remove this entire code path after 3.2. Refs SERVER-7775
- else if (nsString.isSpecialCommand()) {
- opwrite(m);
+ // We should not be holding any locks at this point
+ invariant(!txn->lockState()->isLocked());
+ }
- if (nsString.coll() == "$cmd.sys.inprog") {
- receivedPseudoCommand(txn, c, dbresponse, m, "currentOp");
- return;
- }
- if (nsString.coll() == "$cmd.sys.killop") {
- receivedPseudoCommand(txn, c, dbresponse, m, "killOp");
- return;
- }
- if (nsString.coll() == "$cmd.sys.unlock") {
- receivedPseudoCommand(txn, c, dbresponse, m, "fsyncUnlock");
- return;
- }
- }
- else {
- opread(m);
- }
- }
- else if( op == dbGetMore ) {
- opread(m);
- }
- else if ( op == dbCommand ) {
+ const char* ns = dbmsg.messageShouldHaveNs() ? dbmsg.getns() : NULL;
+ const NamespaceString nsString = ns ? NamespaceString(ns) : NamespaceString();
+
+ if (op == dbQuery) {
+ if (nsString.isCommand()) {
isCommand = true;
opwrite(m);
}
- else {
+ // TODO: remove this entire code path after 3.2. Refs SERVER-7775
+ else if (nsString.isSpecialCommand()) {
opwrite(m);
+
+ if (nsString.coll() == "$cmd.sys.inprog") {
+ receivedPseudoCommand(txn, c, dbresponse, m, "currentOp");
+ return;
+ }
+ if (nsString.coll() == "$cmd.sys.killop") {
+ receivedPseudoCommand(txn, c, dbresponse, m, "killOp");
+ return;
+ }
+ if (nsString.coll() == "$cmd.sys.unlock") {
+ receivedPseudoCommand(txn, c, dbresponse, m, "fsyncUnlock");
+ return;
+ }
+ } else {
+ opread(m);
}
+ } else if (op == dbGetMore) {
+ opread(m);
+ } else if (op == dbCommand) {
+ isCommand = true;
+ opwrite(m);
+ } else {
+ opwrite(m);
+ }
- // Increment op counters.
- switch (op) {
+ // Increment op counters.
+ switch (op) {
case dbQuery:
if (!isCommand) {
globalOpCounters.gotQuery();
- }
- else {
+ } else {
// Command counting is deferred, since it is not known yet whether the command
// needs counting.
}
@@ -501,909 +477,842 @@ namespace {
case dbDelete:
globalOpCounters.gotDelete();
break;
- }
+ }
- CurOp& currentOp = *CurOp::get(txn);
- {
- stdx::lock_guard<Client> lk(*txn->getClient());
- currentOp.setOp_inlock(op);
- }
+ CurOp& currentOp = *CurOp::get(txn);
+ {
+ stdx::lock_guard<Client> lk(*txn->getClient());
+ currentOp.setOp_inlock(op);
+ }
- OpDebug& debug = currentOp.debug();
- debug.op = op;
+ OpDebug& debug = currentOp.debug();
+ debug.op = op;
- long long logThreshold = serverGlobalParams.slowMS;
- LogComponent responseComponent(LogComponent::kQuery);
- if (op == dbInsert ||
- op == dbDelete ||
- op == dbUpdate) {
- responseComponent = LogComponent::kWrite;
- }
- else if (isCommand) {
- responseComponent = LogComponent::kCommand;
- }
+ long long logThreshold = serverGlobalParams.slowMS;
+ LogComponent responseComponent(LogComponent::kQuery);
+ if (op == dbInsert || op == dbDelete || op == dbUpdate) {
+ responseComponent = LogComponent::kWrite;
+ } else if (isCommand) {
+ responseComponent = LogComponent::kCommand;
+ }
- bool shouldLog = logger::globalLogDomain()->shouldLog(responseComponent,
- logger::LogSeverity::Debug(1));
+ bool shouldLog =
+ logger::globalLogDomain()->shouldLog(responseComponent, logger::LogSeverity::Debug(1));
- if ( op == dbQuery ) {
- if (isCommand) {
- receivedCommand(txn, nsString, c, dbresponse, m);
- }
- else {
- receivedQuery(txn, nsString, c, dbresponse, m);
- }
- }
- else if ( op == dbCommand ) {
- receivedRpc(txn, c, dbresponse, m);
- }
- else if ( op == dbGetMore ) {
- if ( ! receivedGetMore(txn, dbresponse, m, currentOp) )
- shouldLog = true;
+ if (op == dbQuery) {
+ if (isCommand) {
+ receivedCommand(txn, nsString, c, dbresponse, m);
+ } else {
+ receivedQuery(txn, nsString, c, dbresponse, m);
}
- else if ( op == dbMsg ) {
- // deprecated - replaced by commands
- const char *p = dbmsg.getns();
-
- int len = strlen(p);
- if ( len > 400 )
- log(LogComponent::kQuery) << curTimeMillis64() % 10000 <<
- " long msg received, len:" << len << endl;
-
- Message *resp = new Message();
- if ( strcmp( "end" , p ) == 0 )
- resp->setData( opReply , "dbMsg end no longer supported" );
- else
- resp->setData( opReply , "i am fine - dbMsg deprecated");
+ } else if (op == dbCommand) {
+ receivedRpc(txn, c, dbresponse, m);
+ } else if (op == dbGetMore) {
+ if (!receivedGetMore(txn, dbresponse, m, currentOp))
+ shouldLog = true;
+ } else if (op == dbMsg) {
+ // deprecated - replaced by commands
+ const char* p = dbmsg.getns();
+
+ int len = strlen(p);
+ if (len > 400)
+ log(LogComponent::kQuery) << curTimeMillis64() % 10000
+ << " long msg received, len:" << len << endl;
+
+ Message* resp = new Message();
+ if (strcmp("end", p) == 0)
+ resp->setData(opReply, "dbMsg end no longer supported");
+ else
+ resp->setData(opReply, "i am fine - dbMsg deprecated");
- dbresponse.response = resp;
- dbresponse.responseTo = m.header().getId();
- }
- else {
- try {
- // The following operations all require authorization.
- // dbInsert, dbUpdate and dbDelete can be easily pre-authorized,
- // here, but dbKillCursors cannot.
- if ( op == dbKillCursors ) {
- currentOp.ensureStarted();
- logThreshold = 10;
- receivedKillCursors(txn, m);
- }
- else if (op != dbInsert && op != dbUpdate && op != dbDelete) {
- log(LogComponent::kQuery) << " operation isn't supported: " << op << endl;
- currentOp.done();
- shouldLog = true;
+ dbresponse.response = resp;
+ dbresponse.responseTo = m.header().getId();
+ } else {
+ try {
+ // The following operations all require authorization.
+ // dbInsert, dbUpdate and dbDelete can be easily pre-authorized,
+ // here, but dbKillCursors cannot.
+ if (op == dbKillCursors) {
+ currentOp.ensureStarted();
+ logThreshold = 10;
+ receivedKillCursors(txn, m);
+ } else if (op != dbInsert && op != dbUpdate && op != dbDelete) {
+ log(LogComponent::kQuery) << " operation isn't supported: " << op << endl;
+ currentOp.done();
+ shouldLog = true;
+ } else {
+ if (remote != DBDirectClient::dummyHost) {
+ const ShardedConnectionInfo* connInfo = ShardedConnectionInfo::get(&c, false);
+ uassert(18663,
+ str::stream() << "legacy writeOps not longer supported for "
+ << "versioned connections, ns: " << nsString.ns()
+ << ", op: " << opToString(op)
+ << ", remote: " << remote.toString(),
+ connInfo == NULL);
}
- else {
- if (remote != DBDirectClient::dummyHost) {
- const ShardedConnectionInfo* connInfo =
- ShardedConnectionInfo::get(&c, false);
- uassert(18663,
- str::stream() << "legacy writeOps not longer supported for "
- << "versioned connections, ns: " << nsString.ns()
- << ", op: " << opToString(op)
- << ", remote: " << remote.toString(),
- connInfo == NULL);
- }
-
- if (!nsString.isValid()) {
- uassert(16257, str::stream() << "Invalid ns [" << ns << "]", false);
- }
- else if (op == dbInsert) {
- receivedInsert(txn, nsString, m, currentOp);
- }
- else if (op == dbUpdate) {
- receivedUpdate(txn, nsString, m, currentOp);
- }
- else if (op == dbDelete) {
- receivedDelete(txn, nsString, m, currentOp);
- }
- else {
- invariant(false);
- }
+
+ if (!nsString.isValid()) {
+ uassert(16257, str::stream() << "Invalid ns [" << ns << "]", false);
+ } else if (op == dbInsert) {
+ receivedInsert(txn, nsString, m, currentOp);
+ } else if (op == dbUpdate) {
+ receivedUpdate(txn, nsString, m, currentOp);
+ } else if (op == dbDelete) {
+ receivedDelete(txn, nsString, m, currentOp);
+ } else {
+ invariant(false);
}
- }
- catch (const UserException& ue) {
- LastError::get(c).setLastError(ue.getCode(), ue.getInfo().msg);
- MONGO_LOG_COMPONENT(3, responseComponent)
- << " Caught Assertion in " << opToString(op) << ", continuing "
- << ue.toString() << endl;
- debug.exceptionInfo = ue.getInfo();
- }
- catch (const AssertionException& e) {
- LastError::get(c).setLastError(e.getCode(), e.getInfo().msg);
- MONGO_LOG_COMPONENT(3, responseComponent)
- << " Caught Assertion in " << opToString(op) << ", continuing "
- << e.toString() << endl;
- debug.exceptionInfo = e.getInfo();
- shouldLog = true;
}
+ } catch (const UserException& ue) {
+ LastError::get(c).setLastError(ue.getCode(), ue.getInfo().msg);
+ MONGO_LOG_COMPONENT(3, responseComponent) << " Caught Assertion in " << opToString(op)
+ << ", continuing " << ue.toString() << endl;
+ debug.exceptionInfo = ue.getInfo();
+ } catch (const AssertionException& e) {
+ LastError::get(c).setLastError(e.getCode(), e.getInfo().msg);
+ MONGO_LOG_COMPONENT(3, responseComponent) << " Caught Assertion in " << opToString(op)
+ << ", continuing " << e.toString() << endl;
+ debug.exceptionInfo = e.getInfo();
+ shouldLog = true;
}
- currentOp.ensureStarted();
- currentOp.done();
- debug.executionTime = currentOp.totalTimeMillis();
+ }
+ currentOp.ensureStarted();
+ currentOp.done();
+ debug.executionTime = currentOp.totalTimeMillis();
- logThreshold += currentOp.getExpectedLatencyMs();
+ logThreshold += currentOp.getExpectedLatencyMs();
- if ( shouldLog || debug.executionTime > logThreshold ) {
- Locker::LockerInfo lockerInfo;
- txn->lockState()->getLockerInfo(&lockerInfo);
+ if (shouldLog || debug.executionTime > logThreshold) {
+ Locker::LockerInfo lockerInfo;
+ txn->lockState()->getLockerInfo(&lockerInfo);
- MONGO_LOG_COMPONENT(0, responseComponent) << debug.report(currentOp, lockerInfo.stats);
- }
+ MONGO_LOG_COMPONENT(0, responseComponent) << debug.report(currentOp, lockerInfo.stats);
+ }
- if (currentOp.shouldDBProfile(debug.executionTime)) {
- // Performance profiling is on
- if (txn->lockState()->isReadLocked()) {
- MONGO_LOG_COMPONENT(1, responseComponent)
- << "note: not profiling because recursive read lock";
- }
- else if (lockedForWriting()) {
- MONGO_LOG_COMPONENT(1, responseComponent)
- << "note: not profiling because doing fsync+lock";
- }
- else {
- profile(txn, op);
- }
+ if (currentOp.shouldDBProfile(debug.executionTime)) {
+ // Performance profiling is on
+ if (txn->lockState()->isReadLocked()) {
+ MONGO_LOG_COMPONENT(1, responseComponent)
+ << "note: not profiling because recursive read lock";
+ } else if (lockedForWriting()) {
+ MONGO_LOG_COMPONENT(1, responseComponent)
+ << "note: not profiling because doing fsync+lock";
+ } else {
+ profile(txn, op);
}
-
- recordCurOpMetrics(txn);
- debug.reset();
}
- void receivedKillCursors(OperationContext* txn, Message& m) {
- LastError::get(txn->getClient()).disable();
- DbMessage dbmessage(m);
- int n = dbmessage.pullInt();
+ recordCurOpMetrics(txn);
+ debug.reset();
+}
- uassert( 13659 , "sent 0 cursors to kill" , n != 0 );
- massert( 13658 , str::stream() << "bad kill cursors size: " << m.dataSize() , m.dataSize() == 8 + ( 8 * n ) );
- uassert( 13004 , str::stream() << "sent negative cursors to kill: " << n , n >= 1 );
+void receivedKillCursors(OperationContext* txn, Message& m) {
+ LastError::get(txn->getClient()).disable();
+ DbMessage dbmessage(m);
+ int n = dbmessage.pullInt();
- if ( n > 2000 ) {
- ( n < 30000 ? warning() : error() ) << "receivedKillCursors, n=" << n << endl;
- verify( n < 30000 );
- }
+ uassert(13659, "sent 0 cursors to kill", n != 0);
+ massert(13658,
+ str::stream() << "bad kill cursors size: " << m.dataSize(),
+ m.dataSize() == 8 + (8 * n));
+ uassert(13004, str::stream() << "sent negative cursors to kill: " << n, n >= 1);
- const char* cursorArray = dbmessage.getArray(n);
+ if (n > 2000) {
+ (n < 30000 ? warning() : error()) << "receivedKillCursors, n=" << n << endl;
+ verify(n < 30000);
+ }
- int found = CursorManager::eraseCursorGlobalIfAuthorized(txn, n, cursorArray);
+ const char* cursorArray = dbmessage.getArray(n);
- if ( shouldLog(logger::LogSeverity::Debug(1)) || found != n ) {
- LOG( found == n ? 1 : 0 ) << "killcursors: found " << found << " of " << n << endl;
- }
+ int found = CursorManager::eraseCursorGlobalIfAuthorized(txn, n, cursorArray);
+ if (shouldLog(logger::LogSeverity::Debug(1)) || found != n) {
+ LOG(found == n ? 1 : 0) << "killcursors: found " << found << " of " << n << endl;
+ }
+}
+
+void receivedUpdate(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op) {
+ DbMessage d(m);
+ uassertStatusOK(userAllowedWriteNS(nsString));
+ op.debug().ns = nsString.ns();
+ int flags = d.pullInt();
+ BSONObj query = d.nextJsObj();
+
+ verify(d.moreJSObjs());
+ verify(query.objsize() < m.header().dataLen());
+ BSONObj toupdate = d.nextJsObj();
+ uassert(10055, "update object too large", toupdate.objsize() <= BSONObjMaxUserSize);
+ verify(toupdate.objsize() < m.header().dataLen());
+ verify(query.objsize() + toupdate.objsize() < m.header().dataLen());
+ bool upsert = flags & UpdateOption_Upsert;
+ bool multi = flags & UpdateOption_Multi;
+ bool broadcast = flags & UpdateOption_Broadcast;
+
+ Status status = AuthorizationSession::get(txn->getClient())
+ ->checkAuthForUpdate(nsString, query, toupdate, upsert);
+ audit::logUpdateAuthzCheck(
+ txn->getClient(), nsString, query, toupdate, upsert, multi, status.code());
+ uassertStatusOK(status);
+
+ op.debug().query = query;
+ {
+ stdx::lock_guard<Client> lk(*txn->getClient());
+ op.setQuery_inlock(query);
}
- void receivedUpdate(OperationContext* txn,
- const NamespaceString& nsString,
- Message& m,
- CurOp& op) {
- DbMessage d(m);
- uassertStatusOK(userAllowedWriteNS(nsString));
- op.debug().ns = nsString.ns();
- int flags = d.pullInt();
- BSONObj query = d.nextJsObj();
-
- verify( d.moreJSObjs() );
- verify( query.objsize() < m.header().dataLen() );
- BSONObj toupdate = d.nextJsObj();
- uassert( 10055 , "update object too large", toupdate.objsize() <= BSONObjMaxUserSize);
- verify( toupdate.objsize() < m.header().dataLen() );
- verify( query.objsize() + toupdate.objsize() < m.header().dataLen() );
- bool upsert = flags & UpdateOption_Upsert;
- bool multi = flags & UpdateOption_Multi;
- bool broadcast = flags & UpdateOption_Broadcast;
-
- Status status = AuthorizationSession::get(txn->getClient())->checkAuthForUpdate(nsString,
- query,
- toupdate,
- upsert);
- audit::logUpdateAuthzCheck(txn->getClient(), nsString, query, toupdate, upsert, multi,
- status.code());
- uassertStatusOK(status);
-
- op.debug().query = query;
- {
- stdx::lock_guard<Client> lk(*txn->getClient());
- op.setQuery_inlock(query);
- }
+ UpdateRequest request(nsString);
+ request.setUpsert(upsert);
+ request.setMulti(multi);
+ request.setQuery(query);
+ request.setUpdates(toupdate);
+ UpdateLifecycleImpl updateLifecycle(broadcast, nsString);
+ request.setLifecycle(&updateLifecycle);
- UpdateRequest request(nsString);
- request.setUpsert(upsert);
- request.setMulti(multi);
- request.setQuery(query);
- request.setUpdates(toupdate);
- UpdateLifecycleImpl updateLifecycle(broadcast, nsString);
- request.setLifecycle(&updateLifecycle);
-
- request.setYieldPolicy(PlanExecutor::YIELD_AUTO);
-
- int attempt = 1;
- while ( 1 ) {
- try {
- ParsedUpdate parsedUpdate(txn, &request);
- uassertStatusOK(parsedUpdate.parseRequest());
-
- // Tentatively take an intent lock, fix up if we need to create the collection
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_IX);
- if (dbHolder().get(txn, nsString.db()) == NULL) {
- // If DB doesn't exist, don't implicitly create it in OldClientContext
- break;
- }
- Lock::CollectionLock collLock(txn->lockState(),
- nsString.ns(),
- parsedUpdate.isIsolated() ? MODE_X : MODE_IX);
- OldClientContext ctx(txn, nsString);
-
- // The common case: no implicit collection creation
- if (!upsert || ctx.db()->getCollection(nsString) != NULL) {
- PlanExecutor* rawExec;
- uassertStatusOK(getExecutorUpdate(txn,
- ctx.db()->getCollection(nsString),
- &parsedUpdate,
- &op.debug(),
- &rawExec));
- std::unique_ptr<PlanExecutor> exec(rawExec);
-
- // Run the plan and get stats out.
- uassertStatusOK(exec->executePlan());
- UpdateResult res = UpdateStage::makeUpdateResult(exec.get(), &op.debug());
-
- // for getlasterror
- LastError::get(txn->getClient()).recordUpdate(
- res.existing, res.numMatched, res.upserted);
- return;
- }
- break;
- }
- catch ( const WriteConflictException& dle ) {
- op.debug().writeConflicts++;
- if ( multi ) {
- log(LogComponent::kWrite) << "Had WriteConflict during multi update, aborting";
- throw;
- }
- WriteConflictException::logAndBackoff( attempt++, "update", nsString.toString() );
- }
- }
+ request.setYieldPolicy(PlanExecutor::YIELD_AUTO);
- // This is an upsert into a non-existing database, so need an exclusive lock
- // to avoid deadlock
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ int attempt = 1;
+ while (1) {
+ try {
ParsedUpdate parsedUpdate(txn, &request);
uassertStatusOK(parsedUpdate.parseRequest());
+ // Tentatively take an intent lock, fix up if we need to create the collection
ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_X);
- OldClientContext ctx(txn, nsString);
- uassert(ErrorCodes::NotMaster,
- str::stream() << "Not primary while performing update on " << nsString.ns(),
- repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nsString));
-
- Database* db = ctx.db();
- if (db->getCollection(nsString)) {
- // someone else beat us to it, that's ok
- // we might race while we unlock if someone drops
- // but that's ok, we'll just do nothing and error out
- }
- else {
- WriteUnitOfWork wuow(txn);
- uassertStatusOK(userCreateNS(txn, db, nsString.ns(), BSONObj()));
- wuow.commit();
+ Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_IX);
+ if (dbHolder().get(txn, nsString.db()) == NULL) {
+ // If DB doesn't exist, don't implicitly create it in OldClientContext
+ break;
}
+ Lock::CollectionLock collLock(
+ txn->lockState(), nsString.ns(), parsedUpdate.isIsolated() ? MODE_X : MODE_IX);
+ OldClientContext ctx(txn, nsString);
- PlanExecutor* rawExec;
- uassertStatusOK(getExecutorUpdate(txn,
- ctx.db()->getCollection(nsString),
- &parsedUpdate,
- &op.debug(),
- &rawExec));
- std::unique_ptr<PlanExecutor> exec(rawExec);
+ // The common case: no implicit collection creation
+ if (!upsert || ctx.db()->getCollection(nsString) != NULL) {
+ PlanExecutor* rawExec;
+ uassertStatusOK(getExecutorUpdate(
+ txn, ctx.db()->getCollection(nsString), &parsedUpdate, &op.debug(), &rawExec));
+ std::unique_ptr<PlanExecutor> exec(rawExec);
- // Run the plan and get stats out.
- uassertStatusOK(exec->executePlan());
- UpdateResult res = UpdateStage::makeUpdateResult(exec.get(), &op.debug());
+ // Run the plan and get stats out.
+ uassertStatusOK(exec->executePlan());
+ UpdateResult res = UpdateStage::makeUpdateResult(exec.get(), &op.debug());
- LastError::get(txn->getClient()).recordUpdate(
- res.existing, res.numMatched, res.upserted);
- } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "update", nsString.ns());
+ // for getlasterror
+ LastError::get(txn->getClient())
+ .recordUpdate(res.existing, res.numMatched, res.upserted);
+ return;
+ }
+ break;
+ } catch (const WriteConflictException& dle) {
+ op.debug().writeConflicts++;
+ if (multi) {
+ log(LogComponent::kWrite) << "Had WriteConflict during multi update, aborting";
+ throw;
+ }
+ WriteConflictException::logAndBackoff(attempt++, "update", nsString.toString());
+ }
}
- void receivedDelete(OperationContext* txn,
- const NamespaceString& nsString,
- Message& m,
- CurOp& op) {
- DbMessage d(m);
- uassertStatusOK(userAllowedWriteNS(nsString));
-
- op.debug().ns = nsString.ns();
- int flags = d.pullInt();
- bool justOne = flags & RemoveOption_JustOne;
- verify( d.moreJSObjs() );
- BSONObj pattern = d.nextJsObj();
-
- Status status = AuthorizationSession::get(txn->getClient())->checkAuthForDelete(nsString,
- pattern);
- audit::logDeleteAuthzCheck(txn->getClient(), nsString, pattern, status.code());
- uassertStatusOK(status);
+ // This is an upsert into a non-existing database, so need an exclusive lock
+ // to avoid deadlock
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ ParsedUpdate parsedUpdate(txn, &request);
+ uassertStatusOK(parsedUpdate.parseRequest());
- op.debug().query = pattern;
- {
- stdx::lock_guard<Client> lk(*txn->getClient());
- op.setQuery_inlock(pattern);
- }
+ ScopedTransaction transaction(txn, MODE_IX);
+ Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_X);
+ OldClientContext ctx(txn, nsString);
+ uassert(ErrorCodes::NotMaster,
+ str::stream() << "Not primary while performing update on " << nsString.ns(),
+ repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nsString));
- DeleteRequest request(nsString);
- request.setQuery(pattern);
- request.setMulti(!justOne);
+ Database* db = ctx.db();
+ if (db->getCollection(nsString)) {
+ // someone else beat us to it, that's ok
+ // we might race while we unlock if someone drops
+ // but that's ok, we'll just do nothing and error out
+ } else {
+ WriteUnitOfWork wuow(txn);
+ uassertStatusOK(userCreateNS(txn, db, nsString.ns(), BSONObj()));
+ wuow.commit();
+ }
- request.setYieldPolicy(PlanExecutor::YIELD_AUTO);
+ PlanExecutor* rawExec;
+ uassertStatusOK(getExecutorUpdate(
+ txn, ctx.db()->getCollection(nsString), &parsedUpdate, &op.debug(), &rawExec));
+ std::unique_ptr<PlanExecutor> exec(rawExec);
- int attempt = 1;
- while ( 1 ) {
- try {
- ParsedDelete parsedDelete(txn, &request);
- uassertStatusOK(parsedDelete.parseRequest());
+ // Run the plan and get stats out.
+ uassertStatusOK(exec->executePlan());
+ UpdateResult res = UpdateStage::makeUpdateResult(exec.get(), &op.debug());
- ScopedTransaction scopedXact(txn, MODE_IX);
- AutoGetDb autoDb(txn, nsString.db(), MODE_IX);
- if (!autoDb.getDb()) {
- break;
- }
+ LastError::get(txn->getClient()).recordUpdate(res.existing, res.numMatched, res.upserted);
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "update", nsString.ns());
+}
+
+void receivedDelete(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op) {
+ DbMessage d(m);
+ uassertStatusOK(userAllowedWriteNS(nsString));
+
+ op.debug().ns = nsString.ns();
+ int flags = d.pullInt();
+ bool justOne = flags & RemoveOption_JustOne;
+ verify(d.moreJSObjs());
+ BSONObj pattern = d.nextJsObj();
+
+ Status status =
+ AuthorizationSession::get(txn->getClient())->checkAuthForDelete(nsString, pattern);
+ audit::logDeleteAuthzCheck(txn->getClient(), nsString, pattern, status.code());
+ uassertStatusOK(status);
+
+ op.debug().query = pattern;
+ {
+ stdx::lock_guard<Client> lk(*txn->getClient());
+ op.setQuery_inlock(pattern);
+ }
- Lock::CollectionLock collLock(txn->lockState(),
- nsString.ns(),
- parsedDelete.isIsolated() ? MODE_X : MODE_IX);
- OldClientContext ctx(txn, nsString);
+ DeleteRequest request(nsString);
+ request.setQuery(pattern);
+ request.setMulti(!justOne);
- PlanExecutor* rawExec;
- uassertStatusOK(getExecutorDelete(txn,
- ctx.db()->getCollection(nsString),
- &parsedDelete,
- &rawExec));
- std::unique_ptr<PlanExecutor> exec(rawExec);
+ request.setYieldPolicy(PlanExecutor::YIELD_AUTO);
- // Run the plan and get the number of docs deleted.
- uassertStatusOK(exec->executePlan());
- long long n = DeleteStage::getNumDeleted(exec.get());
- LastError::get(txn->getClient()).recordDelete(n);
- op.debug().ndeleted = n;
+ int attempt = 1;
+ while (1) {
+ try {
+ ParsedDelete parsedDelete(txn, &request);
+ uassertStatusOK(parsedDelete.parseRequest());
+ ScopedTransaction scopedXact(txn, MODE_IX);
+ AutoGetDb autoDb(txn, nsString.db(), MODE_IX);
+ if (!autoDb.getDb()) {
break;
}
- catch ( const WriteConflictException& dle ) {
- op.debug().writeConflicts++;
- WriteConflictException::logAndBackoff( attempt++, "delete", nsString.toString() );
- }
+
+ Lock::CollectionLock collLock(
+ txn->lockState(), nsString.ns(), parsedDelete.isIsolated() ? MODE_X : MODE_IX);
+ OldClientContext ctx(txn, nsString);
+
+ PlanExecutor* rawExec;
+ uassertStatusOK(
+ getExecutorDelete(txn, ctx.db()->getCollection(nsString), &parsedDelete, &rawExec));
+ std::unique_ptr<PlanExecutor> exec(rawExec);
+
+ // Run the plan and get the number of docs deleted.
+ uassertStatusOK(exec->executePlan());
+ long long n = DeleteStage::getNumDeleted(exec.get());
+ LastError::get(txn->getClient()).recordDelete(n);
+ op.debug().ndeleted = n;
+
+ break;
+ } catch (const WriteConflictException& dle) {
+ op.debug().writeConflicts++;
+ WriteConflictException::logAndBackoff(attempt++, "delete", nsString.toString());
}
}
+}
+
+QueryResult::View emptyMoreResult(long long);
+
+bool receivedGetMore(OperationContext* txn, DbResponse& dbresponse, Message& m, CurOp& curop) {
+ bool ok = true;
+
+ DbMessage d(m);
+
+ const char* ns = d.getns();
+ int ntoreturn = d.pullInt();
+ long long cursorid = d.pullInt64();
+
+ curop.debug().ns = ns;
+ curop.debug().ntoreturn = ntoreturn;
+ curop.debug().cursorid = cursorid;
+
+ unique_ptr<AssertionException> ex;
+ unique_ptr<Timer> timer;
+ int pass = 0;
+ bool exhaust = false;
+ QueryResult::View msgdata = 0;
+ Timestamp last;
+ while (1) {
+ bool isCursorAuthorized = false;
+ try {
+ const NamespaceString nsString(ns);
+ uassert(16258, str::stream() << "Invalid ns [" << ns << "]", nsString.isValid());
- QueryResult::View emptyMoreResult(long long);
-
- bool receivedGetMore(OperationContext* txn,
- DbResponse& dbresponse,
- Message& m,
- CurOp& curop) {
- bool ok = true;
-
- DbMessage d(m);
-
- const char *ns = d.getns();
- int ntoreturn = d.pullInt();
- long long cursorid = d.pullInt64();
-
- curop.debug().ns = ns;
- curop.debug().ntoreturn = ntoreturn;
- curop.debug().cursorid = cursorid;
-
- unique_ptr<AssertionException> ex;
- unique_ptr<Timer> timer;
- int pass = 0;
- bool exhaust = false;
- QueryResult::View msgdata = 0;
- Timestamp last;
- while( 1 ) {
- bool isCursorAuthorized = false;
- try {
- const NamespaceString nsString( ns );
- uassert( 16258, str::stream() << "Invalid ns [" << ns << "]", nsString.isValid() );
-
- Status status = AuthorizationSession::get(txn->getClient())->checkAuthForGetMore(
- nsString, cursorid);
- audit::logGetMoreAuthzCheck(txn->getClient(), nsString, cursorid, status.code());
- uassertStatusOK(status);
-
- if (str::startsWith(ns, "local.oplog.")){
- while (MONGO_FAIL_POINT(rsStopGetMore)) {
- sleepmillis(0);
- }
-
- if (pass == 0) {
- last = getLastSetTimestamp();
- }
- else {
- repl::waitUpToOneSecondForTimestampChange(last);
- }
+ Status status = AuthorizationSession::get(txn->getClient())
+ ->checkAuthForGetMore(nsString, cursorid);
+ audit::logGetMoreAuthzCheck(txn->getClient(), nsString, cursorid, status.code());
+ uassertStatusOK(status);
+
+ if (str::startsWith(ns, "local.oplog.")) {
+ while (MONGO_FAIL_POINT(rsStopGetMore)) {
+ sleepmillis(0);
}
- msgdata = getMore(txn,
- ns,
- ntoreturn,
- cursorid,
- pass,
- exhaust,
- &isCursorAuthorized);
- }
- catch ( AssertionException& e ) {
- if ( isCursorAuthorized ) {
- // If a cursor with id 'cursorid' was authorized, it may have been advanced
- // before an exception terminated processGetMore. Erase the ClientCursor
- // because it may now be out of sync with the client's iteration state.
- // SERVER-7952
- // TODO Temporary code, see SERVER-4563 for a cleanup overview.
- CursorManager::eraseCursorGlobal(txn, cursorid );
+ if (pass == 0) {
+ last = getLastSetTimestamp();
+ } else {
+ repl::waitUpToOneSecondForTimestampChange(last);
}
- ex.reset( new AssertionException( e.getInfo().msg, e.getCode() ) );
- ok = false;
- break;
}
-
- if (msgdata.view2ptr() == 0) {
- // this should only happen with QueryOption_AwaitData
- exhaust = false;
- massert(13073, "shutting down", !inShutdown() );
- if ( ! timer ) {
- timer.reset( new Timer() );
- }
- else {
- if ( timer->seconds() >= 4 ) {
- // after about 4 seconds, return. pass stops at 1000 normally.
- // we want to return occasionally so slave can checkpoint.
- pass = 10000;
- }
- }
- pass++;
- if (kDebugBuild)
- sleepmillis(20);
- else
- sleepmillis(2);
-
- // note: the 1100 is beacuse of the waitForDifferent above
- // should eventually clean this up a bit
- curop.setExpectedLatencyMs( 1100 + timer->millis() );
-
- continue;
+
+ msgdata = getMore(txn, ns, ntoreturn, cursorid, pass, exhaust, &isCursorAuthorized);
+ } catch (AssertionException& e) {
+ if (isCursorAuthorized) {
+ // If a cursor with id 'cursorid' was authorized, it may have been advanced
+ // before an exception terminated processGetMore. Erase the ClientCursor
+ // because it may now be out of sync with the client's iteration state.
+ // SERVER-7952
+ // TODO Temporary code, see SERVER-4563 for a cleanup overview.
+ CursorManager::eraseCursorGlobal(txn, cursorid);
}
+ ex.reset(new AssertionException(e.getInfo().msg, e.getCode()));
+ ok = false;
break;
- };
+ }
- if (ex) {
- BSONObjBuilder err;
- ex->getInfo().append( err );
- BSONObj errObj = err.done();
+ if (msgdata.view2ptr() == 0) {
+ // this should only happen with QueryOption_AwaitData
+ exhaust = false;
+ massert(13073, "shutting down", !inShutdown());
+ if (!timer) {
+ timer.reset(new Timer());
+ } else {
+ if (timer->seconds() >= 4) {
+ // after about 4 seconds, return. pass stops at 1000 normally.
+ // we want to return occasionally so slave can checkpoint.
+ pass = 10000;
+ }
+ }
+ pass++;
+ if (kDebugBuild)
+ sleepmillis(20);
+ else
+ sleepmillis(2);
- curop.debug().exceptionInfo = ex->getInfo();
+ // note: the 1100 is beacuse of the waitForDifferent above
+ // should eventually clean this up a bit
+ curop.setExpectedLatencyMs(1100 + timer->millis());
- replyToQuery(ResultFlag_ErrSet, m, dbresponse, errObj);
- curop.debug().responseLength = dbresponse.response->header().dataLen();
- curop.debug().nreturned = 1;
- return ok;
+ continue;
}
+ break;
+ };
- Message *resp = new Message();
- resp->setData(msgdata.view2ptr(), true);
- curop.debug().responseLength = resp->header().dataLen();
- curop.debug().nreturned = msgdata.getNReturned();
-
- dbresponse.response = resp;
- dbresponse.responseTo = m.header().getId();
+ if (ex) {
+ BSONObjBuilder err;
+ ex->getInfo().append(err);
+ BSONObj errObj = err.done();
- if( exhaust ) {
- curop.debug().exhaust = true;
- dbresponse.exhaustNS = ns;
- }
+ curop.debug().exceptionInfo = ex->getInfo();
+ replyToQuery(ResultFlag_ErrSet, m, dbresponse, errObj);
+ curop.debug().responseLength = dbresponse.response->header().dataLen();
+ curop.debug().nreturned = 1;
return ok;
}
- void checkAndInsert(OperationContext* txn,
- OldClientContext& ctx,
- const char *ns,
- /*modifies*/BSONObj& js) {
-
- StatusWith<BSONObj> fixed = fixDocumentForInsert( js );
- uassertStatusOK( fixed.getStatus() );
- if ( !fixed.getValue().isEmpty() )
- js = fixed.getValue();
-
- int attempt = 0;
- while ( true ) {
- try {
- WriteUnitOfWork wunit(txn);
- Collection* collection = ctx.db()->getCollection( ns );
- if ( !collection ) {
- collection = ctx.db()->createCollection( txn, ns );
- verify( collection );
- }
+ Message* resp = new Message();
+ resp->setData(msgdata.view2ptr(), true);
+ curop.debug().responseLength = resp->header().dataLen();
+ curop.debug().nreturned = msgdata.getNReturned();
- StatusWith<RecordId> status = collection->insertDocument( txn, js, true );
- uassertStatusOK( status.getStatus() );
- wunit.commit();
- break;
- }
- catch( const WriteConflictException& e ) {
- CurOp::get(txn)->debug().writeConflicts++;
- txn->recoveryUnit()->abandonSnapshot();
- WriteConflictException::logAndBackoff( attempt++, "insert", ns);
- }
- }
+ dbresponse.response = resp;
+ dbresponse.responseTo = m.header().getId();
+
+ if (exhaust) {
+ curop.debug().exhaust = true;
+ dbresponse.exhaustNS = ns;
}
- NOINLINE_DECL void insertMulti(OperationContext* txn,
- OldClientContext& ctx,
- bool keepGoing,
- const char *ns,
- vector<BSONObj>& objs,
- CurOp& op) {
- size_t i;
- for (i=0; i<objs.size(); i++){
- try {
- checkAndInsert(txn, ctx, ns, objs[i]);
+ return ok;
+}
+
+void checkAndInsert(OperationContext* txn,
+ OldClientContext& ctx,
+ const char* ns,
+ /*modifies*/ BSONObj& js) {
+ StatusWith<BSONObj> fixed = fixDocumentForInsert(js);
+ uassertStatusOK(fixed.getStatus());
+ if (!fixed.getValue().isEmpty())
+ js = fixed.getValue();
+
+ int attempt = 0;
+ while (true) {
+ try {
+ WriteUnitOfWork wunit(txn);
+ Collection* collection = ctx.db()->getCollection(ns);
+ if (!collection) {
+ collection = ctx.db()->createCollection(txn, ns);
+ verify(collection);
}
- catch (const UserException& ex) {
- if (!keepGoing || i == objs.size()-1){
- globalOpCounters.incInsertInWriteLock(i);
- throw;
- }
- LastError::get(txn->getClient()).setLastError(ex.getCode(), ex.getInfo().msg);
- // otherwise ignore and keep going
+
+ StatusWith<RecordId> status = collection->insertDocument(txn, js, true);
+ uassertStatusOK(status.getStatus());
+ wunit.commit();
+ break;
+ } catch (const WriteConflictException& e) {
+ CurOp::get(txn)->debug().writeConflicts++;
+ txn->recoveryUnit()->abandonSnapshot();
+ WriteConflictException::logAndBackoff(attempt++, "insert", ns);
+ }
+ }
+}
+
+NOINLINE_DECL void insertMulti(OperationContext* txn,
+ OldClientContext& ctx,
+ bool keepGoing,
+ const char* ns,
+ vector<BSONObj>& objs,
+ CurOp& op) {
+ size_t i;
+ for (i = 0; i < objs.size(); i++) {
+ try {
+ checkAndInsert(txn, ctx, ns, objs[i]);
+ } catch (const UserException& ex) {
+ if (!keepGoing || i == objs.size() - 1) {
+ globalOpCounters.incInsertInWriteLock(i);
+ throw;
}
+ LastError::get(txn->getClient()).setLastError(ex.getCode(), ex.getInfo().msg);
+ // otherwise ignore and keep going
}
-
- globalOpCounters.incInsertInWriteLock(i);
- op.debug().ninserted = i;
}
- static void convertSystemIndexInsertsToCommands(
- DbMessage& d,
- BSONArrayBuilder* allCmdsBuilder) {
- while (d.moreJSObjs()) {
- BSONObj spec = d.nextJsObj();
- BSONElement indexNsElement = spec["ns"];
- uassert(ErrorCodes::NoSuchKey,
- str::stream() << "Missing \"ns\" field while inserting into " << d.getns(),
- !indexNsElement.eoo());
- uassert(ErrorCodes::TypeMismatch,
- str::stream() << "Expected \"ns\" field to have type String, not " <<
- typeName(indexNsElement.type()) << " while inserting into " << d.getns(),
- indexNsElement.type() == String);
- const StringData nsToIndex(indexNsElement.valueStringData());
- BSONObjBuilder cmdObjBuilder(allCmdsBuilder->subobjStart());
- cmdObjBuilder << "createIndexes" << nsToCollectionSubstring(nsToIndex);
- BSONArrayBuilder specArrayBuilder(cmdObjBuilder.subarrayStart("indexes"));
- while (true) {
- BSONObjBuilder specBuilder(specArrayBuilder.subobjStart());
- BSONElement specNsElement = spec["ns"];
- if ((specNsElement.type() != String) ||
- (specNsElement.valueStringData() != nsToIndex)) {
-
- break;
- }
- for (BSONObjIterator iter(spec); iter.more();) {
- BSONElement element = iter.next();
- if (element.fieldNameStringData() != "ns") {
- specBuilder.append(element);
- }
- }
- if (!d.moreJSObjs()) {
- break;
+ globalOpCounters.incInsertInWriteLock(i);
+ op.debug().ninserted = i;
+}
+
+static void convertSystemIndexInsertsToCommands(DbMessage& d, BSONArrayBuilder* allCmdsBuilder) {
+ while (d.moreJSObjs()) {
+ BSONObj spec = d.nextJsObj();
+ BSONElement indexNsElement = spec["ns"];
+ uassert(ErrorCodes::NoSuchKey,
+ str::stream() << "Missing \"ns\" field while inserting into " << d.getns(),
+ !indexNsElement.eoo());
+ uassert(ErrorCodes::TypeMismatch,
+ str::stream() << "Expected \"ns\" field to have type String, not "
+ << typeName(indexNsElement.type()) << " while inserting into "
+ << d.getns(),
+ indexNsElement.type() == String);
+ const StringData nsToIndex(indexNsElement.valueStringData());
+ BSONObjBuilder cmdObjBuilder(allCmdsBuilder->subobjStart());
+ cmdObjBuilder << "createIndexes" << nsToCollectionSubstring(nsToIndex);
+ BSONArrayBuilder specArrayBuilder(cmdObjBuilder.subarrayStart("indexes"));
+ while (true) {
+ BSONObjBuilder specBuilder(specArrayBuilder.subobjStart());
+ BSONElement specNsElement = spec["ns"];
+ if ((specNsElement.type() != String) ||
+ (specNsElement.valueStringData() != nsToIndex)) {
+ break;
+ }
+ for (BSONObjIterator iter(spec); iter.more();) {
+ BSONElement element = iter.next();
+ if (element.fieldNameStringData() != "ns") {
+ specBuilder.append(element);
}
- spec = d.nextJsObj();
}
+ if (!d.moreJSObjs()) {
+ break;
+ }
+ spec = d.nextJsObj();
}
}
-
- static void insertSystemIndexes(OperationContext* txn, DbMessage& d, CurOp& curOp) {
- BSONArrayBuilder allCmdsBuilder;
+}
+
+static void insertSystemIndexes(OperationContext* txn, DbMessage& d, CurOp& curOp) {
+ BSONArrayBuilder allCmdsBuilder;
+ try {
+ convertSystemIndexInsertsToCommands(d, &allCmdsBuilder);
+ } catch (const DBException& ex) {
+ LastError::get(txn->getClient()).setLastError(ex.getCode(), ex.getInfo().msg);
+ curOp.debug().exceptionInfo = ex.getInfo();
+ return;
+ }
+ BSONArray allCmds(allCmdsBuilder.done());
+ Command* createIndexesCmd = Command::findCommand("createIndexes");
+ invariant(createIndexesCmd);
+ const bool keepGoing = d.reservedField() & InsertOption_ContinueOnError;
+ for (BSONObjIterator iter(allCmds); iter.more();) {
try {
- convertSystemIndexInsertsToCommands(d, &allCmdsBuilder);
- }
- catch (const DBException& ex) {
+ BSONObj cmdObj = iter.next().Obj();
+
+ rpc::LegacyRequestBuilder requestBuilder{};
+ auto indexNs = NamespaceString(d.getns());
+ auto cmdRequestMsg = requestBuilder.setDatabase(indexNs.db())
+ .setCommandName("createIndexes")
+ .setMetadata(rpc::makeEmptyMetadata())
+ .setCommandArgs(cmdObj)
+ .done();
+ rpc::LegacyRequest cmdRequest{cmdRequestMsg.get()};
+ rpc::LegacyReplyBuilder cmdReplyBuilder{};
+ Command::execCommand(txn, createIndexesCmd, cmdRequest, &cmdReplyBuilder);
+ auto cmdReplyMsg = cmdReplyBuilder.done();
+ rpc::LegacyReply cmdReply{cmdReplyMsg.get()};
+ uassertStatusOK(Command::getStatusFromCommandResult(cmdReply.getCommandReply()));
+ } catch (const DBException& ex) {
LastError::get(txn->getClient()).setLastError(ex.getCode(), ex.getInfo().msg);
curOp.debug().exceptionInfo = ex.getInfo();
- return;
- }
- BSONArray allCmds(allCmdsBuilder.done());
- Command* createIndexesCmd = Command::findCommand("createIndexes");
- invariant(createIndexesCmd);
- const bool keepGoing = d.reservedField() & InsertOption_ContinueOnError;
- for (BSONObjIterator iter(allCmds); iter.more();) {
- try {
- BSONObj cmdObj = iter.next().Obj();
-
- rpc::LegacyRequestBuilder requestBuilder{};
- auto indexNs = NamespaceString(d.getns());
- auto cmdRequestMsg = requestBuilder.setDatabase(indexNs.db())
- .setCommandName("createIndexes")
- .setMetadata(rpc::makeEmptyMetadata())
- .setCommandArgs(cmdObj).done();
- rpc::LegacyRequest cmdRequest{cmdRequestMsg.get()};
- rpc::LegacyReplyBuilder cmdReplyBuilder{};
- Command::execCommand(txn,
- createIndexesCmd,
- cmdRequest,
- &cmdReplyBuilder);
- auto cmdReplyMsg = cmdReplyBuilder.done();
- rpc::LegacyReply cmdReply{cmdReplyMsg.get()};
- uassertStatusOK(Command::getStatusFromCommandResult(cmdReply.getCommandReply()));
- }
- catch (const DBException& ex) {
- LastError::get(txn->getClient()).setLastError(ex.getCode(), ex.getInfo().msg);
- curOp.debug().exceptionInfo = ex.getInfo();
- if (!keepGoing) {
- return;
- }
+ if (!keepGoing) {
+ return;
}
}
}
+}
+
+void receivedInsert(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op) {
+ DbMessage d(m);
+ const char* ns = d.getns();
+ op.debug().ns = ns;
+ uassertStatusOK(userAllowedWriteNS(nsString.ns()));
+ if (nsString.isSystemDotIndexes()) {
+ insertSystemIndexes(txn, d, op);
+ return;
+ }
- void receivedInsert(OperationContext* txn,
- const NamespaceString& nsString,
- Message& m,
- CurOp& op) {
- DbMessage d(m);
- const char* ns = d.getns();
- op.debug().ns = ns;
- uassertStatusOK(userAllowedWriteNS(nsString.ns()));
- if (nsString.isSystemDotIndexes()) {
- insertSystemIndexes(txn, d, op);
- return;
- }
-
- if( !d.moreJSObjs() ) {
- // strange. should we complain?
- return;
- }
-
- vector<BSONObj> multi;
- while (d.moreJSObjs()){
- BSONObj obj = d.nextJsObj();
- multi.push_back(obj);
+ if (!d.moreJSObjs()) {
+ // strange. should we complain?
+ return;
+ }
- // Check auth for insert (also handles checking if this is an index build and checks
- // for the proper privileges in that case).
- Status status = AuthorizationSession::get(txn->getClient())->checkAuthForInsert(nsString, obj);
- audit::logInsertAuthzCheck(txn->getClient(), nsString, obj, status.code());
- uassertStatusOK(status);
- }
+ vector<BSONObj> multi;
+ while (d.moreJSObjs()) {
+ BSONObj obj = d.nextJsObj();
+ multi.push_back(obj);
- const int notMasterCodeForInsert = 10058; // This is different from ErrorCodes::NotMaster
- {
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_IX);
- Lock::CollectionLock collLock(txn->lockState(), nsString.ns(), MODE_IX);
-
- // CONCURRENCY TODO: is being read locked in big log sufficient here?
- // writelock is used to synchronize stepdowns w/ writes
- uassert(notMasterCodeForInsert, "not master",
- repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nsString));
-
- // OldClientContext may implicitly create a database, so check existence
- if (dbHolder().get(txn, nsString.db()) != NULL) {
- OldClientContext ctx(txn, ns);
- if (ctx.db()->getCollection(nsString)) {
- if (multi.size() > 1) {
- const bool keepGoing = d.reservedField() & InsertOption_ContinueOnError;
- insertMulti(txn, ctx, keepGoing, ns, multi, op);
- }
- else {
- checkAndInsert(txn, ctx, ns, multi[0]);
- globalOpCounters.incInsertInWriteLock(1);
- op.debug().ninserted = 1;
- }
- return;
- }
- }
- }
+ // Check auth for insert (also handles checking if this is an index build and checks
+ // for the proper privileges in that case).
+ Status status =
+ AuthorizationSession::get(txn->getClient())->checkAuthForInsert(nsString, obj);
+ audit::logInsertAuthzCheck(txn->getClient(), nsString, obj, status.code());
+ uassertStatusOK(status);
+ }
- // Collection didn't exist so try again with MODE_X
+ const int notMasterCodeForInsert = 10058; // This is different from ErrorCodes::NotMaster
+ {
ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_X);
+ Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_IX);
+ Lock::CollectionLock collLock(txn->lockState(), nsString.ns(), MODE_IX);
// CONCURRENCY TODO: is being read locked in big log sufficient here?
// writelock is used to synchronize stepdowns w/ writes
- uassert(notMasterCodeForInsert, "not master",
+ uassert(notMasterCodeForInsert,
+ "not master",
repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nsString));
- OldClientContext ctx(txn, ns);
-
- if (multi.size() > 1) {
- const bool keepGoing = d.reservedField() & InsertOption_ContinueOnError;
- insertMulti(txn, ctx, keepGoing, ns, multi, op);
- } else {
- checkAndInsert(txn, ctx, ns, multi[0]);
- globalOpCounters.incInsertInWriteLock(1);
- op.debug().ninserted = 1;
+ // OldClientContext may implicitly create a database, so check existence
+ if (dbHolder().get(txn, nsString.db()) != NULL) {
+ OldClientContext ctx(txn, ns);
+ if (ctx.db()->getCollection(nsString)) {
+ if (multi.size() > 1) {
+ const bool keepGoing = d.reservedField() & InsertOption_ContinueOnError;
+ insertMulti(txn, ctx, keepGoing, ns, multi, op);
+ } else {
+ checkAndInsert(txn, ctx, ns, multi[0]);
+ globalOpCounters.incInsertInWriteLock(1);
+ op.debug().ninserted = 1;
+ }
+ return;
+ }
}
}
- static AtomicUInt32 shutdownInProgress(0);
+ // Collection didn't exist so try again with MODE_X
+ ScopedTransaction transaction(txn, MODE_IX);
+ Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_X);
- bool inShutdown() {
- return shutdownInProgress.loadRelaxed() != 0;
- }
+ // CONCURRENCY TODO: is being read locked in big log sufficient here?
+ // writelock is used to synchronize stepdowns w/ writes
+ uassert(notMasterCodeForInsert,
+ "not master",
+ repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nsString));
+
+ OldClientContext ctx(txn, ns);
- bool inShutdownStrict() {
- return shutdownInProgress.load() != 0;
+ if (multi.size() > 1) {
+ const bool keepGoing = d.reservedField() & InsertOption_ContinueOnError;
+ insertMulti(txn, ctx, keepGoing, ns, multi, op);
+ } else {
+ checkAndInsert(txn, ctx, ns, multi[0]);
+ globalOpCounters.incInsertInWriteLock(1);
+ op.debug().ninserted = 1;
}
+}
- static void shutdownServer() {
- log(LogComponent::kNetwork) << "shutdown: going to close listening sockets..." << endl;
- ListeningSockets::get()->closeAll();
+static AtomicUInt32 shutdownInProgress(0);
- log(LogComponent::kNetwork) << "shutdown: going to flush diaglog..." << endl;
- _diaglog.flush();
+bool inShutdown() {
+ return shutdownInProgress.loadRelaxed() != 0;
+}
- /* must do this before unmapping mem or you may get a seg fault */
- log(LogComponent::kNetwork) << "shutdown: going to close sockets..." << endl;
- stdx::thread close_socket_thread( stdx::bind(MessagingPort::closeAllSockets, 0) );
+bool inShutdownStrict() {
+ return shutdownInProgress.load() != 0;
+}
- getGlobalServiceContext()->shutdownGlobalStorageEngineCleanly();
- }
+static void shutdownServer() {
+ log(LogComponent::kNetwork) << "shutdown: going to close listening sockets..." << endl;
+ ListeningSockets::get()->closeAll();
- // shutdownLock
- //
- // Protects:
- // Ensures shutdown is single threaded.
- // Lock Ordering:
- // No restrictions
- stdx::mutex shutdownLock;
-
- void signalShutdown() {
- // Notify all threads shutdown has started
- shutdownInProgress.fetchAndAdd(1);
- }
+ log(LogComponent::kNetwork) << "shutdown: going to flush diaglog..." << endl;
+ _diaglog.flush();
- void exitCleanly(ExitCode code) {
- // Notify all threads shutdown has started
- shutdownInProgress.fetchAndAdd(1);
+ /* must do this before unmapping mem or you may get a seg fault */
+ log(LogComponent::kNetwork) << "shutdown: going to close sockets..." << endl;
+ stdx::thread close_socket_thread(stdx::bind(MessagingPort::closeAllSockets, 0));
- // Grab the shutdown lock to prevent concurrent callers
- stdx::lock_guard<stdx::mutex> lockguard(shutdownLock);
+ getGlobalServiceContext()->shutdownGlobalStorageEngineCleanly();
+}
- // Global storage engine may not be started in all cases before we exit
- if (getGlobalServiceContext()->getGlobalStorageEngine() == NULL) {
- dbexit(code); // returns only under a windows service
- invariant(code == EXIT_WINDOWS_SERVICE_STOP);
- return;
- }
+// shutdownLock
+//
+// Protects:
+// Ensures shutdown is single threaded.
+// Lock Ordering:
+// No restrictions
+stdx::mutex shutdownLock;
- getGlobalServiceContext()->setKillAllOperations();
+void signalShutdown() {
+ // Notify all threads shutdown has started
+ shutdownInProgress.fetchAndAdd(1);
+}
- repl::getGlobalReplicationCoordinator()->shutdown();
- auto catalogMgr = grid.catalogManager();
- if (catalogMgr) {
- catalogMgr->shutDown();
- }
+void exitCleanly(ExitCode code) {
+ // Notify all threads shutdown has started
+ shutdownInProgress.fetchAndAdd(1);
- // We should always be able to acquire the global lock at shutdown.
- //
- // TODO: This call chain uses the locker directly, because we do not want to start an
- // operation context, which also instantiates a recovery unit. Also, using the
- // lockGlobalBegin/lockGlobalComplete sequence, we avoid taking the flush lock. This will
- // all go away if we start acquiring the global/flush lock as part of ScopedTransaction.
- //
- // For a Windows service, dbexit does not call exit(), so we must leak the lock outside
- // of this function to prevent any operations from running that need a lock.
- //
- DefaultLockerImpl* globalLocker = new DefaultLockerImpl();
- LockResult result = globalLocker->lockGlobalBegin(MODE_X);
- if (result == LOCK_WAITING) {
- result = globalLocker->lockGlobalComplete(UINT_MAX);
- }
+ // Grab the shutdown lock to prevent concurrent callers
+ stdx::lock_guard<stdx::mutex> lockguard(shutdownLock);
- invariant(LOCK_OK == result);
+ // Global storage engine may not be started in all cases before we exit
+ if (getGlobalServiceContext()->getGlobalStorageEngine() == NULL) {
+ dbexit(code); // returns only under a windows service
+ invariant(code == EXIT_WINDOWS_SERVICE_STOP);
+ return;
+ }
- log(LogComponent::kControl) << "now exiting" << endl;
+ getGlobalServiceContext()->setKillAllOperations();
- // Execute the graceful shutdown tasks, such as flushing the outstanding journal
- // and data files, close sockets, etc.
- try {
- shutdownServer();
- }
- catch (const DBException& ex) {
- severe() << "shutdown failed with DBException " << ex;
- std::terminate();
- }
- catch (const std::exception& ex) {
- severe() << "shutdown failed with std::exception: " << ex.what();
- std::terminate();
- }
- catch (...) {
- severe() << "shutdown failed with exception";
- std::terminate();
- }
+ repl::getGlobalReplicationCoordinator()->shutdown();
+ auto catalogMgr = grid.catalogManager();
+ if (catalogMgr) {
+ catalogMgr->shutDown();
+ }
+
+ // We should always be able to acquire the global lock at shutdown.
+ //
+ // TODO: This call chain uses the locker directly, because we do not want to start an
+ // operation context, which also instantiates a recovery unit. Also, using the
+ // lockGlobalBegin/lockGlobalComplete sequence, we avoid taking the flush lock. This will
+ // all go away if we start acquiring the global/flush lock as part of ScopedTransaction.
+ //
+ // For a Windows service, dbexit does not call exit(), so we must leak the lock outside
+ // of this function to prevent any operations from running that need a lock.
+ //
+ DefaultLockerImpl* globalLocker = new DefaultLockerImpl();
+ LockResult result = globalLocker->lockGlobalBegin(MODE_X);
+ if (result == LOCK_WAITING) {
+ result = globalLocker->lockGlobalComplete(UINT_MAX);
+ }
- dbexit( code );
+ invariant(LOCK_OK == result);
+
+ log(LogComponent::kControl) << "now exiting" << endl;
+
+ // Execute the graceful shutdown tasks, such as flushing the outstanding journal
+ // and data files, close sockets, etc.
+ try {
+ shutdownServer();
+ } catch (const DBException& ex) {
+ severe() << "shutdown failed with DBException " << ex;
+ std::terminate();
+ } catch (const std::exception& ex) {
+ severe() << "shutdown failed with std::exception: " << ex.what();
+ std::terminate();
+ } catch (...) {
+ severe() << "shutdown failed with exception";
+ std::terminate();
}
- NOINLINE_DECL void dbexit( ExitCode rc, const char *why ) {
- audit::logShutdown(&cc());
+ dbexit(code);
+}
+
+NOINLINE_DECL void dbexit(ExitCode rc, const char* why) {
+ audit::logShutdown(&cc());
- log(LogComponent::kControl) << "dbexit: " << why << " rc: " << rc;
+ log(LogComponent::kControl) << "dbexit: " << why << " rc: " << rc;
#ifdef _WIN32
- // Windows Service Controller wants to be told when we are down,
- // so don't call quickExit() yet, or say "really exiting now"
- //
- if ( rc == EXIT_WINDOWS_SERVICE_STOP ) {
- return;
- }
+ // Windows Service Controller wants to be told when we are down,
+ // so don't call quickExit() yet, or say "really exiting now"
+ //
+ if (rc == EXIT_WINDOWS_SERVICE_STOP) {
+ return;
+ }
#endif
- quickExit(rc);
+ quickExit(rc);
+}
+
+// ----- BEGIN Diaglog -----
+DiagLog::DiagLog() : f(0), level(0) {}
+
+void DiagLog::openFile() {
+ verify(f == 0);
+ stringstream ss;
+ ss << storageGlobalParams.dbpath << "/diaglog." << hex << time(0);
+ string name = ss.str();
+ f = new ofstream(name.c_str(), ios::out | ios::binary);
+ if (!f->good()) {
+ log() << "diagLogging couldn't open " << name << endl;
+ // todo what is this? :
+ throw 1717;
+ } else {
+ log() << "diagLogging using file " << name << endl;
}
-
- // ----- BEGIN Diaglog -----
- DiagLog::DiagLog() : f(0), level(0) {}
-
- void DiagLog::openFile() {
- verify( f == 0 );
- stringstream ss;
- ss << storageGlobalParams.dbpath << "/diaglog." << hex << time(0);
- string name = ss.str();
- f = new ofstream(name.c_str(), ios::out | ios::binary);
- if ( ! f->good() ) {
- log() << "diagLogging couldn't open " << name << endl;
- // todo what is this? :
- throw 1717;
- }
- else {
- log() << "diagLogging using file " << name << endl;
- }
+}
+
+int DiagLog::setLevel(int newLevel) {
+ stdx::lock_guard<stdx::mutex> lk(mutex);
+ int old = level;
+ log() << "diagLogging level=" << newLevel << endl;
+ if (f == 0) {
+ openFile();
}
+ level = newLevel; // must be done AFTER f is set
+ return old;
+}
- int DiagLog::setLevel( int newLevel ) {
+void DiagLog::flush() {
+ if (level) {
+ log() << "flushing diag log" << endl;
stdx::lock_guard<stdx::mutex> lk(mutex);
- int old = level;
- log() << "diagLogging level=" << newLevel << endl;
- if( f == 0 ) {
- openFile();
- }
- level = newLevel; // must be done AFTER f is set
- return old;
+ f->flush();
}
+}
- void DiagLog::flush() {
- if ( level ) {
- log() << "flushing diag log" << endl;
- stdx::lock_guard<stdx::mutex> lk(mutex);
- f->flush();
- }
+void DiagLog::writeop(char* data, int len) {
+ if (level & 1) {
+ stdx::lock_guard<stdx::mutex> lk(mutex);
+ f->write(data, len);
}
+}
- void DiagLog::writeop(char *data,int len) {
- if ( level & 1 ) {
+void DiagLog::readop(char* data, int len) {
+ if (level & 2) {
+ bool log = (level & 4) == 0;
+ OCCASIONALLY log = true;
+ if (log) {
stdx::lock_guard<stdx::mutex> lk(mutex);
- f->write(data,len);
- }
- }
-
- void DiagLog::readop(char *data, int len) {
- if ( level & 2 ) {
- bool log = (level & 4) == 0;
- OCCASIONALLY log = true;
- if ( log ) {
- stdx::lock_guard<stdx::mutex> lk(mutex);
- verify( f );
- f->write(data,len);
- }
+ verify(f);
+ f->write(data, len);
}
}
+}
- DiagLog _diaglog;
+DiagLog _diaglog;
- // ----- END Diaglog -----
+// ----- END Diaglog -----
-} // namespace mongo
+} // namespace mongo