diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2018-04-13 15:13:36 -0400 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2018-04-30 14:09:01 -0400 |
commit | a43fe9ae73752fbd98107cef5421341fe291ab32 (patch) | |
tree | 8972aabe0b36655e67c8b6c52fa2b9a2916d6eed | |
parent | 7a2217a54d59c5d97e9e79cc40639c2589a18deb (diff) | |
download | mongo-a43fe9ae73752fbd98107cef5421341fe291ab32.tar.gz |
SERVER-34204 Always pass non-null opCtx when scheduling getMores in ARM
-rw-r--r-- | buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml | 73 | ||||
-rw-r--r-- | etc/evergreen.yml | 20 | ||||
-rw-r--r-- | src/mongo/db/repl/base_cloner_test_fixture.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner.cpp | 34 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner_test.cpp | 155 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger.cpp | 50 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger.h | 24 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger_test.cpp | 224 | ||||
-rw-r--r-- | src/mongo/s/query/router_stage_merge.cpp | 12 |
9 files changed, 317 insertions, 291 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml new file mode 100644 index 00000000000..fd05367ac35 --- /dev/null +++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml @@ -0,0 +1,73 @@ +test_kind: js_test + +selector: + roots: + - jstests/change_streams/**/*.js + exclude_files: + # This test exercises an internal detail of mongos<->mongod communication and is not expected to + # work against a mongos. + - jstests/change_streams/report_latest_observed_oplog_timestamp.js + # TODO: SERVER-32088 should fix resuming a change stream when not all shards have data. + - jstests/change_streams/change_stream_shell_helper.js + - jstests/change_streams/include_cluster_time.js + - jstests/change_streams/lookup_post_image.js + exclude_with_any_tags: + ## + # The next three tags correspond to the special errors thrown by the + # set_read_and_write_concerns.js override when it refuses to replace the readConcern or + # writeConcern of a particular command. Above each tag are the message(s) that cause the tag to be + # warranted. + ## + # "Cowardly refusing to override read concern of command: ..." + - assumes_read_concern_unchanged + # "Cowardly refusing to override write concern of command: ..." + - assumes_write_concern_unchanged + # "Cowardly refusing to run test with overridden write concern when it uses a command that can + # only perform w=1 writes: ..." + - requires_eval_command + # Transactions not supported on sharded clusters. + - uses_transactions + +executor: + archive: + hooks: + - CheckReplDBHash + - ValidateCollections + config: + shell_options: + global_vars: + TestData: + defaultReadConcernLevel: majority + enableMajorityReadConcern: '' + eval: >- + var testingReplication = true; + load('jstests/libs/override_methods/set_read_and_write_concerns.js'); + load('jstests/libs/override_methods/enable_sessions.js'); + readMode: commands + hooks: + - class: CheckReplDBHash + - class: ValidateCollections + - class: CleanEveryN + n: 20 + fixture: + class: ShardedClusterFixture + # Use two shards to make sure we will only talk to the primary shard for the database and will + # not delay changes to wait for notifications or a clock advancement from other shards. + num_shards: 2 + mongos_options: + bind_ip_all: '' + set_parameters: + enableTestCommands: 1 + mongod_options: + bind_ip_all: '' + nopreallocj: '' + set_parameters: + enableTestCommands: 1 + numInitialSyncAttempts: 1 + periodicNoopIntervalSecs: 1 + writePeriodicNoops: true + num_rs_nodes_per_shard: 1 + # This test suite doesn't actually shard any collections, but enabling sharding will prevent + # read commands against non-existent databases from unconditionally returning a CursorId of 0. + enable_sharding: + - test diff --git a/etc/evergreen.yml b/etc/evergreen.yml index e65a88a99f1..ed1e3c18735 100644 --- a/etc/evergreen.yml +++ b/etc/evergreen.yml @@ -3603,6 +3603,17 @@ tasks: run_multiple_jobs: true - <<: *task_template + name: change_streams_mongos_sessions_passthrough + depends_on: + - name: change_streams + commands: + - func: "do setup" + - func: "run tests" + vars: + resmoke_args: --suites=change_streams_mongos_sessions_passthrough --storageEngine=wiredTiger + run_multiple_jobs: true + +- <<: *task_template name: change_streams_mongos_passthrough depends_on: - name: change_streams @@ -6529,6 +6540,7 @@ buildvariants: - name: auth - name: change_streams - name: change_streams_mongos_passthrough + - name: change_streams_mongos_sessions_passthrough - name: change_streams_secondary_reads - name: change_streams_sharded_collections_passthrough - name: change_streams_whole_db_passthrough @@ -6719,6 +6731,7 @@ buildvariants: - name: causally_consistent_jscore_passthrough_auth - name: change_streams - name: change_streams_mongos_passthrough + - name: change_streams_mongos_sessions_passthrough - name: change_streams_sharded_collections_passthrough - name: change_streams_whole_db_passthrough - name: change_streams_whole_db_mongos_passthrough @@ -7810,6 +7823,7 @@ buildvariants: - name: sharded_causally_consistent_jscore_passthrough - name: change_streams - name: change_streams_mongos_passthrough + - name: change_streams_mongos_sessions_passthrough - name: change_streams_sharded_collections_passthrough - name: change_streams_whole_db_passthrough - name: change_streams_whole_db_mongos_passthrough @@ -8625,6 +8639,7 @@ buildvariants: - name: sharded_causally_consistent_jscore_passthrough - name: change_streams - name: change_streams_mongos_passthrough + - name: change_streams_mongos_sessions_passthrough - name: change_streams_sharded_collections_passthrough - name: change_streams_whole_db_passthrough - name: change_streams_whole_db_mongos_passthrough @@ -9248,6 +9263,7 @@ buildvariants: - name: sharded_causally_consistent_jscore_passthrough - name: change_streams - name: change_streams_mongos_passthrough + - name: change_streams_mongos_sessions_passthrough - name: change_streams_secondary_reads - name: change_streams_sharded_collections_passthrough - name: change_streams_whole_db_passthrough @@ -9784,6 +9800,7 @@ buildvariants: - name: bulk_gle_passthrough - name: change_streams - name: change_streams_mongos_passthrough + - name: change_streams_mongos_sessions_passthrough - name: change_streams_secondary_reads - name: change_streams_sharded_collections_passthrough - name: change_streams_whole_db_passthrough @@ -11343,6 +11360,7 @@ buildvariants: - name: sharded_causally_consistent_jscore_passthrough - name: change_streams - name: change_streams_mongos_passthrough + - name: change_streams_mongos_sessions_passthrough - name: change_streams_sharded_collections_passthrough - name: change_streams_whole_db_passthrough - name: change_streams_whole_db_mongos_passthrough @@ -11974,6 +11992,7 @@ buildvariants: - name: sharded_causally_consistent_jscore_passthrough - name: change_streams - name: change_streams_mongos_passthrough + - name: change_streams_mongos_sessions_passthrough - name: change_streams_secondary_reads - name: change_streams_sharded_collections_passthrough - name: change_streams_whole_db_passthrough @@ -12147,6 +12166,7 @@ buildvariants: - name: sharded_causally_consistent_jscore_passthrough - name: change_streams - name: change_streams_mongos_passthrough + - name: change_streams_mongos_sessions_passthrough - name: change_streams_secondary_reads - name: change_streams_sharded_collections_passthrough - name: change_streams_whole_db_passthrough diff --git a/src/mongo/db/repl/base_cloner_test_fixture.cpp b/src/mongo/db/repl/base_cloner_test_fixture.cpp index c38663a54cf..8fb69472329 100644 --- a/src/mongo/db/repl/base_cloner_test_fixture.cpp +++ b/src/mongo/db/repl/base_cloner_test_fixture.cpp @@ -104,17 +104,30 @@ BSONObj BaseClonerTest::createListIndexesResponse(CursorId cursorId, const BSONA return createListIndexesResponse(cursorId, specs, "firstBatch"); } +namespace { +struct EnsureClientHasBeenInitialized : public executor::ThreadPoolMock::Options { + EnsureClientHasBeenInitialized() : executor::ThreadPoolMock::Options() { + onCreateThread = []() { Client::initThread("CollectionClonerTestThread"); }; + } +}; +} // namespace + BaseClonerTest::BaseClonerTest() - : _mutex(), _setStatusCondition(), _status(getDetectableErrorStatus()) {} + : ThreadPoolExecutorTest(EnsureClientHasBeenInitialized()), + _mutex(), + _setStatusCondition(), + _status(getDetectableErrorStatus()) {} void BaseClonerTest::setUp() { executor::ThreadPoolExecutorTest::setUp(); clear(); launchExecutorThread(); + Client::initThread("CollectionClonerTest"); ThreadPool::Options options; options.minThreads = 1U; options.maxThreads = 1U; + options.onCreateThread = [](StringData threadName) { Client::initThread(threadName); }; dbWorkThreadPool = stdx::make_unique<ThreadPool>(options); dbWorkThreadPool->startup(); @@ -128,6 +141,7 @@ void BaseClonerTest::tearDown() { storageInterface.reset(); dbWorkThreadPool.reset(); + Client::releaseCurrent(); } void BaseClonerTest::clear() { diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index 8aee68a4c58..553cc1d20bf 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -248,8 +248,17 @@ void CollectionCloner::shutdown() { void CollectionCloner::_cancelRemainingWork_inlock() { if (_arm) { - Client::initThreadIfNotAlready(); - _killArmHandle = _arm->kill(cc().getOperationContext()); + // This method can be called from a callback from either a TaskExecutor or a TaskRunner. The + // TaskExecutor should never have an OperationContext attached to the Client, and the + // TaskRunner should always have an OperationContext attached. Unfortunately, we don't know + // which situation we're in, so have to handle both. + auto& client = cc(); + if (auto opCtx = client.getOperationContext()) { + _killArmHandle = _arm->kill(opCtx); + } else { + auto newOpCtx = client.makeOperationContext(); + _killArmHandle = _arm->kill(newOpCtx.get()); + } } _countScheduler.shutdown(); _listIndexesFetcher.shutdown(); @@ -628,9 +637,10 @@ void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCa if (_collectionCloningBatchSize > 0) { armParams.setBatchSize(_collectionCloningBatchSize); } - Client::initThreadIfNotAlready(); - _arm = stdx::make_unique<AsyncResultsMerger>( - cc().getOperationContext(), _executor, std::move(armParams)); + auto opCtx = cc().makeOperationContext(); + _arm = stdx::make_unique<AsyncResultsMerger>(opCtx.get(), _executor, std::move(armParams)); + _arm->detachFromOperationContext(); + opCtx.reset(); // This completion guard invokes _finishCallback on destruction. auto cancelRemainingWorkInLock = [this]() { _cancelRemainingWork_inlock(); }; @@ -643,7 +653,6 @@ void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCa // outside the mutex. This is a necessary condition to invoke _finishCallback. stdx::lock_guard<stdx::mutex> lock(_mutex); Status scheduleStatus = _scheduleNextARMResultsCallback(onCompletionGuard); - _arm->detachFromOperationContext(); if (!scheduleStatus.isOK()) { onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, scheduleStatus); return; @@ -667,9 +676,10 @@ StatusWith<std::vector<BSONElement>> CollectionCloner::_parseParallelCollectionS } Status CollectionCloner::_bufferNextBatchFromArm(WithLock lock) { - Client::initThreadIfNotAlready(); - auto opCtx = cc().getOperationContext(); - _arm->reattachToOperationContext(opCtx); + // We expect this callback to execute in a thread from a TaskExecutor which will not have an + // OperationContext populated. We must make one ourselves. + auto opCtx = cc().makeOperationContext(); + _arm->reattachToOperationContext(opCtx.get()); while (_arm->ready()) { auto armResultStatus = _arm->nextReady(); if (!armResultStatus.getStatus().isOK()) { @@ -690,8 +700,10 @@ Status CollectionCloner::_bufferNextBatchFromArm(WithLock lock) { Status CollectionCloner::_scheduleNextARMResultsCallback( std::shared_ptr<OnCompletionGuard> onCompletionGuard) { - Client::initThreadIfNotAlready(); - _arm->reattachToOperationContext(cc().getOperationContext()); + // We expect this callback to execute in a thread from a TaskExecutor which will not have an + // OperationContext populated. We must make one ourselves. + auto opCtx = cc().makeOperationContext(); + _arm->reattachToOperationContext(opCtx.get()); auto nextEvent = _arm->nextEvent(); _arm->detachFromOperationContext(); if (!nextEvent.isOK()) { diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp index 8322d29fdf3..9f7a31e6a93 100644 --- a/src/mongo/db/repl/collection_cloner_test.cpp +++ b/src/mongo/db/repl/collection_cloner_test.cpp @@ -1133,68 +1133,6 @@ TEST_F(CollectionClonerTest, LastBatchContainsNoDocuments) { ASSERT_FALSE(collectionCloner->isActive()); } -TEST_F(CollectionClonerTest, MiddleBatchContainsNoDocuments) { - ASSERT_OK(collectionCloner->startup()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCountResponse(0)); - processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); - } - ASSERT_TRUE(collectionCloner->isActive()); - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - ASSERT_TRUE(collectionStats.initCalled); - - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - - const BSONObj doc = BSON("_id" << 1); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, BSON_ARRAY(doc))); - } - - collectionCloner->waitForDbWorker(); - ASSERT_EQUALS(1, collectionStats.insertCount); - - ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray, "nextBatch")); - } - - collectionCloner->waitForDbWorker(); - ASSERT_EQUALS(1, collectionStats.insertCount); - - ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); - ASSERT_TRUE(collectionCloner->isActive()); - - const BSONObj doc2 = BSON("_id" << 2); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc2))); - } - - collectionCloner->join(); - - ASSERT_EQUALS(2, collectionStats.insertCount); - ASSERT_TRUE(collectionStats.commitCalled); - - ASSERT_OK(getStatus()); - ASSERT_FALSE(collectionCloner->isActive()); -} - TEST_F(CollectionClonerTest, CollectionClonerTransitionsToCompleteIfShutdownBeforeStartup) { collectionCloner->shutdown(); ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, collectionCloner->startup()); @@ -1975,99 +1913,6 @@ TEST_F(ParallelCollectionClonerTest, InsertDocumentsWithMultipleCursorsOfDiffere ASSERT_FALSE(collectionCloner->isActive()); } -TEST_F(ParallelCollectionClonerTest, MiddleBatchContainsNoDocumentsWithMultipleCursors) { - ASSERT_OK(collectionCloner->startup()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCountResponse(0)); - processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); - } - ASSERT_TRUE(collectionCloner->isActive()); - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - ASSERT_TRUE(collectionStats.initCalled); - - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray) - << createCursorResponse(2, emptyArray) - << createCursorResponse(3, emptyArray)) - << "ok" - << 1)); - } - collectionCloner->waitForDbWorker(); - - auto exec = &getExecutor(); - std::vector<BSONObj> docs; - // Record the buffered documents before they are inserted so we can - // validate them. - collectionCloner->setScheduleDbWorkFn_forTest( - [&](const executor::TaskExecutor::CallbackFn& workFn) { - auto buffered = collectionCloner->getDocumentsToInsert_forTest(); - docs.insert(docs.end(), buffered.begin(), buffered.end()); - return exec->scheduleWork(workFn); - }); - - ASSERT_TRUE(collectionCloner->isActive()); - - int numDocs = 6; - std::vector<BSONObj> generatedDocs = generateDocs(numDocs); - const BSONObj doc = BSON("_id" << 1); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, BSON_ARRAY(generatedDocs[0]))); - processNetworkResponse(createCursorResponse(2, BSON_ARRAY(generatedDocs[1]))); - processNetworkResponse(createCursorResponse(3, BSON_ARRAY(generatedDocs[2]))); - } - - collectionCloner->waitForDbWorker(); - ASSERT_EQUALS(3U, docs.size()); - for (int i = 0; i < 3; i++) { - ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]); - } - ASSERT_EQUALS(3, collectionStats.insertCount); - - ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray, "nextBatch")); - processNetworkResponse(createCursorResponse(2, emptyArray, "nextBatch")); - processNetworkResponse(createCursorResponse(3, emptyArray, "nextBatch")); - } - - collectionCloner->waitForDbWorker(); - ASSERT_EQUALS(3, collectionStats.insertCount); - - ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[3]))); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[4]))); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[5]))); - } - - collectionCloner->join(); - - ASSERT_EQUALS(6U, docs.size()); - for (int i = 3; i < 6; i++) { - ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]); - } - - ASSERT_EQUALS(numDocs, collectionStats.insertCount); - ASSERT_TRUE(collectionStats.commitCalled); - - ASSERT_OK(getStatus()); - ASSERT_FALSE(collectionCloner->isActive()); -} - TEST_F(ParallelCollectionClonerTest, LastBatchContainsNoDocumentsWithMultipleCursors) { ASSERT_OK(collectionCloner->startup()); ASSERT_TRUE(collectionCloner->isActive()); diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 3e41a36c089..f5268ac3408 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -335,6 +335,7 @@ ClusterQueryResult AsyncResultsMerger::_nextReadyUnsorted(WithLock) { } Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) { + invariant(_opCtx, "Cannot schedule a getMore without an OperationContext"); auto& remote = _remotes[remoteIndex]; invariant(!remote.cbHandle.isValid()); @@ -387,6 +388,32 @@ Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) { return Status::OK(); } +Status AsyncResultsMerger::scheduleGetMores() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _scheduleGetMores(lk); +} + +Status AsyncResultsMerger::_scheduleGetMores(WithLock lk) { + // Schedule remote work on hosts for which we need more results. + for (size_t i = 0; i < _remotes.size(); ++i) { + auto& remote = _remotes[i]; + + if (!remote.status.isOK()) { + return remote.status; + } + + if (!remote.hasNext() && !remote.exhausted() && !remote.cbHandle.isValid()) { + // If this remote is not exhausted and there is no outstanding request for it, schedule + // work to retrieve the next batch. + auto nextBatchStatus = _askForNextBatch(lk, i); + if (!nextBatchStatus.isOK()) { + return nextBatchStatus; + } + } + } + return Status::OK(); +} + /* * Note: When nextEvent() is called to do retries, only the remotes with retriable errors will * be rescheduled because: @@ -411,22 +438,9 @@ StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent() "nextEvent() called before an outstanding event was signaled"); } - // Schedule remote work on hosts for which we need more results. - for (size_t i = 0; i < _remotes.size(); ++i) { - auto& remote = _remotes[i]; - - if (!remote.status.isOK()) { - return remote.status; - } - - if (!remote.hasNext() && !remote.exhausted() && !remote.cbHandle.isValid()) { - // If this remote is not exhausted and there is no outstanding request for it, schedule - // work to retrieve the next batch. - auto nextBatchStatus = _askForNextBatch(lk, i); - if (!nextBatchStatus.isOK()) { - return nextBatchStatus; - } - } + auto getMoresStatus = _scheduleGetMores(lk); + if (!getMoresStatus.isOK()) { + return getMoresStatus; } auto eventStatus = _executor->makeEvent(); @@ -592,9 +606,11 @@ void AsyncResultsMerger::_processBatchResults(WithLock lk, if (_tailableMode == TailableModeEnum::kTailable && !remote.hasNext()) { invariant(_remotes.size() == 1); _eofNext = true; - } else if (!remote.hasNext() && !remote.exhausted() && _lifecycleState == kAlive) { + } else if (!remote.hasNext() && !remote.exhausted() && _lifecycleState == kAlive && _opCtx) { // If this is normal or tailable-awaitData cursor and we still don't have anything buffered // after receiving this batch, we can schedule work to retrieve the next batch right away. + // Be careful only to do this when '_opCtx' is non-null, since it is illegal to schedule a + // remote command on a user's behalf without a non-null OperationContext. remote.status = _askForNextBatch(lk, remoteIndex); } } diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h index 1653374b5bc..5f8a18194d2 100644 --- a/src/mongo/s/query/async_results_merger.h +++ b/src/mongo/s/query/async_results_merger.h @@ -192,6 +192,24 @@ public: StatusWith<executor::TaskExecutor::EventHandle> nextEvent(); /** + * Schedules a getMore on any remote hosts which: + * - Do not have an error status set already. + * - Don't already have a request outstanding. + * - We don't currently have any results buffered. + * - Are not exhausted (have a non-zero cursor id). + * Returns an error if any of the remotes responded with an error, or if we encounter an error + * while scheduling the getMore requests.. + * + * In most cases users should call nextEvent() instead of this method, but this can be necessary + * if the caller of nextEvent() calls detachFromOperationContext() before the event is signaled. + * In such cases, the ARM cannot schedule getMores itself, and will need to be manually prompted + * after calling reattachToOperationContext(). + * + * It is illegal to call this method if the ARM is not attached to an OperationContext. + */ + Status scheduleGetMores(); + + /** * Adds the specified shard cursors to the set of cursors to be merged. The results from the * new cursors will be returned as normal through nextReady(). */ @@ -393,6 +411,12 @@ private: */ bool _haveOutstandingBatchRequests(WithLock); + + /** + * Schedules a getMore on any remote hosts which we need another batch from. + */ + Status _scheduleGetMores(WithLock); + /** * Schedules a killCursors command to be run on all remote hosts that have open cursors. */ diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index 586b387ad96..662d78e6dd0 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -172,11 +172,12 @@ protected: /** * Schedules a list of cursor responses to be returned by the mock network. */ - void scheduleNetworkResponses(std::vector<CursorResponse> responses, - CursorResponse::ResponseType responseType) { + void scheduleNetworkResponses(std::vector<CursorResponse> responses) { std::vector<BSONObj> objs; for (const auto& cursorResponse : responses) { - objs.push_back(cursorResponse.toBSON(responseType)); + // For tests of the AsyncResultsMerger, all CursorRepsonses scheduled by the tests are + // subsequent responses, since the AsyncResultsMerger will only ever run getMores. + objs.push_back(cursorResponse.toBSON(CursorResponse::ResponseType::SubsequentResponse)); } scheduleNetworkResponseObjs(objs); } @@ -282,8 +283,7 @@ TEST_F(AsyncResultsMergerTest, SingleShardUnsorted) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")}; responses.emplace_back(kTestNss, CursorId(0), batch); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); // Now that the responses have been delivered, ARM is ready to return results. ASSERT_TRUE(arm->ready()); @@ -327,8 +327,7 @@ TEST_F(AsyncResultsMergerTest, SingleShardSorted) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{$sortKey: {'': 5}}"), fromjson("{$sortKey: {'': 6}}")}; responses.emplace_back(kTestNss, CursorId(0), batch); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); // Now that the responses have been delivered, ARM is ready to return results. ASSERT_TRUE(arm->ready()); @@ -374,8 +373,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardUnsorted) { std::vector<BSONObj> batch1 = { fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")}; responses.emplace_back(kTestNss, CursorId(0), batch1); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); // ARM is ready to return first result. ASSERT_TRUE(arm->ready()); @@ -402,8 +400,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardUnsorted) { std::vector<BSONObj> batch2 = { fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")}; responses.emplace_back(kTestNss, CursorId(0), batch2); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); // ARM is ready to return remaining results. ASSERT_TRUE(arm->ready()); @@ -448,8 +445,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardSorted) { std::vector<BSONObj> batch1 = {fromjson("{$sortKey: {'': 5}}"), fromjson("{$sortKey: {'': 6}}")}; responses.emplace_back(kTestNss, CursorId(0), batch1); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); // ARM is not ready to return results until receiving responses from all remotes. ASSERT_FALSE(arm->ready()); @@ -462,8 +458,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardSorted) { std::vector<BSONObj> batch2 = {fromjson("{$sortKey: {'': 3}}"), fromjson("{$sortKey: {'': 9}}")}; responses.emplace_back(kTestNss, CursorId(0), batch2); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); // Now that all remotes have responded, ARM is ready to return results. ASSERT_TRUE(arm->ready()); @@ -509,8 +504,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) { std::vector<BSONObj> batch1 = { fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")}; responses.emplace_back(kTestNss, CursorId(5), batch1); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); // ARM is ready to return first result. ASSERT_TRUE(arm->ready()); @@ -538,8 +532,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) { std::vector<BSONObj> batch2 = { fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")}; responses.emplace_back(kTestNss, CursorId(0), batch2); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); // ARM is ready to return second shard's results. ASSERT_TRUE(arm->ready()); @@ -566,8 +559,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) { std::vector<BSONObj> batch3 = { fromjson("{_id: 7}"), fromjson("{_id: 8}"), fromjson("{_id: 9}")}; responses.emplace_back(kTestNss, CursorId(0), batch3); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); // ARM is ready to return remaining results. ASSERT_TRUE(arm->ready()); @@ -614,8 +606,7 @@ TEST_F(AsyncResultsMergerTest, CompoundSortKey) { std::vector<BSONObj> batch3 = {fromjson("{$sortKey: {'': 10, '': 12}}"), fromjson("{$sortKey: {'': 5, '': 9}}")}; responses.emplace_back(kTestNss, CursorId(0), batch3); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); // ARM returns all results in sorted order. @@ -660,8 +651,7 @@ TEST_F(AsyncResultsMergerTest, SortedButNoSortKey) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{a: 2, b: 1}"), fromjson("{a: 1, b: 2}")}; responses.emplace_back(kTestNss, CursorId(1), batch1); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -709,8 +699,7 @@ TEST_F(AsyncResultsMergerTest, HasFirstBatch) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")}; responses.emplace_back(kTestNss, CursorId(0), batch); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); // Now that the responses have been delivered, ARM is ready to return results. ASSERT_TRUE(arm->ready()); @@ -769,8 +758,7 @@ TEST_F(AsyncResultsMergerTest, OneShardHasInitialBatchOtherShardExhausted) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")}; responses.emplace_back(kTestNss, CursorId(0), batch); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); // Now that the responses have been delivered, ARM is ready to return results. ASSERT_TRUE(arm->ready()); @@ -810,8 +798,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { responses.emplace_back(kTestNss, CursorId(1), batch1); std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; responses.emplace_back(kTestNss, CursorId(2), batch2); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -832,8 +819,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { responses.clear(); std::vector<BSONObj> batch3 = {fromjson("{_id: 5}"), fromjson("{_id: 6}")}; responses.emplace_back(kTestNss, CursorId(1), batch3); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); blackHoleNextRequest(); executor()->waitForEvent(readyEvent); @@ -850,8 +836,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { responses.clear(); std::vector<BSONObj> batch4 = {fromjson("{_id: 7}"), fromjson("{_id: 8}")}; responses.emplace_back(kTestNss, CursorId(0), batch4); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -881,8 +866,7 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")}; responses.emplace_back(kTestNss, CursorId(456), batch); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -945,8 +929,7 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) { responses.emplace_back(kTestNss, CursorId(1), batch1); std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; responses.emplace_back(kTestNss, CursorId(2), batch2); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); scheduleErrorResponse({ErrorCodes::BadValue, "bad thing happened"}); executor()->waitForEvent(readyEvent); @@ -977,8 +960,7 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(kTestNss, CursorId(0), batch); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -1065,8 +1047,7 @@ TEST_F(AsyncResultsMergerTest, KillAllRemotesExhausted) { responses.emplace_back(kTestNss, CursorId(0), batch2); std::vector<BSONObj> batch3 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; responses.emplace_back(kTestNss, CursorId(0), batch3); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); auto killedEvent = arm->kill(operationContext()); @@ -1100,8 +1081,7 @@ TEST_F(AsyncResultsMergerTest, KillNonExhaustedCursorWithoutPendingRequest) { // Cursor 3 is not exhausted. std::vector<BSONObj> batch3 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; responses.emplace_back(kTestNss, CursorId(123), batch3); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); auto killedEvent = arm->kill(operationContext()); @@ -1130,8 +1110,7 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(kTestNss, CursorId(0), batch1); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); // Kill event will only be signalled once the callbacks for the pending batches have run. auto killedEvent = arm->kill(operationContext()); @@ -1161,8 +1140,7 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(kTestNss, CursorId(1), batch1); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); auto killedEvent = arm->kill(operationContext()); @@ -1199,8 +1177,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(kTestNss, CursorId(123), batch1); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -1220,8 +1197,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) { responses.clear(); std::vector<BSONObj> batch2 = {fromjson("{_id: 3}")}; responses.emplace_back(kTestNss, CursorId(123), batch2); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -1250,8 +1226,7 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch; responses.emplace_back(kTestNss, CursorId(123), batch); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); // After receiving an empty batch, the ARM should return boost::none, but remotes should not be @@ -1279,8 +1254,7 @@ TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch; responses.emplace_back(kTestNss, CursorId(0), batch); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); // Afterwards, the ARM should return boost::none and remote cursors should be marked as @@ -1305,8 +1279,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) { std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(kTestNss, CursorId(1), batch1); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -1326,8 +1299,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) { ASSERT_OK(request.getStatus()); ASSERT_EQ(*request.getValue().batchSize, 1LL); ASSERT_EQ(request.getValue().cursorid, 1LL); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -1362,8 +1334,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) { responses.emplace_back(kTestNss, CursorId(98), batch1); std::vector<BSONObj> batch2 = {fromjson("{_id: 2}")}; responses.emplace_back(kTestNss, CursorId(99), batch2); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -1383,8 +1354,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) { responses.clear(); std::vector<BSONObj> batch3 = {fromjson("{_id: 3}")}; responses.emplace_back(kTestNss, CursorId(99), batch3); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -1398,8 +1368,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) { responses.clear(); std::vector<BSONObj> batch4 = {}; responses.emplace_back(kTestNss, CursorId(0), batch4); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -1420,8 +1389,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(kTestNss, CursorId(98), batch); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -1457,8 +1425,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsOnRetriableErrorNoRetries) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 1}")}; responses.emplace_back(kTestNss, CursorId(0), batch); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); // From the second host we get a network (retriable) error. scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"}); @@ -1516,8 +1483,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}")}; responses.emplace_back(kTestNss, CursorId(123), batch1); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -1538,8 +1504,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) { responses.clear(); std::vector<BSONObj> batch2 = {fromjson("{_id: 2}")}; responses.emplace_back(kTestNss, CursorId(123), batch2); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -1559,8 +1524,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) { responses.clear(); std::vector<BSONObj> batch3 = {fromjson("{_id: 3}")}; responses.emplace_back(kTestNss, CursorId(0), batch3); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); } TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHasNoOplogTimestamp) { @@ -1588,8 +1552,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHas "$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")}; const Timestamp lastObservedFirstCursor = Timestamp(1, 6); responses.emplace_back(kTestNss, CursorId(123), batch1, boost::none, lastObservedFirstCursor); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); // Still shouldn't be ready, we don't have a guarantee from each shard. ASSERT_FALSE(arm->ready()); @@ -1601,8 +1564,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHas "$sortKey: {'': Timestamp(1, 5), '': 1, '': 2}}")}; const Timestamp lastObservedSecondCursor = Timestamp(1, 5); responses.emplace_back(kTestNss, CursorId(456), batch2, boost::none, lastObservedSecondCursor); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_BSONOBJ_EQ( @@ -1621,13 +1583,11 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHas responses.clear(); std::vector<BSONObj> batch3 = {}; responses.emplace_back(kTestNss, CursorId(0), batch3); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); responses.clear(); std::vector<BSONObj> batch4 = {}; responses.emplace_back(kTestNss, CursorId(0), batch4); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); } TEST_F(AsyncResultsMergerTest, @@ -1662,8 +1622,7 @@ TEST_F(AsyncResultsMergerTest, std::vector<CursorResponse> responses; std::vector<BSONObj> batch3 = {}; responses.emplace_back(kTestNss, CursorId(0), batch3, boost::none, Timestamp(1, 8)); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(unittest::assertGet(arm->nextEvent())); ASSERT_TRUE(arm->ready()); ASSERT_BSONOBJ_EQ( @@ -1678,8 +1637,7 @@ TEST_F(AsyncResultsMergerTest, responses.clear(); std::vector<BSONObj> batch4 = {}; responses.emplace_back(kTestNss, CursorId(0), batch4); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); } TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneRemoteHasLowerOplogTime) { @@ -1713,8 +1671,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneRemoteHasLowerOp // Clean up the cursors. std::vector<CursorResponse> responses; responses.emplace_back(kTestNss, CursorId(0), std::vector<BSONObj>{}); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); auto killEvent = arm->kill(operationContext()); executor()->waitForEvent(killEvent); } @@ -1742,8 +1699,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting) "$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")}; const Timestamp lastObservedFirstCursor = Timestamp(1, 6); responses.emplace_back(kTestNss, CursorId(123), batch1, boost::none, lastObservedFirstCursor); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); // Should be ready now. ASSERT_TRUE(arm->ready()); @@ -1765,8 +1721,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting) "$sortKey: {'': Timestamp(1, 5), '': 1, '': 2}}")}; const Timestamp lastObservedSecondCursor = Timestamp(1, 5); responses.emplace_back(kTestNss, CursorId(456), batch2, boost::none, lastObservedSecondCursor); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_BSONOBJ_EQ( @@ -1785,13 +1740,11 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting) responses.clear(); std::vector<BSONObj> batch3 = {}; responses.emplace_back(kTestNss, CursorId(0), batch3); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); responses.clear(); std::vector<BSONObj> batch4 = {}; responses.emplace_back(kTestNss, CursorId(0), batch4); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); } TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting) { @@ -1817,8 +1770,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting "$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")}; const Timestamp lastObservedFirstCursor = Timestamp(1, 6); responses.emplace_back(kTestNss, CursorId(123), batch1, boost::none, lastObservedFirstCursor); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); // Should be ready now. ASSERT_TRUE(arm->ready()); @@ -1842,8 +1794,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting // from it. const Timestamp lastObservedSecondCursor = Timestamp(1, 5); responses.emplace_back(kTestNss, CursorId(456), batch2, boost::none, lastObservedSecondCursor); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_BSONOBJ_EQ( @@ -1862,13 +1813,11 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting responses.clear(); std::vector<BSONObj> batch3 = {}; responses.emplace_back(kTestNss, CursorId(0), batch3); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); responses.clear(); std::vector<BSONObj> batch4 = {}; responses.emplace_back(kTestNss, CursorId(0), batch4); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); } TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) { @@ -2126,5 +2075,66 @@ DEATH_TEST_F(AsyncResultsMergerTest, stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params)); } +DEATH_TEST_F(AsyncResultsMergerTest, + ShouldFailIfAskedToPerformGetMoresWithoutAnOpCtx, + "Cannot schedule a getMore without an OperationContext") { + BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true, awaitData: true}"); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); + + ASSERT_FALSE(arm->ready()); + arm->detachFromOperationContext(); + arm->scheduleGetMores().ignore(); // Should crash. +} + +TEST_F(AsyncResultsMergerTest, ShouldNotScheduleGetMoresWithoutAnOperationContext) { + BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true, awaitData: true}"); + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); + + ASSERT_FALSE(arm->ready()); + auto readyEvent = unittest::assertGet(arm->nextEvent()); + ASSERT_FALSE(arm->ready()); + + // While detached from the OperationContext, schedule an empty batch response. Because the + // response is empty and this is a tailable cursor, the ARM will need to run another getMore on + // that host, but it should not schedule this without a non-null OperationContext. + arm->detachFromOperationContext(); + { + std::vector<CursorResponse> responses; + std::vector<BSONObj> emptyBatch; + responses.emplace_back(kTestNss, CursorId(123), emptyBatch); + scheduleNetworkResponses(std::move(responses)); + } + + ASSERT_FALSE(arm->ready()); + ASSERT_FALSE(networkHasReadyRequests()); // Tests that we haven't asked for the next batch yet. + + // After manually requesting the next getMore, the ARM should be ready. + arm->reattachToOperationContext(operationContext()); + ASSERT_OK(arm->scheduleGetMores()); + + // Schedule the next getMore response. + { + std::vector<CursorResponse> responses; + std::vector<BSONObj> nonEmptyBatch = {fromjson("{_id: 1}")}; + responses.emplace_back(kTestNss, CursorId(123), nonEmptyBatch); + scheduleNetworkResponses(std::move(responses)); + } + + ASSERT_TRUE(arm->ready()); + ASSERT_FALSE(arm->remotesExhausted()); + ASSERT_BSONOBJ_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult()); + ASSERT_FALSE(arm->ready()); + ASSERT_FALSE(arm->remotesExhausted()); + + auto killedEvent = arm->kill(operationContext()); + executor()->waitForEvent(killedEvent); +} + } // namespace } // namespace mongo diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp index 48abb1452ec..967c9f60b35 100644 --- a/src/mongo/s/query/router_stage_merge.cpp +++ b/src/mongo/s/query/router_stage_merge.cpp @@ -89,6 +89,18 @@ StatusWith<EventHandle> RouterStageMerge::getNextEvent() { // If we abandoned a previous event due to a mongoS-side timeout, wait for it first. if (_leftoverEventFromLastTimeout) { invariant(_params->tailableMode == TailableModeEnum::kTailableAndAwaitData); + // If we have an outstanding event from last time, then we might have to manually schedule + // some getMores for the cursors. If a remote response came back while we were between + // getMores (from the user to mongos), the response may have been an empty batch, and the + // ARM would not be able to ask for the next batch immediately since it is not attached to + // an OperationContext. Now that we have a valid OperationContext, we schedule the getMores + // ourselves. + Status getMoreStatus = _arm.scheduleGetMores(); + if (!getMoreStatus.isOK()) { + return getMoreStatus; + } + + // Return the leftover event and clear '_leftoverEventFromLastTimeout'. auto event = _leftoverEventFromLastTimeout; _leftoverEventFromLastTimeout = EventHandle(); return event; |