diff options
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/clientcursor.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/clientcursor.h | 10 | ||||
-rw-r--r-- | src/mongo/db/commands/find_cmd.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/commands/getmore_cmd.cpp | 62 | ||||
-rw-r--r-- | src/mongo/db/commands/list_collections.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/commands/list_indexes.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/commands/repair_cursor.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/commands/run_aggregate.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/query/find.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/s/config/sharding_catalog_manager_drop_coll_test.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_common.cpp | 76 | ||||
-rw-r--r-- | src/mongo/db/write_concern_options.h | 4 |
12 files changed, 124 insertions, 56 deletions
diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp index c3a06b9676c..0ebcff34f09 100644 --- a/src/mongo/db/clientcursor.cpp +++ b/src/mongo/db/clientcursor.cpp @@ -87,7 +87,8 @@ ClientCursor::ClientCursor(ClientCursorParams params, _authenticatedUsers(std::move(params.authenticatedUsers)), _lsid(operationUsingCursor->getLogicalSessionId()), _txnNumber(operationUsingCursor->getTxnNumber()), - _readConcernArgs(params.readConcernArgs), + _writeConcernOptions(std::move(params.writeConcernOptions)), + _readConcernArgs(std::move(params.readConcernArgs)), _originatingCommand(params.originatingCommandObj), _originatingPrivileges(std::move(params.originatingPrivileges)), _queryOptions(params.queryOptions), diff --git a/src/mongo/db/clientcursor.h b/src/mongo/db/clientcursor.h index 92f51255ed5..4694031ccc1 100644 --- a/src/mongo/db/clientcursor.h +++ b/src/mongo/db/clientcursor.h @@ -78,13 +78,15 @@ struct ClientCursorParams { ClientCursorParams(std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> planExecutor, NamespaceString nss, UserNameIterator authenticatedUsersIter, + WriteConcernOptions writeConcernOptions, repl::ReadConcernArgs readConcernArgs, BSONObj originatingCommandObj, LockPolicy lockPolicy, PrivilegeVector originatingPrivileges) : exec(std::move(planExecutor)), nss(std::move(nss)), - readConcernArgs(readConcernArgs), + writeConcernOptions(std::move(writeConcernOptions)), + readConcernArgs(std::move(readConcernArgs)), queryOptions(exec->getCanonicalQuery() ? exec->getCanonicalQuery()->getQueryRequest().getOptions() : 0), @@ -113,6 +115,7 @@ struct ClientCursorParams { std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec; const NamespaceString nss; std::vector<UserName> authenticatedUsers; + const WriteConcernOptions writeConcernOptions; const repl::ReadConcernArgs readConcernArgs; int queryOptions = 0; BSONObj originatingCommandObj; @@ -165,6 +168,10 @@ public: return _readConcernArgs; } + WriteConcernOptions getWriteConcernOptions() const { + return _writeConcernOptions; + } + /** * Returns a pointer to the underlying query plan executor. All cursors manage a PlanExecutor, * so this method never returns a null pointer. @@ -370,6 +377,7 @@ private: // A transaction number for this cursor, if it was provided in the originating command. const boost::optional<TxnNumber> _txnNumber; + const WriteConcernOptions _writeConcernOptions; const repl::ReadConcernArgs _readConcernArgs; // Tracks whether dispose() has been called, to make sure it happens before destruction. It is diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index 90ded58d209..a3d0b8ccff1 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -500,6 +500,7 @@ public: {std::move(exec), nss, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), + opCtx->getWriteConcern(), repl::ReadConcernArgs::get(opCtx), _request.body, ClientCursorParams::LockPolicy::kLockExternally, diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index 27aff323124..93884a84988 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -169,6 +169,45 @@ void applyCursorReadConcern(OperationContext* opCtx, repl::ReadConcernArgs rcArg } /** + * Sets a deadline on the operation if the originating command had a maxTimeMS specified or if this + * is a tailable, awaitData cursor. + */ +void setUpOperationDeadline(OperationContext* opCtx, + const ClientCursor& cursor, + const GetMoreRequest& request, + bool disableAwaitDataFailpointActive) { + + // We assume that cursors created through a DBDirectClient are always used from their + // original OperationContext, so we do not need to move time to and from the cursor. + if (!opCtx->getClient()->isInDirectClient()) { + // There is no time limit set directly on this getMore command. If the cursor is + // awaitData, then we supply a default time of one second. Otherwise we roll over + // any leftover time from the maxTimeMS of the operation that spawned this cursor, + // applying it to this getMore. + if (cursor.isAwaitData() && !disableAwaitDataFailpointActive) { + awaitDataState(opCtx).waitForInsertsDeadline = + opCtx->getServiceContext()->getPreciseClockSource()->now() + + request.awaitDataTimeout.value_or(Seconds{1}); + } else if (cursor.getLeftoverMaxTimeMicros() < Microseconds::max()) { + opCtx->setDeadlineAfterNowBy(cursor.getLeftoverMaxTimeMicros(), + ErrorCodes::MaxTimeMSExpired); + } + } +} +/** + * Sets up the OperationContext in order to correctly inherit options like the read concern from the + * cursor to this operation. + */ +void setUpOperationContextStateForGetMore(OperationContext* opCtx, + const ClientCursor& cursor, + const GetMoreRequest& request, + bool disableAwaitDataFailpointActive) { + applyCursorReadConcern(opCtx, cursor.getReadConcernArgs()); + opCtx->setWriteConcern(cursor.getWriteConcernOptions()); + setUpOperationDeadline(opCtx, cursor, request, disableAwaitDataFailpointActive); +} + +/** * A command for running getMore() against an existing cursor registered with a CursorManager. * Used to generate the next batch of results for a ClientCursor. * @@ -455,28 +494,13 @@ public: _request.nss); } - // We must respect the read concern from the cursor. - applyCursorReadConcern(opCtx, cursorPin->getReadConcernArgs()); - const bool disableAwaitDataFailpointActive = MONGO_FAIL_POINT(disableAwaitDataForGetMoreCmd); - // We assume that cursors created through a DBDirectClient are always used from their - // original OperationContext, so we do not need to move time to and from the cursor. - if (!opCtx->getClient()->isInDirectClient()) { - // There is no time limit set directly on this getMore command. If the cursor is - // awaitData, then we supply a default time of one second. Otherwise we roll over - // any leftover time from the maxTimeMS of the operation that spawned this cursor, - // applying it to this getMore. - if (cursorPin->isAwaitData() && !disableAwaitDataFailpointActive) { - awaitDataState(opCtx).waitForInsertsDeadline = - opCtx->getServiceContext()->getPreciseClockSource()->now() + - _request.awaitDataTimeout.value_or(Seconds{1}); - } else if (cursorPin->getLeftoverMaxTimeMicros() < Microseconds::max()) { - opCtx->setDeadlineAfterNowBy(cursorPin->getLeftoverMaxTimeMicros(), - ErrorCodes::MaxTimeMSExpired); - } - } + // Inherit properties like readConcern and maxTimeMS from our originating cursor. + setUpOperationContextStateForGetMore( + opCtx, *cursorPin.getCursor(), _request, disableAwaitDataFailpointActive); + if (!cursorPin->isAwaitData()) { opCtx->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point. } diff --git a/src/mongo/db/commands/list_collections.cpp b/src/mongo/db/commands/list_collections.cpp index 35a2e0d8008..9c5ddbaab52 100644 --- a/src/mongo/db/commands/list_collections.cpp +++ b/src/mongo/db/commands/list_collections.cpp @@ -394,6 +394,7 @@ public: {std::move(exec), cursorNss, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), + opCtx->getWriteConcern(), repl::ReadConcernArgs::get(opCtx), jsobj, ClientCursorParams::LockPolicy::kLocksInternally, diff --git a/src/mongo/db/commands/list_indexes.cpp b/src/mongo/db/commands/list_indexes.cpp index 9b4a1c0f0fe..dac2a6197b6 100644 --- a/src/mongo/db/commands/list_indexes.cpp +++ b/src/mongo/db/commands/list_indexes.cpp @@ -229,6 +229,7 @@ public: {std::move(exec), nss, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), + opCtx->getWriteConcern(), repl::ReadConcernArgs::get(opCtx), cmdObj, ClientCursorParams::LockPolicy::kLocksInternally, diff --git a/src/mongo/db/commands/repair_cursor.cpp b/src/mongo/db/commands/repair_cursor.cpp index ffbaf61f66e..9618543a0a9 100644 --- a/src/mongo/db/commands/repair_cursor.cpp +++ b/src/mongo/db/commands/repair_cursor.cpp @@ -103,6 +103,7 @@ public: {std::move(exec), ns, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), + opCtx->getWriteConcern(), repl::ReadConcernArgs::get(opCtx), cmdObj, ClientCursorParams::LockPolicy::kLockExternally, diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index fcb8d01237d..8c1be73e152 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -732,6 +732,7 @@ Status runAggregate(OperationContext* opCtx, std::move(exec), origNss, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), + opCtx->getWriteConcern(), repl::ReadConcernArgs::get(opCtx), cmdObj, lockPolicy, diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp index ce24825218a..7485d1a7260 100644 --- a/src/mongo/db/query/find.cpp +++ b/src/mongo/db/query/find.cpp @@ -397,6 +397,13 @@ Message getMore(OperationContext* opCtx, "OP_GET_MORE operations are not supported on tailable aggregations. Only clients " "which support the getMore command can be used on tailable aggregations.", readLock || !cursorPin->isAwaitData()); + uassert( + 31124, + str::stream() + << "OP_GET_MORE does not support cursors with a write concern other than the default." + " Use the getMore command instead. Write concern was: " + << cursorPin->getWriteConcernOptions().toBSON(), + cursorPin->getWriteConcernOptions().usedDefault); // If the operation that spawned this cursor had a time limit set, apply leftover time to this // getmore. @@ -724,6 +731,7 @@ std::string runQuery(OperationContext* opCtx, {std::move(exec), nss, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), + opCtx->getWriteConcern(), readConcernArgs, upconvertedQuery, ClientCursorParams::LockPolicy::kLockExternally, diff --git a/src/mongo/db/s/config/sharding_catalog_manager_drop_coll_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_drop_coll_test.cpp index 6f0e42503ed..43d6a7f4304 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_drop_coll_test.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_drop_coll_test.cpp @@ -111,9 +111,7 @@ public: onCommand([this, shard](const RemoteCommandRequest& request) { ASSERT_EQ(HostAndPort(shard.getHost()), request.target); ASSERT_EQ(_dropNS.db(), request.dbname); - ASSERT_BSONOBJ_EQ(BSON("drop" << _dropNS.coll() << "writeConcern" - << BSON("w" << 0 << "wtimeout" << 0)), - request.cmdObj); + ASSERT_BSONOBJ_EQ(BSON("drop" << _dropNS.coll()), request.cmdObj); ASSERT_BSONOBJ_EQ(rpc::makeEmptyMetadata(), rpc::TrackingMetadata::removeTrackingData(request.metadata)); @@ -222,9 +220,7 @@ TEST_F(DropColl2ShardTest, NSNotFound) { onCommand([this](const RemoteCommandRequest& request) { ASSERT_EQ(HostAndPort(shard1().getHost()), request.target); ASSERT_EQ(dropNS().db(), request.dbname); - ASSERT_BSONOBJ_EQ( - BSON("drop" << dropNS().coll() << "writeConcern" << BSON("w" << 0 << "wtimeout" << 0)), - request.cmdObj); + ASSERT_BSONOBJ_EQ(BSON("drop" << dropNS().coll()), request.cmdObj); ASSERT_BSONOBJ_EQ(rpc::makeEmptyMetadata(), rpc::TrackingMetadata::removeTrackingData(request.metadata)); @@ -235,9 +231,7 @@ TEST_F(DropColl2ShardTest, NSNotFound) { onCommand([this](const RemoteCommandRequest& request) { ASSERT_EQ(HostAndPort(shard2().getHost()), request.target); ASSERT_EQ(dropNS().db(), request.dbname); - ASSERT_BSONOBJ_EQ( - BSON("drop" << dropNS().coll() << "writeConcern" << BSON("w" << 0 << "wtimeout" << 0)), - request.cmdObj); + ASSERT_BSONOBJ_EQ(BSON("drop" << dropNS().coll()), request.cmdObj); ASSERT_BSONOBJ_EQ(rpc::makeEmptyMetadata(), rpc::TrackingMetadata::removeTrackingData(request.metadata)); diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index 498b6e8c2c6..5c48ade9178 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -478,35 +478,33 @@ bool runCommandImpl(OperationContext* opCtx, const bool shouldCheckOutSession = sessionOptions.getTxnNumber() && !shouldCommandSkipSessionCheckout(command->getName()); - if (!invocation->supportsWriteConcern()) { - behaviors.uassertCommandDoesNotSpecifyWriteConcern(request.body); - if (shouldCheckOutSession) { - invokeWithSessionCheckedOut(opCtx, invocation, sessionOptions, replyBuilder); - } else { - invocation->run(opCtx, replyBuilder); - MONGO_FAIL_POINT_BLOCK(waitAfterReadCommandFinishesExecution, options) { - const BSONObj& data = options.getData(); - auto db = data["db"].str(); - if (db.empty() || request.getDatabase() == db) { - CurOpFailpointHelpers::waitWhileFailPointEnabled( - &waitAfterReadCommandFinishesExecution, - opCtx, - "waitAfterReadCommandFinishesExecution"); - } - } - } - } else { - auto wcResult = uassertStatusOK(extractWriteConcern(opCtx, request.body)); - if (sessionOptions.getAutocommit()) { - validateWriteConcernForTransaction(wcResult, invocation->definition()->getName()); - } + // getMore operations inherit a WriteConcern from their originating cursor. For example, if the + // originating command was an aggregate with a $out and batchSize: 0. Note that if the command + // only performed reads then we will not need to wait at all. + const bool shouldWaitForWriteConcern = + invocation->supportsWriteConcern() || command->getLogicalOp() == LogicalOp::opGetMore; + if (shouldWaitForWriteConcern) { auto lastOpBeforeRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); // Change the write concern while running the command. const auto oldWC = opCtx->getWriteConcern(); ON_BLOCK_EXIT([&] { opCtx->setWriteConcern(oldWC); }); - opCtx->setWriteConcern(wcResult); + + boost::optional<WriteConcernOptions> extractedWriteConcern; + if (command->getLogicalOp() == LogicalOp::opGetMore) { + // WriteConcern will be set up during command processing, it must not be specified on + // the command body. + behaviors.uassertCommandDoesNotSpecifyWriteConcern(request.body); + } else { + extractedWriteConcern.emplace( + uassertStatusOK(extractWriteConcern(opCtx, request.body))); + if (sessionOptions.getAutocommit()) { + validateWriteConcernForTransaction(*extractedWriteConcern, + invocation->definition()->getName()); + } + opCtx->setWriteConcern(*extractedWriteConcern); + } auto waitForWriteConcern = [&](auto&& bb) { MONGO_FAIL_POINT_BLOCK_IF(failCommand, data, [&](const BSONObj& data) { @@ -539,9 +537,35 @@ bool runCommandImpl(OperationContext* opCtx, waitForWriteConcern(replyBuilder->getBodyBuilder()); - // Nothing in run() should change the writeConcern. - dassert(SimpleBSONObjComparator::kInstance.evaluate(opCtx->getWriteConcern().toBSON() == - wcResult.toBSON())); + // With the exception of getMores inheriting the WriteConcern from the originating command, + // nothing in run() should change the writeConcern. + dassert(command->getLogicalOp() == LogicalOp::opGetMore + ? !extractedWriteConcern + : (extractedWriteConcern && + SimpleBSONObjComparator::kInstance.evaluate( + opCtx->getWriteConcern().toBSON() == extractedWriteConcern->toBSON()))); + } else { + behaviors.uassertCommandDoesNotSpecifyWriteConcern(request.body); + if (shouldCheckOutSession) { + invokeWithSessionCheckedOut(opCtx, invocation, sessionOptions, replyBuilder); + } else { + invocation->run(opCtx, replyBuilder); + } + } + + // This failpoint should affect both getMores and commands which are read-only and thus don't + // support writeConcern. + if (!shouldWaitForWriteConcern || command->getLogicalOp() == LogicalOp::opGetMore) { + MONGO_FAIL_POINT_BLOCK(waitAfterReadCommandFinishesExecution, options) { + const BSONObj& data = options.getData(); + auto db = data["db"].str(); + if (db.empty() || request.getDatabase() == db) { + CurOpFailpointHelpers::waitWhileFailPointEnabled( + &waitAfterReadCommandFinishesExecution, + opCtx, + "waitAfterReadCommandFinishesExecution"); + } + } } behaviors.waitForLinearizableReadConcern(opCtx); diff --git a/src/mongo/db/write_concern_options.h b/src/mongo/db/write_concern_options.h index 6ea362abab4..99a6bff9fc2 100644 --- a/src/mongo/db/write_concern_options.h +++ b/src/mongo/db/write_concern_options.h @@ -59,6 +59,10 @@ public: WriteConcernOptions() { reset(); + // It is assumed that a default-constructed WriteConcernOptions will be populated with the + // default options. If it is subsequently populated with non-default options, it is the + // caller's responsibility to set this flag accordingly. + usedDefault = true; } WriteConcernOptions(int numNodes, SyncMode sync, int timeout); |