summaryrefslogtreecommitdiff
path: root/src/mongo/s/query
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/query')
-rw-r--r--src/mongo/s/query/SConscript13
-rw-r--r--src/mongo/s/query/async_results_merger.cpp39
-rw-r--r--src/mongo/s/query/async_results_merger.h17
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp280
-rw-r--r--src/mongo/s/query/blocking_results_merger.cpp140
-rw-r--r--src/mongo/s/query/blocking_results_merger.h113
-rw-r--r--src/mongo/s/query/blocking_results_merger_test.cpp119
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp88
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.h14
-rw-r--r--src/mongo/s/query/cluster_client_cursor_params.h9
-rw-r--r--src/mongo/s/query/document_source_router_adapter.cpp83
-rw-r--r--src/mongo/s/query/document_source_router_adapter.h79
-rw-r--r--src/mongo/s/query/results_merger_test_fixture.cpp76
-rw-r--r--src/mongo/s/query/results_merger_test_fixture.h228
-rw-r--r--src/mongo/s/query/router_stage_merge.cpp132
-rw-r--r--src/mongo/s/query/router_stage_merge.h64
-rw-r--r--src/mongo/s/query/router_stage_pipeline.cpp31
-rw-r--r--src/mongo/s/query/router_stage_pipeline.h11
-rw-r--r--src/mongo/s/query/router_stage_update_on_add_shard.cpp122
-rw-r--r--src/mongo/s/query/router_stage_update_on_add_shard.h68
20 files changed, 755 insertions, 971 deletions
diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript
index f153381d12c..ae0c24ad21e 100644
--- a/src/mongo/s/query/SConscript
+++ b/src/mongo/s/query/SConscript
@@ -40,7 +40,6 @@ env.Library(
"cluster_client_cursor_impl.cpp",
],
LIBDEPS=[
- "$BUILD_DIR/mongo/db/pipeline/pipeline",
"router_exec_stage",
],
)
@@ -48,20 +47,14 @@ env.Library(
env.Library(
target="router_exec_stage",
source=[
- "document_source_router_adapter.cpp",
"router_stage_limit.cpp",
- "router_stage_merge.cpp",
"router_stage_mock.cpp",
"router_stage_pipeline.cpp",
"router_stage_remove_metadata_fields.cpp",
"router_stage_skip.cpp",
- "router_stage_update_on_add_shard.cpp",
],
LIBDEPS=[
- "$BUILD_DIR/mongo/db/query/query_common",
"async_results_merger",
- ],
- LIBDEPS_PRIVATE=[
"$BUILD_DIR/mongo/db/pipeline/pipeline",
],
)
@@ -83,11 +76,13 @@ env.Library(
target="async_results_merger",
source=[
"async_results_merger.cpp",
+ "blocking_results_merger.cpp",
"establish_cursors.cpp",
env.Idlc('async_results_merger_params.idl')[0],
],
LIBDEPS=[
"$BUILD_DIR/mongo/db/query/command_request_response",
+ "$BUILD_DIR/mongo/db/query/query_common",
"$BUILD_DIR/mongo/executor/task_executor_interface",
"$BUILD_DIR/mongo/s/async_requests_sender",
"$BUILD_DIR/mongo/s/client/sharding_client",
@@ -106,9 +101,11 @@ env.Library(
)
env.CppUnitTest(
- target="async_results_merger_test",
+ target="results_merger_test",
source=[
+ "blocking_results_merger_test.cpp",
"async_results_merger_test.cpp",
+ "results_merger_test_fixture.cpp",
],
LIBDEPS=[
'async_results_merger',
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index f5268ac3408..3cc6756c843 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -88,8 +88,7 @@ AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx,
// This strange initialization is to work around the fact that the IDL does not currently
// support a default value for an enum. The default tailable mode should be 'kNormal', but
// since that is not supported we treat boost::none (unspecified) to mean 'kNormal'.
- _tailableMode(params.getTailableMode() ? *params.getTailableMode()
- : TailableModeEnum::kNormal),
+ _tailableMode(params.getTailableMode().value_or(TailableModeEnum::kNormal)),
_params(std::move(params)),
_mergeQueue(MergingComparator(_remotes,
_params.getSort() ? *_params.getSort() : BSONObj(),
@@ -116,12 +115,12 @@ AsyncResultsMerger::~AsyncResultsMerger() {
invariant(_remotesExhausted(lk) || _lifecycleState == kKillComplete);
}
-bool AsyncResultsMerger::remotesExhausted() {
+bool AsyncResultsMerger::remotesExhausted() const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
return _remotesExhausted(lk);
}
-bool AsyncResultsMerger::_remotesExhausted(WithLock) {
+bool AsyncResultsMerger::_remotesExhausted(WithLock) const {
for (const auto& remote : _remotes) {
if (!remote.exhausted()) {
return false;
@@ -769,36 +768,4 @@ bool AsyncResultsMerger::MergingComparator::operator()(const size_t& lhs, const
_sort) > 0;
}
-void AsyncResultsMerger::blockingKill(OperationContext* opCtx) {
- auto killEvent = kill(opCtx);
- if (!killEvent) {
- // We are shutting down.
- return;
- }
- _executor->waitForEvent(killEvent);
-}
-
-StatusWith<ClusterQueryResult> AsyncResultsMerger::blockingNext() {
- while (!ready()) {
- auto nextEventStatus = nextEvent();
- if (!nextEventStatus.isOK()) {
- return nextEventStatus.getStatus();
- }
- auto event = nextEventStatus.getValue();
-
- // Block until there are further results to return.
- auto status = _executor->waitForEvent(_opCtx, event);
-
- if (!status.isOK()) {
- return status.getStatus();
- }
-
- // We have not provided a deadline, so if the wait returns without interruption, we do not
- // expect to have timed out.
- invariant(status.getValue() == stdx::cv_status::no_timeout);
- }
-
- return nextReady();
-}
-
} // namespace mongo
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index 5f8a18194d2..488e03d2ee5 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -109,7 +109,7 @@ public:
/**
* Returns true if all of the remote cursors are exhausted.
*/
- bool remotesExhausted();
+ bool remotesExhausted() const;
/**
* Sets the maxTimeMS value that the ARM should forward with any internally issued getMore
@@ -167,12 +167,6 @@ public:
StatusWith<ClusterQueryResult> nextReady();
/**
- * Blocks until the next result is ready, all remote cursors are exhausted, or there is an
- * error.
- */
- StatusWith<ClusterQueryResult> blockingNext();
-
- /**
* Schedules remote work as required in order to make further results available. If there is an
* error in scheduling this work, returns a non-ok status. On success, returns an event handle.
* The caller can pass this event handle to 'executor' in order to be blocked until further
@@ -238,11 +232,6 @@ public:
*/
executor::TaskExecutor::EventHandle kill(OperationContext* opCtx);
- /**
- * A blocking version of kill() that will not return until this is safe to destroy.
- */
- void blockingKill(OperationContext*);
-
private:
/**
* We instantiate one of these per remote host. It contains the buffer of results we've
@@ -346,7 +335,7 @@ private:
/**
* Checks whether or not the remote cursors are all exhausted.
*/
- bool _remotesExhausted(WithLock);
+ bool _remotesExhausted(WithLock) const;
//
// Helpers for ready().
@@ -433,7 +422,7 @@ private:
AsyncResultsMergerParams _params;
// Must be acquired before accessing any data members (other than _params, which is read-only).
- stdx::mutex _mutex;
+ mutable stdx::mutex _mutex;
// Data tracking the state of our communication with each of the remote nodes.
std::vector<RemoteCursorData> _remotes;
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index 7960d22f018..b852cf33f79 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -30,18 +30,13 @@
#include "mongo/s/query/async_results_merger.h"
-#include "mongo/client/remote_command_targeter_factory_mock.h"
-#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/db/json.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/query/getmore_request.h"
#include "mongo/db/query/query_request.h"
-#include "mongo/executor/network_interface_mock.h"
#include "mongo/executor/task_executor.h"
-#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
-#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/client/shard_registry.h"
-#include "mongo/s/sharding_router_test_fixture.h"
+#include "mongo/s/query/results_merger_test_fixture.h"
#include "mongo/stdx/memory.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
@@ -50,212 +45,11 @@ namespace mongo {
namespace {
-using executor::NetworkInterfaceMock;
-using executor::RemoteCommandRequest;
-using executor::RemoteCommandResponse;
-
-using ResponseStatus = executor::TaskExecutor::ResponseStatus;
-
-const HostAndPort kTestConfigShardHost = HostAndPort("FakeConfigHost", 12345);
-const std::vector<ShardId> kTestShardIds = {
- ShardId("FakeShard1"), ShardId("FakeShard2"), ShardId("FakeShard3")};
-const std::vector<HostAndPort> kTestShardHosts = {HostAndPort("FakeShard1Host", 12345),
- HostAndPort("FakeShard2Host", 12345),
- HostAndPort("FakeShard3Host", 12345)};
-
-const NamespaceString kTestNss("testdb.testcoll");
-
LogicalSessionId parseSessionIdFromCmd(BSONObj cmdObj) {
return LogicalSessionId::parse(IDLParserErrorContext("lsid"), cmdObj["lsid"].Obj());
}
-class AsyncResultsMergerTest : public ShardingTestFixture {
-public:
- AsyncResultsMergerTest() {}
-
- void setUp() override {
- setRemote(HostAndPort("ClientHost", 12345));
-
- configTargeter()->setFindHostReturnValue(kTestConfigShardHost);
-
- std::vector<ShardType> shards;
-
- for (size_t i = 0; i < kTestShardIds.size(); i++) {
- ShardType shardType;
- shardType.setName(kTestShardIds[i].toString());
- shardType.setHost(kTestShardHosts[i].toString());
-
- shards.push_back(shardType);
-
- std::unique_ptr<RemoteCommandTargeterMock> targeter(
- stdx::make_unique<RemoteCommandTargeterMock>());
- targeter->setConnectionStringReturnValue(ConnectionString(kTestShardHosts[i]));
- targeter->setFindHostReturnValue(kTestShardHosts[i]);
-
- targeterFactory()->addTargeterToReturn(ConnectionString(kTestShardHosts[i]),
- std::move(targeter));
- }
-
- setupShards(shards);
- }
-
-protected:
- /**
- * Constructs an ARM with the given vector of existing cursors.
- *
- * If 'findCmd' is not set, the default AsyncResultsMergerParams are used.
- * Otherwise, the 'findCmd' is used to construct the AsyncResultsMergerParams.
- *
- * 'findCmd' should not have a 'batchSize', since the find's batchSize is used just in the
- * initial find. The getMore 'batchSize' can be passed in through 'getMoreBatchSize.'
- */
- std::unique_ptr<AsyncResultsMerger> makeARMFromExistingCursors(
- std::vector<RemoteCursor> remoteCursors,
- boost::optional<BSONObj> findCmd = boost::none,
- boost::optional<std::int64_t> getMoreBatchSize = boost::none) {
- AsyncResultsMergerParams params;
- params.setNss(kTestNss);
- params.setRemotes(std::move(remoteCursors));
-
-
- if (findCmd) {
- const auto qr = unittest::assertGet(
- QueryRequest::makeFromFindCommand(kTestNss, *findCmd, false /* isExplain */));
- if (!qr->getSort().isEmpty()) {
- params.setSort(qr->getSort().getOwned());
- }
-
- if (getMoreBatchSize) {
- params.setBatchSize(getMoreBatchSize);
- } else {
- params.setBatchSize(qr->getBatchSize()
- ? boost::optional<std::int64_t>(
- static_cast<std::int64_t>(*qr->getBatchSize()))
- : boost::none);
- }
- params.setTailableMode(qr->getTailableMode());
- params.setAllowPartialResults(qr->isAllowPartialResults());
- }
-
- OperationSessionInfo sessionInfo;
- sessionInfo.setSessionId(operationContext()->getLogicalSessionId());
- sessionInfo.setTxnNumber(operationContext()->getTxnNumber());
- params.setOperationSessionInfo(sessionInfo);
-
- return stdx::make_unique<AsyncResultsMerger>(
- operationContext(), executor(), std::move(params));
- }
-
- /**
- * Schedules a "CommandOnShardedViewNotSupportedOnMongod" error response w/ view definition.
- */
- void scheduleNetworkViewResponse(const std::string& ns, const std::string& pipelineJsonArr) {
- BSONObjBuilder viewDefBob;
- viewDefBob.append("ns", ns);
- viewDefBob.append("pipeline", fromjson(pipelineJsonArr));
-
- BSONObjBuilder bob;
- bob.append("resolvedView", viewDefBob.obj());
- bob.append("ok", 0.0);
- bob.append("errmsg", "Command on view must be executed by mongos");
- bob.append("code", 169);
-
- std::vector<BSONObj> batch = {bob.obj()};
- scheduleNetworkResponseObjs(batch);
- }
-
- /**
- * Schedules a list of cursor responses to be returned by the mock network.
- */
- void scheduleNetworkResponses(std::vector<CursorResponse> responses) {
- std::vector<BSONObj> objs;
- for (const auto& cursorResponse : responses) {
- // For tests of the AsyncResultsMerger, all CursorRepsonses scheduled by the tests are
- // subsequent responses, since the AsyncResultsMerger will only ever run getMores.
- objs.push_back(cursorResponse.toBSON(CursorResponse::ResponseType::SubsequentResponse));
- }
- scheduleNetworkResponseObjs(objs);
- }
-
- /**
- * Schedules a list of raw BSON command responses to be returned by the mock network.
- */
- void scheduleNetworkResponseObjs(std::vector<BSONObj> objs) {
- executor::NetworkInterfaceMock* net = network();
- net->enterNetwork();
- for (const auto& obj : objs) {
- ASSERT_TRUE(net->hasReadyRequests());
- Milliseconds millis(0);
- RemoteCommandResponse response(obj, millis);
- executor::TaskExecutor::ResponseStatus responseStatus(response);
- net->scheduleResponse(net->getNextReadyRequest(), net->now(), responseStatus);
- }
- net->runReadyNetworkOperations();
- net->exitNetwork();
- }
-
- RemoteCommandRequest getNthPendingRequest(size_t n) {
- executor::NetworkInterfaceMock* net = network();
- net->enterNetwork();
- ASSERT_TRUE(net->hasReadyRequests());
- NetworkInterfaceMock::NetworkOperationIterator noi = net->getNthUnscheduledRequest(n);
- RemoteCommandRequest retRequest = noi->getRequest();
- net->exitNetwork();
- return retRequest;
- }
-
- bool networkHasReadyRequests() {
- NetworkInterfaceMock::InNetworkGuard guard(network());
- return guard->hasReadyRequests();
- }
-
- void scheduleErrorResponse(ResponseStatus rs) {
- invariant(!rs.isOK());
- rs.elapsedMillis = Milliseconds(0);
- executor::NetworkInterfaceMock* net = network();
- net->enterNetwork();
- ASSERT_TRUE(net->hasReadyRequests());
- net->scheduleResponse(net->getNextReadyRequest(), net->now(), rs);
- net->runReadyNetworkOperations();
- net->exitNetwork();
- }
-
- void runReadyCallbacks() {
- executor::NetworkInterfaceMock* net = network();
- net->enterNetwork();
- net->runReadyNetworkOperations();
- net->exitNetwork();
- }
-
- void blackHoleNextRequest() {
- executor::NetworkInterfaceMock* net = network();
- net->enterNetwork();
- ASSERT_TRUE(net->hasReadyRequests());
- net->blackHole(net->getNextReadyRequest());
- net->exitNetwork();
- }
-};
-
-void assertKillCusorsCmdHasCursorId(const BSONObj& killCmd, CursorId cursorId) {
- ASSERT_TRUE(killCmd.hasElement("killCursors"));
- ASSERT_EQ(killCmd["cursors"].type(), BSONType::Array);
-
- size_t numCursors = 0;
- for (auto&& cursor : killCmd["cursors"].Obj()) {
- ASSERT_EQ(cursor.type(), BSONType::NumberLong);
- ASSERT_EQ(cursor.numberLong(), cursorId);
- ++numCursors;
- }
- ASSERT_EQ(numCursors, 1u);
-}
-
-RemoteCursor makeRemoteCursor(ShardId shardId, HostAndPort host, CursorResponse response) {
- RemoteCursor remoteCursor;
- remoteCursor.setShardId(std::move(shardId));
- remoteCursor.setHostAndPort(std::move(host));
- remoteCursor.setCursorResponse(std::move(response));
- return remoteCursor;
-}
+using AsyncResultsMergerTest = ResultsMergerTestFixture;
TEST_F(AsyncResultsMergerTest, SingleShardUnsorted) {
std::vector<RemoteCursor> cursors;
@@ -1888,76 +1682,6 @@ TEST_F(AsyncResultsMergerTest, KillShouldNotWaitForRemoteCommandsBeforeSchedulin
executor()->waitForEvent(killEvent);
}
-TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilNextResultIsReady) {
- std::vector<RemoteCursor> cursors;
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
-
- // Before any requests are scheduled, ARM is not ready to return results.
- ASSERT_FALSE(arm->ready());
- ASSERT_FALSE(arm->remotesExhausted());
-
- // Issue a blocking wait for the next result asynchronously on a different thread.
- auto future = launchAsync([&]() {
- auto next = unittest::assertGet(arm->blockingNext());
- ASSERT_FALSE(next.isEOF());
- ASSERT_BSONOBJ_EQ(*next.getResult(), BSON("x" << 1));
- next = unittest::assertGet(arm->blockingNext());
- ASSERT_TRUE(next.isEOF());
- });
-
- // Schedule the response to the getMore which will return the next result and mark the cursor as
- // exhausted.
- onCommand([&](const auto& request) {
- ASSERT(request.cmdObj["getMore"]);
- return CursorResponse(kTestNss, 0LL, {BSON("x" << 1)})
- .toBSON(CursorResponse::ResponseType::SubsequentResponse);
- });
-
- future.timed_get(kFutureTimeout);
-}
-
-TEST_F(AsyncResultsMergerTest, ShouldBeInterruptableDuringBlockingNext) {
- std::vector<RemoteCursor> cursors;
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
-
- // Issue a blocking wait for the next result asynchronously on a different thread.
- auto future = launchAsync([&]() {
- auto nextStatus = arm->blockingNext();
- ASSERT_EQ(nextStatus.getStatus(), ErrorCodes::Interrupted);
- });
-
- // Now mark the OperationContext as killed from this thread.
- {
- stdx::lock_guard<Client> lk(*operationContext()->getClient());
- operationContext()->markKilled(ErrorCodes::Interrupted);
- }
- future.timed_get(kFutureTimeout);
- // Be careful not to use a blocking kill here, since the main thread is in charge of running the
- // callbacks, and we'd block on ourselves.
- auto killEvent = arm->kill(operationContext());
-
- assertKillCusorsCmdHasCursorId(getNthPendingRequest(0u).cmdObj, 1);
- runReadyCallbacks();
- executor()->waitForEvent(killEvent);
-}
-
-TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilKilled) {
- std::vector<RemoteCursor> cursors;
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
-
- // Before any requests are scheduled, ARM is not ready to return results.
- ASSERT_FALSE(arm->ready());
- ASSERT_FALSE(arm->remotesExhausted());
-
- arm->blockingKill(operationContext());
-}
-
TEST_F(AsyncResultsMergerTest, GetMoresShouldNotIncludeLSIDOrTxnNumberIfNoneSpecified) {
std::vector<RemoteCursor> cursors;
cursors.emplace_back(
diff --git a/src/mongo/s/query/blocking_results_merger.cpp b/src/mongo/s/query/blocking_results_merger.cpp
new file mode 100644
index 00000000000..f5ba2af0bf6
--- /dev/null
+++ b/src/mongo/s/query/blocking_results_merger.cpp
@@ -0,0 +1,140 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/query/find_common.h"
+#include "mongo/s/query/blocking_results_merger.h"
+
+namespace mongo {
+
+BlockingResultsMerger::BlockingResultsMerger(OperationContext* opCtx,
+ AsyncResultsMergerParams&& armParams,
+ executor::TaskExecutor* executor)
+ : _tailableMode(armParams.getTailableMode().value_or(TailableModeEnum::kNormal)),
+ _executor(executor),
+ _arm(opCtx, executor, std::move(armParams)) {}
+
+StatusWith<ClusterQueryResult> BlockingResultsMerger::awaitNextWithTimeout(
+ OperationContext* opCtx, RouterExecStage::ExecContext execCtx) {
+ invariant(_tailableMode == TailableModeEnum::kTailableAndAwaitData);
+ // If we are in kInitialFind or kGetMoreWithAtLeastOneResultInBatch context and the ARM is not
+ // ready, we don't block. Fall straight through to the return statement.
+ while (!_arm.ready() && execCtx == RouterExecStage::ExecContext::kGetMoreNoResultsYet) {
+ auto nextEventStatus = getNextEvent();
+ if (!nextEventStatus.isOK()) {
+ return nextEventStatus.getStatus();
+ }
+ auto event = nextEventStatus.getValue();
+
+ // Block until there are further results to return, or our time limit is exceeded.
+ auto waitStatus =
+ _executor->waitForEvent(opCtx, event, awaitDataState(opCtx).waitForInsertsDeadline);
+
+ if (!waitStatus.isOK()) {
+ return waitStatus.getStatus();
+ }
+ // Swallow timeout errors for tailable awaitData cursors, stash the event that we were
+ // waiting on, and return EOF.
+ if (waitStatus == stdx::cv_status::timeout) {
+ _leftoverEventFromLastTimeout = std::move(event);
+ return ClusterQueryResult{};
+ }
+ }
+
+ // We reach this point either if the ARM is ready, or if the ARM is !ready and we are in
+ // kInitialFind or kGetMoreWithAtLeastOneResultInBatch ExecContext. In the latter case, we
+ // return EOF immediately rather than blocking for further results.
+ return _arm.ready() ? _arm.nextReady() : ClusterQueryResult{};
+}
+
+StatusWith<ClusterQueryResult> BlockingResultsMerger::blockUntilNext(OperationContext* opCtx) {
+ while (!_arm.ready()) {
+ auto nextEventStatus = _arm.nextEvent();
+ if (!nextEventStatus.isOK()) {
+ return nextEventStatus.getStatus();
+ }
+ auto event = nextEventStatus.getValue();
+
+ // Block until there are further results to return.
+ auto status = _executor->waitForEvent(opCtx, event);
+
+ if (!status.isOK()) {
+ return status.getStatus();
+ }
+
+ // We have not provided a deadline, so if the wait returns without interruption, we do not
+ // expect to have timed out.
+ invariant(status.getValue() == stdx::cv_status::no_timeout);
+ }
+
+ return _arm.nextReady();
+}
+StatusWith<ClusterQueryResult> BlockingResultsMerger::next(OperationContext* opCtx,
+ RouterExecStage::ExecContext execCtx) {
+ // Non-tailable and tailable non-awaitData cursors always block until ready(). AwaitData
+ // cursors wait for ready() only until a specified time limit is exceeded.
+ return (_tailableMode == TailableModeEnum::kTailableAndAwaitData
+ ? awaitNextWithTimeout(opCtx, execCtx)
+ : blockUntilNext(opCtx));
+}
+
+StatusWith<executor::TaskExecutor::EventHandle> BlockingResultsMerger::getNextEvent() {
+ // If we abandoned a previous event due to a mongoS-side timeout, wait for it first.
+ if (_leftoverEventFromLastTimeout) {
+ invariant(_tailableMode == TailableModeEnum::kTailableAndAwaitData);
+ // If we have an outstanding event from last time, then we might have to manually schedule
+ // some getMores for the cursors. If a remote response came back while we were between
+ // getMores (from the user to mongos), the response may have been an empty batch, and the
+ // ARM would not be able to ask for the next batch immediately since it is not attached to
+ // an OperationContext. Now that we have a valid OperationContext, we schedule the getMores
+ // ourselves.
+ Status getMoreStatus = _arm.scheduleGetMores();
+ if (!getMoreStatus.isOK()) {
+ return getMoreStatus;
+ }
+
+ // Return the leftover event and clear '_leftoverEventFromLastTimeout'.
+ auto event = _leftoverEventFromLastTimeout;
+ _leftoverEventFromLastTimeout = executor::TaskExecutor::EventHandle();
+ return event;
+ }
+
+ return _arm.nextEvent();
+}
+
+void BlockingResultsMerger::kill(OperationContext* opCtx) {
+ auto killEvent = _arm.kill(opCtx);
+ if (!killEvent) {
+ // We are shutting down.
+ return;
+ }
+ _executor->waitForEvent(killEvent);
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/query/blocking_results_merger.h b/src/mongo/s/query/blocking_results_merger.h
new file mode 100644
index 00000000000..cbc96cbbfc0
--- /dev/null
+++ b/src/mongo/s/query/blocking_results_merger.h
@@ -0,0 +1,113 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/s/query/async_results_merger.h"
+#include "mongo/s/query/router_exec_stage.h"
+
+namespace mongo {
+
+/**
+ * Layers a simpler blocking interface on top of the AsyncResultsMerger from which this
+ * BlockingResultsMerger is constructed.
+ */
+class BlockingResultsMerger {
+public:
+ BlockingResultsMerger(OperationContext* opCtx,
+ AsyncResultsMergerParams&& arm,
+ executor::TaskExecutor*);
+
+ /**
+ * Blocks until the next result is available or an error is detected.
+ */
+ StatusWith<ClusterQueryResult> next(OperationContext*, RouterExecStage::ExecContext);
+
+ Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
+ return _arm.setAwaitDataTimeout(awaitDataTimeout);
+ }
+
+ void reattachToOperationContext(OperationContext* opCtx) {
+ _arm.reattachToOperationContext(opCtx);
+ }
+
+ void detachFromOperationContext() {
+ _arm.detachFromOperationContext();
+ }
+
+ bool remotesExhausted() const {
+ return _arm.remotesExhausted();
+ }
+
+ std::size_t getNumRemotes() const {
+ return _arm.getNumRemotes();
+ }
+
+ void addNewShardCursors(std::vector<RemoteCursor>&& newCursors) {
+ _arm.addNewShardCursors(std::move(newCursors));
+ }
+
+ /**
+ * Blocks until '_arm' has been killed, which involves cleaning up any remote cursors managed
+ * by this results merger.
+ */
+ void kill(OperationContext* opCtx);
+
+private:
+ /**
+ * Awaits the next result from the ARM with no time limit.
+ */
+ StatusWith<ClusterQueryResult> blockUntilNext(OperationContext* opCtx);
+
+ /**
+ * Awaits the next result from the ARM up to the time limit specified on 'opCtx'. If this is the
+ * user's initial find or we have already obtained at least one result for this batch, this
+ * method returns EOF immediately rather than blocking.
+ */
+ StatusWith<ClusterQueryResult> awaitNextWithTimeout(OperationContext* opCtx,
+ RouterExecStage::ExecContext execCtx);
+
+ /**
+ * Returns the next event to wait upon - either a new event from the ARM, or a valid preceding
+ * event which we scheduled during the previous call to next().
+ */
+ StatusWith<executor::TaskExecutor::EventHandle> getNextEvent();
+
+ TailableModeEnum _tailableMode;
+ executor::TaskExecutor* _executor;
+
+ // In a case where we have a tailable, awaitData cursor, a call to 'next()' will block waiting
+ // for an event generated by '_arm', but may time out waiting for this event to be triggered.
+ // While it's waiting, the time limit for the 'awaitData' piece of the cursor may have been
+ // exceeded. When this happens, we use '_leftoverEventFromLastTimeout' to remember the old event
+ // and pick back up waiting for it on the next call to 'next()'.
+ executor::TaskExecutor::EventHandle _leftoverEventFromLastTimeout;
+ AsyncResultsMerger _arm;
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/query/blocking_results_merger_test.cpp b/src/mongo/s/query/blocking_results_merger_test.cpp
new file mode 100644
index 00000000000..821eda4d8ad
--- /dev/null
+++ b/src/mongo/s/query/blocking_results_merger_test.cpp
@@ -0,0 +1,119 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/query/blocking_results_merger.h"
+#include "mongo/s/query/results_merger_test_fixture.h"
+
+namespace mongo {
+
+namespace {
+
+using BlockingResultsMergerTest = ResultsMergerTestFixture;
+
+TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilKilled) {
+ std::vector<RemoteCursor> cursors;
+ cursors.emplace_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ BlockingResultsMerger blockingMerger(
+ operationContext(), makeARMParamsFromExistingCursors(std::move(cursors)), executor());
+
+ blockingMerger.kill(operationContext());
+}
+
+TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilNextResultIsReady) {
+ std::vector<RemoteCursor> cursors;
+ cursors.emplace_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ BlockingResultsMerger blockingMerger(
+ operationContext(), makeARMParamsFromExistingCursors(std::move(cursors)), executor());
+
+ // Issue a blocking wait for the next result asynchronously on a different thread.
+ auto future = launchAsync([&]() {
+ auto next = unittest::assertGet(
+ blockingMerger.next(operationContext(), RouterExecStage::ExecContext::kInitialFind));
+ ASSERT_FALSE(next.isEOF());
+ ASSERT_BSONOBJ_EQ(*next.getResult(), BSON("x" << 1));
+ next = unittest::assertGet(
+ blockingMerger.next(operationContext(), RouterExecStage::ExecContext::kInitialFind));
+ ASSERT_TRUE(next.isEOF());
+ });
+
+ // Schedule the response to the getMore which will return the next result and mark the cursor as
+ // exhausted.
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["getMore"]);
+ return CursorResponse(kTestNss, 0LL, {BSON("x" << 1)})
+ .toBSON(CursorResponse::ResponseType::SubsequentResponse);
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ResultsMergerTestFixture, ShouldBeInterruptableDuringBlockingNext) {
+ std::vector<RemoteCursor> cursors;
+ cursors.emplace_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ auto params = makeARMParamsFromExistingCursors(std::move(cursors));
+ params.setTailableMode(TailableModeEnum::kTailableAndAwaitData);
+ BlockingResultsMerger blockingMerger(operationContext(), std::move(params), executor());
+
+ // Issue a blocking wait for the next result asynchronously on a different thread.
+ auto future = launchAsync([&]() {
+ auto nextStatus = blockingMerger.next(operationContext(),
+ RouterExecStage::ExecContext::kGetMoreNoResultsYet);
+ ASSERT_EQ(nextStatus.getStatus(), ErrorCodes::Interrupted);
+ });
+
+ // Now mark the OperationContext as killed from this thread.
+ {
+ stdx::lock_guard<Client> lk(*operationContext()->getClient());
+ operationContext()->markKilled(ErrorCodes::Interrupted);
+ }
+ // Wait for the merger to be interrupted.
+ future.timed_get(kFutureTimeout);
+
+ // Now that we've seen it interrupted, kill it. We have to do this in another thread because
+ // killing a BlockingResultsMerger involves running a killCursors, and this main thread is in
+ // charge of scheduling the response to that request.
+ future = launchAsync([&]() { blockingMerger.kill(operationContext()); });
+ while (!networkHasReadyRequests() || !getNthPendingRequest(0u).cmdObj["killCursors"]) {
+ // Wait for the kill to schedule it's killCursors. It may schedule a getMore first before
+ // cancelling it, so wait until the pending request is actually a killCursors.
+ }
+ assertKillCusorsCmdHasCursorId(getNthPendingRequest(0u).cmdObj, 1);
+
+ // Run the callback for the killCursors. We don't actually inspect the value so we don't have to
+ // schedule a response.
+ runReadyCallbacks();
+ future.timed_get(kFutureTimeout);
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index 1b3a665df5e..acda45f66f0 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -30,14 +30,8 @@
#include "mongo/s/query/cluster_client_cursor_impl.h"
-#include "mongo/db/pipeline/cluster_aggregation_planner.h"
-#include "mongo/db/pipeline/document_source_limit.h"
-#include "mongo/db/pipeline/document_source_skip.h"
-#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/s/query/router_stage_limit.h"
#include "mongo/s/query/router_stage_merge.h"
-#include "mongo/s/query/router_stage_mock.h"
-#include "mongo/s/query/router_stage_pipeline.h"
#include "mongo/s/query/router_stage_remove_metadata_fields.h"
#include "mongo/s/query/router_stage_skip.h"
#include "mongo/stdx/memory.h"
@@ -70,6 +64,14 @@ ClusterClientCursorGuard ClusterClientCursorImpl::make(OperationContext* opCtx,
return ClusterClientCursorGuard(opCtx, std::move(cursor));
}
+ClusterClientCursorGuard ClusterClientCursorImpl::make(OperationContext* opCtx,
+ std::unique_ptr<RouterExecStage> root,
+ ClusterClientCursorParams&& params) {
+ std::unique_ptr<ClusterClientCursor> cursor(new ClusterClientCursorImpl(
+ opCtx, std::move(root), std::move(params), opCtx->getLogicalSessionId()));
+ return ClusterClientCursorGuard(opCtx, std::move(cursor));
+}
+
ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx,
executor::TaskExecutor* executor,
ClusterClientCursorParams&& params,
@@ -84,7 +86,7 @@ ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx,
}
ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx,
- std::unique_ptr<RouterStageMock> root,
+ std::unique_ptr<RouterExecStage> root,
ClusterClientCursorParams&& params,
boost::optional<LogicalSessionId> lsid)
: _params(std::move(params)), _root(std::move(root)), _lsid(lsid), _opCtx(opCtx) {
@@ -183,81 +185,13 @@ boost::optional<ReadPreferenceSetting> ClusterClientCursorImpl::getReadPreferenc
return _params.readPreference;
}
-namespace {
-
-bool isSkipOrLimit(const boost::intrusive_ptr<DocumentSource>& stage) {
- return (dynamic_cast<DocumentSourceLimit*>(stage.get()) ||
- dynamic_cast<DocumentSourceSkip*>(stage.get()));
-}
-
-bool isAllLimitsAndSkips(Pipeline* pipeline) {
- const auto stages = pipeline->getSources();
- return std::all_of(
- stages.begin(), stages.end(), [&](const auto& stage) { return isSkipOrLimit(stage); });
-}
-
-/**
- * Creates the initial stage to feed data into the execution plan. By default, a RouterExecMerge
- * stage, or a custom stage if specified in 'params->creatCustomMerge'.
- */
-std::unique_ptr<RouterExecStage> createInitialStage(OperationContext* opCtx,
- executor::TaskExecutor* executor,
- ClusterClientCursorParams* params) {
- if (params->createCustomCursorSource) {
- return params->createCustomCursorSource(opCtx, executor, params);
- } else {
- return stdx::make_unique<RouterStageMerge>(opCtx, executor, params);
- }
-}
-
-std::unique_ptr<RouterExecStage> buildPipelinePlan(executor::TaskExecutor* executor,
- ClusterClientCursorParams* params) {
- invariant(params->mergePipeline);
- invariant(!params->skip);
- invariant(!params->limit);
- auto* pipeline = params->mergePipeline.get();
- auto* opCtx = pipeline->getContext()->opCtx;
-
- std::unique_ptr<RouterExecStage> root = createInitialStage(opCtx, executor, params);
- if (!isAllLimitsAndSkips(pipeline)) {
- return stdx::make_unique<RouterStagePipeline>(std::move(root),
- std::move(params->mergePipeline));
- }
-
- // After extracting an optional leading $sort, the pipeline consisted entirely of $skip and
- // $limit stages. Avoid creating a RouterStagePipeline (which will go through an expensive
- // conversion from BSONObj -> Document for each result), and create a RouterExecStage tree
- // instead.
- while (!pipeline->getSources().empty()) {
- invariant(isSkipOrLimit(pipeline->getSources().front()));
- if (auto skip = pipeline->popFrontWithName(DocumentSourceSkip::kStageName)) {
- root = stdx::make_unique<RouterStageSkip>(
- opCtx, std::move(root), static_cast<DocumentSourceSkip*>(skip.get())->getSkip());
- } else if (auto limit = pipeline->popFrontWithName(DocumentSourceLimit::kStageName)) {
- root = stdx::make_unique<RouterStageLimit>(
- opCtx, std::move(root), static_cast<DocumentSourceLimit*>(limit.get())->getLimit());
- }
- }
- // We are executing the pipeline without using an actual Pipeline, so we need to strip out any
- // Document metadata ourselves.
- return stdx::make_unique<RouterStageRemoveMetadataFields>(
- opCtx, std::move(root), Document::allMetadataFieldNames);
-}
-} // namespace
-
std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan(
OperationContext* opCtx, executor::TaskExecutor* executor, ClusterClientCursorParams* params) {
const auto skip = params->skip;
const auto limit = params->limit;
- if (params->mergePipeline) {
- if (auto sort =
- cluster_aggregation_planner::popLeadingMergeSort(params->mergePipeline.get())) {
- params->sort = *sort;
- }
- return buildPipelinePlan(executor, params);
- }
- std::unique_ptr<RouterExecStage> root = createInitialStage(opCtx, executor, params);
+ std::unique_ptr<RouterExecStage> root =
+ std::make_unique<RouterStageMerge>(opCtx, executor, params->extractARMParams());
if (skip) {
root = stdx::make_unique<RouterStageSkip>(opCtx, std::move(root), *skip);
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h
index 36f9d3995c8..04e97cad3d9 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.h
+++ b/src/mongo/s/query/cluster_client_cursor_impl.h
@@ -83,12 +83,21 @@ class ClusterClientCursorImpl final : public ClusterClientCursor {
public:
/**
- * Constructs a CCC whose safe cleanup is ensured by an RAII object.
+ * Constructs a cluster query plan and CCC from the given parameters whose safe cleanup is
+ * ensured by an RAII object.
*/
static ClusterClientCursorGuard make(OperationContext* opCtx,
executor::TaskExecutor* executor,
ClusterClientCursorParams&& params);
+ /**
+ * Constructs a CCC from the given execution tree 'root'. The CCC's safe cleanup is ensured by
+ * an RAII object.
+ */
+ static ClusterClientCursorGuard make(OperationContext* opCtx,
+ std::unique_ptr<RouterExecStage> root,
+ ClusterClientCursorParams&& params);
+
StatusWith<ClusterQueryResult> next(RouterExecStage::ExecContext) final;
void kill(OperationContext* opCtx) final;
@@ -122,12 +131,11 @@ public:
boost::optional<ReadPreferenceSetting> getReadPreference() const final;
public:
- /** private for tests */
/**
* Constructs a CCC whose result set is generated by a mock execution stage.
*/
ClusterClientCursorImpl(OperationContext* opCtx,
- std::unique_ptr<RouterStageMock> root,
+ std::unique_ptr<RouterExecStage> root,
ClusterClientCursorParams&& params,
boost::optional<LogicalSessionId> lsid);
diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h
index c2d300ee19e..a853d26a99f 100644
--- a/src/mongo/s/query/cluster_client_cursor_params.h
+++ b/src/mongo/s/query/cluster_client_cursor_params.h
@@ -123,9 +123,6 @@ struct ClusterClientCursorParams {
// Should be forwarded to the remote hosts in 'cmdObj'.
boost::optional<long long> limit;
- // If set, we use this pipeline to merge the output of aggregations on each remote.
- std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline;
-
// Whether this cursor is tailing a capped collection, and whether it has the awaitData option
// set.
TailableModeEnum tailableMode = TailableModeEnum::kNormal;
@@ -133,12 +130,6 @@ struct ClusterClientCursorParams {
// Set if a readPreference must be respected throughout the lifetime of the cursor.
boost::optional<ReadPreferenceSetting> readPreference;
- // If valid, is called to return the RouterExecStage which becomes the initial source in this
- // cursor's execution plan. Otherwise, a RouterStageMerge is used.
- stdx::function<std::unique_ptr<RouterExecStage>(
- OperationContext*, executor::TaskExecutor*, ClusterClientCursorParams*)>
- createCustomCursorSource;
-
// Whether the client indicated that it is willing to receive partial results in the case of an
// unreachable host.
bool isAllowPartialResults = false;
diff --git a/src/mongo/s/query/document_source_router_adapter.cpp b/src/mongo/s/query/document_source_router_adapter.cpp
deleted file mode 100644
index 26a944ed5cc..00000000000
--- a/src/mongo/s/query/document_source_router_adapter.cpp
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Copyright (C) 2017 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/s/query/document_source_router_adapter.h"
-
-#include "mongo/db/pipeline/document_source.h"
-#include "mongo/db/pipeline/expression_context.h"
-
-namespace mongo {
-
-boost::intrusive_ptr<DocumentSourceRouterAdapter> DocumentSourceRouterAdapter::create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- std::unique_ptr<RouterExecStage> childStage) {
- return new DocumentSourceRouterAdapter(expCtx, std::move(childStage));
-}
-
-DocumentSource::GetNextResult DocumentSourceRouterAdapter::getNext() {
- auto next = uassertStatusOK(_child->next(_execContext));
- if (auto nextObj = next.getResult()) {
- return Document::fromBsonWithMetaData(*nextObj);
- }
- return GetNextResult::makeEOF();
-}
-
-void DocumentSourceRouterAdapter::doDispose() {
- _child->kill(pExpCtx->opCtx);
-}
-
-void DocumentSourceRouterAdapter::reattachToOperationContext(OperationContext* opCtx) {
- _child->reattachToOperationContext(opCtx);
-}
-
-void DocumentSourceRouterAdapter::detachFromOperationContext() {
- _child->detachFromOperationContext();
-}
-
-Value DocumentSourceRouterAdapter::serialize(
- boost::optional<ExplainOptions::Verbosity> explain) const {
- invariant(explain); // We shouldn't need to serialize this stage to send it anywhere.
- return Value(); // Return the empty value to hide this stage from explain output.
-}
-
-std::size_t DocumentSourceRouterAdapter::getNumRemotes() const {
- return _child->getNumRemotes();
-}
-
-bool DocumentSourceRouterAdapter::remotesExhausted() {
- return _child->remotesExhausted();
-}
-
-DocumentSourceRouterAdapter::DocumentSourceRouterAdapter(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- std::unique_ptr<RouterExecStage> childStage)
- : DocumentSource(expCtx), _child(std::move(childStage)) {}
-
-} // namespace mongo
diff --git a/src/mongo/s/query/document_source_router_adapter.h b/src/mongo/s/query/document_source_router_adapter.h
deleted file mode 100644
index a7db7734539..00000000000
--- a/src/mongo/s/query/document_source_router_adapter.h
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Copyright (C) 2017 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#pragma once
-
-#include "mongo/s/query/router_exec_stage.h"
-
-#include "mongo/db/pipeline/document_source.h"
-#include "mongo/db/pipeline/pipeline.h"
-
-namespace mongo {
-/**
- * A class that acts as an adapter between the RouterExecStage and DocumentSource interfaces,
- * translating results from an input RouterExecStage into DocumentSource::GetNextResults.
- */
-class DocumentSourceRouterAdapter final : public DocumentSource {
-public:
- static boost::intrusive_ptr<DocumentSourceRouterAdapter> create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- std::unique_ptr<RouterExecStage> childStage);
-
- StageConstraints constraints(Pipeline::SplitState pipeState) const final {
- return {StreamType::kStreaming,
- PositionRequirement::kFirst,
- HostTypeRequirement::kMongoS,
- DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed,
- TransactionRequirement::kAllowed};
- }
-
- GetNextResult getNext() final;
- void doDispose() final;
- void reattachToOperationContext(OperationContext* opCtx) final;
- void detachFromOperationContext() final;
- Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final;
- bool remotesExhausted();
- std::size_t getNumRemotes() const;
-
- void setExecContext(RouterExecStage::ExecContext execContext) {
- _execContext = execContext;
- }
-
- Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) const {
- return _child->setAwaitDataTimeout(awaitDataTimeout);
- }
-
-private:
- DocumentSourceRouterAdapter(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- std::unique_ptr<RouterExecStage> childStage);
-
- std::unique_ptr<RouterExecStage> _child;
- RouterExecStage::ExecContext _execContext;
-};
-} // namespace mongo
diff --git a/src/mongo/s/query/results_merger_test_fixture.cpp b/src/mongo/s/query/results_merger_test_fixture.cpp
new file mode 100644
index 00000000000..57033523c68
--- /dev/null
+++ b/src/mongo/s/query/results_merger_test_fixture.cpp
@@ -0,0 +1,76 @@
+/**
+ * Copyright 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/client/remote_command_targeter_factory_mock.h"
+#include "mongo/client/remote_command_targeter_mock.h"
+#include "mongo/executor/network_interface_mock.h"
+#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
+#include "mongo/s/catalog/type_shard.h"
+#include "mongo/s/query/results_merger_test_fixture.h"
+
+namespace mongo {
+const HostAndPort ResultsMergerTestFixture::kTestConfigShardHost =
+ HostAndPort("FakeConfigHost", 12345);
+const std::vector<ShardId> ResultsMergerTestFixture::kTestShardIds = {
+ ShardId("FakeShard1"), ShardId("FakeShard2"), ShardId("FakeShard3")};
+const std::vector<HostAndPort> ResultsMergerTestFixture::kTestShardHosts = {
+ HostAndPort("FakeShard1Host", 12345),
+ HostAndPort("FakeShard2Host", 12345),
+ HostAndPort("FakeShard3Host", 12345)};
+
+const NamespaceString ResultsMergerTestFixture::kTestNss = NamespaceString{"testdb.testcoll"};
+
+void ResultsMergerTestFixture::setUp() {
+ setRemote(HostAndPort("ClientHost", 12345));
+
+ configTargeter()->setFindHostReturnValue(kTestConfigShardHost);
+
+ std::vector<ShardType> shards;
+
+ for (size_t i = 0; i < kTestShardIds.size(); i++) {
+ ShardType shardType;
+ shardType.setName(kTestShardIds[i].toString());
+ shardType.setHost(kTestShardHosts[i].toString());
+
+ shards.push_back(shardType);
+
+ std::unique_ptr<RemoteCommandTargeterMock> targeter(
+ stdx::make_unique<RemoteCommandTargeterMock>());
+ targeter->setConnectionStringReturnValue(ConnectionString(kTestShardHosts[i]));
+ targeter->setFindHostReturnValue(kTestShardHosts[i]);
+
+ targeterFactory()->addTargeterToReturn(ConnectionString(kTestShardHosts[i]),
+ std::move(targeter));
+ }
+
+ setupShards(shards);
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/query/results_merger_test_fixture.h b/src/mongo/s/query/results_merger_test_fixture.h
new file mode 100644
index 00000000000..1252f22b793
--- /dev/null
+++ b/src/mongo/s/query/results_merger_test_fixture.h
@@ -0,0 +1,228 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/s/query/async_results_merger.h"
+#include "mongo/s/sharding_router_test_fixture.h"
+
+namespace mongo {
+
+/**
+ * Test fixture which is useful to both the tests for AsyncResultsMerger and BlockingResultsMerger.
+ */
+class ResultsMergerTestFixture : public ShardingTestFixture {
+public:
+ static const HostAndPort kTestConfigShardHost;
+ static const std::vector<ShardId> kTestShardIds;
+ static const std::vector<HostAndPort> kTestShardHosts;
+
+ static const NamespaceString kTestNss;
+
+ ResultsMergerTestFixture() {}
+
+ void setUp() override;
+
+protected:
+ /**
+ * Constructs an AsyncResultsMergerParams object with the given vector of existing cursors.
+ *
+ * If 'findCmd' is not set, the default AsyncResultsMergerParams are used. Otherwise, the
+ * 'findCmd' is used to construct the AsyncResultsMergerParams.
+ *
+ * 'findCmd' should not have a 'batchSize', since the find's batchSize is used just in the
+ * initial find. The getMore 'batchSize' can be passed in through 'getMoreBatchSize.'
+ */
+ AsyncResultsMergerParams makeARMParamsFromExistingCursors(
+ std::vector<RemoteCursor> remoteCursors,
+ boost::optional<BSONObj> findCmd = boost::none,
+ boost::optional<std::int64_t> getMoreBatchSize = boost::none) {
+ AsyncResultsMergerParams params;
+ params.setNss(kTestNss);
+ params.setRemotes(std::move(remoteCursors));
+
+
+ if (findCmd) {
+ const auto qr = unittest::assertGet(
+ QueryRequest::makeFromFindCommand(kTestNss, *findCmd, false /* isExplain */));
+ if (!qr->getSort().isEmpty()) {
+ params.setSort(qr->getSort().getOwned());
+ }
+
+ if (getMoreBatchSize) {
+ params.setBatchSize(getMoreBatchSize);
+ } else {
+ params.setBatchSize(qr->getBatchSize()
+ ? boost::optional<std::int64_t>(
+ static_cast<std::int64_t>(*qr->getBatchSize()))
+ : boost::none);
+ }
+ params.setTailableMode(qr->getTailableMode());
+ params.setAllowPartialResults(qr->isAllowPartialResults());
+ }
+
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(operationContext()->getLogicalSessionId());
+ sessionInfo.setTxnNumber(operationContext()->getTxnNumber());
+ params.setOperationSessionInfo(sessionInfo);
+ return params;
+ }
+ /**
+ * Constructs an ARM with the given vector of existing cursors.
+ *
+ * If 'findCmd' is not set, the default AsyncResultsMergerParams are used.
+ * Otherwise, the 'findCmd' is used to construct the AsyncResultsMergerParams.
+ *
+ * 'findCmd' should not have a 'batchSize', since the find's batchSize is used just in the
+ * initial find. The getMore 'batchSize' can be passed in through 'getMoreBatchSize.'
+ */
+ std::unique_ptr<AsyncResultsMerger> makeARMFromExistingCursors(
+ std::vector<RemoteCursor> remoteCursors,
+ boost::optional<BSONObj> findCmd = boost::none,
+ boost::optional<std::int64_t> getMoreBatchSize = boost::none) {
+
+ return stdx::make_unique<AsyncResultsMerger>(
+ operationContext(),
+ executor(),
+ makeARMParamsFromExistingCursors(std::move(remoteCursors), findCmd, getMoreBatchSize));
+ }
+
+ /**
+ * Schedules a "CommandOnShardedViewNotSupportedOnMongod" error response w/ view definition.
+ */
+ void scheduleNetworkViewResponse(const std::string& ns, const std::string& pipelineJsonArr) {
+ BSONObjBuilder viewDefBob;
+ viewDefBob.append("ns", ns);
+ viewDefBob.append("pipeline", fromjson(pipelineJsonArr));
+
+ BSONObjBuilder bob;
+ bob.append("resolvedView", viewDefBob.obj());
+ bob.append("ok", 0.0);
+ bob.append("errmsg", "Command on view must be executed by mongos");
+ bob.append("code", 169);
+
+ std::vector<BSONObj> batch = {bob.obj()};
+ scheduleNetworkResponseObjs(batch);
+ }
+
+ /**
+ * Schedules a list of cursor responses to be returned by the mock network.
+ */
+ void scheduleNetworkResponses(std::vector<CursorResponse> responses) {
+ std::vector<BSONObj> objs;
+ for (const auto& cursorResponse : responses) {
+ // For tests of the AsyncResultsMerger, all CursorRepsonses scheduled by the tests are
+ // subsequent responses, since the AsyncResultsMerger will only ever run getMores.
+ objs.push_back(cursorResponse.toBSON(CursorResponse::ResponseType::SubsequentResponse));
+ }
+ scheduleNetworkResponseObjs(objs);
+ }
+
+ /**
+ * Schedules a list of raw BSON command responses to be returned by the mock network.
+ */
+ void scheduleNetworkResponseObjs(std::vector<BSONObj> objs) {
+ executor::NetworkInterfaceMock* net = network();
+ net->enterNetwork();
+ for (const auto& obj : objs) {
+ ASSERT_TRUE(net->hasReadyRequests());
+ Milliseconds millis(0);
+ executor::RemoteCommandResponse response(obj, millis);
+ executor::TaskExecutor::ResponseStatus responseStatus(response);
+ net->scheduleResponse(net->getNextReadyRequest(), net->now(), responseStatus);
+ }
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
+ }
+
+ executor::RemoteCommandRequest getNthPendingRequest(size_t n) {
+ executor::NetworkInterfaceMock* net = network();
+ net->enterNetwork();
+ ASSERT_TRUE(net->hasReadyRequests());
+ executor::NetworkInterfaceMock::NetworkOperationIterator noi =
+ net->getNthUnscheduledRequest(n);
+ executor::RemoteCommandRequest retRequest = noi->getRequest();
+ net->exitNetwork();
+ return retRequest;
+ }
+
+ bool networkHasReadyRequests() {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(network());
+ return guard->hasReadyRequests();
+ }
+
+ void scheduleErrorResponse(executor::ResponseStatus rs) {
+ invariant(!rs.isOK());
+ rs.elapsedMillis = Milliseconds(0);
+ executor::NetworkInterfaceMock* net = network();
+ net->enterNetwork();
+ ASSERT_TRUE(net->hasReadyRequests());
+ net->scheduleResponse(net->getNextReadyRequest(), net->now(), rs);
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
+ }
+
+ void runReadyCallbacks() {
+ executor::NetworkInterfaceMock* net = network();
+ net->enterNetwork();
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
+ }
+
+ void blackHoleNextRequest() {
+ executor::NetworkInterfaceMock* net = network();
+ net->enterNetwork();
+ ASSERT_TRUE(net->hasReadyRequests());
+ net->blackHole(net->getNextReadyRequest());
+ net->exitNetwork();
+ }
+
+ void assertKillCusorsCmdHasCursorId(const BSONObj& killCmd, CursorId cursorId) {
+ std::cout << "CHARLIE: " << killCmd;
+ ASSERT_TRUE(killCmd.hasElement("killCursors"));
+ ASSERT_EQ(killCmd["cursors"].type(), BSONType::Array);
+
+ size_t numCursors = 0;
+ for (auto&& cursor : killCmd["cursors"].Obj()) {
+ ASSERT_EQ(cursor.type(), BSONType::NumberLong);
+ ASSERT_EQ(cursor.numberLong(), cursorId);
+ ++numCursors;
+ }
+ ASSERT_EQ(numCursors, 1u);
+ }
+
+ RemoteCursor makeRemoteCursor(ShardId shardId, HostAndPort host, CursorResponse response) {
+ RemoteCursor remoteCursor;
+ remoteCursor.setShardId(std::move(shardId));
+ remoteCursor.setHostAndPort(std::move(host));
+ remoteCursor.setCursorResponse(std::move(response));
+ return remoteCursor;
+ }
+};
+
+} // namespace mongo \ No newline at end of file
diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp
deleted file mode 100644
index 967c9f60b35..00000000000
--- a/src/mongo/s/query/router_stage_merge.cpp
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/s/query/router_stage_merge.h"
-
-#include "mongo/db/query/find_common.h"
-#include "mongo/util/scopeguard.h"
-
-namespace mongo {
-
-RouterStageMerge::RouterStageMerge(OperationContext* opCtx,
- executor::TaskExecutor* executor,
- ClusterClientCursorParams* params)
- : RouterExecStage(opCtx),
- _executor(executor),
- _params(params),
- _arm(opCtx, executor, params->extractARMParams()) {}
-
-StatusWith<ClusterQueryResult> RouterStageMerge::next(ExecContext execCtx) {
- // Non-tailable and tailable non-awaitData cursors always block until ready(). AwaitData
- // cursors wait for ready() only until a specified time limit is exceeded.
- return (_params->tailableMode == TailableModeEnum::kTailableAndAwaitData
- ? awaitNextWithTimeout(execCtx)
- : _arm.blockingNext());
-}
-
-StatusWith<ClusterQueryResult> RouterStageMerge::awaitNextWithTimeout(ExecContext execCtx) {
- invariant(_params->tailableMode == TailableModeEnum::kTailableAndAwaitData);
- // If we are in kInitialFind or kGetMoreWithAtLeastOneResultInBatch context and the ARM is not
- // ready, we don't block. Fall straight through to the return statement.
- while (!_arm.ready() && execCtx == ExecContext::kGetMoreNoResultsYet) {
- auto nextEventStatus = getNextEvent();
- if (!nextEventStatus.isOK()) {
- return nextEventStatus.getStatus();
- }
- auto event = nextEventStatus.getValue();
-
- // Block until there are further results to return, or our time limit is exceeded.
- auto waitStatus = _executor->waitForEvent(
- getOpCtx(), event, awaitDataState(getOpCtx()).waitForInsertsDeadline);
-
- if (!waitStatus.isOK()) {
- return waitStatus.getStatus();
- }
- // Swallow timeout errors for tailable awaitData cursors, stash the event that we were
- // waiting on, and return EOF.
- if (waitStatus == stdx::cv_status::timeout) {
- _leftoverEventFromLastTimeout = std::move(event);
- return ClusterQueryResult{};
- }
- }
-
- // We reach this point either if the ARM is ready, or if the ARM is !ready and we are in
- // kInitialFind or kGetMoreWithAtLeastOneResultInBatch ExecContext. In the latter case, we
- // return EOF immediately rather than blocking for further results.
- return _arm.ready() ? _arm.nextReady() : ClusterQueryResult{};
-}
-
-StatusWith<EventHandle> RouterStageMerge::getNextEvent() {
- // If we abandoned a previous event due to a mongoS-side timeout, wait for it first.
- if (_leftoverEventFromLastTimeout) {
- invariant(_params->tailableMode == TailableModeEnum::kTailableAndAwaitData);
- // If we have an outstanding event from last time, then we might have to manually schedule
- // some getMores for the cursors. If a remote response came back while we were between
- // getMores (from the user to mongos), the response may have been an empty batch, and the
- // ARM would not be able to ask for the next batch immediately since it is not attached to
- // an OperationContext. Now that we have a valid OperationContext, we schedule the getMores
- // ourselves.
- Status getMoreStatus = _arm.scheduleGetMores();
- if (!getMoreStatus.isOK()) {
- return getMoreStatus;
- }
-
- // Return the leftover event and clear '_leftoverEventFromLastTimeout'.
- auto event = _leftoverEventFromLastTimeout;
- _leftoverEventFromLastTimeout = EventHandle();
- return event;
- }
-
- return _arm.nextEvent();
-}
-
-void RouterStageMerge::kill(OperationContext* opCtx) {
- _arm.blockingKill(opCtx);
-}
-
-bool RouterStageMerge::remotesExhausted() {
- return _arm.remotesExhausted();
-}
-
-std::size_t RouterStageMerge::getNumRemotes() const {
- return _arm.getNumRemotes();
-}
-
-Status RouterStageMerge::doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) {
- return _arm.setAwaitDataTimeout(awaitDataTimeout);
-}
-
-void RouterStageMerge::addNewShardCursors(std::vector<RemoteCursor>&& newShards) {
- _arm.addNewShardCursors(std::move(newShards));
-}
-
-} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h
index b6bfee146b6..c0a847f7bd2 100644
--- a/src/mongo/s/query/router_stage_merge.h
+++ b/src/mongo/s/query/router_stage_merge.h
@@ -29,74 +29,56 @@
#pragma once
#include "mongo/executor/task_executor.h"
-#include "mongo/s/query/async_results_merger.h"
+#include "mongo/s/query/blocking_results_merger.h"
#include "mongo/s/query/cluster_client_cursor_params.h"
#include "mongo/s/query/router_exec_stage.h"
#include "mongo/util/net/hostandport.h"
namespace mongo {
-namespace {
-using EventHandle = executor::TaskExecutor::EventHandle;
-} // namespace
-
/**
- * Draws results from the AsyncResultsMerger, which is the underlying source of the stream of merged
- * documents manipulated by the RouterExecStage pipeline. Used to present a stream of documents
- * merged from the shards to the stages later in the pipeline.
+ * Serves as an adapter between the RouterExecStage interface and the BlockingResultsMerger
+ * interface, providing a single stream of results populated from many remote streams.
*/
class RouterStageMerge final : public RouterExecStage {
public:
RouterStageMerge(OperationContext* opCtx,
executor::TaskExecutor* executor,
- ClusterClientCursorParams* params);
-
- StatusWith<ClusterQueryResult> next(ExecContext) final;
+ AsyncResultsMergerParams&& armParams)
+ : RouterExecStage(opCtx), _resultsMerger(opCtx, std::move(armParams), executor) {}
- void kill(OperationContext* opCtx) final;
+ StatusWith<ClusterQueryResult> next(ExecContext execCtx) final {
+ return _resultsMerger.next(getOpCtx(), execCtx);
+ }
- bool remotesExhausted() final;
+ void kill(OperationContext* opCtx) final {
+ _resultsMerger.kill(opCtx);
+ }
- std::size_t getNumRemotes() const final;
+ bool remotesExhausted() final {
+ return _resultsMerger.remotesExhausted();
+ }
- /**
- * Adds the cursors in 'newShards' to those being merged by the ARM.
- */
- void addNewShardCursors(std::vector<RemoteCursor>&& newShards);
+ std::size_t getNumRemotes() const final {
+ return _resultsMerger.getNumRemotes();
+ }
protected:
- Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+ Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final {
+ return _resultsMerger.setAwaitDataTimeout(awaitDataTimeout);
+ }
void doReattachToOperationContext() override {
- _arm.reattachToOperationContext(getOpCtx());
+ _resultsMerger.reattachToOperationContext(getOpCtx());
}
virtual void doDetachFromOperationContext() {
- _arm.detachFromOperationContext();
+ _resultsMerger.detachFromOperationContext();
}
private:
- /**
- * Awaits the next result from the ARM up to a specified time limit. If this is the user's
- * initial find or we have already obtained at least one result for this batch, this method
- * returns EOF immediately rather than blocking.
- */
- StatusWith<ClusterQueryResult> awaitNextWithTimeout(ExecContext execCtx);
-
- /**
- * Returns the next event to wait upon - either a new event from the ARM, or a valid preceding
- * event which we scheduled during the previous call to next().
- */
- StatusWith<EventHandle> getNextEvent();
-
- // Not owned here.
- executor::TaskExecutor* _executor;
- EventHandle _leftoverEventFromLastTimeout;
-
- ClusterClientCursorParams* _params;
-
// Schedules remote work and merges results from 'remotes'.
- AsyncResultsMerger _arm;
+ BlockingResultsMerger _resultsMerger;
};
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp
index 5e94274b9ac..a5a97bdbdbc 100644
--- a/src/mongo/s/query/router_stage_pipeline.cpp
+++ b/src/mongo/s/query/router_stage_pipeline.cpp
@@ -35,26 +35,20 @@
#include "mongo/db/pipeline/document_source_list_local_sessions.h"
#include "mongo/db/pipeline/document_source_merge_cursors.h"
#include "mongo/db/pipeline/expression_context.h"
-#include "mongo/s/query/document_source_router_adapter.h"
namespace mongo {
-RouterStagePipeline::RouterStagePipeline(std::unique_ptr<RouterExecStage> child,
- std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline)
+RouterStagePipeline::RouterStagePipeline(std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline)
: RouterExecStage(mergePipeline->getContext()->opCtx),
- _mergePipeline(std::move(mergePipeline)),
- _mongosOnlyPipeline(!_mergePipeline->isSplitForMerge()) {
- if (!_mongosOnlyPipeline) {
- // Add an adapter to the front of the pipeline to draw results from 'child'.
- _routerAdapter =
- DocumentSourceRouterAdapter::create(_mergePipeline->getContext(), std::move(child)),
- _mergePipeline->addInitialSource(_routerAdapter);
- }
+ _mergePipeline(std::move(mergePipeline)) {
+ invariant(!_mergePipeline->getSources().empty());
+ _mergeCursorsStage =
+ dynamic_cast<DocumentSourceMergeCursors*>(_mergePipeline->getSources().front().get());
}
StatusWith<ClusterQueryResult> RouterStagePipeline::next(RouterExecStage::ExecContext execContext) {
- if (_routerAdapter) {
- _routerAdapter->setExecContext(execContext);
+ if (_mergeCursorsStage) {
+ _mergeCursorsStage->setExecContext(execContext);
}
// Pipeline::getNext will return a boost::optional<Document> or boost::none if EOF.
@@ -85,15 +79,20 @@ void RouterStagePipeline::kill(OperationContext* opCtx) {
}
std::size_t RouterStagePipeline::getNumRemotes() const {
- return _mongosOnlyPipeline ? 0 : _routerAdapter->getNumRemotes();
+ if (_mergeCursorsStage) {
+ return _mergeCursorsStage->getNumRemotes();
+ }
+ return 0;
}
bool RouterStagePipeline::remotesExhausted() {
- return _mongosOnlyPipeline || _routerAdapter->remotesExhausted();
+ return !_mergeCursorsStage || _mergeCursorsStage->remotesExhausted();
}
Status RouterStagePipeline::doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) {
- return _routerAdapter->setAwaitDataTimeout(awaitDataTimeout);
+ invariant(_mergeCursorsStage,
+ "The only cursors which should be tailable are those with remote cursors.");
+ return _mergeCursorsStage->setAwaitDataTimeout(awaitDataTimeout);
}
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_pipeline.h b/src/mongo/s/query/router_stage_pipeline.h
index c14ddf9f80b..43706b42cd9 100644
--- a/src/mongo/s/query/router_stage_pipeline.h
+++ b/src/mongo/s/query/router_stage_pipeline.h
@@ -31,8 +31,8 @@
#include "mongo/s/query/router_exec_stage.h"
#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_merge_cursors.h"
#include "mongo/db/pipeline/pipeline.h"
-#include "mongo/s/query/document_source_router_adapter.h"
namespace mongo {
@@ -42,8 +42,7 @@ namespace mongo {
*/
class RouterStagePipeline final : public RouterExecStage {
public:
- RouterStagePipeline(std::unique_ptr<RouterExecStage> child,
- std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline);
+ RouterStagePipeline(std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline);
StatusWith<ClusterQueryResult> next(RouterExecStage::ExecContext execContext) final;
@@ -61,8 +60,10 @@ protected:
void doDetachFromOperationContext() final;
private:
- boost::intrusive_ptr<DocumentSourceRouterAdapter> _routerAdapter;
std::unique_ptr<Pipeline, PipelineDeleter> _mergePipeline;
- bool _mongosOnlyPipeline;
+
+ // May be null if this pipeline is executing exclusively on mongos and will not contact the
+ // shards at all.
+ boost::intrusive_ptr<DocumentSourceMergeCursors> _mergeCursorsStage;
};
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_update_on_add_shard.cpp b/src/mongo/s/query/router_stage_update_on_add_shard.cpp
deleted file mode 100644
index 61fa2a9176d..00000000000
--- a/src/mongo/s/query/router_stage_update_on_add_shard.cpp
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Copyright (C) 2017 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
-#include "mongo/s/query/router_stage_update_on_add_shard.h"
-
-#include <algorithm>
-
-#include "mongo/base/checked_cast.h"
-#include "mongo/db/pipeline/document_source_change_stream.h"
-#include "mongo/executor/task_executor_pool.h"
-#include "mongo/s/client/shard_registry.h"
-#include "mongo/s/grid.h"
-#include "mongo/s/query/establish_cursors.h"
-#include "mongo/s/query/router_stage_merge.h"
-#include "mongo/util/log.h"
-
-namespace mongo {
-namespace {
-
-// Returns true if the change stream document has an 'operationType' of 'newShardDetected'.
-bool needsUpdate(const StatusWith<ClusterQueryResult>& childResult) {
- if (!childResult.isOK() || childResult.getValue().isEOF()) {
- return false;
- }
- return ((*childResult.getValue().getResult())[DocumentSourceChangeStream::kOperationTypeField]
- .str() == DocumentSourceChangeStream::kNewShardDetectedOpType);
-}
-}
-
-RouterStageUpdateOnAddShard::RouterStageUpdateOnAddShard(OperationContext* opCtx,
- executor::TaskExecutor* executor,
- ClusterClientCursorParams* params,
- std::vector<ShardId> shardIds,
- BSONObj cmdToRunOnNewShards)
- : RouterExecStage(opCtx, stdx::make_unique<RouterStageMerge>(opCtx, executor, params)),
- _params(params),
- _shardIds(std::move(shardIds)),
- _cmdToRunOnNewShards(cmdToRunOnNewShards) {}
-
-StatusWith<ClusterQueryResult> RouterStageUpdateOnAddShard::next(
- RouterExecStage::ExecContext execContext) {
- auto childStage = getChildStage();
- auto childResult = childStage->next(execContext);
- while (needsUpdate(childResult)) {
- addNewShardCursors(*childResult.getValue().getResult());
- childResult = childStage->next(execContext);
- }
- return childResult;
-}
-
-void RouterStageUpdateOnAddShard::addNewShardCursors(BSONObj newShardDetectedObj) {
- checked_cast<RouterStageMerge*>(getChildStage())
- ->addNewShardCursors(establishShardCursorsOnNewShards(newShardDetectedObj));
-}
-
-std::vector<RemoteCursor> RouterStageUpdateOnAddShard::establishShardCursorsOnNewShards(
- const BSONObj& newShardDetectedObj) {
- auto* opCtx = getOpCtx();
- // Reload the shard registry. We need to ensure a reload initiated after calling this method
- // caused the reload, otherwise we aren't guaranteed to get all the new shards.
- auto* shardRegistry = Grid::get(opCtx)->shardRegistry();
- if (!shardRegistry->reload(opCtx)) {
- // A 'false' return from shardRegistry.reload() means a reload was already in progress and
- // it completed before reload() returned. So another reload(), regardless of return
- // value, will ensure a reload started after the first call to reload().
- shardRegistry->reload(opCtx);
- }
-
- std::vector<ShardId> shardIds, newShardIds;
- shardRegistry->getAllShardIdsNoReload(&shardIds);
- std::sort(_shardIds.begin(), _shardIds.end());
- std::sort(shardIds.begin(), shardIds.end());
- std::set_difference(shardIds.begin(),
- shardIds.end(),
- _shardIds.begin(),
- _shardIds.end(),
- std::back_inserter(newShardIds));
-
- auto cmdObj = DocumentSourceChangeStream::replaceResumeTokenInCommand(
- _cmdToRunOnNewShards,
- newShardDetectedObj[DocumentSourceChangeStream::kIdField].embeddedObject());
- std::vector<std::pair<ShardId, BSONObj>> requests;
- for (const auto& shardId : newShardIds) {
- requests.emplace_back(shardId, cmdObj);
- _shardIds.push_back(shardId);
- }
- const bool allowPartialResults = false; // partial results are not allowed
- return establishCursors(opCtx,
- Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
- _params->nsString,
- _params->readPreference.value_or(ReadPreferenceSetting()),
- requests,
- allowPartialResults);
-}
-
-} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_update_on_add_shard.h b/src/mongo/s/query/router_stage_update_on_add_shard.h
deleted file mode 100644
index 00ee921e2af..00000000000
--- a/src/mongo/s/query/router_stage_update_on_add_shard.h
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Copyright (C) 2017 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-#pragma once
-
-#include "mongo/executor/task_executor.h"
-#include "mongo/s/query/cluster_client_cursor_params.h"
-#include "mongo/s/query/router_exec_stage.h"
-
-namespace mongo {
-/**
- * Uses a RouterStageMerge to merge results, and monitors the merged stream for special
- * sentinel documents which indicate the the set of cursors needs to be updated. When the
- * sentinel is detected, removes it from the stream and updates the set of cursors.
- *
- * cmdToRunOnNewShards: Command to execute on the new shard to open the cursor.
- */
-class RouterStageUpdateOnAddShard final : public RouterExecStage {
-public:
- RouterStageUpdateOnAddShard(OperationContext* opCtx,
- executor::TaskExecutor* executor,
- ClusterClientCursorParams* params,
- std::vector<ShardId> shardIds,
- BSONObj cmdToRunOnNewShards);
-
- StatusWith<ClusterQueryResult> next(ExecContext) final;
-
-private:
- /**
- * Establish the new cursors and tell the RouterStageMerge about them.
- * obj: The BSONObj which triggered the establishment of the new cursors
- */
- void addNewShardCursors(BSONObj obj);
-
- /**
- * Open the cursors on the new shards.
- */
- std::vector<RemoteCursor> establishShardCursorsOnNewShards(const BSONObj& newShardDetectedObj);
-
- ClusterClientCursorParams* _params;
- std::vector<ShardId> _shardIds;
- BSONObj _cmdToRunOnNewShards;
-};
-}