diff options
author | Mickey. J Winters <mickey.winters@mongodb.com> | 2023-05-03 17:46:06 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-05-03 19:31:43 +0000 |
commit | 01e26506a0981b4141bc65c9d016666ada60319e (patch) | |
tree | 7a4f33c4aba91b4587ee8c2b5d0c07fa2a563c0c /src/mongo/db/commands/run_aggregate.cpp | |
parent | d45cbc7a64e6e3846c3646c841c21cb3371b46ca (diff) | |
download | mongo-01e26506a0981b4141bc65c9d016666ada60319e.tar.gz |
SERVER-76289 Block opening new change streams during shard split or shard merge critical section
Diffstat (limited to 'src/mongo/db/commands/run_aggregate.cpp')
-rw-r--r-- | src/mongo/db/commands/run_aggregate.cpp | 13 |
1 files changed, 13 insertions, 0 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index b03eae16dc9..7db34d9f041 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -81,6 +81,7 @@ #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/repl/speculative_majority_read_info.h" +#include "mongo/db/repl/tenant_migration_access_blocker_util.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/query_analysis_writer.h" @@ -1010,6 +1011,18 @@ Status runAggregate(OperationContext* opCtx, curOp->beginQueryPlanningTimer(); expCtx->stopExpressionCounters(); + // This prevents opening a new change stream in the critical section of a serverless shard + // split or merge operation to prevent resuming on the recipient with a resume token higher + // than that operation's blockTimestamp. + // + // If we do this check before picking a startTime for a change stream then the primary could + // go into a blocking state between the check and getting the timestamp resulting in a + // startTime greater than blockTimestamp. Therefore we must do this check here, after the + // pipeline has been parsed and startTime has been initialized. + if (liteParsedPipeline.hasChangeStream()) { + tenant_migration_access_blocker::assertCanOpenChangeStream(expCtx->opCtx, nss.dbName()); + } + // After parsing to detect if $$USER_ROLES is referenced in the query, set the value of // $$USER_ROLES for the aggregation. expCtx->setUserRoles(); |