path: root/src/mongo/db/instance.cpp
diff options
Diffstat (limited to 'src/mongo/db/instance.cpp')
1 files changed, 172 insertions, 675 deletions
diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp
index c71fc2c814d..e9de5d5b127 100644
--- a/src/mongo/db/instance.cpp
+++ b/src/mongo/db/instance.cpp
@@ -40,10 +40,8 @@
#include "mongo/db/auth/authorization_manager.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/auth/authz_manager_external_state_d.h"
-#include "mongo/db/background.h"
-#include "mongo/db/catalog/index_create.h"
#include "mongo/db/client.h"
-#include "mongo/db/clientcursor.h"
+#include "mongo/db/catalog/cursor_manager.h"
#include "mongo/db/commands.h"
#include "mongo/db/commands/fsync.h"
#include "mongo/db/concurrency/d_concurrency.h"
@@ -53,10 +51,7 @@
#include "mongo/db/db.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
-#include "mongo/db/dbhelpers.h"
#include "mongo/db/dbmessage.h"
-#include "mongo/db/exec/delete.h"
-#include "mongo/db/exec/update.h"
#include "mongo/db/ftdc/ftdc_mongod.h"
#include "mongo/db/global_timestamp.h"
#include "mongo/db/instance.h"
@@ -67,22 +62,11 @@
#include "mongo/db/mongod_options.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/op_observer.h"
-#include "mongo/db/ops/delete_request.h"
-#include "mongo/db/ops/insert.h"
-#include "mongo/db/ops/parsed_delete.h"
-#include "mongo/db/ops/parsed_update.h"
-#include "mongo/db/ops/update_driver.h"
-#include "mongo/db/ops/update_lifecycle_impl.h"
-#include "mongo/db/ops/update_request.h"
+#include "mongo/db/ops/write_ops_exec.h"
#include "mongo/db/ops/write_ops_parsers.h"
#include "mongo/db/query/find.h"
#include "mongo/db/query/get_executor.h"
-#include "mongo/db/query/plan_summary_stats.h"
-#include "mongo/db/repl/oplog.h"
-#include "mongo/db/repl/repl_client_info.h"
-#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/run_commands.h"
-#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/sharded_connection_info.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/service_context.h"
@@ -93,7 +77,6 @@
#include "mongo/platform/process_id.h"
#include "mongo/rpc/command_reply_builder.h"
#include "mongo/rpc/command_request.h"
-#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/legacy_reply.h"
#include "mongo/rpc/legacy_reply_builder.h"
#include "mongo/rpc/legacy_request.h"
@@ -115,7 +98,6 @@
#include "mongo/util/time_support.h"
namespace mongo {
using logger::LogComponent;
using std::endl;
using std::hex;
@@ -126,6 +108,12 @@ using std::stringstream;
using std::unique_ptr;
using std::vector;
+string dbExecCommand;
+namespace {
// for diaglog
inline void opread(Message& m) {
if (_diaglog.getLevel() & 2) {
@@ -139,25 +127,6 @@ inline void opwrite(Message& m) {
-void receivedKillCursors(OperationContext* txn, Message& m);
-void receivedUpdate(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op);
-void receivedDelete(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op);
-void receivedInsert(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op);
-bool receivedGetMore(OperationContext* txn, DbResponse& dbresponse, Message& m, CurOp& curop);
-int nloggedsome = 0;
-#define LOGWITHRATELIMIT if (++nloggedsome < 1000 || nloggedsome % 100 == 0)
-string dbExecCommand;
-namespace {
unique_ptr<AuthzManagerExternalState> createAuthzManagerExternalStateMongod() {
return stdx::make_unique<AuthzManagerExternalStateMongod>();
@@ -231,13 +200,11 @@ void beginCommandOp(OperationContext* txn, const NamespaceString& nss, const BSO
-} // namespace
-static void receivedCommand(OperationContext* txn,
- const NamespaceString& nss,
- Client& client,
- DbResponse& dbResponse,
- Message& message) {
+void receivedCommand(OperationContext* txn,
+ const NamespaceString& nss,
+ Client& client,
+ DbResponse& dbResponse,
+ Message& message) {
const int32_t responseToMsgId = message.header().getId();
@@ -282,8 +249,6 @@ static void receivedCommand(OperationContext* txn,
dbResponse.responseToMsgId = responseToMsgId;
-namespace {
void receivedRpc(OperationContext* txn, Client& client, DbResponse& dbResponse, Message& message) {
invariant(message.operation() == dbCommand);
@@ -381,14 +346,13 @@ void receivedPseudoCommand(OperationContext* txn,
receivedCommand(txn, interposedNss, client, dbResponse, interposed);
-} // namespace
-static void receivedQuery(OperationContext* txn,
- const NamespaceString& nss,
- Client& c,
- DbResponse& dbResponse,
- Message& m) {
+void receivedQuery(OperationContext* txn,
+ const NamespaceString& nss,
+ Client& c,
+ DbResponse& dbResponse,
+ Message& m) {
+ globalOpCounters.gotQuery();
int32_t responseToMsgId = m.header().getId();
@@ -420,6 +384,152 @@ static void receivedQuery(OperationContext* txn,
dbResponse.responseToMsgId = responseToMsgId;
+void receivedKillCursors(OperationContext* txn, Message& m) {
+ LastError::get(txn->getClient()).disable();
+ DbMessage dbmessage(m);
+ int n = dbmessage.pullInt();
+ uassert(13659, "sent 0 cursors to kill", n != 0);
+ massert(13658,
+ str::stream() << "bad kill cursors size: " << m.dataSize(),
+ m.dataSize() == 8 + (8 * n));
+ uassert(13004, str::stream() << "sent negative cursors to kill: " << n, n >= 1);
+ if (n > 2000) {
+ (n < 30000 ? warning() : error()) << "receivedKillCursors, n=" << n << endl;
+ verify(n < 30000);
+ }
+ const char* cursorArray = dbmessage.getArray(n);
+ int found = CursorManager::eraseCursorGlobalIfAuthorized(txn, n, cursorArray);
+ if (shouldLog(logger::LogSeverity::Debug(1)) || found != n) {
+ LOG(found == n ? 1 : 0) << "killcursors: found " << found << " of " << n << endl;
+ }
+void receivedInsert(OperationContext* txn, const NamespaceString& nsString, Message& m) {
+ auto insertOp = parseLegacyInsert(m);
+ invariant(insertOp.ns == nsString);
+ for (const auto& obj : insertOp.documents) {
+ Status status =
+ AuthorizationSession::get(txn->getClient())->checkAuthForInsert(nsString, obj);
+ audit::logInsertAuthzCheck(txn->getClient(), nsString, obj, status.code());
+ uassertStatusOK(status);
+ }
+ performInserts(txn, insertOp);
+void receivedUpdate(OperationContext* txn, const NamespaceString& nsString, Message& m) {
+ auto updateOp = parseLegacyUpdate(m);
+ auto& singleUpdate = updateOp.updates[0];
+ invariant(updateOp.ns == nsString);
+ Status status = AuthorizationSession::get(txn->getClient())
+ ->checkAuthForUpdate(
+ nsString, singleUpdate.query, singleUpdate.update, singleUpdate.upsert);
+ audit::logUpdateAuthzCheck(txn->getClient(),
+ nsString,
+ singleUpdate.query,
+ singleUpdate.update,
+ singleUpdate.upsert,
+ singleUpdate.multi,
+ status.code());
+ uassertStatusOK(status);
+ performUpdates(txn, updateOp);
+void receivedDelete(OperationContext* txn, const NamespaceString& nsString, Message& m) {
+ auto deleteOp = parseLegacyDelete(m);
+ auto& singleDelete = deleteOp.deletes[0];
+ invariant(deleteOp.ns == nsString);
+ Status status = AuthorizationSession::get(txn->getClient())
+ ->checkAuthForDelete(nsString, singleDelete.query);
+ audit::logDeleteAuthzCheck(txn->getClient(), nsString, singleDelete.query, status.code());
+ uassertStatusOK(status);
+ performDeletes(txn, deleteOp);
+bool receivedGetMore(OperationContext* txn, DbResponse& dbresponse, Message& m, CurOp& curop) {
+ globalOpCounters.gotGetMore();
+ DbMessage d(m);
+ const char* ns = d.getns();
+ int ntoreturn = d.pullInt();
+ uassert(
+ 34419, str::stream() << "Invalid ntoreturn for OP_GET_MORE: " << ntoreturn, ntoreturn >= 0);
+ long long cursorid = d.pullInt64();
+ curop.debug().ntoreturn = ntoreturn;
+ curop.debug().cursorid = cursorid;
+ {
+ stdx::lock_guard<Client>(*txn->getClient());
+ CurOp::get(txn)->setNS_inlock(ns);
+ }
+ bool exhaust = false;
+ QueryResult::View msgdata = 0;
+ bool isCursorAuthorized = false;
+ try {
+ const NamespaceString nsString(ns);
+ uassert(ErrorCodes::InvalidNamespace,
+ str::stream() << "Invalid ns [" << ns << "]",
+ nsString.isValid());
+ Status status = AuthorizationSession::get(txn->getClient())
+ ->checkAuthForGetMore(nsString, cursorid, false);
+ audit::logGetMoreAuthzCheck(txn->getClient(), nsString, cursorid, status.code());
+ uassertStatusOK(status);
+ while (MONGO_FAIL_POINT(rsStopGetMore)) {
+ sleepmillis(0);
+ }
+ msgdata = getMore(txn, ns, ntoreturn, cursorid, &exhaust, &isCursorAuthorized);
+ } catch (AssertionException& e) {
+ if (isCursorAuthorized) {
+ // If a cursor with id 'cursorid' was authorized, it may have been advanced
+ // before an exception terminated processGetMore. Erase the ClientCursor
+ // because it may now be out of sync with the client's iteration state.
+ // SERVER-7952
+ // TODO Temporary code, see SERVER-4563 for a cleanup overview.
+ CursorManager::eraseCursorGlobal(txn, cursorid);
+ }
+ BSONObjBuilder err;
+ e.getInfo().append(err);
+ BSONObj errObj = err.done();
+ curop.debug().exceptionInfo = e.getInfo();
+ replyToQuery(ResultFlag_ErrSet, m, dbresponse, errObj);
+ curop.debug().responseLength = dbresponse.response.header().dataLen();
+ curop.debug().nreturned = 1;
+ return false;
+ }
+ dbresponse.response.setData(msgdata.view2ptr(), true);
+ curop.debug().responseLength = dbresponse.response.header().dataLen();
+ curop.debug().nreturned = msgdata.getNReturned();
+ dbresponse.responseToMsgId = m.header().getId();
+ if (exhaust) {
+ curop.debug().exhaust = true;
+ dbresponse.exhaustNS = ns;
+ }
+ return true;
+} // namespace
// Mongod on win32 defines a value for this function. In all other executables it is NULL.
void (*reportEventToSystem)(const char* msg) = 0;
@@ -442,7 +552,9 @@ void assembleResponse(OperationContext* txn,
DbMessage dbmsg(m);
Client& c = *txn->getClient();
- if (!c.isInDirectClient()) {
+ if (c.isInDirectClient()) {
+ invariant(!txn->lockState()->inAWriteUnitOfWork());
+ } else {
@@ -486,33 +598,6 @@ void assembleResponse(OperationContext* txn,
- // Increment op counters.
- switch (op) {
- case dbQuery:
- if (!isCommand) {
- globalOpCounters.gotQuery();
- } else {
- // Command counting is deferred, since it is not known yet whether the command
- // needs counting.
- }
- break;
- case dbGetMore:
- globalOpCounters.gotGetMore();
- break;
- case dbInsert:
- // Insert counting is deferred, since it is not known yet whether the insert contains
- // multiple documents (each of which needs to be counted).
- break;
- case dbUpdate:
- globalOpCounters.gotUpdate();
- break;
- case dbDelete:
- globalOpCounters.gotDelete();
- break;
- default:
- break;
- }
CurOp& currentOp = *CurOp::get(txn);
stdx::lock_guard<Client> lk(*txn->getClient());
@@ -553,10 +638,8 @@ void assembleResponse(OperationContext* txn,
dbresponse.responseToMsgId = m.header().getId();
} else {
+ // The remaining operations do not return any response. They are fire-and-forget.
try {
- // The following operations all require authorization.
- // dbInsert, dbUpdate and dbDelete can be easily pre-authorized,
- // here, but dbKillCursors cannot.
if (op == dbKillCursors) {
logThreshold = 10;
@@ -579,11 +662,11 @@ void assembleResponse(OperationContext* txn,
if (!nsString.isValid()) {
uassert(16257, str::stream() << "Invalid ns [" << ns << "]", false);
} else if (op == dbInsert) {
- receivedInsert(txn, nsString, m, currentOp);
+ receivedInsert(txn, nsString, m);
} else if (op == dbUpdate) {
- receivedUpdate(txn, nsString, m, currentOp);
+ receivedUpdate(txn, nsString, m);
} else if (op == dbDelete) {
- receivedDelete(txn, nsString, m, currentOp);
+ receivedDelete(txn, nsString, m);
} else {
@@ -630,592 +713,6 @@ void assembleResponse(OperationContext* txn,
-void receivedKillCursors(OperationContext* txn, Message& m) {
- LastError::get(txn->getClient()).disable();
- DbMessage dbmessage(m);
- int n = dbmessage.pullInt();
- uassert(13659, "sent 0 cursors to kill", n != 0);
- massert(13658,
- str::stream() << "bad kill cursors size: " << m.dataSize(),
- m.dataSize() == 8 + (8 * n));
- uassert(13004, str::stream() << "sent negative cursors to kill: " << n, n >= 1);
- if (n > 2000) {
- (n < 30000 ? warning() : error()) << "receivedKillCursors, n=" << n << endl;
- verify(n < 30000);
- }
- const char* cursorArray = dbmessage.getArray(n);
- int found = CursorManager::eraseCursorGlobalIfAuthorized(txn, n, cursorArray);
- if (shouldLog(logger::LogSeverity::Debug(1)) || found != n) {
- LOG(found == n ? 1 : 0) << "killcursors: found " << found << " of " << n << endl;
- }
-void receivedUpdate(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op) {
- uassertStatusOK(userAllowedWriteNS(nsString));
- auto client = txn->getClient();
- auto lastOpAtOperationStart = repl::ReplClientInfo::forClient(client).getLastOp();
- ScopeGuard lastOpSetterGuard = MakeObjGuard(repl::ReplClientInfo::forClient(client),
- &repl::ReplClientInfo::setLastOpToSystemLastOpTime,
- txn);
- auto updateOp = parseLegacyUpdate(m);
- auto& singleUpdate = updateOp.updates[0];
- uassert(10055, "update object too large", singleUpdate.update.objsize() <= BSONObjMaxUserSize);
- op.debug().query = singleUpdate.query;
- {
- stdx::lock_guard<Client> lk(*client);
- op.setNS_inlock(nsString.ns());
- op.setQuery_inlock(singleUpdate.query);
- }
- Status status = AuthorizationSession::get(client)->checkAuthForUpdate(
- nsString, singleUpdate.query, singleUpdate.update, singleUpdate.upsert);
- audit::logUpdateAuthzCheck(client,
- nsString,
- singleUpdate.query,
- singleUpdate.update,
- singleUpdate.upsert,
- singleUpdate.multi,
- status.code());
- uassertStatusOK(status);
- UpdateRequest request(nsString);
- request.setUpsert(singleUpdate.upsert);
- request.setMulti(singleUpdate.multi);
- request.setQuery(singleUpdate.query);
- request.setUpdates(singleUpdate.update);
- UpdateLifecycleImpl updateLifecycle(nsString);
- request.setLifecycle(&updateLifecycle);
- request.setYieldPolicy(PlanExecutor::YIELD_AUTO);
- int attempt = 1;
- while (1) {
- try {
- ParsedUpdate parsedUpdate(txn, &request);
- uassertStatusOK(parsedUpdate.parseRequest());
- // Tentatively take an intent lock, fix up if we need to create the collection
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_IX);
- if (dbHolder().get(txn, nsString.db()) == NULL) {
- // If DB doesn't exist, don't implicitly create it in OldClientContext
- break;
- }
- Lock::CollectionLock collLock(
- txn->lockState(), nsString.ns(), parsedUpdate.isIsolated() ? MODE_X : MODE_IX);
- OldClientContext ctx(txn, nsString.ns());
- auto collection = ctx.db()->getCollection(nsString);
- // The common case: no implicit collection creation
- if (!singleUpdate.upsert || collection != NULL) {
- unique_ptr<PlanExecutor> exec =
- uassertStatusOK(getExecutorUpdate(txn, &op.debug(), collection, &parsedUpdate));
- // Run the plan and get stats out.
- uassertStatusOK(exec->executePlan());
- PlanSummaryStats summary;
- Explain::getSummaryStats(*exec, &summary);
- if (collection) {
- collection->infoCache()->notifyOfQuery(txn, summary.indexesUsed);
- }
- const UpdateStats* updateStats = UpdateStage::getUpdateStats(exec.get());
- UpdateStage::recordUpdateStatsInOpDebug(updateStats, &op.debug());
- op.debug().setPlanSummaryMetrics(summary);
- UpdateResult res = UpdateStage::makeUpdateResult(updateStats);
- // for getlasterror
- size_t nMatchedOrInserted = res.upserted.isEmpty() ? res.numMatched : 1U;
- LastError::get(client).recordUpdate(res.existing, nMatchedOrInserted, res.upserted);
- if (repl::ReplClientInfo::forClient(client).getLastOp() != lastOpAtOperationStart) {
- // If this operation has already generated a new lastOp, don't bother setting it
- // here. No-op updates will not generate a new lastOp, so we still need the
- // guard to fire in that case.
- lastOpSetterGuard.Dismiss();
- }
- return;
- }
- break;
- } catch (const WriteConflictException& dle) {
- op.debug().writeConflicts++;
- if (singleUpdate.multi) {
- log(LogComponent::kWrite) << "Had WriteConflict during multi update, aborting";
- throw;
- }
- WriteConflictException::logAndBackoff(attempt++, "update", nsString.toString());
- }
- }
- // This is an upsert into a non-existing database, so need an exclusive lock
- // to avoid deadlock
- ParsedUpdate parsedUpdate(txn, &request);
- uassertStatusOK(parsedUpdate.parseRequest());
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_X);
- OldClientContext ctx(txn, nsString.ns());
- uassert(ErrorCodes::NotMaster,
- str::stream() << "Not primary while performing update on " << nsString.ns(),
- repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nsString));
- Database* db = ctx.db();
- if (db->getCollection(nsString)) {
- // someone else beat us to it, that's ok
- // we might race while we unlock if someone drops
- // but that's ok, we'll just do nothing and error out
- } else {
- WriteUnitOfWork wuow(txn);
- uassertStatusOK(userCreateNS(txn, db, nsString.ns(), BSONObj()));
- wuow.commit();
- }
- auto collection = ctx.db()->getCollection(nsString);
- invariant(collection);
- unique_ptr<PlanExecutor> exec =
- uassertStatusOK(getExecutorUpdate(txn, &op.debug(), collection, &parsedUpdate));
- // Run the plan and get stats out.
- uassertStatusOK(exec->executePlan());
- PlanSummaryStats summary;
- Explain::getSummaryStats(*exec, &summary);
- collection->infoCache()->notifyOfQuery(txn, summary.indexesUsed);
- const UpdateStats* updateStats = UpdateStage::getUpdateStats(exec.get());
- UpdateStage::recordUpdateStatsInOpDebug(updateStats, &op.debug());
- op.debug().setPlanSummaryMetrics(summary);
- UpdateResult res = UpdateStage::makeUpdateResult(updateStats);
- size_t nMatchedOrInserted = res.upserted.isEmpty() ? res.numMatched : 1U;
- LastError::get(client).recordUpdate(res.existing, nMatchedOrInserted, res.upserted);
- if (repl::ReplClientInfo::forClient(client).getLastOp() != lastOpAtOperationStart) {
- // If this operation has already generated a new lastOp, don't bother setting it
- // here. No-op updates will not generate a new lastOp, so we still need the
- // guard to fire in that case.
- lastOpSetterGuard.Dismiss();
- }
- }
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "update", nsString.ns());
-void receivedDelete(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op) {
- uassertStatusOK(userAllowedWriteNS(nsString));
- auto deleteOp = parseLegacyDelete(m);
- auto& singleDelete = deleteOp.deletes[0];
- auto client = txn->getClient();
- auto lastOpAtOperationStart = repl::ReplClientInfo::forClient(client).getLastOp();
- ScopeGuard lastOpSetterGuard = MakeObjGuard(repl::ReplClientInfo::forClient(client),
- &repl::ReplClientInfo::setLastOpToSystemLastOpTime,
- txn);
- op.debug().query = singleDelete.query;
- {
- stdx::lock_guard<Client> lk(*client);
- op.setQuery_inlock(singleDelete.query);
- op.setNS_inlock(nsString.ns());
- }
- Status status =
- AuthorizationSession::get(client)->checkAuthForDelete(nsString, singleDelete.query);
- audit::logDeleteAuthzCheck(client, nsString, singleDelete.query, status.code());
- uassertStatusOK(status);
- DeleteRequest request(nsString);
- request.setQuery(singleDelete.query);
- request.setMulti(singleDelete.multi);
- request.setYieldPolicy(PlanExecutor::YIELD_AUTO);
- int attempt = 1;
- while (1) {
- try {
- ParsedDelete parsedDelete(txn, &request);
- uassertStatusOK(parsedDelete.parseRequest());
- ScopedTransaction scopedXact(txn, MODE_IX);
- AutoGetDb autoDb(txn, nsString.db(), MODE_IX);
- if (!autoDb.getDb()) {
- break;
- }
- Lock::CollectionLock collLock(
- txn->lockState(), nsString.ns(), parsedDelete.isIsolated() ? MODE_X : MODE_IX);
- OldClientContext ctx(txn, nsString.ns());
- auto collection = ctx.db()->getCollection(nsString);
- unique_ptr<PlanExecutor> exec =
- uassertStatusOK(getExecutorDelete(txn, &op.debug(), collection, &parsedDelete));
- // Run the plan and get the number of docs deleted.
- uassertStatusOK(exec->executePlan());
- long long n = DeleteStage::getNumDeleted(*exec);
- LastError::get(client).recordDelete(n);
- op.debug().ndeleted = n;
- PlanSummaryStats summary;
- Explain::getSummaryStats(*exec, &summary);
- if (collection) {
- collection->infoCache()->notifyOfQuery(txn, summary.indexesUsed);
- }
- CurOp::get(txn)->debug().setPlanSummaryMetrics(summary);
- if (repl::ReplClientInfo::forClient(client).getLastOp() != lastOpAtOperationStart) {
- // If this operation has already generated a new lastOp, don't bother setting it
- // here. No-op updates will not generate a new lastOp, so we still need the
- // guard to fire in that case.
- lastOpSetterGuard.Dismiss();
- }
- break;
- } catch (const WriteConflictException& dle) {
- op.debug().writeConflicts++;
- WriteConflictException::logAndBackoff(attempt++, "delete", nsString.toString());
- }
- }
-bool receivedGetMore(OperationContext* txn, DbResponse& dbresponse, Message& m, CurOp& curop) {
- DbMessage d(m);
- const char* ns = d.getns();
- int ntoreturn = d.pullInt();
- uassert(
- 34419, str::stream() << "Invalid ntoreturn for OP_GET_MORE: " << ntoreturn, ntoreturn >= 0);
- long long cursorid = d.pullInt64();
- curop.debug().ntoreturn = ntoreturn;
- curop.debug().cursorid = cursorid;
- {
- stdx::lock_guard<Client>(*txn->getClient());
- CurOp::get(txn)->setNS_inlock(ns);
- }
- bool exhaust = false;
- QueryResult::View msgdata = 0;
- bool isCursorAuthorized = false;
- try {
- const NamespaceString nsString(ns);
- uassert(ErrorCodes::InvalidNamespace,
- str::stream() << "Invalid ns [" << ns << "]",
- nsString.isValid());
- Status status = AuthorizationSession::get(txn->getClient())
- ->checkAuthForGetMore(nsString, cursorid, false);
- audit::logGetMoreAuthzCheck(txn->getClient(), nsString, cursorid, status.code());
- uassertStatusOK(status);
- while (MONGO_FAIL_POINT(rsStopGetMore)) {
- sleepmillis(0);
- }
- msgdata = getMore(txn, ns, ntoreturn, cursorid, &exhaust, &isCursorAuthorized);
- } catch (AssertionException& e) {
- if (isCursorAuthorized) {
- // If a cursor with id 'cursorid' was authorized, it may have been advanced
- // before an exception terminated processGetMore. Erase the ClientCursor
- // because it may now be out of sync with the client's iteration state.
- // SERVER-7952
- // TODO Temporary code, see SERVER-4563 for a cleanup overview.
- CursorManager::eraseCursorGlobal(txn, cursorid);
- }
- BSONObjBuilder err;
- e.getInfo().append(err);
- BSONObj errObj = err.done();
- curop.debug().exceptionInfo = e.getInfo();
- replyToQuery(ResultFlag_ErrSet, m, dbresponse, errObj);
- curop.debug().responseLength = dbresponse.response.header().dataLen();
- curop.debug().nreturned = 1;
- return false;
- }
- dbresponse.response.setData(msgdata.view2ptr(), true);
- curop.debug().responseLength = dbresponse.response.header().dataLen();
- curop.debug().nreturned = msgdata.getNReturned();
- dbresponse.responseToMsgId = m.header().getId();
- if (exhaust) {
- curop.debug().exhaust = true;
- dbresponse.exhaustNS = ns;
- }
- return true;
-void insertMultiSingletons(OperationContext* txn,
- OldClientContext& ctx,
- bool keepGoing,
- StringData ns,
- CurOp& op,
- vector<BSONObj>::iterator begin,
- vector<BSONObj>::iterator end) {
- for (vector<BSONObj>::iterator it = begin; it != end; it++) {
- try {
- WriteUnitOfWork wouw(txn);
- Collection* collection = ctx.db()->getCollection(ns);
- if (!collection) {
- collection = ctx.db()->createCollection(txn, ns);
- invariant(collection);
- }
- uassertStatusOK(collection->insertDocument(txn, *it, &op.debug(), true));
- wouw.commit();
- }
- globalOpCounters.incInsertInWriteLock(1);
- op.debug().ninserted++;
- } catch (const UserException& ex) {
- if (!keepGoing)
- throw;
- LastError::get(txn->getClient()).setLastError(ex.getCode(), ex.getInfo().msg);
- }
- }
-void insertMultiVector(OperationContext* txn,
- OldClientContext& ctx,
- bool keepGoing,
- StringData ns,
- CurOp& op,
- vector<BSONObj>::iterator begin,
- vector<BSONObj>::iterator end) {
- try {
- WriteUnitOfWork wunit(txn);
- Collection* collection = ctx.db()->getCollection(ns);
- if (!collection) {
- collection = ctx.db()->createCollection(txn, ns);
- invariant(collection);
- }
- uassertStatusOK(collection->insertDocuments(txn, begin, end, &op.debug(), true, false));
- wunit.commit();
- int inserted = end - begin;
- globalOpCounters.incInsertInWriteLock(inserted);
- op.debug().ninserted = inserted;
- } catch (UserException&) {
- txn->recoveryUnit()->abandonSnapshot();
- insertMultiSingletons(txn, ctx, keepGoing, ns, op, begin, end);
- } catch (WriteConflictException&) {
- CurOp::get(txn)->debug().writeConflicts++;
- txn->recoveryUnit()->abandonSnapshot();
- WriteConflictException::logAndBackoff(0, "insert", ns);
- insertMultiSingletons(txn, ctx, keepGoing, ns, op, begin, end);
- }
-NOINLINE_DECL void insertMulti(OperationContext* txn,
- OldClientContext& ctx,
- const InsertOp& insertOp,
- CurOp& op) {
- std::vector<BSONObj> docs;
- docs.reserve(insertOp.documents.size());
- for (auto&& doc : insertOp.documents) {
- // TODO don't fail yet on invalid documents. They should be treated like other errors.
- BSONObj fixed = uassertStatusOK(fixDocumentForInsert(doc));
- docs.push_back(fixed.isEmpty() ? doc : std::move(fixed));
- }
- vector<BSONObj>::iterator chunkBegin = docs.begin();
- int64_t chunkCount = 0;
- int64_t chunkSize = 0;
- auto client = txn->getClient();
- auto lastOpAtOperationStart = repl::ReplClientInfo::forClient(client).getLastOp();
- ScopeGuard lastOpSetterGuard = MakeObjGuard(repl::ReplClientInfo::forClient(client),
- &repl::ReplClientInfo::setLastOpToSystemLastOpTime,
- txn);
- for (vector<BSONObj>::iterator it = docs.begin(); it != docs.end(); it++) {
- chunkSize += (*it).objsize();
- // Limit chunk size, actual size chosen is a tradeoff: larger sizes are more efficient,
- // but smaller chunk sizes allow yielding to other threads and lower chance of WCEs
- if ((++chunkCount >= internalQueryExecYieldIterations / 2) ||
- (chunkSize >= insertVectorMaxBytes)) {
- if (it == chunkBegin) // there is only one doc to process, so avoid retry on failure
- insertMultiSingletons(
- txn, ctx, insertOp.continueOnError, insertOp.ns.ns(), op, chunkBegin, it + 1);
- else
- insertMultiVector(
- txn, ctx, insertOp.continueOnError, insertOp.ns.ns(), op, chunkBegin, it + 1);
- chunkBegin = it + 1;
- chunkCount = 0;
- chunkSize = 0;
- }
- }
- if (chunkBegin != docs.end())
- insertMultiVector(
- txn, ctx, insertOp.continueOnError, insertOp.ns.ns(), op, chunkBegin, docs.end());
- if (repl::ReplClientInfo::forClient(client).getLastOp() != lastOpAtOperationStart) {
- // If this operation has already generated a new lastOp, don't bother setting it
- // here. No-op inserts will not generate a new lastOp, so we still need the
- // guard to fire in that case.
- lastOpSetterGuard.Dismiss();
- }
-static void convertSystemIndexInsertsToCommands(DbMessage& d, BSONArrayBuilder* allCmdsBuilder) {
- while (d.moreJSObjs()) {
- BSONObj spec = d.nextJsObj();
- BSONElement indexNsElement = spec["ns"];
- uassert(ErrorCodes::NoSuchKey,
- str::stream() << "Missing \"ns\" field while inserting into " << d.getns(),
- !indexNsElement.eoo());
- uassert(ErrorCodes::TypeMismatch,
- str::stream() << "Expected \"ns\" field to have type String, not "
- << typeName(indexNsElement.type()) << " while inserting into "
- << d.getns(),
- indexNsElement.type() == String);
- const StringData nsToIndex(indexNsElement.valueStringData());
- BSONObjBuilder cmdObjBuilder(allCmdsBuilder->subobjStart());
- cmdObjBuilder << "createIndexes" << nsToCollectionSubstring(nsToIndex);
- BSONArrayBuilder specArrayBuilder(cmdObjBuilder.subarrayStart("indexes"));
- while (true) {
- BSONObjBuilder specBuilder(specArrayBuilder.subobjStart());
- BSONElement specNsElement = spec["ns"];
- if ((specNsElement.type() != String) ||
- (specNsElement.valueStringData() != nsToIndex)) {
- break;
- }
- for (BSONObjIterator iter(spec); iter.more();) {
- BSONElement element =;
- if (element.fieldNameStringData() != "ns") {
- specBuilder.append(element);
- }
- }
- if (!d.moreJSObjs()) {
- break;
- }
- spec = d.nextJsObj();
- }
- }
-static void insertSystemIndexes(OperationContext* txn, DbMessage& d, CurOp& curOp) {
- BSONArrayBuilder allCmdsBuilder;
- try {
- convertSystemIndexInsertsToCommands(d, &allCmdsBuilder);
- } catch (const DBException& ex) {
- LastError::get(txn->getClient()).setLastError(ex.getCode(), ex.getInfo().msg);
- curOp.debug().exceptionInfo = ex.getInfo();
- return;
- }
- BSONArray allCmds(allCmdsBuilder.done());
- Command* createIndexesCmd = Command::findCommand("createIndexes");
- invariant(createIndexesCmd);
- const bool keepGoing = d.reservedField() & InsertOption_ContinueOnError;
- for (BSONObjIterator iter(allCmds); iter.more();) {
- try {
- BSONObj cmdObj =;
- rpc::LegacyRequestBuilder requestBuilder{};
- auto indexNs = NamespaceString(d.getns());
- auto cmdRequestMsg = requestBuilder.setDatabase(indexNs.db())
- .setCommandName("createIndexes")
- .setCommandArgs(cmdObj)
- .setMetadata(rpc::makeEmptyMetadata())
- .done();
- rpc::LegacyRequest cmdRequest{&cmdRequestMsg};
- rpc::LegacyReplyBuilder cmdReplyBuilder{};
- Command::execCommand(txn, createIndexesCmd, cmdRequest, &cmdReplyBuilder);
- auto cmdReplyMsg = cmdReplyBuilder.done();
- rpc::LegacyReply cmdReply{&cmdReplyMsg};
- uassertStatusOK(getStatusFromCommandResult(cmdReply.getCommandReply()));
- } catch (const DBException& ex) {
- LastError::get(txn->getClient()).setLastError(ex.getCode(), ex.getInfo().msg);
- curOp.debug().exceptionInfo = ex.getInfo();
- if (!keepGoing) {
- return;
- }
- }
- }
-bool _receivedInsert(OperationContext* txn,
- const NamespaceString& nsString,
- const InsertOp& insertOp,
- CurOp& op,
- bool checkCollection) {
- // CONCURRENCY TODO: is being read locked in big log sufficient here?
- // writelock is used to synchronize stepdowns w/ writes
- uassert(
- 10058, "not master", repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nsString));
- OldClientContext ctx(txn, insertOp.ns.ns());
- if (checkCollection && !ctx.db()->getCollection(nsString))
- return false;
- insertMulti(txn, ctx, insertOp, op);
- return true;
-void receivedInsert(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op) {
- {
- stdx::lock_guard<Client>(*txn->getClient());
- CurOp::get(txn)->setNS_inlock(nsString.ns());
- }
- uassertStatusOK(userAllowedWriteNS(nsString.ns()));
- if (nsString.isSystemDotIndexes()) {
- DbMessage d(m);
- insertSystemIndexes(txn, d, op);
- return;
- }
- auto insertOp = parseLegacyInsert(m);
- for (const auto& obj : insertOp.documents) {
- // Check auth for insert.
- Status status =
- AuthorizationSession::get(txn->getClient())->checkAuthForInsert(nsString, obj);
- audit::logInsertAuthzCheck(txn->getClient(), nsString, obj, status.code());
- uassertStatusOK(status);
- }
- {
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_IX);
- Lock::CollectionLock collLock(txn->lockState(), nsString.ns(), MODE_IX);
- // OldClientContext may implicitly create a database, so check existence
- if (dbHolder().get(txn, nsString.db()) != NULL) {
- if (_receivedInsert(txn, nsString, insertOp, op, true))
- return;
- }
- }
- // Collection didn't exist so try again with MODE_X
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_X);
- _receivedInsert(txn, nsString, insertOp, op, false);
// ----- BEGIN Diaglog -----
DiagLog::DiagLog() : f(0), level(0) {}