diff options
Diffstat (limited to 'src/mongo/db/instance.cpp')
-rw-r--r-- | src/mongo/db/instance.cpp | 847 |
1 files changed, 172 insertions, 675 deletions
diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp index c71fc2c814d..e9de5d5b127 100644 --- a/src/mongo/db/instance.cpp +++ b/src/mongo/db/instance.cpp @@ -40,10 +40,8 @@ #include "mongo/db/auth/authorization_manager.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/auth/authz_manager_external_state_d.h" -#include "mongo/db/background.h" -#include "mongo/db/catalog/index_create.h" #include "mongo/db/client.h" -#include "mongo/db/clientcursor.h" +#include "mongo/db/catalog/cursor_manager.h" #include "mongo/db/commands.h" #include "mongo/db/commands/fsync.h" #include "mongo/db/concurrency/d_concurrency.h" @@ -53,10 +51,7 @@ #include "mongo/db/db.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" -#include "mongo/db/dbhelpers.h" #include "mongo/db/dbmessage.h" -#include "mongo/db/exec/delete.h" -#include "mongo/db/exec/update.h" #include "mongo/db/ftdc/ftdc_mongod.h" #include "mongo/db/global_timestamp.h" #include "mongo/db/instance.h" @@ -67,22 +62,11 @@ #include "mongo/db/mongod_options.h" #include "mongo/db/namespace_string.h" #include "mongo/db/op_observer.h" -#include "mongo/db/ops/delete_request.h" -#include "mongo/db/ops/insert.h" -#include "mongo/db/ops/parsed_delete.h" -#include "mongo/db/ops/parsed_update.h" -#include "mongo/db/ops/update_driver.h" -#include "mongo/db/ops/update_lifecycle_impl.h" -#include "mongo/db/ops/update_request.h" +#include "mongo/db/ops/write_ops_exec.h" #include "mongo/db/ops/write_ops_parsers.h" #include "mongo/db/query/find.h" #include "mongo/db/query/get_executor.h" -#include "mongo/db/query/plan_summary_stats.h" -#include "mongo/db/repl/oplog.h" -#include "mongo/db/repl/repl_client_info.h" -#include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/run_commands.h" -#include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/sharded_connection_info.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/service_context.h" @@ -93,7 +77,6 @@ #include "mongo/platform/process_id.h" #include "mongo/rpc/command_reply_builder.h" #include "mongo/rpc/command_request.h" -#include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/legacy_reply.h" #include "mongo/rpc/legacy_reply_builder.h" #include "mongo/rpc/legacy_request.h" @@ -115,7 +98,6 @@ #include "mongo/util/time_support.h" namespace mongo { - using logger::LogComponent; using std::endl; using std::hex; @@ -126,6 +108,12 @@ using std::stringstream; using std::unique_ptr; using std::vector; +string dbExecCommand; + +MONGO_FP_DECLARE(rsStopGetMore); + +namespace { + // for diaglog inline void opread(Message& m) { if (_diaglog.getLevel() & 2) { @@ -139,25 +127,6 @@ inline void opwrite(Message& m) { } } -void receivedKillCursors(OperationContext* txn, Message& m); - -void receivedUpdate(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op); - -void receivedDelete(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op); - -void receivedInsert(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op); - -bool receivedGetMore(OperationContext* txn, DbResponse& dbresponse, Message& m, CurOp& curop); - -int nloggedsome = 0; -#define LOGWITHRATELIMIT if (++nloggedsome < 1000 || nloggedsome % 100 == 0) - -string dbExecCommand; - -MONGO_FP_DECLARE(rsStopGetMore); - -namespace { - unique_ptr<AuthzManagerExternalState> createAuthzManagerExternalStateMongod() { return stdx::make_unique<AuthzManagerExternalStateMongod>(); } @@ -231,13 +200,11 @@ void beginCommandOp(OperationContext* txn, const NamespaceString& nss, const BSO curop->setNS_inlock(nss.ns()); } -} // namespace - -static void receivedCommand(OperationContext* txn, - const NamespaceString& nss, - Client& client, - DbResponse& dbResponse, - Message& message) { +void receivedCommand(OperationContext* txn, + const NamespaceString& nss, + Client& client, + DbResponse& dbResponse, + Message& message) { invariant(nss.isCommand()); const int32_t responseToMsgId = message.header().getId(); @@ -282,8 +249,6 @@ static void receivedCommand(OperationContext* txn, dbResponse.responseToMsgId = responseToMsgId; } -namespace { - void receivedRpc(OperationContext* txn, Client& client, DbResponse& dbResponse, Message& message) { invariant(message.operation() == dbCommand); @@ -381,14 +346,13 @@ void receivedPseudoCommand(OperationContext* txn, receivedCommand(txn, interposedNss, client, dbResponse, interposed); } -} // namespace - -static void receivedQuery(OperationContext* txn, - const NamespaceString& nss, - Client& c, - DbResponse& dbResponse, - Message& m) { +void receivedQuery(OperationContext* txn, + const NamespaceString& nss, + Client& c, + DbResponse& dbResponse, + Message& m) { invariant(!nss.isCommand()); + globalOpCounters.gotQuery(); int32_t responseToMsgId = m.header().getId(); @@ -420,6 +384,152 @@ static void receivedQuery(OperationContext* txn, dbResponse.responseToMsgId = responseToMsgId; } +void receivedKillCursors(OperationContext* txn, Message& m) { + LastError::get(txn->getClient()).disable(); + DbMessage dbmessage(m); + int n = dbmessage.pullInt(); + + uassert(13659, "sent 0 cursors to kill", n != 0); + massert(13658, + str::stream() << "bad kill cursors size: " << m.dataSize(), + m.dataSize() == 8 + (8 * n)); + uassert(13004, str::stream() << "sent negative cursors to kill: " << n, n >= 1); + + if (n > 2000) { + (n < 30000 ? warning() : error()) << "receivedKillCursors, n=" << n << endl; + verify(n < 30000); + } + + const char* cursorArray = dbmessage.getArray(n); + + int found = CursorManager::eraseCursorGlobalIfAuthorized(txn, n, cursorArray); + + if (shouldLog(logger::LogSeverity::Debug(1)) || found != n) { + LOG(found == n ? 1 : 0) << "killcursors: found " << found << " of " << n << endl; + } +} + +void receivedInsert(OperationContext* txn, const NamespaceString& nsString, Message& m) { + auto insertOp = parseLegacyInsert(m); + invariant(insertOp.ns == nsString); + for (const auto& obj : insertOp.documents) { + Status status = + AuthorizationSession::get(txn->getClient())->checkAuthForInsert(nsString, obj); + audit::logInsertAuthzCheck(txn->getClient(), nsString, obj, status.code()); + uassertStatusOK(status); + } + performInserts(txn, insertOp); +} + +void receivedUpdate(OperationContext* txn, const NamespaceString& nsString, Message& m) { + auto updateOp = parseLegacyUpdate(m); + auto& singleUpdate = updateOp.updates[0]; + invariant(updateOp.ns == nsString); + + Status status = AuthorizationSession::get(txn->getClient()) + ->checkAuthForUpdate( + nsString, singleUpdate.query, singleUpdate.update, singleUpdate.upsert); + audit::logUpdateAuthzCheck(txn->getClient(), + nsString, + singleUpdate.query, + singleUpdate.update, + singleUpdate.upsert, + singleUpdate.multi, + status.code()); + uassertStatusOK(status); + + performUpdates(txn, updateOp); +} + +void receivedDelete(OperationContext* txn, const NamespaceString& nsString, Message& m) { + auto deleteOp = parseLegacyDelete(m); + auto& singleDelete = deleteOp.deletes[0]; + invariant(deleteOp.ns == nsString); + + Status status = AuthorizationSession::get(txn->getClient()) + ->checkAuthForDelete(nsString, singleDelete.query); + audit::logDeleteAuthzCheck(txn->getClient(), nsString, singleDelete.query, status.code()); + uassertStatusOK(status); + + performDeletes(txn, deleteOp); +} + +bool receivedGetMore(OperationContext* txn, DbResponse& dbresponse, Message& m, CurOp& curop) { + globalOpCounters.gotGetMore(); + DbMessage d(m); + + const char* ns = d.getns(); + int ntoreturn = d.pullInt(); + uassert( + 34419, str::stream() << "Invalid ntoreturn for OP_GET_MORE: " << ntoreturn, ntoreturn >= 0); + long long cursorid = d.pullInt64(); + + curop.debug().ntoreturn = ntoreturn; + curop.debug().cursorid = cursorid; + + { + stdx::lock_guard<Client>(*txn->getClient()); + CurOp::get(txn)->setNS_inlock(ns); + } + + bool exhaust = false; + QueryResult::View msgdata = 0; + bool isCursorAuthorized = false; + + try { + const NamespaceString nsString(ns); + uassert(ErrorCodes::InvalidNamespace, + str::stream() << "Invalid ns [" << ns << "]", + nsString.isValid()); + + Status status = AuthorizationSession::get(txn->getClient()) + ->checkAuthForGetMore(nsString, cursorid, false); + audit::logGetMoreAuthzCheck(txn->getClient(), nsString, cursorid, status.code()); + uassertStatusOK(status); + + while (MONGO_FAIL_POINT(rsStopGetMore)) { + sleepmillis(0); + } + + msgdata = getMore(txn, ns, ntoreturn, cursorid, &exhaust, &isCursorAuthorized); + } catch (AssertionException& e) { + if (isCursorAuthorized) { + // If a cursor with id 'cursorid' was authorized, it may have been advanced + // before an exception terminated processGetMore. Erase the ClientCursor + // because it may now be out of sync with the client's iteration state. + // SERVER-7952 + // TODO Temporary code, see SERVER-4563 for a cleanup overview. + CursorManager::eraseCursorGlobal(txn, cursorid); + } + + BSONObjBuilder err; + e.getInfo().append(err); + BSONObj errObj = err.done(); + + curop.debug().exceptionInfo = e.getInfo(); + + replyToQuery(ResultFlag_ErrSet, m, dbresponse, errObj); + curop.debug().responseLength = dbresponse.response.header().dataLen(); + curop.debug().nreturned = 1; + return false; + } + + dbresponse.response.setData(msgdata.view2ptr(), true); + curop.debug().responseLength = dbresponse.response.header().dataLen(); + curop.debug().nreturned = msgdata.getNReturned(); + + dbresponse.responseToMsgId = m.header().getId(); + + if (exhaust) { + curop.debug().exhaust = true; + dbresponse.exhaustNS = ns; + } + + return true; +} + +} // namespace + // Mongod on win32 defines a value for this function. In all other executables it is NULL. void (*reportEventToSystem)(const char* msg) = 0; @@ -442,7 +552,9 @@ void assembleResponse(OperationContext* txn, DbMessage dbmsg(m); Client& c = *txn->getClient(); - if (!c.isInDirectClient()) { + if (c.isInDirectClient()) { + invariant(!txn->lockState()->inAWriteUnitOfWork()); + } else { LastError::get(c).startRequest(); AuthorizationSession::get(c)->startRequest(txn); @@ -486,33 +598,6 @@ void assembleResponse(OperationContext* txn, opwrite(m); } - // Increment op counters. - switch (op) { - case dbQuery: - if (!isCommand) { - globalOpCounters.gotQuery(); - } else { - // Command counting is deferred, since it is not known yet whether the command - // needs counting. - } - break; - case dbGetMore: - globalOpCounters.gotGetMore(); - break; - case dbInsert: - // Insert counting is deferred, since it is not known yet whether the insert contains - // multiple documents (each of which needs to be counted). - break; - case dbUpdate: - globalOpCounters.gotUpdate(); - break; - case dbDelete: - globalOpCounters.gotDelete(); - break; - default: - break; - } - CurOp& currentOp = *CurOp::get(txn); { stdx::lock_guard<Client> lk(*txn->getClient()); @@ -553,10 +638,8 @@ void assembleResponse(OperationContext* txn, dbresponse.responseToMsgId = m.header().getId(); } else { + // The remaining operations do not return any response. They are fire-and-forget. try { - // The following operations all require authorization. - // dbInsert, dbUpdate and dbDelete can be easily pre-authorized, - // here, but dbKillCursors cannot. if (op == dbKillCursors) { currentOp.ensureStarted(); logThreshold = 10; @@ -579,11 +662,11 @@ void assembleResponse(OperationContext* txn, if (!nsString.isValid()) { uassert(16257, str::stream() << "Invalid ns [" << ns << "]", false); } else if (op == dbInsert) { - receivedInsert(txn, nsString, m, currentOp); + receivedInsert(txn, nsString, m); } else if (op == dbUpdate) { - receivedUpdate(txn, nsString, m, currentOp); + receivedUpdate(txn, nsString, m); } else if (op == dbDelete) { - receivedDelete(txn, nsString, m, currentOp); + receivedDelete(txn, nsString, m); } else { invariant(false); } @@ -630,592 +713,6 @@ void assembleResponse(OperationContext* txn, recordCurOpMetrics(txn); } -void receivedKillCursors(OperationContext* txn, Message& m) { - LastError::get(txn->getClient()).disable(); - DbMessage dbmessage(m); - int n = dbmessage.pullInt(); - - uassert(13659, "sent 0 cursors to kill", n != 0); - massert(13658, - str::stream() << "bad kill cursors size: " << m.dataSize(), - m.dataSize() == 8 + (8 * n)); - uassert(13004, str::stream() << "sent negative cursors to kill: " << n, n >= 1); - - if (n > 2000) { - (n < 30000 ? warning() : error()) << "receivedKillCursors, n=" << n << endl; - verify(n < 30000); - } - - const char* cursorArray = dbmessage.getArray(n); - - int found = CursorManager::eraseCursorGlobalIfAuthorized(txn, n, cursorArray); - - if (shouldLog(logger::LogSeverity::Debug(1)) || found != n) { - LOG(found == n ? 1 : 0) << "killcursors: found " << found << " of " << n << endl; - } -} - -void receivedUpdate(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op) { - uassertStatusOK(userAllowedWriteNS(nsString)); - auto client = txn->getClient(); - auto lastOpAtOperationStart = repl::ReplClientInfo::forClient(client).getLastOp(); - ScopeGuard lastOpSetterGuard = MakeObjGuard(repl::ReplClientInfo::forClient(client), - &repl::ReplClientInfo::setLastOpToSystemLastOpTime, - txn); - - auto updateOp = parseLegacyUpdate(m); - auto& singleUpdate = updateOp.updates[0]; - - uassert(10055, "update object too large", singleUpdate.update.objsize() <= BSONObjMaxUserSize); - - op.debug().query = singleUpdate.query; - { - stdx::lock_guard<Client> lk(*client); - op.setNS_inlock(nsString.ns()); - op.setQuery_inlock(singleUpdate.query); - } - - Status status = AuthorizationSession::get(client)->checkAuthForUpdate( - nsString, singleUpdate.query, singleUpdate.update, singleUpdate.upsert); - audit::logUpdateAuthzCheck(client, - nsString, - singleUpdate.query, - singleUpdate.update, - singleUpdate.upsert, - singleUpdate.multi, - status.code()); - uassertStatusOK(status); - - UpdateRequest request(nsString); - request.setUpsert(singleUpdate.upsert); - request.setMulti(singleUpdate.multi); - request.setQuery(singleUpdate.query); - request.setUpdates(singleUpdate.update); - UpdateLifecycleImpl updateLifecycle(nsString); - request.setLifecycle(&updateLifecycle); - - request.setYieldPolicy(PlanExecutor::YIELD_AUTO); - - int attempt = 1; - while (1) { - try { - ParsedUpdate parsedUpdate(txn, &request); - uassertStatusOK(parsedUpdate.parseRequest()); - - // Tentatively take an intent lock, fix up if we need to create the collection - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_IX); - if (dbHolder().get(txn, nsString.db()) == NULL) { - // If DB doesn't exist, don't implicitly create it in OldClientContext - break; - } - Lock::CollectionLock collLock( - txn->lockState(), nsString.ns(), parsedUpdate.isIsolated() ? MODE_X : MODE_IX); - OldClientContext ctx(txn, nsString.ns()); - - auto collection = ctx.db()->getCollection(nsString); - - // The common case: no implicit collection creation - if (!singleUpdate.upsert || collection != NULL) { - unique_ptr<PlanExecutor> exec = - uassertStatusOK(getExecutorUpdate(txn, &op.debug(), collection, &parsedUpdate)); - - // Run the plan and get stats out. - uassertStatusOK(exec->executePlan()); - - PlanSummaryStats summary; - Explain::getSummaryStats(*exec, &summary); - if (collection) { - collection->infoCache()->notifyOfQuery(txn, summary.indexesUsed); - } - - const UpdateStats* updateStats = UpdateStage::getUpdateStats(exec.get()); - UpdateStage::recordUpdateStatsInOpDebug(updateStats, &op.debug()); - op.debug().setPlanSummaryMetrics(summary); - - UpdateResult res = UpdateStage::makeUpdateResult(updateStats); - - // for getlasterror - size_t nMatchedOrInserted = res.upserted.isEmpty() ? res.numMatched : 1U; - LastError::get(client).recordUpdate(res.existing, nMatchedOrInserted, res.upserted); - - if (repl::ReplClientInfo::forClient(client).getLastOp() != lastOpAtOperationStart) { - // If this operation has already generated a new lastOp, don't bother setting it - // here. No-op updates will not generate a new lastOp, so we still need the - // guard to fire in that case. - lastOpSetterGuard.Dismiss(); - } - return; - } - break; - } catch (const WriteConflictException& dle) { - op.debug().writeConflicts++; - if (singleUpdate.multi) { - log(LogComponent::kWrite) << "Had WriteConflict during multi update, aborting"; - throw; - } - WriteConflictException::logAndBackoff(attempt++, "update", nsString.toString()); - } - } - - // This is an upsert into a non-existing database, so need an exclusive lock - // to avoid deadlock - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - ParsedUpdate parsedUpdate(txn, &request); - uassertStatusOK(parsedUpdate.parseRequest()); - - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_X); - OldClientContext ctx(txn, nsString.ns()); - uassert(ErrorCodes::NotMaster, - str::stream() << "Not primary while performing update on " << nsString.ns(), - repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nsString)); - - Database* db = ctx.db(); - if (db->getCollection(nsString)) { - // someone else beat us to it, that's ok - // we might race while we unlock if someone drops - // but that's ok, we'll just do nothing and error out - } else { - WriteUnitOfWork wuow(txn); - uassertStatusOK(userCreateNS(txn, db, nsString.ns(), BSONObj())); - wuow.commit(); - } - - auto collection = ctx.db()->getCollection(nsString); - invariant(collection); - unique_ptr<PlanExecutor> exec = - uassertStatusOK(getExecutorUpdate(txn, &op.debug(), collection, &parsedUpdate)); - - // Run the plan and get stats out. - uassertStatusOK(exec->executePlan()); - - PlanSummaryStats summary; - Explain::getSummaryStats(*exec, &summary); - collection->infoCache()->notifyOfQuery(txn, summary.indexesUsed); - - const UpdateStats* updateStats = UpdateStage::getUpdateStats(exec.get()); - UpdateStage::recordUpdateStatsInOpDebug(updateStats, &op.debug()); - op.debug().setPlanSummaryMetrics(summary); - - UpdateResult res = UpdateStage::makeUpdateResult(updateStats); - - size_t nMatchedOrInserted = res.upserted.isEmpty() ? res.numMatched : 1U; - LastError::get(client).recordUpdate(res.existing, nMatchedOrInserted, res.upserted); - - if (repl::ReplClientInfo::forClient(client).getLastOp() != lastOpAtOperationStart) { - // If this operation has already generated a new lastOp, don't bother setting it - // here. No-op updates will not generate a new lastOp, so we still need the - // guard to fire in that case. - lastOpSetterGuard.Dismiss(); - } - } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "update", nsString.ns()); -} - -void receivedDelete(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op) { - uassertStatusOK(userAllowedWriteNS(nsString)); - - auto deleteOp = parseLegacyDelete(m); - auto& singleDelete = deleteOp.deletes[0]; - - auto client = txn->getClient(); - auto lastOpAtOperationStart = repl::ReplClientInfo::forClient(client).getLastOp(); - ScopeGuard lastOpSetterGuard = MakeObjGuard(repl::ReplClientInfo::forClient(client), - &repl::ReplClientInfo::setLastOpToSystemLastOpTime, - txn); - - op.debug().query = singleDelete.query; - { - stdx::lock_guard<Client> lk(*client); - op.setQuery_inlock(singleDelete.query); - op.setNS_inlock(nsString.ns()); - } - - Status status = - AuthorizationSession::get(client)->checkAuthForDelete(nsString, singleDelete.query); - audit::logDeleteAuthzCheck(client, nsString, singleDelete.query, status.code()); - uassertStatusOK(status); - - DeleteRequest request(nsString); - request.setQuery(singleDelete.query); - request.setMulti(singleDelete.multi); - - request.setYieldPolicy(PlanExecutor::YIELD_AUTO); - - int attempt = 1; - while (1) { - try { - ParsedDelete parsedDelete(txn, &request); - uassertStatusOK(parsedDelete.parseRequest()); - - ScopedTransaction scopedXact(txn, MODE_IX); - AutoGetDb autoDb(txn, nsString.db(), MODE_IX); - if (!autoDb.getDb()) { - break; - } - - Lock::CollectionLock collLock( - txn->lockState(), nsString.ns(), parsedDelete.isIsolated() ? MODE_X : MODE_IX); - OldClientContext ctx(txn, nsString.ns()); - - auto collection = ctx.db()->getCollection(nsString); - - unique_ptr<PlanExecutor> exec = - uassertStatusOK(getExecutorDelete(txn, &op.debug(), collection, &parsedDelete)); - - // Run the plan and get the number of docs deleted. - uassertStatusOK(exec->executePlan()); - long long n = DeleteStage::getNumDeleted(*exec); - LastError::get(client).recordDelete(n); - op.debug().ndeleted = n; - - PlanSummaryStats summary; - Explain::getSummaryStats(*exec, &summary); - if (collection) { - collection->infoCache()->notifyOfQuery(txn, summary.indexesUsed); - } - CurOp::get(txn)->debug().setPlanSummaryMetrics(summary); - - if (repl::ReplClientInfo::forClient(client).getLastOp() != lastOpAtOperationStart) { - // If this operation has already generated a new lastOp, don't bother setting it - // here. No-op updates will not generate a new lastOp, so we still need the - // guard to fire in that case. - lastOpSetterGuard.Dismiss(); - } - break; - } catch (const WriteConflictException& dle) { - op.debug().writeConflicts++; - WriteConflictException::logAndBackoff(attempt++, "delete", nsString.toString()); - } - } -} - -bool receivedGetMore(OperationContext* txn, DbResponse& dbresponse, Message& m, CurOp& curop) { - DbMessage d(m); - - const char* ns = d.getns(); - int ntoreturn = d.pullInt(); - uassert( - 34419, str::stream() << "Invalid ntoreturn for OP_GET_MORE: " << ntoreturn, ntoreturn >= 0); - long long cursorid = d.pullInt64(); - - curop.debug().ntoreturn = ntoreturn; - curop.debug().cursorid = cursorid; - - { - stdx::lock_guard<Client>(*txn->getClient()); - CurOp::get(txn)->setNS_inlock(ns); - } - - bool exhaust = false; - QueryResult::View msgdata = 0; - bool isCursorAuthorized = false; - - try { - const NamespaceString nsString(ns); - uassert(ErrorCodes::InvalidNamespace, - str::stream() << "Invalid ns [" << ns << "]", - nsString.isValid()); - - Status status = AuthorizationSession::get(txn->getClient()) - ->checkAuthForGetMore(nsString, cursorid, false); - audit::logGetMoreAuthzCheck(txn->getClient(), nsString, cursorid, status.code()); - uassertStatusOK(status); - - while (MONGO_FAIL_POINT(rsStopGetMore)) { - sleepmillis(0); - } - - msgdata = getMore(txn, ns, ntoreturn, cursorid, &exhaust, &isCursorAuthorized); - } catch (AssertionException& e) { - if (isCursorAuthorized) { - // If a cursor with id 'cursorid' was authorized, it may have been advanced - // before an exception terminated processGetMore. Erase the ClientCursor - // because it may now be out of sync with the client's iteration state. - // SERVER-7952 - // TODO Temporary code, see SERVER-4563 for a cleanup overview. - CursorManager::eraseCursorGlobal(txn, cursorid); - } - - BSONObjBuilder err; - e.getInfo().append(err); - BSONObj errObj = err.done(); - - curop.debug().exceptionInfo = e.getInfo(); - - replyToQuery(ResultFlag_ErrSet, m, dbresponse, errObj); - curop.debug().responseLength = dbresponse.response.header().dataLen(); - curop.debug().nreturned = 1; - return false; - } - - dbresponse.response.setData(msgdata.view2ptr(), true); - curop.debug().responseLength = dbresponse.response.header().dataLen(); - curop.debug().nreturned = msgdata.getNReturned(); - - dbresponse.responseToMsgId = m.header().getId(); - - if (exhaust) { - curop.debug().exhaust = true; - dbresponse.exhaustNS = ns; - } - - return true; -} - -void insertMultiSingletons(OperationContext* txn, - OldClientContext& ctx, - bool keepGoing, - StringData ns, - CurOp& op, - vector<BSONObj>::iterator begin, - vector<BSONObj>::iterator end) { - for (vector<BSONObj>::iterator it = begin; it != end; it++) { - try { - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - WriteUnitOfWork wouw(txn); - Collection* collection = ctx.db()->getCollection(ns); - if (!collection) { - collection = ctx.db()->createCollection(txn, ns); - invariant(collection); - } - - uassertStatusOK(collection->insertDocument(txn, *it, &op.debug(), true)); - wouw.commit(); - } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "insert", ns); - - globalOpCounters.incInsertInWriteLock(1); - op.debug().ninserted++; - - } catch (const UserException& ex) { - if (!keepGoing) - throw; - LastError::get(txn->getClient()).setLastError(ex.getCode(), ex.getInfo().msg); - } - } -} - -void insertMultiVector(OperationContext* txn, - OldClientContext& ctx, - bool keepGoing, - StringData ns, - CurOp& op, - vector<BSONObj>::iterator begin, - vector<BSONObj>::iterator end) { - try { - WriteUnitOfWork wunit(txn); - Collection* collection = ctx.db()->getCollection(ns); - if (!collection) { - collection = ctx.db()->createCollection(txn, ns); - invariant(collection); - } - - uassertStatusOK(collection->insertDocuments(txn, begin, end, &op.debug(), true, false)); - wunit.commit(); - - int inserted = end - begin; - globalOpCounters.incInsertInWriteLock(inserted); - op.debug().ninserted = inserted; - } catch (UserException&) { - txn->recoveryUnit()->abandonSnapshot(); - insertMultiSingletons(txn, ctx, keepGoing, ns, op, begin, end); - } catch (WriteConflictException&) { - CurOp::get(txn)->debug().writeConflicts++; - txn->recoveryUnit()->abandonSnapshot(); - WriteConflictException::logAndBackoff(0, "insert", ns); - insertMultiSingletons(txn, ctx, keepGoing, ns, op, begin, end); - } -} - -NOINLINE_DECL void insertMulti(OperationContext* txn, - OldClientContext& ctx, - const InsertOp& insertOp, - CurOp& op) { - std::vector<BSONObj> docs; - docs.reserve(insertOp.documents.size()); - for (auto&& doc : insertOp.documents) { - // TODO don't fail yet on invalid documents. They should be treated like other errors. - BSONObj fixed = uassertStatusOK(fixDocumentForInsert(doc)); - docs.push_back(fixed.isEmpty() ? doc : std::move(fixed)); - } - - vector<BSONObj>::iterator chunkBegin = docs.begin(); - int64_t chunkCount = 0; - int64_t chunkSize = 0; - - auto client = txn->getClient(); - auto lastOpAtOperationStart = repl::ReplClientInfo::forClient(client).getLastOp(); - ScopeGuard lastOpSetterGuard = MakeObjGuard(repl::ReplClientInfo::forClient(client), - &repl::ReplClientInfo::setLastOpToSystemLastOpTime, - txn); - - for (vector<BSONObj>::iterator it = docs.begin(); it != docs.end(); it++) { - chunkSize += (*it).objsize(); - // Limit chunk size, actual size chosen is a tradeoff: larger sizes are more efficient, - // but smaller chunk sizes allow yielding to other threads and lower chance of WCEs - if ((++chunkCount >= internalQueryExecYieldIterations / 2) || - (chunkSize >= insertVectorMaxBytes)) { - if (it == chunkBegin) // there is only one doc to process, so avoid retry on failure - insertMultiSingletons( - txn, ctx, insertOp.continueOnError, insertOp.ns.ns(), op, chunkBegin, it + 1); - else - insertMultiVector( - txn, ctx, insertOp.continueOnError, insertOp.ns.ns(), op, chunkBegin, it + 1); - chunkBegin = it + 1; - chunkCount = 0; - chunkSize = 0; - } - } - if (chunkBegin != docs.end()) - insertMultiVector( - txn, ctx, insertOp.continueOnError, insertOp.ns.ns(), op, chunkBegin, docs.end()); - - if (repl::ReplClientInfo::forClient(client).getLastOp() != lastOpAtOperationStart) { - // If this operation has already generated a new lastOp, don't bother setting it - // here. No-op inserts will not generate a new lastOp, so we still need the - // guard to fire in that case. - lastOpSetterGuard.Dismiss(); - } -} - -static void convertSystemIndexInsertsToCommands(DbMessage& d, BSONArrayBuilder* allCmdsBuilder) { - while (d.moreJSObjs()) { - BSONObj spec = d.nextJsObj(); - BSONElement indexNsElement = spec["ns"]; - uassert(ErrorCodes::NoSuchKey, - str::stream() << "Missing \"ns\" field while inserting into " << d.getns(), - !indexNsElement.eoo()); - uassert(ErrorCodes::TypeMismatch, - str::stream() << "Expected \"ns\" field to have type String, not " - << typeName(indexNsElement.type()) << " while inserting into " - << d.getns(), - indexNsElement.type() == String); - const StringData nsToIndex(indexNsElement.valueStringData()); - BSONObjBuilder cmdObjBuilder(allCmdsBuilder->subobjStart()); - cmdObjBuilder << "createIndexes" << nsToCollectionSubstring(nsToIndex); - BSONArrayBuilder specArrayBuilder(cmdObjBuilder.subarrayStart("indexes")); - while (true) { - BSONObjBuilder specBuilder(specArrayBuilder.subobjStart()); - BSONElement specNsElement = spec["ns"]; - if ((specNsElement.type() != String) || - (specNsElement.valueStringData() != nsToIndex)) { - break; - } - for (BSONObjIterator iter(spec); iter.more();) { - BSONElement element = iter.next(); - if (element.fieldNameStringData() != "ns") { - specBuilder.append(element); - } - } - if (!d.moreJSObjs()) { - break; - } - spec = d.nextJsObj(); - } - } -} - -static void insertSystemIndexes(OperationContext* txn, DbMessage& d, CurOp& curOp) { - BSONArrayBuilder allCmdsBuilder; - try { - convertSystemIndexInsertsToCommands(d, &allCmdsBuilder); - } catch (const DBException& ex) { - LastError::get(txn->getClient()).setLastError(ex.getCode(), ex.getInfo().msg); - curOp.debug().exceptionInfo = ex.getInfo(); - return; - } - BSONArray allCmds(allCmdsBuilder.done()); - Command* createIndexesCmd = Command::findCommand("createIndexes"); - invariant(createIndexesCmd); - const bool keepGoing = d.reservedField() & InsertOption_ContinueOnError; - for (BSONObjIterator iter(allCmds); iter.more();) { - try { - BSONObj cmdObj = iter.next().Obj(); - - rpc::LegacyRequestBuilder requestBuilder{}; - auto indexNs = NamespaceString(d.getns()); - auto cmdRequestMsg = requestBuilder.setDatabase(indexNs.db()) - .setCommandName("createIndexes") - .setCommandArgs(cmdObj) - .setMetadata(rpc::makeEmptyMetadata()) - .done(); - rpc::LegacyRequest cmdRequest{&cmdRequestMsg}; - rpc::LegacyReplyBuilder cmdReplyBuilder{}; - Command::execCommand(txn, createIndexesCmd, cmdRequest, &cmdReplyBuilder); - auto cmdReplyMsg = cmdReplyBuilder.done(); - rpc::LegacyReply cmdReply{&cmdReplyMsg}; - uassertStatusOK(getStatusFromCommandResult(cmdReply.getCommandReply())); - } catch (const DBException& ex) { - LastError::get(txn->getClient()).setLastError(ex.getCode(), ex.getInfo().msg); - curOp.debug().exceptionInfo = ex.getInfo(); - if (!keepGoing) { - return; - } - } - } -} - -bool _receivedInsert(OperationContext* txn, - const NamespaceString& nsString, - const InsertOp& insertOp, - CurOp& op, - bool checkCollection) { - // CONCURRENCY TODO: is being read locked in big log sufficient here? - // writelock is used to synchronize stepdowns w/ writes - uassert( - 10058, "not master", repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nsString)); - - OldClientContext ctx(txn, insertOp.ns.ns()); - if (checkCollection && !ctx.db()->getCollection(nsString)) - return false; - insertMulti(txn, ctx, insertOp, op); - return true; -} - -void receivedInsert(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op) { - { - stdx::lock_guard<Client>(*txn->getClient()); - CurOp::get(txn)->setNS_inlock(nsString.ns()); - } - - uassertStatusOK(userAllowedWriteNS(nsString.ns())); - if (nsString.isSystemDotIndexes()) { - DbMessage d(m); - insertSystemIndexes(txn, d, op); - return; - } - - auto insertOp = parseLegacyInsert(m); - - for (const auto& obj : insertOp.documents) { - // Check auth for insert. - Status status = - AuthorizationSession::get(txn->getClient())->checkAuthForInsert(nsString, obj); - audit::logInsertAuthzCheck(txn->getClient(), nsString, obj, status.code()); - uassertStatusOK(status); - } - - { - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_IX); - Lock::CollectionLock collLock(txn->lockState(), nsString.ns(), MODE_IX); - - // OldClientContext may implicitly create a database, so check existence - if (dbHolder().get(txn, nsString.db()) != NULL) { - if (_receivedInsert(txn, nsString, insertOp, op, true)) - return; - } - } - - // Collection didn't exist so try again with MODE_X - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_X); - - _receivedInsert(txn, nsString, insertOp, op, false); -} - // ----- BEGIN Diaglog ----- DiagLog::DiagLog() : f(0), level(0) {} |