summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJennifer Peshansky <jennifer.peshansky@mongodb.com>2022-03-10 19:58:43 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-10 20:46:28 +0000
commit11d01816f743d6764c4f12c42697f5edf813ce27 (patch)
tree0e22860addeff160b720646a4d04e7430ddf497b
parenta0724afb1d6e5e5f1c4c9921ec156496e58be59a (diff)
downloadmongo-11d01816f743d6764c4f12c42697f5edf813ce27.tar.gz
SERVER-63773 Add getPostBatchResumeToken override to router_stage_merge
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml8
-rw-r--r--src/mongo/s/query/async_results_merger.cpp4
-rw-r--r--src/mongo/s/query/router_exec_stage.h2
-rw-r--r--src/mongo/s/query/router_stage_merge.h4
-rw-r--r--src/mongo/s/query/router_stage_pipeline.cpp2
-rw-r--r--src/mongo/s/query/router_stage_pipeline.h2
6 files changed, 10 insertions, 12 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml
index 11e1e110db3..db39ec22c09 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml
@@ -3,14 +3,6 @@ test_kind: js_test
selector:
roots:
- jstests/change_streams/**/*.js
- exclude_files:
- # TODO SERVER-63771 unblock all these tests.
- - jstests/change_streams/metadata_notifications.js
- - jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js
- - jstests/change_streams/report_post_batch_resume_token.js
- - jstests/change_streams/resume_from_high_water_mark_token.js
- - jstests/change_streams/start_after_invalidation_exception.js
- - jstests/change_streams/whole_db_metadata_notifications.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 9100df3c57d..56f2deff96e 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -168,7 +168,9 @@ bool AsyncResultsMerger::remotesExhausted() const {
bool AsyncResultsMerger::_remotesExhausted(WithLock) const {
for (const auto& remote : _remotes) {
- if (!remote.exhausted()) {
+ // If any remote has been invalidated, we must force the batch-building code to make another
+ // attempt to retrieve more results. This will (correctly) throw via _assertNotInvalidated.
+ if (!remote.exhausted() || remote.invalidated) {
return false;
}
}
diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h
index 2b83cc393f0..553fdf5c294 100644
--- a/src/mongo/s/query/router_exec_stage.h
+++ b/src/mongo/s/query/router_exec_stage.h
@@ -128,7 +128,7 @@ public:
* Returns the postBatchResumeToken if this RouterExecStage tree is executing a $changeStream;
* otherwise, returns an empty BSONObj. Default implementation forwards to the stage's child.
*/
- virtual BSONObj getPostBatchResumeToken() const {
+ virtual BSONObj getPostBatchResumeToken() {
return _child ? _child->getPostBatchResumeToken() : BSONObj();
}
diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h
index e2786ebb1a6..72f1aec26fb 100644
--- a/src/mongo/s/query/router_stage_merge.h
+++ b/src/mongo/s/query/router_stage_merge.h
@@ -71,6 +71,10 @@ public:
return _resultsMerger.getNumRemotes();
}
+ BSONObj getPostBatchResumeToken() final {
+ return _resultsMerger.getHighWaterMark();
+ }
+
protected:
Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final {
return _resultsMerger.setAwaitDataTimeout(awaitDataTimeout);
diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp
index 65ec99085e8..253f28d84ee 100644
--- a/src/mongo/s/query/router_stage_pipeline.cpp
+++ b/src/mongo/s/query/router_stage_pipeline.cpp
@@ -82,7 +82,7 @@ std::size_t RouterStagePipeline::getNumRemotes() const {
return 0;
}
-BSONObj RouterStagePipeline::getPostBatchResumeToken() const {
+BSONObj RouterStagePipeline::getPostBatchResumeToken() {
return _mergeCursorsStage ? _mergeCursorsStage->getHighWaterMark() : BSONObj();
}
diff --git a/src/mongo/s/query/router_stage_pipeline.h b/src/mongo/s/query/router_stage_pipeline.h
index 2d3c5e5725c..a853f5caa41 100644
--- a/src/mongo/s/query/router_stage_pipeline.h
+++ b/src/mongo/s/query/router_stage_pipeline.h
@@ -53,7 +53,7 @@ public:
std::size_t getNumRemotes() const final;
- BSONObj getPostBatchResumeToken() const final;
+ BSONObj getPostBatchResumeToken() final;
protected:
Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final;