diff options
-rw-r--r-- | buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml | 8 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/query/router_exec_stage.h | 2 | ||||
-rw-r--r-- | src/mongo/s/query/router_stage_merge.h | 4 | ||||
-rw-r--r-- | src/mongo/s/query/router_stage_pipeline.cpp | 2 | ||||
-rw-r--r-- | src/mongo/s/query/router_stage_pipeline.h | 2 |
6 files changed, 10 insertions, 12 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml index 11e1e110db3..db39ec22c09 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml @@ -3,14 +3,6 @@ test_kind: js_test selector: roots: - jstests/change_streams/**/*.js - exclude_files: - # TODO SERVER-63771 unblock all these tests. - - jstests/change_streams/metadata_notifications.js - - jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js - - jstests/change_streams/report_post_batch_resume_token.js - - jstests/change_streams/resume_from_high_water_mark_token.js - - jstests/change_streams/start_after_invalidation_exception.js - - jstests/change_streams/whole_db_metadata_notifications.js exclude_with_any_tags: ## # The next tags correspond to the special errors thrown by the diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 9100df3c57d..56f2deff96e 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -168,7 +168,9 @@ bool AsyncResultsMerger::remotesExhausted() const { bool AsyncResultsMerger::_remotesExhausted(WithLock) const { for (const auto& remote : _remotes) { - if (!remote.exhausted()) { + // If any remote has been invalidated, we must force the batch-building code to make another + // attempt to retrieve more results. This will (correctly) throw via _assertNotInvalidated. + if (!remote.exhausted() || remote.invalidated) { return false; } } diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h index 2b83cc393f0..553fdf5c294 100644 --- a/src/mongo/s/query/router_exec_stage.h +++ b/src/mongo/s/query/router_exec_stage.h @@ -128,7 +128,7 @@ public: * Returns the postBatchResumeToken if this RouterExecStage tree is executing a $changeStream; * otherwise, returns an empty BSONObj. Default implementation forwards to the stage's child. */ - virtual BSONObj getPostBatchResumeToken() const { + virtual BSONObj getPostBatchResumeToken() { return _child ? _child->getPostBatchResumeToken() : BSONObj(); } diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h index e2786ebb1a6..72f1aec26fb 100644 --- a/src/mongo/s/query/router_stage_merge.h +++ b/src/mongo/s/query/router_stage_merge.h @@ -71,6 +71,10 @@ public: return _resultsMerger.getNumRemotes(); } + BSONObj getPostBatchResumeToken() final { + return _resultsMerger.getHighWaterMark(); + } + protected: Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final { return _resultsMerger.setAwaitDataTimeout(awaitDataTimeout); diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp index 65ec99085e8..253f28d84ee 100644 --- a/src/mongo/s/query/router_stage_pipeline.cpp +++ b/src/mongo/s/query/router_stage_pipeline.cpp @@ -82,7 +82,7 @@ std::size_t RouterStagePipeline::getNumRemotes() const { return 0; } -BSONObj RouterStagePipeline::getPostBatchResumeToken() const { +BSONObj RouterStagePipeline::getPostBatchResumeToken() { return _mergeCursorsStage ? _mergeCursorsStage->getHighWaterMark() : BSONObj(); } diff --git a/src/mongo/s/query/router_stage_pipeline.h b/src/mongo/s/query/router_stage_pipeline.h index 2d3c5e5725c..a853f5caa41 100644 --- a/src/mongo/s/query/router_stage_pipeline.h +++ b/src/mongo/s/query/router_stage_pipeline.h @@ -53,7 +53,7 @@ public: std::size_t getNumRemotes() const final; - BSONObj getPostBatchResumeToken() const final; + BSONObj getPostBatchResumeToken() final; protected: Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final; |