summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/clientcursor.cpp3
-rw-r--r--src/mongo/db/clientcursor.h10
-rw-r--r--src/mongo/db/commands/find_cmd.cpp1
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp62
-rw-r--r--src/mongo/db/commands/list_collections.cpp1
-rw-r--r--src/mongo/db/commands/list_indexes.cpp1
-rw-r--r--src/mongo/db/commands/repair_cursor.cpp1
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp1
-rw-r--r--src/mongo/db/query/find.cpp8
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_drop_coll_test.cpp12
-rw-r--r--src/mongo/db/service_entry_point_common.cpp76
-rw-r--r--src/mongo/db/write_concern_options.h4
12 files changed, 124 insertions, 56 deletions
diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp
index c3a06b9676c..0ebcff34f09 100644
--- a/src/mongo/db/clientcursor.cpp
+++ b/src/mongo/db/clientcursor.cpp
@@ -87,7 +87,8 @@ ClientCursor::ClientCursor(ClientCursorParams params,
_authenticatedUsers(std::move(params.authenticatedUsers)),
_lsid(operationUsingCursor->getLogicalSessionId()),
_txnNumber(operationUsingCursor->getTxnNumber()),
- _readConcernArgs(params.readConcernArgs),
+ _writeConcernOptions(std::move(params.writeConcernOptions)),
+ _readConcernArgs(std::move(params.readConcernArgs)),
_originatingCommand(params.originatingCommandObj),
_originatingPrivileges(std::move(params.originatingPrivileges)),
_queryOptions(params.queryOptions),
diff --git a/src/mongo/db/clientcursor.h b/src/mongo/db/clientcursor.h
index 92f51255ed5..4694031ccc1 100644
--- a/src/mongo/db/clientcursor.h
+++ b/src/mongo/db/clientcursor.h
@@ -78,13 +78,15 @@ struct ClientCursorParams {
ClientCursorParams(std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> planExecutor,
NamespaceString nss,
UserNameIterator authenticatedUsersIter,
+ WriteConcernOptions writeConcernOptions,
repl::ReadConcernArgs readConcernArgs,
BSONObj originatingCommandObj,
LockPolicy lockPolicy,
PrivilegeVector originatingPrivileges)
: exec(std::move(planExecutor)),
nss(std::move(nss)),
- readConcernArgs(readConcernArgs),
+ writeConcernOptions(std::move(writeConcernOptions)),
+ readConcernArgs(std::move(readConcernArgs)),
queryOptions(exec->getCanonicalQuery()
? exec->getCanonicalQuery()->getQueryRequest().getOptions()
: 0),
@@ -113,6 +115,7 @@ struct ClientCursorParams {
std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec;
const NamespaceString nss;
std::vector<UserName> authenticatedUsers;
+ const WriteConcernOptions writeConcernOptions;
const repl::ReadConcernArgs readConcernArgs;
int queryOptions = 0;
BSONObj originatingCommandObj;
@@ -165,6 +168,10 @@ public:
return _readConcernArgs;
}
+ WriteConcernOptions getWriteConcernOptions() const {
+ return _writeConcernOptions;
+ }
+
/**
* Returns a pointer to the underlying query plan executor. All cursors manage a PlanExecutor,
* so this method never returns a null pointer.
@@ -370,6 +377,7 @@ private:
// A transaction number for this cursor, if it was provided in the originating command.
const boost::optional<TxnNumber> _txnNumber;
+ const WriteConcernOptions _writeConcernOptions;
const repl::ReadConcernArgs _readConcernArgs;
// Tracks whether dispose() has been called, to make sure it happens before destruction. It is
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index 90ded58d209..a3d0b8ccff1 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -500,6 +500,7 @@ public:
{std::move(exec),
nss,
AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
+ opCtx->getWriteConcern(),
repl::ReadConcernArgs::get(opCtx),
_request.body,
ClientCursorParams::LockPolicy::kLockExternally,
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index 27aff323124..93884a84988 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -169,6 +169,45 @@ void applyCursorReadConcern(OperationContext* opCtx, repl::ReadConcernArgs rcArg
}
/**
+ * Sets a deadline on the operation if the originating command had a maxTimeMS specified or if this
+ * is a tailable, awaitData cursor.
+ */
+void setUpOperationDeadline(OperationContext* opCtx,
+ const ClientCursor& cursor,
+ const GetMoreRequest& request,
+ bool disableAwaitDataFailpointActive) {
+
+ // We assume that cursors created through a DBDirectClient are always used from their
+ // original OperationContext, so we do not need to move time to and from the cursor.
+ if (!opCtx->getClient()->isInDirectClient()) {
+ // There is no time limit set directly on this getMore command. If the cursor is
+ // awaitData, then we supply a default time of one second. Otherwise we roll over
+ // any leftover time from the maxTimeMS of the operation that spawned this cursor,
+ // applying it to this getMore.
+ if (cursor.isAwaitData() && !disableAwaitDataFailpointActive) {
+ awaitDataState(opCtx).waitForInsertsDeadline =
+ opCtx->getServiceContext()->getPreciseClockSource()->now() +
+ request.awaitDataTimeout.value_or(Seconds{1});
+ } else if (cursor.getLeftoverMaxTimeMicros() < Microseconds::max()) {
+ opCtx->setDeadlineAfterNowBy(cursor.getLeftoverMaxTimeMicros(),
+ ErrorCodes::MaxTimeMSExpired);
+ }
+ }
+}
+/**
+ * Sets up the OperationContext in order to correctly inherit options like the read concern from the
+ * cursor to this operation.
+ */
+void setUpOperationContextStateForGetMore(OperationContext* opCtx,
+ const ClientCursor& cursor,
+ const GetMoreRequest& request,
+ bool disableAwaitDataFailpointActive) {
+ applyCursorReadConcern(opCtx, cursor.getReadConcernArgs());
+ opCtx->setWriteConcern(cursor.getWriteConcernOptions());
+ setUpOperationDeadline(opCtx, cursor, request, disableAwaitDataFailpointActive);
+}
+
+/**
* A command for running getMore() against an existing cursor registered with a CursorManager.
* Used to generate the next batch of results for a ClientCursor.
*
@@ -455,28 +494,13 @@ public:
_request.nss);
}
- // We must respect the read concern from the cursor.
- applyCursorReadConcern(opCtx, cursorPin->getReadConcernArgs());
-
const bool disableAwaitDataFailpointActive =
MONGO_FAIL_POINT(disableAwaitDataForGetMoreCmd);
- // We assume that cursors created through a DBDirectClient are always used from their
- // original OperationContext, so we do not need to move time to and from the cursor.
- if (!opCtx->getClient()->isInDirectClient()) {
- // There is no time limit set directly on this getMore command. If the cursor is
- // awaitData, then we supply a default time of one second. Otherwise we roll over
- // any leftover time from the maxTimeMS of the operation that spawned this cursor,
- // applying it to this getMore.
- if (cursorPin->isAwaitData() && !disableAwaitDataFailpointActive) {
- awaitDataState(opCtx).waitForInsertsDeadline =
- opCtx->getServiceContext()->getPreciseClockSource()->now() +
- _request.awaitDataTimeout.value_or(Seconds{1});
- } else if (cursorPin->getLeftoverMaxTimeMicros() < Microseconds::max()) {
- opCtx->setDeadlineAfterNowBy(cursorPin->getLeftoverMaxTimeMicros(),
- ErrorCodes::MaxTimeMSExpired);
- }
- }
+ // Inherit properties like readConcern and maxTimeMS from our originating cursor.
+ setUpOperationContextStateForGetMore(
+ opCtx, *cursorPin.getCursor(), _request, disableAwaitDataFailpointActive);
+
if (!cursorPin->isAwaitData()) {
opCtx->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point.
}
diff --git a/src/mongo/db/commands/list_collections.cpp b/src/mongo/db/commands/list_collections.cpp
index 35a2e0d8008..9c5ddbaab52 100644
--- a/src/mongo/db/commands/list_collections.cpp
+++ b/src/mongo/db/commands/list_collections.cpp
@@ -394,6 +394,7 @@ public:
{std::move(exec),
cursorNss,
AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
+ opCtx->getWriteConcern(),
repl::ReadConcernArgs::get(opCtx),
jsobj,
ClientCursorParams::LockPolicy::kLocksInternally,
diff --git a/src/mongo/db/commands/list_indexes.cpp b/src/mongo/db/commands/list_indexes.cpp
index 9b4a1c0f0fe..dac2a6197b6 100644
--- a/src/mongo/db/commands/list_indexes.cpp
+++ b/src/mongo/db/commands/list_indexes.cpp
@@ -229,6 +229,7 @@ public:
{std::move(exec),
nss,
AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
+ opCtx->getWriteConcern(),
repl::ReadConcernArgs::get(opCtx),
cmdObj,
ClientCursorParams::LockPolicy::kLocksInternally,
diff --git a/src/mongo/db/commands/repair_cursor.cpp b/src/mongo/db/commands/repair_cursor.cpp
index ffbaf61f66e..9618543a0a9 100644
--- a/src/mongo/db/commands/repair_cursor.cpp
+++ b/src/mongo/db/commands/repair_cursor.cpp
@@ -103,6 +103,7 @@ public:
{std::move(exec),
ns,
AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
+ opCtx->getWriteConcern(),
repl::ReadConcernArgs::get(opCtx),
cmdObj,
ClientCursorParams::LockPolicy::kLockExternally,
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index fcb8d01237d..8c1be73e152 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -732,6 +732,7 @@ Status runAggregate(OperationContext* opCtx,
std::move(exec),
origNss,
AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
+ opCtx->getWriteConcern(),
repl::ReadConcernArgs::get(opCtx),
cmdObj,
lockPolicy,
diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp
index ce24825218a..7485d1a7260 100644
--- a/src/mongo/db/query/find.cpp
+++ b/src/mongo/db/query/find.cpp
@@ -397,6 +397,13 @@ Message getMore(OperationContext* opCtx,
"OP_GET_MORE operations are not supported on tailable aggregations. Only clients "
"which support the getMore command can be used on tailable aggregations.",
readLock || !cursorPin->isAwaitData());
+ uassert(
+ 31124,
+ str::stream()
+ << "OP_GET_MORE does not support cursors with a write concern other than the default."
+ " Use the getMore command instead. Write concern was: "
+ << cursorPin->getWriteConcernOptions().toBSON(),
+ cursorPin->getWriteConcernOptions().usedDefault);
// If the operation that spawned this cursor had a time limit set, apply leftover time to this
// getmore.
@@ -724,6 +731,7 @@ std::string runQuery(OperationContext* opCtx,
{std::move(exec),
nss,
AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
+ opCtx->getWriteConcern(),
readConcernArgs,
upconvertedQuery,
ClientCursorParams::LockPolicy::kLockExternally,
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_drop_coll_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_drop_coll_test.cpp
index 6f0e42503ed..43d6a7f4304 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_drop_coll_test.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_drop_coll_test.cpp
@@ -111,9 +111,7 @@ public:
onCommand([this, shard](const RemoteCommandRequest& request) {
ASSERT_EQ(HostAndPort(shard.getHost()), request.target);
ASSERT_EQ(_dropNS.db(), request.dbname);
- ASSERT_BSONOBJ_EQ(BSON("drop" << _dropNS.coll() << "writeConcern"
- << BSON("w" << 0 << "wtimeout" << 0)),
- request.cmdObj);
+ ASSERT_BSONOBJ_EQ(BSON("drop" << _dropNS.coll()), request.cmdObj);
ASSERT_BSONOBJ_EQ(rpc::makeEmptyMetadata(),
rpc::TrackingMetadata::removeTrackingData(request.metadata));
@@ -222,9 +220,7 @@ TEST_F(DropColl2ShardTest, NSNotFound) {
onCommand([this](const RemoteCommandRequest& request) {
ASSERT_EQ(HostAndPort(shard1().getHost()), request.target);
ASSERT_EQ(dropNS().db(), request.dbname);
- ASSERT_BSONOBJ_EQ(
- BSON("drop" << dropNS().coll() << "writeConcern" << BSON("w" << 0 << "wtimeout" << 0)),
- request.cmdObj);
+ ASSERT_BSONOBJ_EQ(BSON("drop" << dropNS().coll()), request.cmdObj);
ASSERT_BSONOBJ_EQ(rpc::makeEmptyMetadata(),
rpc::TrackingMetadata::removeTrackingData(request.metadata));
@@ -235,9 +231,7 @@ TEST_F(DropColl2ShardTest, NSNotFound) {
onCommand([this](const RemoteCommandRequest& request) {
ASSERT_EQ(HostAndPort(shard2().getHost()), request.target);
ASSERT_EQ(dropNS().db(), request.dbname);
- ASSERT_BSONOBJ_EQ(
- BSON("drop" << dropNS().coll() << "writeConcern" << BSON("w" << 0 << "wtimeout" << 0)),
- request.cmdObj);
+ ASSERT_BSONOBJ_EQ(BSON("drop" << dropNS().coll()), request.cmdObj);
ASSERT_BSONOBJ_EQ(rpc::makeEmptyMetadata(),
rpc::TrackingMetadata::removeTrackingData(request.metadata));
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index 498b6e8c2c6..5c48ade9178 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -478,35 +478,33 @@ bool runCommandImpl(OperationContext* opCtx,
const bool shouldCheckOutSession =
sessionOptions.getTxnNumber() && !shouldCommandSkipSessionCheckout(command->getName());
- if (!invocation->supportsWriteConcern()) {
- behaviors.uassertCommandDoesNotSpecifyWriteConcern(request.body);
- if (shouldCheckOutSession) {
- invokeWithSessionCheckedOut(opCtx, invocation, sessionOptions, replyBuilder);
- } else {
- invocation->run(opCtx, replyBuilder);
- MONGO_FAIL_POINT_BLOCK(waitAfterReadCommandFinishesExecution, options) {
- const BSONObj& data = options.getData();
- auto db = data["db"].str();
- if (db.empty() || request.getDatabase() == db) {
- CurOpFailpointHelpers::waitWhileFailPointEnabled(
- &waitAfterReadCommandFinishesExecution,
- opCtx,
- "waitAfterReadCommandFinishesExecution");
- }
- }
- }
- } else {
- auto wcResult = uassertStatusOK(extractWriteConcern(opCtx, request.body));
- if (sessionOptions.getAutocommit()) {
- validateWriteConcernForTransaction(wcResult, invocation->definition()->getName());
- }
+ // getMore operations inherit a WriteConcern from their originating cursor. For example, if the
+ // originating command was an aggregate with a $out and batchSize: 0. Note that if the command
+ // only performed reads then we will not need to wait at all.
+ const bool shouldWaitForWriteConcern =
+ invocation->supportsWriteConcern() || command->getLogicalOp() == LogicalOp::opGetMore;
+ if (shouldWaitForWriteConcern) {
auto lastOpBeforeRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
// Change the write concern while running the command.
const auto oldWC = opCtx->getWriteConcern();
ON_BLOCK_EXIT([&] { opCtx->setWriteConcern(oldWC); });
- opCtx->setWriteConcern(wcResult);
+
+ boost::optional<WriteConcernOptions> extractedWriteConcern;
+ if (command->getLogicalOp() == LogicalOp::opGetMore) {
+ // WriteConcern will be set up during command processing, it must not be specified on
+ // the command body.
+ behaviors.uassertCommandDoesNotSpecifyWriteConcern(request.body);
+ } else {
+ extractedWriteConcern.emplace(
+ uassertStatusOK(extractWriteConcern(opCtx, request.body)));
+ if (sessionOptions.getAutocommit()) {
+ validateWriteConcernForTransaction(*extractedWriteConcern,
+ invocation->definition()->getName());
+ }
+ opCtx->setWriteConcern(*extractedWriteConcern);
+ }
auto waitForWriteConcern = [&](auto&& bb) {
MONGO_FAIL_POINT_BLOCK_IF(failCommand, data, [&](const BSONObj& data) {
@@ -539,9 +537,35 @@ bool runCommandImpl(OperationContext* opCtx,
waitForWriteConcern(replyBuilder->getBodyBuilder());
- // Nothing in run() should change the writeConcern.
- dassert(SimpleBSONObjComparator::kInstance.evaluate(opCtx->getWriteConcern().toBSON() ==
- wcResult.toBSON()));
+ // With the exception of getMores inheriting the WriteConcern from the originating command,
+ // nothing in run() should change the writeConcern.
+ dassert(command->getLogicalOp() == LogicalOp::opGetMore
+ ? !extractedWriteConcern
+ : (extractedWriteConcern &&
+ SimpleBSONObjComparator::kInstance.evaluate(
+ opCtx->getWriteConcern().toBSON() == extractedWriteConcern->toBSON())));
+ } else {
+ behaviors.uassertCommandDoesNotSpecifyWriteConcern(request.body);
+ if (shouldCheckOutSession) {
+ invokeWithSessionCheckedOut(opCtx, invocation, sessionOptions, replyBuilder);
+ } else {
+ invocation->run(opCtx, replyBuilder);
+ }
+ }
+
+ // This failpoint should affect both getMores and commands which are read-only and thus don't
+ // support writeConcern.
+ if (!shouldWaitForWriteConcern || command->getLogicalOp() == LogicalOp::opGetMore) {
+ MONGO_FAIL_POINT_BLOCK(waitAfterReadCommandFinishesExecution, options) {
+ const BSONObj& data = options.getData();
+ auto db = data["db"].str();
+ if (db.empty() || request.getDatabase() == db) {
+ CurOpFailpointHelpers::waitWhileFailPointEnabled(
+ &waitAfterReadCommandFinishesExecution,
+ opCtx,
+ "waitAfterReadCommandFinishesExecution");
+ }
+ }
}
behaviors.waitForLinearizableReadConcern(opCtx);
diff --git a/src/mongo/db/write_concern_options.h b/src/mongo/db/write_concern_options.h
index 6ea362abab4..99a6bff9fc2 100644
--- a/src/mongo/db/write_concern_options.h
+++ b/src/mongo/db/write_concern_options.h
@@ -59,6 +59,10 @@ public:
WriteConcernOptions() {
reset();
+ // It is assumed that a default-constructed WriteConcernOptions will be populated with the
+ // default options. If it is subsequently populated with non-default options, it is the
+ // caller's responsibility to set this flag accordingly.
+ usedDefault = true;
}
WriteConcernOptions(int numNodes, SyncMode sync, int timeout);