diff options
Diffstat (limited to 'src/mongo/db/assemble_response.cpp')
-rw-r--r-- | src/mongo/db/assemble_response.cpp | 146 |
1 files changed, 75 insertions, 71 deletions
diff --git a/src/mongo/db/assemble_response.cpp b/src/mongo/db/assemble_response.cpp index 4c6121dfb2b..a4f21cc5ae2 100644 --- a/src/mongo/db/assemble_response.cpp +++ b/src/mongo/db/assemble_response.cpp @@ -136,14 +136,14 @@ void generateLegacyQueryErrorResponse(const AssertionException* exception, /** * Fills out CurOp / OpDebug with basic command info. */ -void beginCommandOp(OperationContext* txn, const NamespaceString& nss, const BSONObj& queryObj) { - auto curop = CurOp::get(txn); - stdx::lock_guard<Client> lk(*txn->getClient()); +void beginCommandOp(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& queryObj) { + auto curop = CurOp::get(opCtx); + stdx::lock_guard<Client> lk(*opCtx->getClient()); curop->setQuery_inlock(queryObj); curop->setNS_inlock(nss.ns()); } -void receivedCommand(OperationContext* txn, +void receivedCommand(OperationContext* opCtx, const NamespaceString& nss, Client& client, DbResponse& dbResponse, @@ -155,7 +155,7 @@ void receivedCommand(OperationContext* txn, DbMessage dbMessage(message); QueryMessage queryMessage(dbMessage); - CurOp* op = CurOp::get(txn); + CurOp* op = CurOp::get(opCtx); rpc::LegacyReplyBuilder builder{}; @@ -165,10 +165,10 @@ void receivedCommand(OperationContext* txn, // Auth checking for Commands happens later. int nToReturn = queryMessage.ntoreturn; - beginCommandOp(txn, nss, request.getCommandArgs()); + beginCommandOp(opCtx, nss, request.getCommandArgs()); { - stdx::lock_guard<Client> lk(*txn->getClient()); + stdx::lock_guard<Client> lk(*opCtx->getClient()); op->markCommand_inlock(); } @@ -177,11 +177,11 @@ void receivedCommand(OperationContext* txn, << ") for $cmd type ns - can only be 1 or -1", nToReturn == 1 || nToReturn == -1); - runCommands(txn, request, &builder); + runCommands(opCtx, request, &builder); op->debug().iscommand = true; } catch (const DBException& exception) { - Command::generateErrorResponse(txn, &builder, exception); + Command::generateErrorResponse(opCtx, &builder, exception); } auto response = builder.done(); @@ -192,14 +192,17 @@ void receivedCommand(OperationContext* txn, dbResponse.responseToMsgId = responseToMsgId; } -void receivedRpc(OperationContext* txn, Client& client, DbResponse& dbResponse, Message& message) { +void receivedRpc(OperationContext* opCtx, + Client& client, + DbResponse& dbResponse, + Message& message) { invariant(message.operation() == dbCommand); const int32_t responseToMsgId = message.header().getId(); rpc::CommandReplyBuilder replyBuilder{}; - auto curOp = CurOp::get(txn); + auto curOp = CurOp::get(opCtx); try { // database is validated here @@ -208,18 +211,18 @@ void receivedRpc(OperationContext* txn, Client& client, DbResponse& dbResponse, // We construct a legacy $cmd namespace so we can fill in curOp using // the existing logic that existed for OP_QUERY commands NamespaceString nss(request.getDatabase(), "$cmd"); - beginCommandOp(txn, nss, request.getCommandArgs()); + beginCommandOp(opCtx, nss, request.getCommandArgs()); { - stdx::lock_guard<Client> lk(*txn->getClient()); + stdx::lock_guard<Client> lk(*opCtx->getClient()); curOp->markCommand_inlock(); } - runCommands(txn, request, &replyBuilder); + runCommands(opCtx, request, &replyBuilder); curOp->debug().iscommand = true; } catch (const DBException& exception) { - Command::generateErrorResponse(txn, &replyBuilder, exception); + Command::generateErrorResponse(opCtx, &replyBuilder, exception); } auto response = replyBuilder.done(); @@ -234,7 +237,7 @@ void receivedRpc(OperationContext* txn, Client& client, DbResponse& dbResponse, // as ordinary commands. To support old clients for another release, this helper serves // to execute the real command from the legacy pseudo-command codepath. // TODO: remove after MongoDB 3.2 is released -void receivedPseudoCommand(OperationContext* txn, +void receivedPseudoCommand(OperationContext* opCtx, Client& client, DbResponse& dbResponse, Message& message, @@ -285,10 +288,10 @@ void receivedPseudoCommand(OperationContext* txn, interposed.setData(dbQuery, cmdMsgBuf.buf(), cmdMsgBuf.len()); interposed.header().setId(message.header().getId()); - receivedCommand(txn, interposedNss, client, dbResponse, interposed); + receivedCommand(opCtx, interposedNss, client, dbResponse, interposed); } -void receivedQuery(OperationContext* txn, +void receivedQuery(OperationContext* opCtx, const NamespaceString& nss, Client& c, DbResponse& dbResponse, @@ -301,21 +304,21 @@ void receivedQuery(OperationContext* txn, DbMessage d(m); QueryMessage q(d); - CurOp& op = *CurOp::get(txn); + CurOp& op = *CurOp::get(opCtx); try { - Client* client = txn->getClient(); + Client* client = opCtx->getClient(); Status status = AuthorizationSession::get(client)->checkAuthForFind(nss, false); audit::logQueryAuthzCheck(client, nss, q.query, status.code()); uassertStatusOK(status); - dbResponse.exhaustNS = runQuery(txn, q, nss, dbResponse.response); + dbResponse.exhaustNS = runQuery(opCtx, q, nss, dbResponse.response); } catch (const AssertionException& e) { // If we got a stale config, wait in case the operation is stuck in a critical section if (e.getCode() == ErrorCodes::SendStaleConfig) { auto& sce = static_cast<const StaleConfigException&>(e); - ShardingState::get(txn)->onStaleShardVersion( - txn, NamespaceString(sce.getns()), sce.getVersionReceived()); + ShardingState::get(opCtx)->onStaleShardVersion( + opCtx, NamespaceString(sce.getns()), sce.getVersionReceived()); } dbResponse.response.reset(); @@ -326,8 +329,8 @@ void receivedQuery(OperationContext* txn, dbResponse.responseToMsgId = responseToMsgId; } -void receivedKillCursors(OperationContext* txn, Message& m) { - LastError::get(txn->getClient()).disable(); +void receivedKillCursors(OperationContext* opCtx, Message& m) { + LastError::get(opCtx->getClient()).disable(); DbMessage dbmessage(m); int n = dbmessage.pullInt(); @@ -344,35 +347,35 @@ void receivedKillCursors(OperationContext* txn, Message& m) { const char* cursorArray = dbmessage.getArray(n); - int found = CursorManager::eraseCursorGlobalIfAuthorized(txn, n, cursorArray); + int found = CursorManager::eraseCursorGlobalIfAuthorized(opCtx, n, cursorArray); if (shouldLog(logger::LogSeverity::Debug(1)) || found != n) { LOG(found == n ? 1 : 0) << "killcursors: found " << found << " of " << n; } } -void receivedInsert(OperationContext* txn, const NamespaceString& nsString, Message& m) { +void receivedInsert(OperationContext* opCtx, const NamespaceString& nsString, Message& m) { auto insertOp = parseLegacyInsert(m); invariant(insertOp.ns == nsString); for (const auto& obj : insertOp.documents) { Status status = - AuthorizationSession::get(txn->getClient())->checkAuthForInsert(txn, nsString, obj); - audit::logInsertAuthzCheck(txn->getClient(), nsString, obj, status.code()); + AuthorizationSession::get(opCtx->getClient())->checkAuthForInsert(opCtx, nsString, obj); + audit::logInsertAuthzCheck(opCtx->getClient(), nsString, obj, status.code()); uassertStatusOK(status); } - performInserts(txn, insertOp); + performInserts(opCtx, insertOp); } -void receivedUpdate(OperationContext* txn, const NamespaceString& nsString, Message& m) { +void receivedUpdate(OperationContext* opCtx, const NamespaceString& nsString, Message& m) { auto updateOp = parseLegacyUpdate(m); auto& singleUpdate = updateOp.updates[0]; invariant(updateOp.ns == nsString); Status status = - AuthorizationSession::get(txn->getClient()) + AuthorizationSession::get(opCtx->getClient()) ->checkAuthForUpdate( - txn, nsString, singleUpdate.query, singleUpdate.update, singleUpdate.upsert); - audit::logUpdateAuthzCheck(txn->getClient(), + opCtx, nsString, singleUpdate.query, singleUpdate.update, singleUpdate.upsert); + audit::logUpdateAuthzCheck(opCtx->getClient(), nsString, singleUpdate.query, singleUpdate.update, @@ -381,23 +384,23 @@ void receivedUpdate(OperationContext* txn, const NamespaceString& nsString, Mess status.code()); uassertStatusOK(status); - performUpdates(txn, updateOp); + performUpdates(opCtx, updateOp); } -void receivedDelete(OperationContext* txn, const NamespaceString& nsString, Message& m) { +void receivedDelete(OperationContext* opCtx, const NamespaceString& nsString, Message& m) { auto deleteOp = parseLegacyDelete(m); auto& singleDelete = deleteOp.deletes[0]; invariant(deleteOp.ns == nsString); - Status status = AuthorizationSession::get(txn->getClient()) - ->checkAuthForDelete(txn, nsString, singleDelete.query); - audit::logDeleteAuthzCheck(txn->getClient(), nsString, singleDelete.query, status.code()); + Status status = AuthorizationSession::get(opCtx->getClient()) + ->checkAuthForDelete(opCtx, nsString, singleDelete.query); + audit::logDeleteAuthzCheck(opCtx->getClient(), nsString, singleDelete.query, status.code()); uassertStatusOK(status); - performDeletes(txn, deleteOp); + performDeletes(opCtx, deleteOp); } -bool receivedGetMore(OperationContext* txn, DbResponse& dbresponse, Message& m, CurOp& curop) { +bool receivedGetMore(OperationContext* opCtx, DbResponse& dbresponse, Message& m, CurOp& curop) { globalOpCounters.gotGetMore(); DbMessage d(m); @@ -411,8 +414,8 @@ bool receivedGetMore(OperationContext* txn, DbResponse& dbresponse, Message& m, curop.debug().cursorid = cursorid; { - stdx::lock_guard<Client> lk(*txn->getClient()); - CurOp::get(txn)->setNS_inlock(ns); + stdx::lock_guard<Client> lk(*opCtx->getClient()); + CurOp::get(opCtx)->setNS_inlock(ns); } bool exhaust = false; @@ -424,16 +427,17 @@ bool receivedGetMore(OperationContext* txn, DbResponse& dbresponse, Message& m, str::stream() << "Invalid ns [" << ns << "]", nsString.isValid()); - Status status = AuthorizationSession::get(txn->getClient()) + Status status = AuthorizationSession::get(opCtx->getClient()) ->checkAuthForGetMore(nsString, cursorid, false); - audit::logGetMoreAuthzCheck(txn->getClient(), nsString, cursorid, status.code()); + audit::logGetMoreAuthzCheck(opCtx->getClient(), nsString, cursorid, status.code()); uassertStatusOK(status); while (MONGO_FAIL_POINT(rsStopGetMore)) { sleepmillis(0); } - dbresponse.response = getMore(txn, ns, ntoreturn, cursorid, &exhaust, &isCursorAuthorized); + dbresponse.response = + getMore(opCtx, ns, ntoreturn, cursorid, &exhaust, &isCursorAuthorized); } catch (AssertionException& e) { if (isCursorAuthorized) { // If a cursor with id 'cursorid' was authorized, it may have been advanced @@ -441,7 +445,7 @@ bool receivedGetMore(OperationContext* txn, DbResponse& dbresponse, Message& m, // because it may now be out of sync with the client's iteration state. // SERVER-7952 // TODO Temporary code, see SERVER-4563 for a cleanup overview. - CursorManager::eraseCursorGlobal(txn, cursorid); + CursorManager::eraseCursorGlobal(opCtx, cursorid); } BSONObjBuilder err; @@ -473,7 +477,7 @@ bool receivedGetMore(OperationContext* txn, DbResponse& dbresponse, Message& m, } // namespace // Returns false when request includes 'end' -void assembleResponse(OperationContext* txn, +void assembleResponse(OperationContext* opCtx, Message& m, DbResponse& dbresponse, const HostAndPort& remote) { @@ -483,15 +487,15 @@ void assembleResponse(OperationContext* txn, DbMessage dbmsg(m); - Client& c = *txn->getClient(); + Client& c = *opCtx->getClient(); if (c.isInDirectClient()) { - invariant(!txn->lockState()->inAWriteUnitOfWork()); + invariant(!opCtx->lockState()->inAWriteUnitOfWork()); } else { LastError::get(c).startRequest(); - AuthorizationSession::get(c)->startRequest(txn); + AuthorizationSession::get(c)->startRequest(opCtx); // We should not be holding any locks at this point - invariant(!txn->lockState()->isLocked()); + invariant(!opCtx->lockState()->isLocked()); } const char* ns = dbmsg.messageShouldHaveNs() ? dbmsg.getns() : NULL; @@ -507,15 +511,15 @@ void assembleResponse(OperationContext* txn, opwrite(m); if (nsString.coll() == "$cmd.sys.inprog") { - receivedPseudoCommand(txn, c, dbresponse, m, "currentOp"); + receivedPseudoCommand(opCtx, c, dbresponse, m, "currentOp"); return; } if (nsString.coll() == "$cmd.sys.killop") { - receivedPseudoCommand(txn, c, dbresponse, m, "killOp"); + receivedPseudoCommand(opCtx, c, dbresponse, m, "killOp"); return; } if (nsString.coll() == "$cmd.sys.unlock") { - receivedPseudoCommand(txn, c, dbresponse, m, "fsyncUnlock"); + receivedPseudoCommand(opCtx, c, dbresponse, m, "fsyncUnlock"); return; } } else { @@ -530,9 +534,9 @@ void assembleResponse(OperationContext* txn, opwrite(m); } - CurOp& currentOp = *CurOp::get(txn); + CurOp& currentOp = *CurOp::get(opCtx); { - stdx::lock_guard<Client> lk(*txn->getClient()); + stdx::lock_guard<Client> lk(*opCtx->getClient()); // Commands handling code will reset this if the operation is a command // which is logically a basic CRUD operation like query, insert, etc. currentOp.setNetworkOp_inlock(op); @@ -546,14 +550,14 @@ void assembleResponse(OperationContext* txn, if (op == dbQuery) { if (isCommand) { - receivedCommand(txn, nsString, c, dbresponse, m); + receivedCommand(opCtx, nsString, c, dbresponse, m); } else { - receivedQuery(txn, nsString, c, dbresponse, m); + receivedQuery(opCtx, nsString, c, dbresponse, m); } } else if (op == dbCommand) { - receivedRpc(txn, c, dbresponse, m); + receivedRpc(opCtx, c, dbresponse, m); } else if (op == dbGetMore) { - if (!receivedGetMore(txn, dbresponse, m, currentOp)) + if (!receivedGetMore(opCtx, dbresponse, m, currentOp)) shouldLogOpDebug = true; } else if (op == dbMsg) { // deprecated - replaced by commands @@ -575,7 +579,7 @@ void assembleResponse(OperationContext* txn, if (op == dbKillCursors) { currentOp.ensureStarted(); logThresholdMs = 10; - receivedKillCursors(txn, m); + receivedKillCursors(opCtx, m); } else if (op != dbInsert && op != dbUpdate && op != dbDelete) { log() << " operation isn't supported: " << static_cast<int>(op); currentOp.done(); @@ -597,11 +601,11 @@ void assembleResponse(OperationContext* txn, if (!nsString.isValid()) { uassert(16257, str::stream() << "Invalid ns [" << ns << "]", false); } else if (op == dbInsert) { - receivedInsert(txn, nsString, m); + receivedInsert(opCtx, nsString, m); } else if (op == dbUpdate) { - receivedUpdate(txn, nsString, m); + receivedUpdate(opCtx, nsString, m); } else if (op == dbDelete) { - receivedDelete(txn, nsString, m); + receivedDelete(opCtx, nsString, m); } else { invariant(false); } @@ -624,9 +628,9 @@ void assembleResponse(OperationContext* txn, debug.executionTimeMicros = currentOp.totalTimeMicros(); logThresholdMs += currentOp.getExpectedLatencyMs(); - Top::get(txn->getServiceContext()) + Top::get(opCtx->getServiceContext()) .incrementGlobalLatencyStats( - txn, currentOp.totalTimeMicros(), currentOp.getReadWriteType()); + opCtx, currentOp.totalTimeMicros(), currentOp.getReadWriteType()); const bool shouldSample = serverGlobalParams.sampleRate == 1.0 ? true @@ -634,13 +638,13 @@ void assembleResponse(OperationContext* txn, if (shouldLogOpDebug || (shouldSample && debug.executionTimeMicros > logThresholdMs * 1000LL)) { Locker::LockerInfo lockerInfo; - txn->lockState()->getLockerInfo(&lockerInfo); + opCtx->lockState()->getLockerInfo(&lockerInfo); log() << debug.report(&c, currentOp, lockerInfo.stats); } if (shouldSample && currentOp.shouldDBProfile()) { // Performance profiling is on - if (txn->lockState()->isReadLocked()) { + if (opCtx->lockState()->isReadLocked()) { LOG(1) << "note: not profiling because recursive read lock"; } else if (lockedForWriting()) { // TODO SERVER-26825: Fix race condition where fsyncLock is acquired post @@ -649,11 +653,11 @@ void assembleResponse(OperationContext* txn, } else if (storageGlobalParams.readOnly) { LOG(1) << "note: not profiling because server is read-only"; } else { - profile(txn, op); + profile(opCtx, op); } } - recordCurOpMetrics(txn); + recordCurOpMetrics(opCtx); } } // namespace mongo |