summaryrefslogtreecommitdiff
path: root/src/mongo/s/query
diff options
context:
space:
mode:
authorDavid Storch <david.storch@10gen.com>2015-09-22 16:40:14 -0400
committerDavid Storch <david.storch@10gen.com>2015-09-24 09:19:32 -0400
commit1a8711db875fc1b0e855c838dc0241c381a19dbc (patch)
tree73d661c88294fb3977317c9032436e24daaac4dd /src/mongo/s/query
parent6d62d7f7bc0841ab48ae6b3f6fc69fa11682e2e9 (diff)
downloadmongo-1a8711db875fc1b0e855c838dc0241c381a19dbc.tar.gz
SERVER-20537 fix leak of remote cursor in AsyncResultsMerger
If ARM::kill() was called before the find command response was received, the callback for the outstanding find command would be cancelled. As a result, the cursor id in need of a kill was never received, and the cursor was left alive.
Diffstat (limited to 'src/mongo/s/query')
-rw-r--r--src/mongo/s/query/async_results_merger.cpp56
-rw-r--r--src/mongo/s/query/async_results_merger.h10
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp70
-rw-r--r--src/mongo/s/query/router_stage_merge.cpp9
4 files changed, 106 insertions, 39 deletions
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 1116eec129c..ef039eadb2f 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -298,6 +298,27 @@ StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent()
return eventToReturn;
}
+StatusWith<CursorResponse> AsyncResultsMerger::parseCursorResponse(const BSONObj& responseObj,
+ const RemoteCursorData& remote) {
+ auto getMoreParseStatus = CursorResponse::parseFromBSON(responseObj);
+ if (!getMoreParseStatus.isOK()) {
+ return getMoreParseStatus.getStatus();
+ }
+
+ auto cursorResponse = getMoreParseStatus.getValue();
+
+ // If we have a cursor established, and we get a non-zero cursor id that is not equal to the
+ // established cursor id, we will fail the operation.
+ if (remote.cursorId && cursorResponse.cursorId != 0 &&
+ *remote.cursorId != cursorResponse.cursorId) {
+ return Status(ErrorCodes::BadValue,
+ str::stream() << "Expected cursorid " << *remote.cursorId << " but received "
+ << cursorResponse.cursorId);
+ }
+
+ return cursorResponse;
+}
+
void AsyncResultsMerger::handleBatchResponse(
const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, size_t remoteIndex) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
@@ -315,6 +336,15 @@ void AsyncResultsMerger::handleBatchResponse(
// Make sure to wake up anyone waiting on '_currentEvent' if we're shutting down.
signalCurrentEventIfReady_inlock();
+ // Make a best effort to parse the response and retrieve the cursor id. We need the cursor
+ // id in order to issue a killCursors command against it.
+ if (cbData.response.isOK()) {
+ auto cursorResponse = parseCursorResponse(cbData.response.getValue().data, remote);
+ if (cursorResponse.isOK()) {
+ remote.cursorId = cursorResponse.getValue().cursorId;
+ }
+ }
+
// If we're killed and we're not waiting on any more batches to come back, then we are ready
// to kill the cursors on the remote hosts and clean up this cursor. Schedule the
// killCursors command and signal that this cursor is safe now safe to destroy. We have to
@@ -356,24 +386,13 @@ void AsyncResultsMerger::handleBatchResponse(
return;
}
- auto getMoreParseStatus = CursorResponse::parseFromBSON(cbData.response.getValue().data);
- if (!getMoreParseStatus.isOK()) {
- remote.status = getMoreParseStatus.getStatus();
- return;
- }
-
- auto cursorResponse = getMoreParseStatus.getValue();
-
- // If we have a cursor established, and we get a non-zero cursorid that is not equal to the
- // established cursorid, we will fail the operation.
- if (remote.cursorId && cursorResponse.cursorId != 0 &&
- *remote.cursorId != cursorResponse.cursorId) {
- remote.status = Status(ErrorCodes::BadValue,
- str::stream() << "Expected cursorid " << *remote.cursorId
- << " but received " << cursorResponse.cursorId);
+ auto cursorResponseStatus = parseCursorResponse(cbData.response.getValue().data, remote);
+ if (!cursorResponseStatus.isOK()) {
+ remote.status = cursorResponseStatus.getStatus();
return;
}
+ auto cursorResponse = cursorResponseStatus.getValue();
remote.cursorId = cursorResponse.cursorId;
remote.cmdObj = boost::none;
@@ -476,13 +495,6 @@ executor::TaskExecutor::EventHandle AsyncResultsMerger::kill() {
_lifecycleState = kKillStarted;
- // Cancel callbacks.
- for (const auto& remote : _remotes) {
- if (remote.cbHandle.isValid()) {
- _executor->cancel(remote.cbHandle);
- }
- }
-
// Make '_killCursorsScheduledEvent', which we will signal as soon as we have scheduled a
// killCursors command to run on all the remote shards.
auto statusWithEvent = _executor->makeEvent();
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index a6d75df1db0..6b00f94f8aa 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -43,6 +43,8 @@
namespace mongo {
+struct CursorResponse;
+
/**
* AsyncResultsMerger is used to generate results from cursor-generating commands on one or more
* remote hosts. A cursor-generating command (e.g. the find command) is one that establishes a
@@ -211,6 +213,14 @@ private:
const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData);
/**
+ * Parses the find or getMore command response object to a CursorResponse.
+ *
+ * Returns a non-OK response if the response fails to parse or if there is a cursor id mismatch.
+ */
+ static StatusWith<CursorResponse> parseCursorResponse(const BSONObj& responseObj,
+ const RemoteCursorData& remote);
+
+ /**
* Helper to schedule a command asking the remote node for another batch of results.
*
* The 'remoteIndex' gives the position of the remote node from which we are retrieving the
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index 0924d2e74ec..7daaa34d03b 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -167,13 +167,6 @@ protected:
net->exitNetwork();
}
- void runReadyNetworkOperations() {
- executor::NetworkInterfaceMock* net = getNet();
- net->enterNetwork();
- net->runReadyNetworkOperations();
- net->exitNetwork();
- }
-
const NamespaceString _nss;
const std::vector<HostAndPort> _remotes;
@@ -822,8 +815,67 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) {
// Kill event will only be signalled once the pending batches are received.
auto killedEvent = arm->kill();
- // Ensures that callbacks run with a cancelled status.
- runReadyNetworkOperations();
+ // After the kill, the ARM waits for outstanding batches to come back. This ensures that we
+ // receive cursor ids for any established remote cursors, and can clean them up by issuing
+ // killCursors commands.
+ responses.clear();
+ std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
+ responses.emplace_back(_nss, CursorId(123), batch2);
+ std::vector<BSONObj> batch3 = {fromjson("{_id: 4}"), fromjson("{_id: 5}")};
+ responses.emplace_back(_nss, CursorId(0), batch2);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse);
+
+ // Only one of the responses has a non-zero cursor id. The ARM should have issued a killCursors
+ // command against this id.
+ BSONObj expectedCmdObj = BSON("killCursors"
+ << "testcoll"
+ << "cursors" << BSON_ARRAY(CursorId(123)));
+ ASSERT_EQ(getFirstPendingRequest().cmdObj, expectedCmdObj);
+
+ // Ensure that we properly signal both those waiting for the kill, and those waiting for more
+ // results to be ready.
+ executor->waitForEvent(readyEvent);
+ executor->waitForEvent(killedEvent);
+}
+
+TEST_F(AsyncResultsMergerTest, KillOutstandingGetMore) {
+ BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}");
+ makeCursorFromFindCmd(findCmd, {_remotes[0]});
+
+ ASSERT_FALSE(arm->ready());
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
+ ASSERT_FALSE(arm->ready());
+
+ std::vector<CursorResponse> responses;
+ std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
+ responses.emplace_back(_nss, CursorId(123), batch1);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse);
+
+ // First batch received.
+ ASSERT_TRUE(arm->ready());
+ ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_TRUE(arm->ready());
+ ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()));
+
+ // This will schedule a getMore on cursor id 123.
+ ASSERT_FALSE(arm->ready());
+ readyEvent = unittest::assertGet(arm->nextEvent());
+ ASSERT_FALSE(arm->ready());
+
+ auto killedEvent = arm->kill();
+
+ // The kill can't complete until the getMore response is received.
+ responses.clear();
+ std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
+ responses.emplace_back(_nss, CursorId(123), batch2);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse);
+
+ // While processing the getMore response, a killCursors against id 123 should have been
+ // scheduled.
+ BSONObj expectedCmdObj = BSON("killCursors"
+ << "testcoll"
+ << "cursors" << BSON_ARRAY(CursorId(123)));
+ ASSERT_EQ(getFirstPendingRequest().cmdObj, expectedCmdObj);
// Ensure that we properly signal both those waiting for the kill, and those waiting for more
// results to be ready.
diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp
index 69dbefaccd2..b861e999385 100644
--- a/src/mongo/s/query/router_stage_merge.cpp
+++ b/src/mongo/s/query/router_stage_merge.cpp
@@ -41,9 +41,6 @@ RouterStageMerge::RouterStageMerge(executor::TaskExecutor* executor,
: _executor(executor), _arm(executor, std::move(params)) {}
StatusWith<boost::optional<BSONObj>> RouterStageMerge::next() {
- // On error, kill the underlying async results merger.
- auto killer = MakeGuard(&RouterStageMerge::kill, this);
-
while (!_arm.ready()) {
auto nextEventStatus = _arm.nextEvent();
if (!nextEventStatus.isOK()) {
@@ -55,11 +52,7 @@ StatusWith<boost::optional<BSONObj>> RouterStageMerge::next() {
_executor->waitForEvent(event);
}
- auto statusWithNext = _arm.nextReady();
- if (statusWithNext.isOK()) {
- killer.Dismiss();
- }
- return statusWithNext;
+ return _arm.nextReady();
}
void RouterStageMerge::kill() {