diff options
author | Rishab Joshi <rishab.joshi@mongodb.com> | 2021-04-29 12:49:04 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-04-30 00:42:46 +0000 |
commit | 8e30a28df2f10b53ee83f034f26b805c5e980914 (patch) | |
tree | fd57ca827372e5c317efec00a1a838bbb3f00abe /src/mongo/s/query/cluster_aggregation_planner.cpp | |
parent | e5e1ac3798b786d179ec16d4deec3744c5dc3be7 (diff) | |
download | mongo-8e30a28df2f10b53ee83f034f26b805c5e980914.tar.gz |
SERVER-54937 Removes DocumentSourceChangeStreamCloseCursor dependencies on invalidate events
Diffstat (limited to 'src/mongo/s/query/cluster_aggregation_planner.cpp')
-rw-r--r-- | src/mongo/s/query/cluster_aggregation_planner.cpp | 13 |
1 files changed, 13 insertions, 0 deletions
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index 90e2b822178..8827907d5a2 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -35,6 +35,7 @@ #include "mongo/bson/util/bson_extract.h" #include "mongo/client/connpool.h" #include "mongo/db/auth/authorization_session.h" +#include "mongo/db/pipeline/change_stream_invalidation_info.h" #include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/document_source_skip.h" #include "mongo/db/pipeline/sharded_agg_helpers.h" @@ -290,6 +291,18 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx, // error. cursorState = ClusterCursorManager::CursorState::Exhausted; break; + } catch (const ExceptionFor<ErrorCodes::ChangeStreamInvalidated>& ex) { + // This exception is thrown when a change-stream cursor is invalidated. Set the PBRT + // to the resume token of the invalidating event, and mark the cursor response as + // invalidated. We always expect to have ExtraInfo for this error code. + const auto extraInfo = ex.extraInfo<ChangeStreamInvalidationInfo>(); + tassert( + 5493706, "Missing ChangeStreamInvalidationInfo on exception", extraInfo != nullptr); + + responseBuilder.setPostBatchResumeToken(extraInfo->getInvalidateResumeToken()); + responseBuilder.setInvalidated(); + cursorState = ClusterCursorManager::CursorState::Exhausted; + break; } // Check whether we have exhausted the pipeline's results. |