summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2018-04-13 15:13:36 -0400
committerBernard Gorman <bernard.gorman@gmail.com>2018-08-22 06:10:33 +0100
commite1f4285c2f918c70e52ac99e5285d1179c04d2a0 (patch)
tree83d67c507d5570cae5611c5489463e6d9853f75a
parenteca2a99916c7fd7c74f8efbb836d5d2458d7a450 (diff)
downloadmongo-e1f4285c2f918c70e52ac99e5285d1179c04d2a0.tar.gz
SERVER-34204 Always pass non-null opCtx when scheduling getMores in ARM
(cherry picked from commit a43fe9ae73752fbd98107cef5421341fe291ab32)
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml72
-rw-r--r--etc/evergreen.yml20
-rw-r--r--jstests/change_streams/only_wake_getmore_for_relevant_changes.js65
-rw-r--r--src/mongo/db/repl/base_cloner_test_fixture.cpp24
-rw-r--r--src/mongo/db/repl/collection_cloner.cpp34
-rw-r--r--src/mongo/db/repl/collection_cloner_test.cpp155
-rw-r--r--src/mongo/s/query/async_results_merger.cpp50
-rw-r--r--src/mongo/s/query/async_results_merger.h24
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp229
-rw-r--r--src/mongo/s/query/router_stage_merge.cpp12
-rw-r--r--src/mongo/util/concurrency/old_thread_pool.cpp2
-rw-r--r--src/mongo/util/concurrency/old_thread_pool.h1
12 files changed, 360 insertions, 328 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml
new file mode 100644
index 00000000000..10d8f35cca4
--- /dev/null
+++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml
@@ -0,0 +1,72 @@
+test_kind: js_test
+
+selector:
+ roots:
+ - jstests/change_streams/**/*.js
+ exclude_files:
+ # This test exercises an internal detail of mongos<->mongod communication and is not expected to
+ # work against a mongos.
+ - jstests/change_streams/report_latest_observed_oplog_timestamp.js
+ # TODO: SERVER-32088 should fix resuming a change stream when not all shards have data.
+ - jstests/change_streams/change_stream_shell_helper.js
+ - jstests/change_streams/lookup_post_image.js
+ exclude_with_any_tags:
+ ##
+ # The next three tags correspond to the special errors thrown by the
+ # set_read_and_write_concerns.js override when it refuses to replace the readConcern or
+ # writeConcern of a particular command. Above each tag are the message(s) that cause the tag to be
+ # warranted.
+ ##
+ # "Cowardly refusing to override read concern of command: ..."
+ - assumes_read_concern_unchanged
+ # "Cowardly refusing to override write concern of command: ..."
+ - assumes_write_concern_unchanged
+ # "Cowardly refusing to run test with overridden write concern when it uses a command that can
+ # only perform w=1 writes: ..."
+ - requires_eval_command
+ # Transactions not supported on sharded clusters.
+ - uses_transactions
+
+executor:
+ archive:
+ hooks:
+ - CheckReplDBHash
+ - ValidateCollections
+ config:
+ shell_options:
+ global_vars:
+ TestData:
+ defaultReadConcernLevel: majority
+ enableMajorityReadConcern: ''
+ eval: >-
+ var testingReplication = true;
+ load('jstests/libs/override_methods/set_read_and_write_concerns.js');
+ load('jstests/libs/override_methods/enable_sessions.js');
+ readMode: commands
+ hooks:
+ - class: CheckReplDBHash
+ - class: ValidateCollections
+ - class: CleanEveryN
+ n: 20
+ fixture:
+ class: ShardedClusterFixture
+ # Use two shards to make sure we will only talk to the primary shard for the database and will
+ # not delay changes to wait for notifications or a clock advancement from other shards.
+ num_shards: 2
+ mongos_options:
+ bind_ip_all: ''
+ set_parameters:
+ enableTestCommands: 1
+ mongod_options:
+ bind_ip_all: ''
+ nopreallocj: ''
+ set_parameters:
+ enableTestCommands: 1
+ numInitialSyncAttempts: 1
+ periodicNoopIntervalSecs: 1
+ writePeriodicNoops: true
+ num_rs_nodes_per_shard: 1
+ # This test suite doesn't actually shard any collections, but enabling sharding will prevent
+ # read commands against non-existent databases from unconditionally returning a CursorId of 0.
+ enable_sharding:
+ - test
diff --git a/etc/evergreen.yml b/etc/evergreen.yml
index 543c2415862..d9b4f9bb51e 100644
--- a/etc/evergreen.yml
+++ b/etc/evergreen.yml
@@ -3410,6 +3410,17 @@ tasks:
run_multiple_jobs: true
- <<: *task_template
+ name: change_streams_mongos_sessions_passthrough_WT
+ depends_on:
+ - name: change_streams_WT
+ commands:
+ - func: "do setup"
+ - func: "run tests"
+ vars:
+ resmoke_args: --suites=change_streams_mongos_sessions_passthrough --storageEngine=wiredTiger
+ run_multiple_jobs: true
+
+- <<: *task_template
name: change_streams_mongos_passthrough_WT
depends_on:
- name: change_streams_WT
@@ -5502,6 +5513,7 @@ buildvariants:
- name: auth_WT
- name: change_streams_WT
- name: change_streams_mongos_passthrough_WT
+ - name: change_streams_mongos_sessions_passthrough_WT
- name: change_streams_secondary_reads_WT
- name: change_streams_sharded_collections_passthrough_WT
- name: dbtest_WT
@@ -5628,6 +5640,7 @@ buildvariants:
- name: causally_consistent_jscore_passthrough_auth_WT
- name: change_streams_WT
- name: change_streams_mongos_passthrough_WT
+ - name: change_streams_mongos_sessions_passthrough_WT
- name: change_streams_sharded_collections_passthrough_WT
- name: concurrency_WT
- name: concurrency_replication_WT
@@ -6447,6 +6460,7 @@ buildvariants:
- name: sharded_causally_consistent_jscore_passthrough_WT
- name: change_streams_WT
- name: change_streams_mongos_passthrough_WT
+ - name: change_streams_mongos_sessions_passthrough_WT
- name: change_streams_sharded_collections_passthrough_WT
- name: dbtest_WT
- name: disk_WT
@@ -7002,6 +7016,7 @@ buildvariants:
- name: sharded_causally_consistent_jscore_passthrough_WT
- name: change_streams_WT
- name: change_streams_mongos_passthrough_WT
+ - name: change_streams_mongos_sessions_passthrough_WT
- name: change_streams_sharded_collections_passthrough_WT
- name: concurrency_WT
- name: concurrency_replication_WT
@@ -7212,6 +7227,7 @@ buildvariants:
- name: sharded_causally_consistent_jscore_passthrough_WT
- name: change_streams_WT
- name: change_streams_mongos_passthrough_WT
+ - name: change_streams_mongos_sessions_passthrough_WT
- name: change_streams_secondary_reads_WT
- name: change_streams_sharded_collections_passthrough_WT
- name: concurrency_WT
@@ -7576,6 +7592,7 @@ buildvariants:
- name: bulk_gle_passthrough_WT
- name: change_streams_WT
- name: change_streams_mongos_passthrough_WT
+ - name: change_streams_mongos_sessions_passthrough_WT
- name: change_streams_secondary_reads_WT
- name: change_streams_sharded_collections_passthrough_WT
- name: concurrency_WT
@@ -9018,6 +9035,7 @@ buildvariants:
- name: sharded_causally_consistent_jscore_passthrough_WT
- name: change_streams_WT
- name: change_streams_mongos_passthrough_WT
+ - name: change_streams_mongos_sessions_passthrough_WT
- name: change_streams_sharded_collections_passthrough_WT
- name: concurrency
distros:
@@ -9399,6 +9417,7 @@ buildvariants:
- name: sharded_causally_consistent_jscore_passthrough_WT
- name: change_streams_WT
- name: change_streams_mongos_passthrough_WT
+ - name: change_streams_mongos_sessions_passthrough_WT
- name: change_streams_secondary_reads_WT
- name: change_streams_sharded_collections_passthrough_WT
- name: concurrency_WT
@@ -9557,6 +9576,7 @@ buildvariants:
- name: sharded_causally_consistent_jscore_passthrough_WT
- name: change_streams_WT
- name: change_streams_mongos_passthrough_WT
+ - name: change_streams_mongos_sessions_passthrough_WT
- name: change_streams_secondary_reads_WT
- name: change_streams_sharded_collections_passthrough_WT
- name: concurrency_WT
diff --git a/jstests/change_streams/only_wake_getmore_for_relevant_changes.js b/jstests/change_streams/only_wake_getmore_for_relevant_changes.js
index 84f4d0ae979..8a42a176628 100644
--- a/jstests/change_streams/only_wake_getmore_for_relevant_changes.js
+++ b/jstests/change_streams/only_wake_getmore_for_relevant_changes.js
@@ -21,38 +21,41 @@
// the getMore to run. To prevent this from happening, the main thread waits for an insert
// into "sentinel", to signal that the parallel shell has started and is waiting for the
// getMore to appear in currentOp.
- const shellSentinelCollection = assertDropAndRecreateCollection(db, "shell_sentinel");
-
const port =
(collection.stats().sharded ? collection.getMongo().port
: FixtureHelpers.getPrimaryForNodeHostingDatabase(db).port);
- const awaitShellDoingEventDuringGetMore = startParallelShell(`
-// Signal that the parallel shell has started.
-assert.writeOK(db.getCollection("${ shellSentinelCollection.getName() }").insert({}));
-
-// Wait for the getMore to appear in currentOp.
-assert.soon(function() {
- return db.currentOp({
- op: "getmore",
- "command.collection": "${collection.getName()}",
- "originatingCommand.comment": "${identifyingComment}",
- }).inprog.length > 0;
-});
+ const sentinelCountBefore = shellSentinelCollection.find().itcount();
-const eventFn = ${ event.toString() };
-eventFn();`,
+ const awaitShellDoingEventDuringGetMore = startParallelShell(`
+ // Wait for the getMore to appear in currentOp.
+ assert.soon(function() {
+ return db.currentOp({
+ op: "getmore",
+ "originatingCommand.comment": "${identifyingComment}",
+ }).inprog.length > 0;
+ });
+ const eventFn = ${event.toString()};
+ eventFn();
+ //Signal that the parallel shell has completed its event function.
+ assert.writeOK(db.getCollection("${shellSentinelCollection.getName()}").insert({}));`,
port);
- // Wait for the shell to start.
- assert.soon(() => shellSentinelCollection.findOne() != null);
-
// Run and time the getMore.
- const startTime = (new Date()).getTime();
- const result = assert.commandWorked(db.runCommand(
- {getMore: awaitDataCursorId, collection: collection.getName(), maxTimeMS: maxTimeMS}));
+ let startTime, result, elapsedMs;
+ assert.soon(function() {
+ startTime = (new Date()).getTime();
+ result = assert.commandWorked(db.runCommand({
+ getMore: awaitDataCursorId,
+ collection: collection.getName(),
+ maxTimeMS: maxTimeMS
+ }));
+ elapsedMs = (new Date()).getTime() - startTime;
+ return result.cursor.nextBatch.length > 0 ||
+ shellSentinelCollection.find().itcount() > sentinelCountBefore;
+ });
awaitShellDoingEventDuringGetMore();
- return {result: result, elapsedMs: (new Date()).getTime() - startTime};
+ return {result: result, elapsedMs: elapsedMs};
}
/**
@@ -107,7 +110,10 @@ eventFn();`,
return result;
}
+ // Refresh all collections which will be required in the course of this test.
+ const shellSentinelCollection = assertDropAndRecreateCollection(db, "shell_sentinel");
const changesCollection = assertDropAndRecreateCollection(db, "changes");
+ const unrelatedCollection = assertDropCollection(db, "unrelated_collection");
// Start a change stream cursor.
const wholeCollectionStreamComment = "change stream on entire collection";
@@ -131,16 +137,15 @@ eventFn();`,
event: () => assert.writeOK(db.changes.insert({_id: "wake up"}))
});
assert.eq(getMoreResponse.cursor.nextBatch.length, 1);
- assert.docEq(getMoreResponse.cursor.nextBatch[0], {
- documentKey: {_id: "wake up"},
- fullDocument: {_id: "wake up"},
- ns: {db: db.getName(), coll: changesCollection.getName()},
- operationType: "insert"
- });
+ assert.eq(getMoreResponse.cursor.nextBatch[0].operationType,
+ "insert",
+ tojson(getMoreResponse.cursor.nextBatch[0]));
+ assert.eq(getMoreResponse.cursor.nextBatch[0].fullDocument,
+ {_id: "wake up"},
+ tojson(getMoreResponse.cursor.nextBatch[0]));
// Test that an insert to an unrelated collection will not cause the change stream to wake up
// and return an empty batch before reaching the maxTimeMS.
- assertDropCollection(db, "unrelated_collection");
assertEventDoesNotWakeCursor({
collection: changesCollection,
awaitDataCursorId: changeCursorId,
diff --git a/src/mongo/db/repl/base_cloner_test_fixture.cpp b/src/mongo/db/repl/base_cloner_test_fixture.cpp
index fa418d338fb..2ae3df0e8f5 100644
--- a/src/mongo/db/repl/base_cloner_test_fixture.cpp
+++ b/src/mongo/db/repl/base_cloner_test_fixture.cpp
@@ -104,14 +104,33 @@ BSONObj BaseClonerTest::createListIndexesResponse(CursorId cursorId, const BSONA
return createListIndexesResponse(cursorId, specs, "firstBatch");
}
+namespace {
+struct EnsureClientHasBeenInitialized : public executor::ThreadPoolMock::Options {
+ EnsureClientHasBeenInitialized() : executor::ThreadPoolMock::Options() {
+ onCreateThread = []() { Client::initThread("CollectionClonerTestThread"); };
+ }
+};
+} // namespace
+
BaseClonerTest::BaseClonerTest()
- : _mutex(), _setStatusCondition(), _status(getDetectableErrorStatus()) {}
+ : ThreadPoolExecutorTest(EnsureClientHasBeenInitialized()),
+ _mutex(),
+ _setStatusCondition(),
+ _status(getDetectableErrorStatus()) {}
void BaseClonerTest::setUp() {
executor::ThreadPoolExecutorTest::setUp();
clear();
launchExecutorThread();
- dbWorkThreadPool = stdx::make_unique<OldThreadPool>(1);
+
+ Client::initThread("CollectionClonerTest");
+ ThreadPool::Options options;
+ options.minThreads = 1U;
+ options.maxThreads = 1U;
+ options.onCreateThread = [](StringData threadName) { Client::initThread(threadName); };
+ dbWorkThreadPool = stdx::make_unique<OldThreadPool>(options);
+ dbWorkThreadPool->startThreads();
+
storageInterface.reset(new StorageInterfaceMock());
}
@@ -122,6 +141,7 @@ void BaseClonerTest::tearDown() {
storageInterface.reset();
dbWorkThreadPool->join();
dbWorkThreadPool.reset();
+ Client::releaseCurrent();
}
void BaseClonerTest::clear() {
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp
index ffcbf7fafb0..1d26394e113 100644
--- a/src/mongo/db/repl/collection_cloner.cpp
+++ b/src/mongo/db/repl/collection_cloner.cpp
@@ -246,8 +246,17 @@ void CollectionCloner::shutdown() {
void CollectionCloner::_cancelRemainingWork_inlock() {
if (_arm) {
- Client::initThreadIfNotAlready();
- _killArmHandle = _arm->kill(cc().getOperationContext());
+ // This method can be called from a callback from either a TaskExecutor or a TaskRunner. The
+ // TaskExecutor should never have an OperationContext attached to the Client, and the
+ // TaskRunner should always have an OperationContext attached. Unfortunately, we don't know
+ // which situation we're in, so have to handle both.
+ auto& client = cc();
+ if (auto opCtx = client.getOperationContext()) {
+ _killArmHandle = _arm->kill(opCtx);
+ } else {
+ auto newOpCtx = client.makeOperationContext();
+ _killArmHandle = _arm->kill(newOpCtx.get());
+ }
}
_countScheduler.shutdown();
_listIndexesFetcher.shutdown();
@@ -650,9 +659,12 @@ void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCa
_clusterClientCursorParams->remotes = std::move(remoteCursors);
if (_collectionCloningBatchSize > 0)
_clusterClientCursorParams->batchSize = _collectionCloningBatchSize;
- Client::initThreadIfNotAlready();
+ // Client::initThreadIfNotAlready();
+ auto opCtx = cc().makeOperationContext();
_arm = stdx::make_unique<AsyncResultsMerger>(
- cc().getOperationContext(), _executor, _clusterClientCursorParams.get());
+ opCtx.get(), _executor, _clusterClientCursorParams.get());
+ _arm->detachFromOperationContext();
+ opCtx.reset();
// This completion guard invokes _finishCallback on destruction.
auto cancelRemainingWorkInLock = [this]() { _cancelRemainingWork_inlock(); };
@@ -665,7 +677,6 @@ void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCa
// outside the mutex. This is a necessary condition to invoke _finishCallback.
stdx::lock_guard<stdx::mutex> lock(_mutex);
Status scheduleStatus = _scheduleNextARMResultsCallback(onCompletionGuard);
- _arm->detachFromOperationContext();
if (!scheduleStatus.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, scheduleStatus);
return;
@@ -689,9 +700,10 @@ StatusWith<std::vector<BSONElement>> CollectionCloner::_parseParallelCollectionS
}
Status CollectionCloner::_bufferNextBatchFromArm(WithLock lock) {
- Client::initThreadIfNotAlready();
- auto opCtx = cc().getOperationContext();
- _arm->reattachToOperationContext(opCtx);
+ // We expect this callback to execute in a thread from a TaskExecutor which will not have an
+ // OperationContext populated. We must make one ourselves.
+ auto opCtx = cc().makeOperationContext();
+ _arm->reattachToOperationContext(opCtx.get());
while (_arm->ready()) {
auto armResultStatus = _arm->nextReady();
if (!armResultStatus.getStatus().isOK()) {
@@ -712,8 +724,10 @@ Status CollectionCloner::_bufferNextBatchFromArm(WithLock lock) {
Status CollectionCloner::_scheduleNextARMResultsCallback(
std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
- Client::initThreadIfNotAlready();
- _arm->reattachToOperationContext(cc().getOperationContext());
+ // We expect this callback to execute in a thread from a TaskExecutor which will not have an
+ // OperationContext populated. We must make one ourselves.
+ auto opCtx = cc().makeOperationContext();
+ _arm->reattachToOperationContext(opCtx.get());
auto nextEvent = _arm->nextEvent();
_arm->detachFromOperationContext();
if (!nextEvent.isOK()) {
diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp
index 32c8ea3e568..7d9666d2907 100644
--- a/src/mongo/db/repl/collection_cloner_test.cpp
+++ b/src/mongo/db/repl/collection_cloner_test.cpp
@@ -1129,68 +1129,6 @@ TEST_F(CollectionClonerTest, LastBatchContainsNoDocuments) {
ASSERT_FALSE(collectionCloner->isActive());
}
-TEST_F(CollectionClonerTest, MiddleBatchContainsNoDocuments) {
- ASSERT_OK(collectionCloner->startup());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
- processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
- }
- ASSERT_TRUE(collectionCloner->isActive());
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
- ASSERT_TRUE(collectionStats.initCalled);
-
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
- const BSONObj doc = BSON("_id" << 1);
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, BSON_ARRAY(doc)));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(1, collectionStats.insertCount);
-
- ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray, "nextBatch"));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(1, collectionStats.insertCount);
-
- ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
- ASSERT_TRUE(collectionCloner->isActive());
-
- const BSONObj doc2 = BSON("_id" << 2);
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc2)));
- }
-
- collectionCloner->join();
-
- ASSERT_EQUALS(2, collectionStats.insertCount);
- ASSERT_TRUE(collectionStats.commitCalled);
-
- ASSERT_OK(getStatus());
- ASSERT_FALSE(collectionCloner->isActive());
-}
-
TEST_F(CollectionClonerTest, CollectionClonerTransitionsToCompleteIfShutdownBeforeStartup) {
collectionCloner->shutdown();
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, collectionCloner->startup());
@@ -1969,99 +1907,6 @@ TEST_F(ParallelCollectionClonerTest, InsertDocumentsWithMultipleCursorsOfDiffere
ASSERT_FALSE(collectionCloner->isActive());
}
-TEST_F(ParallelCollectionClonerTest, MiddleBatchContainsNoDocumentsWithMultipleCursors) {
- ASSERT_OK(collectionCloner->startup());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
- processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
- }
- ASSERT_TRUE(collectionCloner->isActive());
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
- ASSERT_TRUE(collectionStats.initCalled);
-
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray)
- << createCursorResponse(2, emptyArray)
- << createCursorResponse(3, emptyArray))
- << "ok"
- << 1));
- }
- collectionCloner->waitForDbWorker();
-
- auto exec = &getExecutor();
- std::vector<BSONObj> docs;
- // Record the buffered documents before they are inserted so we can
- // validate them.
- collectionCloner->setScheduleDbWorkFn_forTest(
- [&](const executor::TaskExecutor::CallbackFn& workFn) {
- auto buffered = collectionCloner->getDocumentsToInsert_forTest();
- docs.insert(docs.end(), buffered.begin(), buffered.end());
- return exec->scheduleWork(workFn);
- });
-
- ASSERT_TRUE(collectionCloner->isActive());
-
- int numDocs = 6;
- std::vector<BSONObj> generatedDocs = generateDocs(numDocs);
- const BSONObj doc = BSON("_id" << 1);
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, BSON_ARRAY(generatedDocs[0])));
- processNetworkResponse(createCursorResponse(2, BSON_ARRAY(generatedDocs[1])));
- processNetworkResponse(createCursorResponse(3, BSON_ARRAY(generatedDocs[2])));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(3U, docs.size());
- for (int i = 0; i < 3; i++) {
- ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]);
- }
- ASSERT_EQUALS(3, collectionStats.insertCount);
-
- ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray, "nextBatch"));
- processNetworkResponse(createCursorResponse(2, emptyArray, "nextBatch"));
- processNetworkResponse(createCursorResponse(3, emptyArray, "nextBatch"));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(3, collectionStats.insertCount);
-
- ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[3])));
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[4])));
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[5])));
- }
-
- collectionCloner->join();
-
- ASSERT_EQUALS(6U, docs.size());
- for (int i = 3; i < 6; i++) {
- ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]);
- }
-
- ASSERT_EQUALS(numDocs, collectionStats.insertCount);
- ASSERT_TRUE(collectionStats.commitCalled);
-
- ASSERT_OK(getStatus());
- ASSERT_FALSE(collectionCloner->isActive());
-}
-
TEST_F(ParallelCollectionClonerTest, LastBatchContainsNoDocumentsWithMultipleCursors) {
ASSERT_OK(collectionCloner->startup());
ASSERT_TRUE(collectionCloner->isActive());
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 241f09f209b..b9ef1290fa6 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -326,6 +326,7 @@ ClusterQueryResult AsyncResultsMerger::_nextReadyUnsorted(WithLock) {
}
Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) {
+ invariant(_opCtx, "Cannot schedule a getMore without an OperationContext");
auto& remote = _remotes[remoteIndex];
invariant(!remote.cbHandle.isValid());
@@ -364,6 +365,32 @@ Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) {
return Status::OK();
}
+Status AsyncResultsMerger::scheduleGetMores() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _scheduleGetMores(lk);
+}
+
+Status AsyncResultsMerger::_scheduleGetMores(WithLock lk) {
+ // Schedule remote work on hosts for which we need more results.
+ for (size_t i = 0; i < _remotes.size(); ++i) {
+ auto& remote = _remotes[i];
+
+ if (!remote.status.isOK()) {
+ return remote.status;
+ }
+
+ if (!remote.hasNext() && !remote.exhausted() && !remote.cbHandle.isValid()) {
+ // If this remote is not exhausted and there is no outstanding request for it, schedule
+ // work to retrieve the next batch.
+ auto nextBatchStatus = _askForNextBatch(lk, i);
+ if (!nextBatchStatus.isOK()) {
+ return nextBatchStatus;
+ }
+ }
+ }
+ return Status::OK();
+}
+
/*
* Note: When nextEvent() is called to do retries, only the remotes with retriable errors will
* be rescheduled because:
@@ -388,22 +415,9 @@ StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent()
"nextEvent() called before an outstanding event was signaled");
}
- // Schedule remote work on hosts for which we need more results.
- for (size_t i = 0; i < _remotes.size(); ++i) {
- auto& remote = _remotes[i];
-
- if (!remote.status.isOK()) {
- return remote.status;
- }
-
- if (!remote.hasNext() && !remote.exhausted() && !remote.cbHandle.isValid()) {
- // If this remote is not exhausted and there is no outstanding request for it, schedule
- // work to retrieve the next batch.
- auto nextBatchStatus = _askForNextBatch(lk, i);
- if (!nextBatchStatus.isOK()) {
- return nextBatchStatus;
- }
- }
+ auto getMoresStatus = _scheduleGetMores(lk);
+ if (!getMoresStatus.isOK()) {
+ return getMoresStatus;
}
auto eventStatus = _executor->makeEvent();
@@ -570,9 +584,11 @@ void AsyncResultsMerger::_processBatchResults(WithLock lk,
if (_params->tailableMode == TailableMode::kTailable && !remote.hasNext()) {
invariant(_remotes.size() == 1);
_eofNext = true;
- } else if (!remote.hasNext() && !remote.exhausted() && _lifecycleState == kAlive) {
+ } else if (!remote.hasNext() && !remote.exhausted() && _lifecycleState == kAlive && _opCtx) {
// If this is normal or tailable-awaitData cursor and we still don't have anything buffered
// after receiving this batch, we can schedule work to retrieve the next batch right away.
+ // Be careful only to do this when '_opCtx' is non-null, since it is illegal to schedule a
+ // remote command on a user's behalf without a non-null OperationContext.
remote.status = _askForNextBatch(lk, remoteIndex);
}
}
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index da21593486c..b48e5004a56 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -179,6 +179,24 @@ public:
StatusWith<executor::TaskExecutor::EventHandle> nextEvent();
/**
+ * Schedules a getMore on any remote hosts which:
+ * - Do not have an error status set already.
+ * - Don't already have a request outstanding.
+ * - We don't currently have any results buffered.
+ * - Are not exhausted (have a non-zero cursor id).
+ * Returns an error if any of the remotes responded with an error, or if we encounter an error
+ * while scheduling the getMore requests..
+ *
+ * In most cases users should call nextEvent() instead of this method, but this can be necessary
+ * if the caller of nextEvent() calls detachFromOperationContext() before the event is signaled.
+ * In such cases, the ARM cannot schedule getMores itself, and will need to be manually prompted
+ * after calling reattachToOperationContext().
+ *
+ * It is illegal to call this method if the ARM is not attached to an OperationContext.
+ */
+ Status scheduleGetMores();
+
+ /**
* Adds the specified shard cursors to the set of cursors to be merged. The results from the
* new cursors will be returned as normal through nextReady().
*/
@@ -368,6 +386,12 @@ private:
*/
bool _haveOutstandingBatchRequests(WithLock);
+
+ /**
+ * Schedules a getMore on any remote hosts which we need another batch from.
+ */
+ Status _scheduleGetMores(WithLock);
+
/**
* Schedules a killCursors command to be run on all remote hosts that have open cursors.
*/
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index 8155c50237c..8c82fbdbb0c 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -43,6 +43,7 @@
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/sharding_test_fixture.h"
#include "mongo/stdx/memory.h"
+#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
@@ -62,6 +63,10 @@ const std::vector<HostAndPort> kTestShardHosts = {HostAndPort("FakeShard1Host",
HostAndPort("FakeShard2Host", 12345),
HostAndPort("FakeShard3Host", 12345)};
+LogicalSessionId parseSessionIdFromCmd(BSONObj cmdObj) {
+ return LogicalSessionId::parse(IDLParserErrorContext("lsid"), cmdObj["lsid"].Obj());
+}
+
class AsyncResultsMergerTest : public ShardingTestFixture {
public:
AsyncResultsMergerTest() : _nss("testdb.testcoll") {}
@@ -155,11 +160,12 @@ protected:
/**
* Schedules a list of cursor responses to be returned by the mock network.
*/
- void scheduleNetworkResponses(std::vector<CursorResponse> responses,
- CursorResponse::ResponseType responseType) {
+ void scheduleNetworkResponses(std::vector<CursorResponse> responses) {
std::vector<BSONObj> objs;
for (const auto& cursorResponse : responses) {
- objs.push_back(cursorResponse.toBSON(responseType));
+ // For tests of the AsyncResultsMerger, all CursorRepsonses scheduled by the tests are
+ // subsequent responses, since the AsyncResultsMerger will only ever run getMores.
+ objs.push_back(cursorResponse.toBSON(CursorResponse::ResponseType::SubsequentResponse));
}
scheduleNetworkResponseObjs(objs);
}
@@ -191,6 +197,11 @@ protected:
return retRequest;
}
+ bool networkHasReadyRequests() {
+ NetworkInterfaceMock::InNetworkGuard guard(network());
+ return guard->hasReadyRequests();
+ }
+
void scheduleErrorResponse(ResponseStatus rs) {
invariant(!rs.isOK());
rs.elapsedMillis = Milliseconds(0);
@@ -243,8 +254,7 @@ TEST_F(AsyncResultsMergerTest, SingleShardUnsorted) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")};
responses.emplace_back(_nss, CursorId(0), batch);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
// Now that the responses have been delivered, ARM is ready to return results.
ASSERT_TRUE(arm->ready());
@@ -287,8 +297,7 @@ TEST_F(AsyncResultsMergerTest, SingleShardSorted) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{$sortKey: {'': 5}}"), fromjson("{$sortKey: {'': 6}}")};
responses.emplace_back(_nss, CursorId(0), batch);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
// Now that the responses have been delivered, ARM is ready to return results.
ASSERT_TRUE(arm->ready());
@@ -332,8 +341,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardUnsorted) {
std::vector<BSONObj> batch1 = {
fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")};
responses.emplace_back(_nss, CursorId(0), batch1);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
// ARM is ready to return first result.
ASSERT_TRUE(arm->ready());
@@ -360,8 +368,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardUnsorted) {
std::vector<BSONObj> batch2 = {
fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")};
responses.emplace_back(_nss, CursorId(0), batch2);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
// ARM is ready to return remaining results.
ASSERT_TRUE(arm->ready());
@@ -404,8 +411,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardSorted) {
std::vector<BSONObj> batch1 = {fromjson("{$sortKey: {'': 5}}"),
fromjson("{$sortKey: {'': 6}}")};
responses.emplace_back(_nss, CursorId(0), batch1);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
// ARM is not ready to return results until receiving responses from all remotes.
ASSERT_FALSE(arm->ready());
@@ -418,8 +424,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardSorted) {
std::vector<BSONObj> batch2 = {fromjson("{$sortKey: {'': 3}}"),
fromjson("{$sortKey: {'': 9}}")};
responses.emplace_back(_nss, CursorId(0), batch2);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
// Now that all remotes have responded, ARM is ready to return results.
ASSERT_TRUE(arm->ready());
@@ -463,8 +468,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) {
std::vector<BSONObj> batch1 = {
fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")};
responses.emplace_back(_nss, CursorId(5), batch1);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
// ARM is ready to return first result.
ASSERT_TRUE(arm->ready());
@@ -492,8 +496,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) {
std::vector<BSONObj> batch2 = {
fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")};
responses.emplace_back(_nss, CursorId(0), batch2);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
// ARM is ready to return second shard's results.
ASSERT_TRUE(arm->ready());
@@ -520,8 +523,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) {
std::vector<BSONObj> batch3 = {
fromjson("{_id: 7}"), fromjson("{_id: 8}"), fromjson("{_id: 9}")};
responses.emplace_back(_nss, CursorId(0), batch3);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
// ARM is ready to return remaining results.
ASSERT_TRUE(arm->ready());
@@ -565,8 +567,7 @@ TEST_F(AsyncResultsMergerTest, CompoundSortKey) {
std::vector<BSONObj> batch3 = {fromjson("{$sortKey: {'': 10, '': 12}}"),
fromjson("{$sortKey: {'': 5, '': 9}}")};
responses.emplace_back(_nss, CursorId(0), batch3);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
// ARM returns all results in sorted order.
@@ -610,8 +611,7 @@ TEST_F(AsyncResultsMergerTest, SortedButNoSortKey) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{a: 2, b: 1}"), fromjson("{a: 1, b: 2}")};
responses.emplace_back(_nss, CursorId(1), batch1);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -659,8 +659,7 @@ TEST_F(AsyncResultsMergerTest, HasFirstBatch) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")};
responses.emplace_back(_nss, CursorId(0), batch);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
// Now that the responses have been delivered, ARM is ready to return results.
ASSERT_TRUE(arm->ready());
@@ -718,8 +717,7 @@ TEST_F(AsyncResultsMergerTest, OneShardHasInitialBatchOtherShardExhausted) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")};
responses.emplace_back(_nss, CursorId(0), batch);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
// Now that the responses have been delivered, ARM is ready to return results.
ASSERT_TRUE(arm->ready());
@@ -757,8 +755,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
responses.emplace_back(_nss, CursorId(1), batch1);
std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
responses.emplace_back(_nss, CursorId(2), batch2);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -779,8 +776,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
responses.clear();
std::vector<BSONObj> batch3 = {fromjson("{_id: 5}"), fromjson("{_id: 6}")};
responses.emplace_back(_nss, CursorId(1), batch3);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
blackHoleNextRequest();
executor()->waitForEvent(readyEvent);
@@ -797,8 +793,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
responses.clear();
std::vector<BSONObj> batch4 = {fromjson("{_id: 7}"), fromjson("{_id: 8}")};
responses.emplace_back(_nss, CursorId(0), batch4);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -825,8 +820,7 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")};
responses.emplace_back(_nss, CursorId(456), batch);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -883,8 +877,7 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) {
responses.emplace_back(_nss, CursorId(1), batch1);
std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
responses.emplace_back(_nss, CursorId(2), batch2);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
scheduleErrorResponse({ErrorCodes::BadValue, "bad thing happened"});
executor()->waitForEvent(readyEvent);
@@ -914,8 +907,7 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
responses.emplace_back(_nss, CursorId(0), batch);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -992,8 +984,7 @@ TEST_F(AsyncResultsMergerTest, KillAllBatchesReceived) {
responses.emplace_back(_nss, CursorId(0), batch2);
std::vector<BSONObj> batch3 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
responses.emplace_back(_nss, CursorId(123), batch3);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
// Kill should be able to return right away if there are no pending batches.
auto killedEvent = arm->kill(operationContext());
@@ -1016,8 +1007,7 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
responses.emplace_back(_nss, CursorId(0), batch1);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
// Kill event will only be signalled once the callbacks for the pending batches have run.
auto killedEvent = arm->kill(operationContext());
@@ -1026,13 +1016,11 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) {
responses.clear();
std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
responses.emplace_back(_nss, CursorId(2), batch2);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
responses.clear();
std::vector<BSONObj> batch3 = {fromjson("{_id: 5}"), fromjson("{_id: 6}")};
responses.emplace_back(_nss, CursorId(3), batch3);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
// Ensure that we properly signal those waiting for more results to be ready.
executor()->waitForEvent(readyEvent);
@@ -1051,8 +1039,7 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
responses.emplace_back(_nss, CursorId(1), batch1);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
auto killedEvent = arm->kill(operationContext());
@@ -1087,8 +1074,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
responses.emplace_back(_nss, CursorId(123), batch1);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -1108,8 +1094,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) {
responses.clear();
std::vector<BSONObj> batch2 = {fromjson("{_id: 3}")};
responses.emplace_back(_nss, CursorId(123), batch2);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -1137,8 +1122,7 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch;
responses.emplace_back(_nss, CursorId(123), batch);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
// After receiving an empty batch, the ARM should return boost::none, but remotes should not be
@@ -1165,8 +1149,7 @@ TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch;
responses.emplace_back(_nss, CursorId(0), batch);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
// Afterwards, the ARM should return boost::none and remote cursors should be marked as
@@ -1190,8 +1173,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) {
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
responses.emplace_back(_nss, CursorId(1), batch1);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -1211,8 +1193,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) {
ASSERT_OK(request.getStatus());
ASSERT_EQ(*request.getValue().batchSize, 1LL);
ASSERT_EQ(request.getValue().cursorid, 1LL);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -1241,8 +1222,7 @@ TEST_F(AsyncResultsMergerTest, SendsSecondaryOkAsMetadata) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}")};
responses.emplace_back(_nss, CursorId(0), batch1);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -1274,8 +1254,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
responses.emplace_back(_nss, CursorId(98), batch1);
std::vector<BSONObj> batch2 = {fromjson("{_id: 2}")};
responses.emplace_back(_nss, CursorId(99), batch2);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -1295,8 +1274,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
responses.clear();
std::vector<BSONObj> batch3 = {fromjson("{_id: 3}")};
responses.emplace_back(_nss, CursorId(99), batch3);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -1310,8 +1288,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
responses.clear();
std::vector<BSONObj> batch4 = {};
responses.emplace_back(_nss, CursorId(0), batch4);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -1331,8 +1308,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
responses.emplace_back(_nss, CursorId(98), batch);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -1366,8 +1342,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsOnRetriableErrorNoRetries) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 1}")};
responses.emplace_back(_nss, CursorId(0), batch);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
// From the second host we get a network (retriable) error.
scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"});
@@ -1422,8 +1397,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}")};
responses.emplace_back(_nss, CursorId(123), batch1);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -1444,8 +1418,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
responses.clear();
std::vector<BSONObj> batch2 = {fromjson("{_id: 2}")};
responses.emplace_back(_nss, CursorId(123), batch2);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -1465,8 +1438,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
responses.clear();
std::vector<BSONObj> batch3 = {fromjson("{_id: 3}")};
responses.emplace_back(_nss, CursorId(0), batch3);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
}
TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHasNoOplogTimestamp) {
@@ -1491,8 +1463,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHas
"$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")};
const Timestamp lastObservedFirstCursor = Timestamp(1, 6);
responses.emplace_back(_nss, CursorId(123), batch1, boost::none, lastObservedFirstCursor);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
// Still shouldn't be ready, we don't have a guarantee from each shard.
ASSERT_FALSE(arm->ready());
@@ -1504,8 +1475,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHas
"$sortKey: {'': Timestamp(1, 5), '': 1, '': 2}}")};
const Timestamp lastObservedSecondCursor = Timestamp(1, 5);
responses.emplace_back(_nss, CursorId(456), batch2, boost::none, lastObservedSecondCursor);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
ASSERT_BSONOBJ_EQ(
@@ -1524,13 +1494,11 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHas
responses.clear();
std::vector<BSONObj> batch3 = {};
responses.emplace_back(_nss, CursorId(0), batch3);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
responses.clear();
std::vector<BSONObj> batch4 = {};
responses.emplace_back(_nss, CursorId(0), batch4);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
}
TEST_F(AsyncResultsMergerTest,
@@ -1563,8 +1531,7 @@ TEST_F(AsyncResultsMergerTest,
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch3 = {};
responses.emplace_back(_nss, CursorId(0), batch3, boost::none, Timestamp(1, 8));
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(unittest::assertGet(arm->nextEvent()));
ASSERT_TRUE(arm->ready());
ASSERT_BSONOBJ_EQ(
@@ -1579,8 +1546,7 @@ TEST_F(AsyncResultsMergerTest,
responses.clear();
std::vector<BSONObj> batch4 = {};
responses.emplace_back(_nss, CursorId(0), batch4);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
}
TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneRemoteHasLowerOplogTime) {
@@ -1612,8 +1578,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneRemoteHasLowerOp
// Clean up the cursors.
std::vector<CursorResponse> responses;
responses.emplace_back(_nss, CursorId(0), std::vector<BSONObj>{});
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
auto killEvent = arm->kill(operationContext());
executor()->waitForEvent(killEvent);
}
@@ -1639,8 +1604,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting)
"$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")};
const Timestamp lastObservedFirstCursor = Timestamp(1, 6);
responses.emplace_back(_nss, CursorId(123), batch1, boost::none, lastObservedFirstCursor);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
// Should be ready now.
ASSERT_TRUE(arm->ready());
@@ -1661,8 +1625,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting)
"$sortKey: {'': Timestamp(1, 5), '': 1, '': 2}}")};
const Timestamp lastObservedSecondCursor = Timestamp(1, 5);
responses.emplace_back(_nss, CursorId(456), batch2, boost::none, lastObservedSecondCursor);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
ASSERT_BSONOBJ_EQ(
@@ -1681,13 +1644,11 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting)
responses.clear();
std::vector<BSONObj> batch3 = {};
responses.emplace_back(_nss, CursorId(0), batch3);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
responses.clear();
std::vector<BSONObj> batch4 = {};
responses.emplace_back(_nss, CursorId(0), batch4);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
}
TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting) {
@@ -1711,8 +1672,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting
"$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")};
const Timestamp lastObservedFirstCursor = Timestamp(1, 6);
responses.emplace_back(_nss, CursorId(123), batch1, boost::none, lastObservedFirstCursor);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
// Should be ready now.
ASSERT_TRUE(arm->ready());
@@ -1735,8 +1695,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting
// from it.
const Timestamp lastObservedSecondCursor = Timestamp(1, 5);
responses.emplace_back(_nss, CursorId(456), batch2, boost::none, lastObservedSecondCursor);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
ASSERT_BSONOBJ_EQ(
@@ -1755,13 +1714,11 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting
responses.clear();
std::vector<BSONObj> batch3 = {};
responses.emplace_back(_nss, CursorId(0), batch3);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
responses.clear();
std::vector<BSONObj> batch4 = {};
responses.emplace_back(_nss, CursorId(0), batch4);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
}
TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) {
@@ -1831,14 +1788,58 @@ TEST_F(AsyncResultsMergerTest, KillShouldWaitForRemoteCommandsBeforeSchedulingKi
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 1}")};
responses.emplace_back(_nss, CursorId(1), batch);
- scheduleNetworkResponses(std::move(responses),
- CursorResponse::ResponseType::SubsequentResponse);
+ scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
// Now the kill cursors command should be scheduled.
executor()->waitForEvent(killEvent);
}
-} // namespace
+TEST_F(AsyncResultsMergerTest, ShouldNotScheduleGetMoresWithoutAnOperationContext) {
+ BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true, awaitData: true}");
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.push_back({kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {})});
+ makeCursorFromExistingCursors(std::move(cursors), findCmd);
+
+ ASSERT_FALSE(arm->ready());
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
+ ASSERT_FALSE(arm->ready());
+
+ // While detached from the OperationContext, schedule an empty batch response. Because the
+ // response is empty and this is a tailable cursor, the ARM will need to run another getMore on
+ // that host, but it should not schedule this without a non-null OperationContext.
+ arm->detachFromOperationContext();
+ {
+ std::vector<CursorResponse> responses;
+ std::vector<BSONObj> emptyBatch;
+ responses.emplace_back(_nss, CursorId(123), emptyBatch);
+ scheduleNetworkResponses(std::move(responses));
+ }
+
+ ASSERT_FALSE(arm->ready());
+ ASSERT_FALSE(networkHasReadyRequests()); // Tests that we haven't asked for the next batch yet.
+
+ // After manually requesting the next getMore, the ARM should be ready.
+ arm->reattachToOperationContext(operationContext());
+ ASSERT_OK(arm->scheduleGetMores());
+
+ // Schedule the next getMore response.
+ {
+ std::vector<CursorResponse> responses;
+ std::vector<BSONObj> nonEmptyBatch = {fromjson("{_id: 1}")};
+ responses.emplace_back(_nss, CursorId(123), nonEmptyBatch);
+ scheduleNetworkResponses(std::move(responses));
+ }
+ ASSERT_TRUE(arm->ready());
+ ASSERT_FALSE(arm->remotesExhausted());
+ ASSERT_BSONOBJ_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult());
+ ASSERT_FALSE(arm->ready());
+ ASSERT_FALSE(arm->remotesExhausted());
+
+ auto killedEvent = arm->kill(operationContext());
+ executor()->waitForEvent(killedEvent);
+}
+
+} // namespace
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp
index 31c200bb004..f105c87a5da 100644
--- a/src/mongo/s/query/router_stage_merge.cpp
+++ b/src/mongo/s/query/router_stage_merge.cpp
@@ -102,6 +102,18 @@ StatusWith<EventHandle> RouterStageMerge::getNextEvent() {
// If we abandoned a previous event due to a mongoS-side timeout, wait for it first.
if (_leftoverEventFromLastTimeout) {
invariant(_params->tailableMode == TailableMode::kTailableAndAwaitData);
+ // If we have an outstanding event from last time, then we might have to manually schedule
+ // some getMores for the cursors. If a remote response came back while we were between
+ // getMores (from the user to mongos), the response may have been an empty batch, and the
+ // ARM would not be able to ask for the next batch immediately since it is not attached to
+ // an OperationContext. Now that we have a valid OperationContext, we schedule the getMores
+ // ourselves.
+ Status getMoreStatus = _arm.scheduleGetMores();
+ if (!getMoreStatus.isOK()) {
+ return getMoreStatus;
+ }
+
+ // Return the leftover event and clear '_leftoverEventFromLastTimeout'.
auto event = _leftoverEventFromLastTimeout;
_leftoverEventFromLastTimeout = EventHandle();
return event;
diff --git a/src/mongo/util/concurrency/old_thread_pool.cpp b/src/mongo/util/concurrency/old_thread_pool.cpp
index f253d1f4b05..5ff9b61ebb9 100644
--- a/src/mongo/util/concurrency/old_thread_pool.cpp
+++ b/src/mongo/util/concurrency/old_thread_pool.cpp
@@ -60,6 +60,8 @@ OldThreadPool::OldThreadPool(const DoNotStartThreadsTag&,
const std::string& threadNamePrefix)
: _pool(makeOptions(nThreads, threadNamePrefix)) {}
+OldThreadPool::OldThreadPool(ThreadPool::Options options) : _pool(std::move(options)) {}
+
std::size_t OldThreadPool::getNumThreads() const {
return _pool.getStats().numThreads;
}
diff --git a/src/mongo/util/concurrency/old_thread_pool.h b/src/mongo/util/concurrency/old_thread_pool.h
index 0a1f7e9887f..eed66d65f14 100644
--- a/src/mongo/util/concurrency/old_thread_pool.h
+++ b/src/mongo/util/concurrency/old_thread_pool.h
@@ -53,6 +53,7 @@ public:
explicit OldThreadPool(const DoNotStartThreadsTag&,
int nThreads = 8,
const std::string& threadNamePrefix = "");
+ explicit OldThreadPool(ThreadPool::Options options);
std::size_t getNumThreads() const;
ThreadPool::Stats getStats() const;