summaryrefslogtreecommitdiff
path: root/src/mongo/db/commands/run_aggregate.cpp
diff options
context:
space:
mode:
authorMickey. J Winters <mickey.winters@mongodb.com>2023-05-03 17:46:06 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-05-03 19:31:43 +0000
commit01e26506a0981b4141bc65c9d016666ada60319e (patch)
tree7a4f33c4aba91b4587ee8c2b5d0c07fa2a563c0c /src/mongo/db/commands/run_aggregate.cpp
parentd45cbc7a64e6e3846c3646c841c21cb3371b46ca (diff)
downloadmongo-01e26506a0981b4141bc65c9d016666ada60319e.tar.gz
SERVER-76289 Block opening new change streams during shard split or shard merge critical section
Diffstat (limited to 'src/mongo/db/commands/run_aggregate.cpp')
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp13
1 files changed, 13 insertions, 0 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index b03eae16dc9..7db34d9f041 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -81,6 +81,7 @@
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/repl/speculative_majority_read_info.h"
+#include "mongo/db/repl/tenant_migration_access_blocker_util.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/query_analysis_writer.h"
@@ -1010,6 +1011,18 @@ Status runAggregate(OperationContext* opCtx,
curOp->beginQueryPlanningTimer();
expCtx->stopExpressionCounters();
+ // This prevents opening a new change stream in the critical section of a serverless shard
+ // split or merge operation to prevent resuming on the recipient with a resume token higher
+ // than that operation's blockTimestamp.
+ //
+ // If we do this check before picking a startTime for a change stream then the primary could
+ // go into a blocking state between the check and getting the timestamp resulting in a
+ // startTime greater than blockTimestamp. Therefore we must do this check here, after the
+ // pipeline has been parsed and startTime has been initialized.
+ if (liteParsedPipeline.hasChangeStream()) {
+ tenant_migration_access_blocker::assertCanOpenChangeStream(expCtx->opCtx, nss.dbName());
+ }
+
// After parsing to detect if $$USER_ROLES is referenced in the query, set the value of
// $$USER_ROLES for the aggregation.
expCtx->setUserRoles();