summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2018-04-13 15:13:36 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2018-04-30 14:09:01 -0400
commita43fe9ae73752fbd98107cef5421341fe291ab32 (patch)
tree8972aabe0b36655e67c8b6c52fa2b9a2916d6eed
parent7a2217a54d59c5d97e9e79cc40639c2589a18deb (diff)
downloadmongo-a43fe9ae73752fbd98107cef5421341fe291ab32.tar.gz
SERVER-34204 Always pass non-null opCtx when scheduling getMores in ARM
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml73
-rw-r--r--etc/evergreen.yml20
-rw-r--r--src/mongo/db/repl/base_cloner_test_fixture.cpp16
-rw-r--r--src/mongo/db/repl/collection_cloner.cpp34
-rw-r--r--src/mongo/db/repl/collection_cloner_test.cpp155
-rw-r--r--src/mongo/s/query/async_results_merger.cpp50
-rw-r--r--src/mongo/s/query/async_results_merger.h24
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp224
-rw-r--r--src/mongo/s/query/router_stage_merge.cpp12
9 files changed, 317 insertions, 291 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml
new file mode 100644
index 00000000000..fd05367ac35
--- /dev/null
+++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml
@@ -0,0 +1,73 @@
+test_kind: js_test
+
+selector:
+ roots:
+ - jstests/change_streams/**/*.js
+ exclude_files:
+ # This test exercises an internal detail of mongos<->mongod communication and is not expected to
+ # work against a mongos.
+ - jstests/change_streams/report_latest_observed_oplog_timestamp.js
+ # TODO: SERVER-32088 should fix resuming a change stream when not all shards have data.
+ - jstests/change_streams/change_stream_shell_helper.js
+ - jstests/change_streams/include_cluster_time.js
+ - jstests/change_streams/lookup_post_image.js
+ exclude_with_any_tags:
+ ##
+ # The next three tags correspond to the special errors thrown by the
+ # set_read_and_write_concerns.js override when it refuses to replace the readConcern or
+ # writeConcern of a particular command. Above each tag are the message(s) that cause the tag to be
+ # warranted.
+ ##
+ # "Cowardly refusing to override read concern of command: ..."
+ - assumes_read_concern_unchanged
+ # "Cowardly refusing to override write concern of command: ..."
+ - assumes_write_concern_unchanged
+ # "Cowardly refusing to run test with overridden write concern when it uses a command that can
+ # only perform w=1 writes: ..."
+ - requires_eval_command
+ # Transactions not supported on sharded clusters.
+ - uses_transactions
+
+executor:
+ archive:
+ hooks:
+ - CheckReplDBHash
+ - ValidateCollections
+ config:
+ shell_options:
+ global_vars:
+ TestData:
+ defaultReadConcernLevel: majority
+ enableMajorityReadConcern: ''
+ eval: >-
+ var testingReplication = true;
+ load('jstests/libs/override_methods/set_read_and_write_concerns.js');
+ load('jstests/libs/override_methods/enable_sessions.js');
+ readMode: commands
+ hooks:
+ - class: CheckReplDBHash
+ - class: ValidateCollections
+ - class: CleanEveryN
+ n: 20
+ fixture:
+ class: ShardedClusterFixture
+ # Use two shards to make sure we will only talk to the primary shard for the database and will
+ # not delay changes to wait for notifications or a clock advancement from other shards.
+ num_shards: 2
+ mongos_options:
+ bind_ip_all: ''
+ set_parameters:
+ enableTestCommands: 1
+ mongod_options:
+ bind_ip_all: ''
+ nopreallocj: ''
+ set_parameters:
+ enableTestCommands: 1
+ numInitialSyncAttempts: 1
+ periodicNoopIntervalSecs: 1
+ writePeriodicNoops: true
+ num_rs_nodes_per_shard: 1
+ # This test suite doesn't actually shard any collections, but enabling sharding will prevent
+ # read commands against non-existent databases from unconditionally returning a CursorId of 0.
+ enable_sharding:
+ - test
diff --git a/etc/evergreen.yml b/etc/evergreen.yml
index e65a88a99f1..ed1e3c18735 100644
--- a/etc/evergreen.yml
+++ b/etc/evergreen.yml
@@ -3603,6 +3603,17 @@ tasks:
run_multiple_jobs: true
- <<: *task_template
+ name: change_streams_mongos_sessions_passthrough
+ depends_on:
+ - name: change_streams
+ commands:
+ - func: "do setup"
+ - func: "run tests"
+ vars:
+ resmoke_args: --suites=change_streams_mongos_sessions_passthrough --storageEngine=wiredTiger
+ run_multiple_jobs: true
+
+- <<: *task_template
name: change_streams_mongos_passthrough
depends_on:
- name: change_streams
@@ -6529,6 +6540,7 @@ buildvariants:
- name: auth
- name: change_streams
- name: change_streams_mongos_passthrough
+ - name: change_streams_mongos_sessions_passthrough
- name: change_streams_secondary_reads
- name: change_streams_sharded_collections_passthrough
- name: change_streams_whole_db_passthrough
@@ -6719,6 +6731,7 @@ buildvariants:
- name: causally_consistent_jscore_passthrough_auth
- name: change_streams
- name: change_streams_mongos_passthrough
+ - name: change_streams_mongos_sessions_passthrough
- name: change_streams_sharded_collections_passthrough
- name: change_streams_whole_db_passthrough
- name: change_streams_whole_db_mongos_passthrough
@@ -7810,6 +7823,7 @@ buildvariants:
- name: sharded_causally_consistent_jscore_passthrough
- name: change_streams
- name: change_streams_mongos_passthrough
+ - name: change_streams_mongos_sessions_passthrough
- name: change_streams_sharded_collections_passthrough
- name: change_streams_whole_db_passthrough
- name: change_streams_whole_db_mongos_passthrough
@@ -8625,6 +8639,7 @@ buildvariants:
- name: sharded_causally_consistent_jscore_passthrough
- name: change_streams
- name: change_streams_mongos_passthrough
+ - name: change_streams_mongos_sessions_passthrough
- name: change_streams_sharded_collections_passthrough
- name: change_streams_whole_db_passthrough
- name: change_streams_whole_db_mongos_passthrough
@@ -9248,6 +9263,7 @@ buildvariants:
- name: sharded_causally_consistent_jscore_passthrough
- name: change_streams
- name: change_streams_mongos_passthrough
+ - name: change_streams_mongos_sessions_passthrough
- name: change_streams_secondary_reads
- name: change_streams_sharded_collections_passthrough
- name: change_streams_whole_db_passthrough
@@ -9784,6 +9800,7 @@ buildvariants:
- name: bulk_gle_passthrough
- name: change_streams
- name: change_streams_mongos_passthrough
+ - name: change_streams_mongos_sessions_passthrough
- name: change_streams_secondary_reads
- name: change_streams_sharded_collections_passthrough
- name: change_streams_whole_db_passthrough
@@ -11343,6 +11360,7 @@ buildvariants:
- name: sharded_causally_consistent_jscore_passthrough
- name: change_streams
- name: change_streams_mongos_passthrough
+ - name: change_streams_mongos_sessions_passthrough
- name: change_streams_sharded_collections_passthrough
- name: change_streams_whole_db_passthrough
- name: change_streams_whole_db_mongos_passthrough
@@ -11974,6 +11992,7 @@ buildvariants:
- name: sharded_causally_consistent_jscore_passthrough
- name: change_streams
- name: change_streams_mongos_passthrough
+ - name: change_streams_mongos_sessions_passthrough
- name: change_streams_secondary_reads
- name: change_streams_sharded_collections_passthrough
- name: change_streams_whole_db_passthrough
@@ -12147,6 +12166,7 @@ buildvariants:
- name: sharded_causally_consistent_jscore_passthrough
- name: change_streams
- name: change_streams_mongos_passthrough
+ - name: change_streams_mongos_sessions_passthrough
- name: change_streams_secondary_reads
- name: change_streams_sharded_collections_passthrough
- name: change_streams_whole_db_passthrough
diff --git a/src/mongo/db/repl/base_cloner_test_fixture.cpp b/src/mongo/db/repl/base_cloner_test_fixture.cpp
index c38663a54cf..8fb69472329 100644
--- a/src/mongo/db/repl/base_cloner_test_fixture.cpp
+++ b/src/mongo/db/repl/base_cloner_test_fixture.cpp
@@ -104,17 +104,30 @@ BSONObj BaseClonerTest::createListIndexesResponse(CursorId cursorId, const BSONA
return createListIndexesResponse(cursorId, specs, "firstBatch");
}
+namespace {
+struct EnsureClientHasBeenInitialized : public executor::ThreadPoolMock::Options {
+ EnsureClientHasBeenInitialized() : executor::ThreadPoolMock::Options() {
+ onCreateThread = []() { Client::initThread("CollectionClonerTestThread"); };
+ }
+};
+} // namespace
+
BaseClonerTest::BaseClonerTest()
- : _mutex(), _setStatusCondition(), _status(getDetectableErrorStatus()) {}
+ : ThreadPoolExecutorTest(EnsureClientHasBeenInitialized()),
+ _mutex(),
+ _setStatusCondition(),
+ _status(getDetectableErrorStatus()) {}
void BaseClonerTest::setUp() {
executor::ThreadPoolExecutorTest::setUp();
clear();
launchExecutorThread();
+ Client::initThread("CollectionClonerTest");
ThreadPool::Options options;
options.minThreads = 1U;
options.maxThreads = 1U;
+ options.onCreateThread = [](StringData threadName) { Client::initThread(threadName); };
dbWorkThreadPool = stdx::make_unique<ThreadPool>(options);
dbWorkThreadPool->startup();
@@ -128,6 +141,7 @@ void BaseClonerTest::tearDown() {
storageInterface.reset();
dbWorkThreadPool.reset();
+ Client::releaseCurrent();
}
void BaseClonerTest::clear() {
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp
index 8aee68a4c58..553cc1d20bf 100644
--- a/src/mongo/db/repl/collection_cloner.cpp
+++ b/src/mongo/db/repl/collection_cloner.cpp
@@ -248,8 +248,17 @@ void CollectionCloner::shutdown() {
void CollectionCloner::_cancelRemainingWork_inlock() {
if (_arm) {
- Client::initThreadIfNotAlready();
- _killArmHandle = _arm->kill(cc().getOperationContext());
+ // This method can be called from a callback from either a TaskExecutor or a TaskRunner. The
+ // TaskExecutor should never have an OperationContext attached to the Client, and the
+ // TaskRunner should always have an OperationContext attached. Unfortunately, we don't know
+ // which situation we're in, so have to handle both.
+ auto& client = cc();
+ if (auto opCtx = client.getOperationContext()) {
+ _killArmHandle = _arm->kill(opCtx);
+ } else {
+ auto newOpCtx = client.makeOperationContext();
+ _killArmHandle = _arm->kill(newOpCtx.get());
+ }
}
_countScheduler.shutdown();
_listIndexesFetcher.shutdown();
@@ -628,9 +637,10 @@ void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCa
if (_collectionCloningBatchSize > 0) {
armParams.setBatchSize(_collectionCloningBatchSize);
}
- Client::initThreadIfNotAlready();
- _arm = stdx::make_unique<AsyncResultsMerger>(
- cc().getOperationContext(), _executor, std::move(armParams));
+ auto opCtx = cc().makeOperationContext();
+ _arm = stdx::make_unique<AsyncResultsMerger>(opCtx.get(), _executor, std::move(armParams));
+ _arm->detachFromOperationContext();
+ opCtx.reset();
// This completion guard invokes _finishCallback on destruction.
auto cancelRemainingWorkInLock = [this]() { _cancelRemainingWork_inlock(); };
@@ -643,7 +653,6 @@ void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCa
// outside the mutex. This is a necessary condition to invoke _finishCallback.
stdx::lock_guard<stdx::mutex> lock(_mutex);
Status scheduleStatus = _scheduleNextARMResultsCallback(onCompletionGuard);
- _arm->detachFromOperationContext();
if (!scheduleStatus.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, scheduleStatus);
return;
@@ -667,9 +676,10 @@ StatusWith<std::vector<BSONElement>> CollectionCloner::_parseParallelCollectionS
}
Status CollectionCloner::_bufferNextBatchFromArm(WithLock lock) {
- Client::initThreadIfNotAlready();
- auto opCtx = cc().getOperationContext();
- _arm->reattachToOperationContext(opCtx);
+ // We expect this callback to execute in a thread from a TaskExecutor which will not have an
+ // OperationContext populated. We must make one ourselves.
+ auto opCtx = cc().makeOperationContext();
+ _arm->reattachToOperationContext(opCtx.get());
while (_arm->ready()) {
auto armResultStatus = _arm->nextReady();
if (!armResultStatus.getStatus().isOK()) {
@@ -690,8 +700,10 @@ Status CollectionCloner::_bufferNextBatchFromArm(WithLock lock) {
Status CollectionCloner::_scheduleNextARMResultsCallback(
std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
- Client::initThreadIfNotAlready();
- _arm->reattachToOperationContext(cc().getOperationContext());
+ // We expect this callback to execute in a thread from a TaskExecutor which will not have an
+ // OperationContext populated. We must make one ourselves.
+ auto opCtx = cc().makeOperationContext();
+ _arm->reattachToOperationContext(opCtx.get());
auto nextEvent = _arm->nextEvent();
_arm->detachFromOperationContext();
if (!nextEvent.isOK()) {
diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp
index 8322d29fdf3..9f7a31e6a93 100644
--- a/src/mongo/db/repl/collection_cloner_test.cpp
+++ b/src/mongo/db/repl/collection_cloner_test.cpp
@@ -1133,68 +1133,6 @@ TEST_F(CollectionClonerTest, LastBatchContainsNoDocuments) {
ASSERT_FALSE(collectionCloner->isActive());
}
-TEST_F(CollectionClonerTest, MiddleBatchContainsNoDocuments) {
- ASSERT_OK(collectionCloner->startup());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
- processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
- }
- ASSERT_TRUE(collectionCloner->isActive());
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
- ASSERT_TRUE(collectionStats.initCalled);
-
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
- const BSONObj doc = BSON("_id" << 1);
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, BSON_ARRAY(doc)));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(1, collectionStats.insertCount);
-
- ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray, "nextBatch"));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(1, collectionStats.insertCount);
-
- ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
- ASSERT_TRUE(collectionCloner->isActive());
-
- const BSONObj doc2 = BSON("_id" << 2);
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc2)));
- }
-
- collectionCloner->join();
-
- ASSERT_EQUALS(2, collectionStats.insertCount);
- ASSERT_TRUE(collectionStats.commitCalled);
-
- ASSERT_OK(getStatus());
- ASSERT_FALSE(collectionCloner->isActive());
-}
-
TEST_F(CollectionClonerTest, CollectionClonerTransitionsToCompleteIfShutdownBeforeStartup) {
collectionCloner->shutdown();
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, collectionCloner->startup());
@@ -1975,99 +1913,6 @@ TEST_F(ParallelCollectionClonerTest, InsertDocumentsWithMultipleCursorsOfDiffere
ASSERT_FALSE(collectionCloner->isActive());
}
-TEST_F(ParallelCollectionClonerTest, MiddleBatchContainsNoDocumentsWithMultipleCursors) {
- ASSERT_OK(collectionCloner->startup());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
- processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
- }
- ASSERT_TRUE(collectionCloner->isActive());
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
- ASSERT_TRUE(collectionStats.initCalled);
-
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray)
- << createCursorResponse(2, emptyArray)
- << createCursorResponse(3, emptyArray))
- << "ok"
- << 1));
- }
- collectionCloner->waitForDbWorker();
-
- auto exec = &getExecutor();
- std::vector<BSONObj> docs;
- // Record the buffered documents before they are inserted so we can
- // validate them.
- collectionCloner->setScheduleDbWorkFn_forTest(
- [&](const executor::TaskExecutor::CallbackFn& workFn) {
- auto buffered = collectionCloner->getDocumentsToInsert_forTest();
- docs.insert(docs.end(), buffered.begin(), buffered.end());
- return exec->scheduleWork(workFn);
- });
-
- ASSERT_TRUE(collectionCloner->isActive());
-
- int numDocs = 6;
- std::vector<BSONObj> generatedDocs = generateDocs(numDocs);
- const BSONObj doc = BSON("_id" << 1);
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, BSON_ARRAY(generatedDocs[0])));
- processNetworkResponse(createCursorResponse(2, BSON_ARRAY(generatedDocs[1])));
- processNetworkResponse(createCursorResponse(3, BSON_ARRAY(generatedDocs[2])));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(3U, docs.size());
- for (int i = 0; i < 3; i++) {
- ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]);
- }
- ASSERT_EQUALS(3, collectionStats.insertCount);
-
- ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray, "nextBatch"));
- processNetworkResponse(createCursorResponse(2, emptyArray, "nextBatch"));
- processNetworkResponse(createCursorResponse(3, emptyArray, "nextBatch"));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(3, collectionStats.insertCount);
-
- ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[3])));
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[4])));
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[5])));
- }
-
- collectionCloner->join();
-
- ASSERT_EQUALS(6U, docs.size());
- for (int i = 3; i < 6; i++) {
- ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]);
- }
-
- ASSERT_EQUALS(numDocs, collectionStats.insertCount);
- ASSERT_TRUE(collectionStats.commitCalled);
-
- ASSERT_OK(getStatus());
- ASSERT_FALSE(collectionCloner->isActive());
-}
-
TEST_F(ParallelCollectionClonerTest, LastBatchContainsNoDocumentsWithMultipleCursors) {
ASSERT_OK(collectionCloner->startup());
ASSERT_TRUE(collectionCloner->isActive());
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 3e41a36c089..f5268ac3408 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -335,6 +335,7 @@ ClusterQueryResult AsyncResultsMerger::_nextReadyUnsorted(WithLock) {
}
Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) {
+ invariant(_opCtx, "Cannot schedule a getMore without an OperationContext");
auto& remote = _remotes[remoteIndex];
invariant(!remote.cbHandle.isValid());
@@ -387,6 +388,32 @@ Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) {
return Status::OK();
}
+Status AsyncResultsMerger::scheduleGetMores() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _scheduleGetMores(lk);
+}
+
+Status AsyncResultsMerger::_scheduleGetMores(WithLock lk) {
+ // Schedule remote work on hosts for which we need more results.
+ for (size_t i = 0; i < _remotes.size(); ++i) {
+ auto& remote = _remotes[i];
+
+ if (!remote.status.isOK()) {
+ return remote.status;
+ }
+
+ if (!remote.hasNext() && !remote.exhausted() && !remote.cbHandle.isValid()) {
+ // If this remote is not exhausted and there is no outstanding request for it, schedule
+ // work to retrieve the next batch.
+ auto nextBatchStatus = _askForNextBatch(lk, i);
+ if (!nextBatchStatus.isOK()) {
+ return nextBatchStatus;
+ }
+ }
+ }
+ return Status::OK();
+}
+
/*
* Note: When nextEvent() is called to do retries, only the remotes with retriable errors will
* be rescheduled because:
@@ -411,22 +438,9 @@ StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent()
"nextEvent() called before an outstanding event was signaled");
}
- // Schedule remote work on hosts for which we need more results.
- for (size_t i = 0; i < _remotes.size(); ++i) {
- auto& remote = _remotes[i];
-
- if (!remote.status.isOK()) {
- return remote.status;
- }
-
- if (!remote.hasNext() && !remote.exhausted() && !remote.cbHandle.isValid()) {
- // If this remote is not exhausted and there is no outstanding request for it, schedule
- // work to retrieve the next batch.
- auto nextBatchStatus = _askForNextBatch(lk, i);
- if (!nextBatchStatus.isOK()) {
- return nextBatchStatus;
- }
- }
+ auto getMoresStatus = _scheduleGetMores(lk);
+ if (!getMoresStatus.isOK()) {
+ return getMoresStatus;
}
auto eventStatus = _executor->makeEvent();
@@ -592,9 +606,11 @@ void AsyncResultsMerger::_processBatchResults(WithLock lk,
if (_tailableMode == TailableModeEnum::kTailable && !remote.hasNext()) {
invariant(_remotes.size() == 1);
_eofNext = true;
- } else if (!remote.hasNext() && !remote.exhausted() && _lifecycleState == kAlive) {
+ } else if (!remote.hasNext() && !remote.exhausted() && _lifecycleState == kAlive && _opCtx) {
// If this is normal or tailable-awaitData cursor and we still don't have anything buffered
// after receiving this batch, we can schedule work to retrieve the next batch right away.
+ // Be careful only to do this when '_opCtx' is non-null, since it is illegal to schedule a
+ // remote command on a user's behalf without a non-null OperationContext.
remote.status = _askForNextBatch(lk, remoteIndex);
}
}
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index 1653374b5bc..5f8a18194d2 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -192,6 +192,24 @@ public:
StatusWith<executor::TaskExecutor::EventHandle> nextEvent();
/**
+ * Schedules a getMore on any remote hosts which:
+ * - Do not have an error status set already.
+ * - Don't already have a request outstanding.
+ * - We don't currently have any results buffered.
+ * - Are not exhausted (have a non-zero cursor id).
+ * Returns an error if any of the remotes responded with an error, or if we encounter an error
+ * while scheduling the getMore requests..
+ *
+ * In most cases users should call nextEvent() instead of this method, but this can be necessary
+ * if the caller of nextEvent() calls detachFromOperationContext() before the event is signaled.
+ * In such cases, the ARM cannot schedule getMores itself, and will need to be manually prompted
+ * after calling reattachToOperationContext().
+ *
+ * It is illegal to call this method if the ARM is not attached to an OperationContext.
+ */
+ Status scheduleGetMores();
+
+ /**
* Adds the specified shard cursors to the set of cursors to be merged. The results from the
* new cursors will be returned as normal through nextReady().
*/
@@ -393,6 +411,12 @@ private:
*/
bool _haveOutstandingBatchRequests(WithLock);
+
+ /**
+ * Schedules a getMore on any remote hosts which we need another batch from.
+ */
+ Status _scheduleGetMores(WithLock);
+
/**
* Schedules a killCursors command to be run on all remote hosts that have open cursors.
*/
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index 586b387ad96..662d78e6dd0 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -172,11 +172,12 @@ protected:
/**
* Schedules a list of cursor responses to be returned by the mock network.
*/
- void scheduleNetworkResponses(std::vector<CursorResponse> responses,
- CursorResponse::ResponseType responseType) {
+ void scheduleNetworkResponses(std::vector<CursorResponse> responses) {
std::vector<BSONObj> objs;
for (const auto& cursorResponse : responses) {
- objs.push_back(cursorResponse.toBSON(responseType));
+ // For tests of the AsyncResultsMerger, all CursorRepsonses scheduled by the tests are
+ // subsequent responses, since the AsyncResultsMerger will only ever run getMores.
+ objs.push_back(cursorResponse.toBSON(CursorResponse::ResponseType::SubsequentResponse));
}
scheduleNetworkResponseObjs(objs);
}
@@ -282,8 +283,7 @@ TEST_F(AsyncResultsMergerTest, SingleShardUnsorted) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")};
responses.emplace_back(kTestNss, CursorId(0), batch);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
// Now that the responses have been delivered, ARM is ready to return results.
ASSERT_TRUE(arm->ready());
@@ -327,8 +327,7 @@ TEST_F(AsyncResultsMergerTest, SingleShardSorted) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{$sortKey: {'': 5}}"), fromjson("{$sortKey: {'': 6}}")};
responses.emplace_back(kTestNss, CursorId(0), batch);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
// Now that the responses have been delivered, ARM is ready to return results.
ASSERT_TRUE(arm->ready());
@@ -374,8 +373,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardUnsorted) {
std::vector<BSONObj> batch1 = {
fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")};
responses.emplace_back(kTestNss, CursorId(0), batch1);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
// ARM is ready to return first result.
ASSERT_TRUE(arm->ready());
@@ -402,8 +400,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardUnsorted) {
std::vector<BSONObj> batch2 = {
fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")};
responses.emplace_back(kTestNss, CursorId(0), batch2);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
// ARM is ready to return remaining results.
ASSERT_TRUE(arm->ready());
@@ -448,8 +445,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardSorted) {
std::vector<BSONObj> batch1 = {fromjson("{$sortKey: {'': 5}}"),
fromjson("{$sortKey: {'': 6}}")};
responses.emplace_back(kTestNss, CursorId(0), batch1);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
// ARM is not ready to return results until receiving responses from all remotes.
ASSERT_FALSE(arm->ready());
@@ -462,8 +458,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardSorted) {
std::vector<BSONObj> batch2 = {fromjson("{$sortKey: {'': 3}}"),
fromjson("{$sortKey: {'': 9}}")};
responses.emplace_back(kTestNss, CursorId(0), batch2);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
// Now that all remotes have responded, ARM is ready to return results.
ASSERT_TRUE(arm->ready());
@@ -509,8 +504,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) {
std::vector<BSONObj> batch1 = {
fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")};
responses.emplace_back(kTestNss, CursorId(5), batch1);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
// ARM is ready to return first result.
ASSERT_TRUE(arm->ready());
@@ -538,8 +532,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) {
std::vector<BSONObj> batch2 = {
fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")};
responses.emplace_back(kTestNss, CursorId(0), batch2);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
// ARM is ready to return second shard's results.
ASSERT_TRUE(arm->ready());
@@ -566,8 +559,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) {
std::vector<BSONObj> batch3 = {
fromjson("{_id: 7}"), fromjson("{_id: 8}"), fromjson("{_id: 9}")};
responses.emplace_back(kTestNss, CursorId(0), batch3);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
// ARM is ready to return remaining results.
ASSERT_TRUE(arm->ready());
@@ -614,8 +606,7 @@ TEST_F(AsyncResultsMergerTest, CompoundSortKey) {
std::vector<BSONObj> batch3 = {fromjson("{$sortKey: {'': 10, '': 12}}"),
fromjson("{$sortKey: {'': 5, '': 9}}")};
responses.emplace_back(kTestNss, CursorId(0), batch3);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
// ARM returns all results in sorted order.
@@ -660,8 +651,7 @@ TEST_F(AsyncResultsMergerTest, SortedButNoSortKey) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{a: 2, b: 1}"), fromjson("{a: 1, b: 2}")};
responses.emplace_back(kTestNss, CursorId(1), batch1);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -709,8 +699,7 @@ TEST_F(AsyncResultsMergerTest, HasFirstBatch) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")};
responses.emplace_back(kTestNss, CursorId(0), batch);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
// Now that the responses have been delivered, ARM is ready to return results.
ASSERT_TRUE(arm->ready());
@@ -769,8 +758,7 @@ TEST_F(AsyncResultsMergerTest, OneShardHasInitialBatchOtherShardExhausted) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")};
responses.emplace_back(kTestNss, CursorId(0), batch);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
// Now that the responses have been delivered, ARM is ready to return results.
ASSERT_TRUE(arm->ready());
@@ -810,8 +798,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
responses.emplace_back(kTestNss, CursorId(1), batch1);
std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
responses.emplace_back(kTestNss, CursorId(2), batch2);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -832,8 +819,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
responses.clear();
std::vector<BSONObj> batch3 = {fromjson("{_id: 5}"), fromjson("{_id: 6}")};
responses.emplace_back(kTestNss, CursorId(1), batch3);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
blackHoleNextRequest();
executor()->waitForEvent(readyEvent);
@@ -850,8 +836,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
responses.clear();
std::vector<BSONObj> batch4 = {fromjson("{_id: 7}"), fromjson("{_id: 8}")};
responses.emplace_back(kTestNss, CursorId(0), batch4);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -881,8 +866,7 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")};
responses.emplace_back(kTestNss, CursorId(456), batch);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -945,8 +929,7 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) {
responses.emplace_back(kTestNss, CursorId(1), batch1);
std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
responses.emplace_back(kTestNss, CursorId(2), batch2);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
scheduleErrorResponse({ErrorCodes::BadValue, "bad thing happened"});
executor()->waitForEvent(readyEvent);
@@ -977,8 +960,7 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
responses.emplace_back(kTestNss, CursorId(0), batch);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -1065,8 +1047,7 @@ TEST_F(AsyncResultsMergerTest, KillAllRemotesExhausted) {
responses.emplace_back(kTestNss, CursorId(0), batch2);
std::vector<BSONObj> batch3 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
responses.emplace_back(kTestNss, CursorId(0), batch3);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
auto killedEvent = arm->kill(operationContext());
@@ -1100,8 +1081,7 @@ TEST_F(AsyncResultsMergerTest, KillNonExhaustedCursorWithoutPendingRequest) {
// Cursor 3 is not exhausted.
std::vector<BSONObj> batch3 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
responses.emplace_back(kTestNss, CursorId(123), batch3);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
auto killedEvent = arm->kill(operationContext());
@@ -1130,8 +1110,7 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
responses.emplace_back(kTestNss, CursorId(0), batch1);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
// Kill event will only be signalled once the callbacks for the pending batches have run.
auto killedEvent = arm->kill(operationContext());
@@ -1161,8 +1140,7 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
responses.emplace_back(kTestNss, CursorId(1), batch1);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
auto killedEvent = arm->kill(operationContext());
@@ -1199,8 +1177,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
responses.emplace_back(kTestNss, CursorId(123), batch1);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -1220,8 +1197,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) {
responses.clear();
std::vector<BSONObj> batch2 = {fromjson("{_id: 3}")};
responses.emplace_back(kTestNss, CursorId(123), batch2);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -1250,8 +1226,7 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch;
responses.emplace_back(kTestNss, CursorId(123), batch);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
// After receiving an empty batch, the ARM should return boost::none, but remotes should not be
@@ -1279,8 +1254,7 @@ TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch;
responses.emplace_back(kTestNss, CursorId(0), batch);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
// Afterwards, the ARM should return boost::none and remote cursors should be marked as
@@ -1305,8 +1279,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) {
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
responses.emplace_back(kTestNss, CursorId(1), batch1);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -1326,8 +1299,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) {
ASSERT_OK(request.getStatus());
ASSERT_EQ(*request.getValue().batchSize, 1LL);
ASSERT_EQ(request.getValue().cursorid, 1LL);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -1362,8 +1334,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
responses.emplace_back(kTestNss, CursorId(98), batch1);
std::vector<BSONObj> batch2 = {fromjson("{_id: 2}")};
responses.emplace_back(kTestNss, CursorId(99), batch2);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -1383,8 +1354,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
responses.clear();
std::vector<BSONObj> batch3 = {fromjson("{_id: 3}")};
responses.emplace_back(kTestNss, CursorId(99), batch3);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -1398,8 +1368,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
responses.clear();
std::vector<BSONObj> batch4 = {};
responses.emplace_back(kTestNss, CursorId(0), batch4);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -1420,8 +1389,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
responses.emplace_back(kTestNss, CursorId(98), batch);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -1457,8 +1425,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsOnRetriableErrorNoRetries) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 1}")};
responses.emplace_back(kTestNss, CursorId(0), batch);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
// From the second host we get a network (retriable) error.
scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"});
@@ -1516,8 +1483,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}")};
responses.emplace_back(kTestNss, CursorId(123), batch1);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -1538,8 +1504,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
responses.clear();
std::vector<BSONObj> batch2 = {fromjson("{_id: 2}")};
responses.emplace_back(kTestNss, CursorId(123), batch2);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -1559,8 +1524,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
responses.clear();
std::vector<BSONObj> batch3 = {fromjson("{_id: 3}")};
responses.emplace_back(kTestNss, CursorId(0), batch3);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
}
TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHasNoOplogTimestamp) {
@@ -1588,8 +1552,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHas
"$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")};
const Timestamp lastObservedFirstCursor = Timestamp(1, 6);
responses.emplace_back(kTestNss, CursorId(123), batch1, boost::none, lastObservedFirstCursor);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
// Still shouldn't be ready, we don't have a guarantee from each shard.
ASSERT_FALSE(arm->ready());
@@ -1601,8 +1564,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHas
"$sortKey: {'': Timestamp(1, 5), '': 1, '': 2}}")};
const Timestamp lastObservedSecondCursor = Timestamp(1, 5);
responses.emplace_back(kTestNss, CursorId(456), batch2, boost::none, lastObservedSecondCursor);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
ASSERT_BSONOBJ_EQ(
@@ -1621,13 +1583,11 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHas
responses.clear();
std::vector<BSONObj> batch3 = {};
responses.emplace_back(kTestNss, CursorId(0), batch3);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
responses.clear();
std::vector<BSONObj> batch4 = {};
responses.emplace_back(kTestNss, CursorId(0), batch4);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
}
TEST_F(AsyncResultsMergerTest,
@@ -1662,8 +1622,7 @@ TEST_F(AsyncResultsMergerTest,
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch3 = {};
responses.emplace_back(kTestNss, CursorId(0), batch3, boost::none, Timestamp(1, 8));
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(unittest::assertGet(arm->nextEvent()));
ASSERT_TRUE(arm->ready());
ASSERT_BSONOBJ_EQ(
@@ -1678,8 +1637,7 @@ TEST_F(AsyncResultsMergerTest,
responses.clear();
std::vector<BSONObj> batch4 = {};
responses.emplace_back(kTestNss, CursorId(0), batch4);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
}
TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneRemoteHasLowerOplogTime) {
@@ -1713,8 +1671,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneRemoteHasLowerOp
// Clean up the cursors.
std::vector<CursorResponse> responses;
responses.emplace_back(kTestNss, CursorId(0), std::vector<BSONObj>{});
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
auto killEvent = arm->kill(operationContext());
executor()->waitForEvent(killEvent);
}
@@ -1742,8 +1699,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting)
"$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")};
const Timestamp lastObservedFirstCursor = Timestamp(1, 6);
responses.emplace_back(kTestNss, CursorId(123), batch1, boost::none, lastObservedFirstCursor);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
// Should be ready now.
ASSERT_TRUE(arm->ready());
@@ -1765,8 +1721,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting)
"$sortKey: {'': Timestamp(1, 5), '': 1, '': 2}}")};
const Timestamp lastObservedSecondCursor = Timestamp(1, 5);
responses.emplace_back(kTestNss, CursorId(456), batch2, boost::none, lastObservedSecondCursor);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
ASSERT_BSONOBJ_EQ(
@@ -1785,13 +1740,11 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting)
responses.clear();
std::vector<BSONObj> batch3 = {};
responses.emplace_back(kTestNss, CursorId(0), batch3);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
responses.clear();
std::vector<BSONObj> batch4 = {};
responses.emplace_back(kTestNss, CursorId(0), batch4);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
}
TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting) {
@@ -1817,8 +1770,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting
"$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")};
const Timestamp lastObservedFirstCursor = Timestamp(1, 6);
responses.emplace_back(kTestNss, CursorId(123), batch1, boost::none, lastObservedFirstCursor);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
// Should be ready now.
ASSERT_TRUE(arm->ready());
@@ -1842,8 +1794,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting
// from it.
const Timestamp lastObservedSecondCursor = Timestamp(1, 5);
responses.emplace_back(kTestNss, CursorId(456), batch2, boost::none, lastObservedSecondCursor);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
ASSERT_BSONOBJ_EQ(
@@ -1862,13 +1813,11 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting
responses.clear();
std::vector<BSONObj> batch3 = {};
responses.emplace_back(kTestNss, CursorId(0), batch3);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
responses.clear();
std::vector<BSONObj> batch4 = {};
responses.emplace_back(kTestNss, CursorId(0), batch4);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
}
TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) {
@@ -2126,5 +2075,66 @@ DEATH_TEST_F(AsyncResultsMergerTest,
stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params));
}
+DEATH_TEST_F(AsyncResultsMergerTest,
+ ShouldFailIfAskedToPerformGetMoresWithoutAnOpCtx,
+ "Cannot schedule a getMore without an OperationContext") {
+ BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true, awaitData: true}");
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+
+ ASSERT_FALSE(arm->ready());
+ arm->detachFromOperationContext();
+ arm->scheduleGetMores().ignore(); // Should crash.
+}
+
+TEST_F(AsyncResultsMergerTest, ShouldNotScheduleGetMoresWithoutAnOperationContext) {
+ BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true, awaitData: true}");
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+
+ ASSERT_FALSE(arm->ready());
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
+ ASSERT_FALSE(arm->ready());
+
+ // While detached from the OperationContext, schedule an empty batch response. Because the
+ // response is empty and this is a tailable cursor, the ARM will need to run another getMore on
+ // that host, but it should not schedule this without a non-null OperationContext.
+ arm->detachFromOperationContext();
+ {
+ std::vector<CursorResponse> responses;
+ std::vector<BSONObj> emptyBatch;
+ responses.emplace_back(kTestNss, CursorId(123), emptyBatch);
+ scheduleNetworkResponses(std::move(responses));
+ }
+
+ ASSERT_FALSE(arm->ready());
+ ASSERT_FALSE(networkHasReadyRequests()); // Tests that we haven't asked for the next batch yet.
+
+ // After manually requesting the next getMore, the ARM should be ready.
+ arm->reattachToOperationContext(operationContext());
+ ASSERT_OK(arm->scheduleGetMores());
+
+ // Schedule the next getMore response.
+ {
+ std::vector<CursorResponse> responses;
+ std::vector<BSONObj> nonEmptyBatch = {fromjson("{_id: 1}")};
+ responses.emplace_back(kTestNss, CursorId(123), nonEmptyBatch);
+ scheduleNetworkResponses(std::move(responses));
+ }
+
+ ASSERT_TRUE(arm->ready());
+ ASSERT_FALSE(arm->remotesExhausted());
+ ASSERT_BSONOBJ_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult());
+ ASSERT_FALSE(arm->ready());
+ ASSERT_FALSE(arm->remotesExhausted());
+
+ auto killedEvent = arm->kill(operationContext());
+ executor()->waitForEvent(killedEvent);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp
index 48abb1452ec..967c9f60b35 100644
--- a/src/mongo/s/query/router_stage_merge.cpp
+++ b/src/mongo/s/query/router_stage_merge.cpp
@@ -89,6 +89,18 @@ StatusWith<EventHandle> RouterStageMerge::getNextEvent() {
// If we abandoned a previous event due to a mongoS-side timeout, wait for it first.
if (_leftoverEventFromLastTimeout) {
invariant(_params->tailableMode == TailableModeEnum::kTailableAndAwaitData);
+ // If we have an outstanding event from last time, then we might have to manually schedule
+ // some getMores for the cursors. If a remote response came back while we were between
+ // getMores (from the user to mongos), the response may have been an empty batch, and the
+ // ARM would not be able to ask for the next batch immediately since it is not attached to
+ // an OperationContext. Now that we have a valid OperationContext, we schedule the getMores
+ // ourselves.
+ Status getMoreStatus = _arm.scheduleGetMores();
+ if (!getMoreStatus.isOK()) {
+ return getMoreStatus;
+ }
+
+ // Return the leftover event and clear '_leftoverEventFromLastTimeout'.
auto event = _leftoverEventFromLastTimeout;
_leftoverEventFromLastTimeout = EventHandle();
return event;