diff options
author | David Storch <david.storch@10gen.com> | 2015-09-22 16:40:14 -0400 |
---|---|---|
committer | David Storch <david.storch@10gen.com> | 2015-09-24 09:19:32 -0400 |
commit | 1a8711db875fc1b0e855c838dc0241c381a19dbc (patch) | |
tree | 73d661c88294fb3977317c9032436e24daaac4dd /src/mongo/s/query | |
parent | 6d62d7f7bc0841ab48ae6b3f6fc69fa11682e2e9 (diff) | |
download | mongo-1a8711db875fc1b0e855c838dc0241c381a19dbc.tar.gz |
SERVER-20537 fix leak of remote cursor in AsyncResultsMerger
If ARM::kill() was called before the find command response was received, the callback for the
outstanding find command would be cancelled. As a result, the cursor id in need of a kill was never
received, and the cursor was left alive.
Diffstat (limited to 'src/mongo/s/query')
-rw-r--r-- | src/mongo/s/query/async_results_merger.cpp | 56 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger.h | 10 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger_test.cpp | 70 | ||||
-rw-r--r-- | src/mongo/s/query/router_stage_merge.cpp | 9 |
4 files changed, 106 insertions, 39 deletions
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 1116eec129c..ef039eadb2f 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -298,6 +298,27 @@ StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent() return eventToReturn; } +StatusWith<CursorResponse> AsyncResultsMerger::parseCursorResponse(const BSONObj& responseObj, + const RemoteCursorData& remote) { + auto getMoreParseStatus = CursorResponse::parseFromBSON(responseObj); + if (!getMoreParseStatus.isOK()) { + return getMoreParseStatus.getStatus(); + } + + auto cursorResponse = getMoreParseStatus.getValue(); + + // If we have a cursor established, and we get a non-zero cursor id that is not equal to the + // established cursor id, we will fail the operation. + if (remote.cursorId && cursorResponse.cursorId != 0 && + *remote.cursorId != cursorResponse.cursorId) { + return Status(ErrorCodes::BadValue, + str::stream() << "Expected cursorid " << *remote.cursorId << " but received " + << cursorResponse.cursorId); + } + + return cursorResponse; +} + void AsyncResultsMerger::handleBatchResponse( const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, size_t remoteIndex) { stdx::lock_guard<stdx::mutex> lk(_mutex); @@ -315,6 +336,15 @@ void AsyncResultsMerger::handleBatchResponse( // Make sure to wake up anyone waiting on '_currentEvent' if we're shutting down. signalCurrentEventIfReady_inlock(); + // Make a best effort to parse the response and retrieve the cursor id. We need the cursor + // id in order to issue a killCursors command against it. + if (cbData.response.isOK()) { + auto cursorResponse = parseCursorResponse(cbData.response.getValue().data, remote); + if (cursorResponse.isOK()) { + remote.cursorId = cursorResponse.getValue().cursorId; + } + } + // If we're killed and we're not waiting on any more batches to come back, then we are ready // to kill the cursors on the remote hosts and clean up this cursor. Schedule the // killCursors command and signal that this cursor is safe now safe to destroy. We have to @@ -356,24 +386,13 @@ void AsyncResultsMerger::handleBatchResponse( return; } - auto getMoreParseStatus = CursorResponse::parseFromBSON(cbData.response.getValue().data); - if (!getMoreParseStatus.isOK()) { - remote.status = getMoreParseStatus.getStatus(); - return; - } - - auto cursorResponse = getMoreParseStatus.getValue(); - - // If we have a cursor established, and we get a non-zero cursorid that is not equal to the - // established cursorid, we will fail the operation. - if (remote.cursorId && cursorResponse.cursorId != 0 && - *remote.cursorId != cursorResponse.cursorId) { - remote.status = Status(ErrorCodes::BadValue, - str::stream() << "Expected cursorid " << *remote.cursorId - << " but received " << cursorResponse.cursorId); + auto cursorResponseStatus = parseCursorResponse(cbData.response.getValue().data, remote); + if (!cursorResponseStatus.isOK()) { + remote.status = cursorResponseStatus.getStatus(); return; } + auto cursorResponse = cursorResponseStatus.getValue(); remote.cursorId = cursorResponse.cursorId; remote.cmdObj = boost::none; @@ -476,13 +495,6 @@ executor::TaskExecutor::EventHandle AsyncResultsMerger::kill() { _lifecycleState = kKillStarted; - // Cancel callbacks. - for (const auto& remote : _remotes) { - if (remote.cbHandle.isValid()) { - _executor->cancel(remote.cbHandle); - } - } - // Make '_killCursorsScheduledEvent', which we will signal as soon as we have scheduled a // killCursors command to run on all the remote shards. auto statusWithEvent = _executor->makeEvent(); diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h index a6d75df1db0..6b00f94f8aa 100644 --- a/src/mongo/s/query/async_results_merger.h +++ b/src/mongo/s/query/async_results_merger.h @@ -43,6 +43,8 @@ namespace mongo { +struct CursorResponse; + /** * AsyncResultsMerger is used to generate results from cursor-generating commands on one or more * remote hosts. A cursor-generating command (e.g. the find command) is one that establishes a @@ -211,6 +213,14 @@ private: const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData); /** + * Parses the find or getMore command response object to a CursorResponse. + * + * Returns a non-OK response if the response fails to parse or if there is a cursor id mismatch. + */ + static StatusWith<CursorResponse> parseCursorResponse(const BSONObj& responseObj, + const RemoteCursorData& remote); + + /** * Helper to schedule a command asking the remote node for another batch of results. * * The 'remoteIndex' gives the position of the remote node from which we are retrieving the diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index 0924d2e74ec..7daaa34d03b 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -167,13 +167,6 @@ protected: net->exitNetwork(); } - void runReadyNetworkOperations() { - executor::NetworkInterfaceMock* net = getNet(); - net->enterNetwork(); - net->runReadyNetworkOperations(); - net->exitNetwork(); - } - const NamespaceString _nss; const std::vector<HostAndPort> _remotes; @@ -822,8 +815,67 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) { // Kill event will only be signalled once the pending batches are received. auto killedEvent = arm->kill(); - // Ensures that callbacks run with a cancelled status. - runReadyNetworkOperations(); + // After the kill, the ARM waits for outstanding batches to come back. This ensures that we + // receive cursor ids for any established remote cursors, and can clean them up by issuing + // killCursors commands. + responses.clear(); + std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; + responses.emplace_back(_nss, CursorId(123), batch2); + std::vector<BSONObj> batch3 = {fromjson("{_id: 4}"), fromjson("{_id: 5}")}; + responses.emplace_back(_nss, CursorId(0), batch2); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); + + // Only one of the responses has a non-zero cursor id. The ARM should have issued a killCursors + // command against this id. + BSONObj expectedCmdObj = BSON("killCursors" + << "testcoll" + << "cursors" << BSON_ARRAY(CursorId(123))); + ASSERT_EQ(getFirstPendingRequest().cmdObj, expectedCmdObj); + + // Ensure that we properly signal both those waiting for the kill, and those waiting for more + // results to be ready. + executor->waitForEvent(readyEvent); + executor->waitForEvent(killedEvent); +} + +TEST_F(AsyncResultsMergerTest, KillOutstandingGetMore) { + BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}"); + makeCursorFromFindCmd(findCmd, {_remotes[0]}); + + ASSERT_FALSE(arm->ready()); + auto readyEvent = unittest::assertGet(arm->nextEvent()); + ASSERT_FALSE(arm->ready()); + + std::vector<CursorResponse> responses; + std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; + responses.emplace_back(_nss, CursorId(123), batch1); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); + + // First batch received. + ASSERT_TRUE(arm->ready()); + ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); + ASSERT_TRUE(arm->ready()); + ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady())); + + // This will schedule a getMore on cursor id 123. + ASSERT_FALSE(arm->ready()); + readyEvent = unittest::assertGet(arm->nextEvent()); + ASSERT_FALSE(arm->ready()); + + auto killedEvent = arm->kill(); + + // The kill can't complete until the getMore response is received. + responses.clear(); + std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; + responses.emplace_back(_nss, CursorId(123), batch2); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse); + + // While processing the getMore response, a killCursors against id 123 should have been + // scheduled. + BSONObj expectedCmdObj = BSON("killCursors" + << "testcoll" + << "cursors" << BSON_ARRAY(CursorId(123))); + ASSERT_EQ(getFirstPendingRequest().cmdObj, expectedCmdObj); // Ensure that we properly signal both those waiting for the kill, and those waiting for more // results to be ready. diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp index 69dbefaccd2..b861e999385 100644 --- a/src/mongo/s/query/router_stage_merge.cpp +++ b/src/mongo/s/query/router_stage_merge.cpp @@ -41,9 +41,6 @@ RouterStageMerge::RouterStageMerge(executor::TaskExecutor* executor, : _executor(executor), _arm(executor, std::move(params)) {} StatusWith<boost::optional<BSONObj>> RouterStageMerge::next() { - // On error, kill the underlying async results merger. - auto killer = MakeGuard(&RouterStageMerge::kill, this); - while (!_arm.ready()) { auto nextEventStatus = _arm.nextEvent(); if (!nextEventStatus.isOK()) { @@ -55,11 +52,7 @@ StatusWith<boost::optional<BSONObj>> RouterStageMerge::next() { _executor->waitForEvent(event); } - auto statusWithNext = _arm.nextReady(); - if (statusWithNext.isOK()) { - killer.Dismiss(); - } - return statusWithNext; + return _arm.nextReady(); } void RouterStageMerge::kill() { |