summaryrefslogtreecommitdiff
path: root/src/mongo/s/query/cluster_aggregation_planner.cpp
diff options
context:
space:
mode:
authorRishab Joshi <rishab.joshi@mongodb.com>2021-04-29 12:49:04 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-04-30 00:42:46 +0000
commit8e30a28df2f10b53ee83f034f26b805c5e980914 (patch)
treefd57ca827372e5c317efec00a1a838bbb3f00abe /src/mongo/s/query/cluster_aggregation_planner.cpp
parente5e1ac3798b786d179ec16d4deec3744c5dc3be7 (diff)
downloadmongo-8e30a28df2f10b53ee83f034f26b805c5e980914.tar.gz
SERVER-54937 Removes DocumentSourceChangeStreamCloseCursor dependencies on invalidate events
Diffstat (limited to 'src/mongo/s/query/cluster_aggregation_planner.cpp')
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.cpp13
1 files changed, 13 insertions, 0 deletions
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp
index 90e2b822178..8827907d5a2 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner.cpp
@@ -35,6 +35,7 @@
#include "mongo/bson/util/bson_extract.h"
#include "mongo/client/connpool.h"
#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/pipeline/change_stream_invalidation_info.h"
#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/pipeline/document_source_skip.h"
#include "mongo/db/pipeline/sharded_agg_helpers.h"
@@ -290,6 +291,18 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx,
// error.
cursorState = ClusterCursorManager::CursorState::Exhausted;
break;
+ } catch (const ExceptionFor<ErrorCodes::ChangeStreamInvalidated>& ex) {
+ // This exception is thrown when a change-stream cursor is invalidated. Set the PBRT
+ // to the resume token of the invalidating event, and mark the cursor response as
+ // invalidated. We always expect to have ExtraInfo for this error code.
+ const auto extraInfo = ex.extraInfo<ChangeStreamInvalidationInfo>();
+ tassert(
+ 5493706, "Missing ChangeStreamInvalidationInfo on exception", extraInfo != nullptr);
+
+ responseBuilder.setPostBatchResumeToken(extraInfo->getInvalidateResumeToken());
+ responseBuilder.setInvalidated();
+ cursorState = ClusterCursorManager::CursorState::Exhausted;
+ break;
}
// Check whether we have exhausted the pipeline's results.