diff options
author | Cheahuychou Mao <cheahuychou.mao@mongodb.com> | 2020-02-24 13:11:16 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-03-02 19:46:59 +0000 |
commit | b8583f28abf62cf624cd039c6c0aecc8dce653e6 (patch) | |
tree | bba98e8733b3a79527e6aa2dbc9a978bb03f5b64 | |
parent | f44bfe5d2fa808b299c5bc83979138b5eb6df6be (diff) | |
download | mongo-b8583f28abf62cf624cd039c6c0aecc8dce653e6.tar.gz |
SERVER-45726 Allow empty firstBatch with TaskExecutorCursor
(cherry picked from commit 727e3a51192152607de7ab4ac3cc8d909b1f5df1)
-rw-r--r-- | src/mongo/executor/task_executor_cursor.cpp | 19 | ||||
-rw-r--r-- | src/mongo/executor/task_executor_cursor.h | 15 | ||||
-rw-r--r-- | src/mongo/executor/task_executor_cursor_test.cpp | 164 |
3 files changed, 149 insertions, 49 deletions
diff --git a/src/mongo/executor/task_executor_cursor.cpp b/src/mongo/executor/task_executor_cursor.cpp index 316561d2987..368a76f1abf 100644 --- a/src/mongo/executor/task_executor_cursor.cpp +++ b/src/mongo/executor/task_executor_cursor.cpp @@ -61,7 +61,7 @@ TaskExecutorCursor::~TaskExecutorCursor() { _executor->cancel(*_cbHandle); } - if (_cursorId > 0) { + if (_cursorId >= kMinLegalCursorId) { // We deliberately ignore failures to kill the cursor. This "best effort" is acceptable // because some timeout mechanism on the remote host can be expected to reap it later. // @@ -78,7 +78,7 @@ TaskExecutorCursor::~TaskExecutorCursor() { } boost::optional<BSONObj> TaskExecutorCursor::getNext(OperationContext* opCtx) { - if (_batchIter == _batch.end()) { + while (_batchIter == _batch.end() && _cursorId != kClosedCursorId) { _getNextBatch(opCtx); } @@ -130,9 +130,8 @@ void TaskExecutorCursor::_runRemoteCommand(const RemoteCommandRequest& rcr) { } void TaskExecutorCursor::_getNextBatch(OperationContext* opCtx) { - if (_cursorId == 0) { - return; - } + invariant(_cbHandle, "_getNextBatch() requires an async request to have already been sent."); + invariant(_cursorId != kClosedCursorId); auto clock = opCtx->getServiceContext()->getPreciseClockSource(); auto dateStart = clock->now(); @@ -143,10 +142,10 @@ void TaskExecutorCursor::_getNextBatch(OperationContext* opCtx) { _millisecondsWaiting += std::max(Milliseconds(0), dateEnd - dateStart); uassertStatusOK(out); - // If we had a cursor id, set it to 0 so that we don't attempt to kill the cursor if there was - // an error - if (_cursorId > 0) { - _cursorId = 0; + // If we had a cursor id, set it to kClosedCursorId so that we don't attempt to kill the cursor + // if there was an error. + if (_cursorId >= kMinLegalCursorId) { + _cursorId = kClosedCursorId; } // if we've received a response from our last request (initial or getmore), our remote operation @@ -156,7 +155,7 @@ void TaskExecutorCursor::_getNextBatch(OperationContext* opCtx) { auto cr = uassertStatusOK(CursorResponse::parseFromBSON(out.getValue())); // If this was our first batch - if (_cursorId == -1) { + if (_cursorId == kUnitializedCursorId) { _ns = cr.getNSS(); _rcr.dbname = _ns.db().toString(); } diff --git a/src/mongo/executor/task_executor_cursor.h b/src/mongo/executor/task_executor_cursor.h index 2e650fcee23..020d4986039 100644 --- a/src/mongo/executor/task_executor_cursor.h +++ b/src/mongo/executor/task_executor_cursor.h @@ -57,6 +57,14 @@ namespace executor { */ class TaskExecutorCursor { public: + // Cursor id has 1 of 3 states. + // <0 - We haven't yet received a response for our initial request + // 0 - Cursor is done (errored or consumed) + // >=1 - Cursor is live on the remote + constexpr static CursorId kUnitializedCursorId = -1; + constexpr static CursorId kClosedCursorId = 0; + constexpr static CursorId kMinLegalCursorId = 1; + struct Options { boost::optional<int64_t> batchSize; }; @@ -128,12 +136,7 @@ private: // Stash the callbackhandle for the current outstanding operation boost::optional<TaskExecutor::CallbackHandle> _cbHandle; - // cursor id has 1 of 3 states. - // - // <1 - We haven't yet received a response for our initial request - // 0 - Cursor is done (errored or consumed) - // >1 - Cursor is live on the remote - CursorId _cursorId = -1; + CursorId _cursorId = kUnitializedCursorId; // This is a sum of the time spent waiting on remote calls. Milliseconds _millisecondsWaiting = Milliseconds(0); diff --git a/src/mongo/executor/task_executor_cursor_test.cpp b/src/mongo/executor/task_executor_cursor_test.cpp index 57719c44a2c..c27eb038179 100644 --- a/src/mongo/executor/task_executor_cursor_test.cpp +++ b/src/mongo/executor/task_executor_cursor_test.cpp @@ -118,15 +118,16 @@ public: * Ensure we work for a single simple batch */ TEST_F(TaskExecutorCursorFixture, SingleBatchWorks) { - auto findCmd = BSON("find" - << "test" - << "batchSize" << 2); + const auto findCmd = BSON("find" + << "test" + << "batchSize" << 2); + const CursorId cursorId = 0; RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get()); TaskExecutorCursor tec(&getExecutor(), rcr); - ASSERT_BSONOBJ_EQ(findCmd, scheduleSuccessfulCursorResponse("firstBatch", 1, 2, 0)); + ASSERT_BSONOBJ_EQ(findCmd, scheduleSuccessfulCursorResponse("firstBatch", 1, 2, cursorId)); ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 1); @@ -141,12 +142,11 @@ TEST_F(TaskExecutorCursorFixture, SingleBatchWorks) { * Ensure we work if find fails (and that we receive the error code it failed with) */ TEST_F(TaskExecutorCursorFixture, FailureInFind) { - RemoteCommandRequest rcr(HostAndPort("localhost"), - "test", - BSON("find" - << "test" - << "batchSize" << 2), - opCtx.get()); + const auto findCmd = BSON("find" + << "test" + << "batchSize" << 2); + + RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get()); TaskExecutorCursor tec(&getExecutor(), rcr); @@ -165,17 +165,17 @@ TEST_F(TaskExecutorCursorFixture, FailureInFind) { * Ensure early termination of the cursor calls killCursor (if we know about the cursor id) */ TEST_F(TaskExecutorCursorFixture, EarlyReturnKillsCursor) { - RemoteCommandRequest rcr(HostAndPort("localhost"), - "test", - BSON("find" - << "test" - << "batchSize" << 2), - opCtx.get()); + const auto findCmd = BSON("find" + << "test" + << "batchSize" << 2); + const CursorId cursorId = 1; + + RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get()); { TaskExecutorCursor tec(&getExecutor(), rcr); - scheduleSuccessfulCursorResponse("firstBatch", 1, 2, 1); + scheduleSuccessfulCursorResponse("firstBatch", 1, 2, cursorId); ASSERT(tec.getNext(opCtx.get())); } @@ -190,12 +190,12 @@ TEST_F(TaskExecutorCursorFixture, EarlyReturnKillsCursor) { * Ensure multiple batches works correctly */ TEST_F(TaskExecutorCursorFixture, MultipleBatchesWorks) { - RemoteCommandRequest rcr(HostAndPort("localhost"), - "test", - BSON("find" - << "test" - << "batchSize" << 2), - opCtx.get()); + const auto findCmd = BSON("find" + << "test" + << "batchSize" << 2); + CursorId cursorId = 1; + + RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get()); TaskExecutorCursor tec(&getExecutor(), rcr, [] { TaskExecutorCursor::Options opts; @@ -203,7 +203,7 @@ TEST_F(TaskExecutorCursorFixture, MultipleBatchesWorks) { return opts; }()); - scheduleSuccessfulCursorResponse("firstBatch", 1, 2, 1); + scheduleSuccessfulCursorResponse("firstBatch", 1, 2, cursorId); ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 1); @@ -219,16 +219,17 @@ TEST_F(TaskExecutorCursorFixture, MultipleBatchesWorks) { ErrorCodes::ExceededTimeLimit); // We can pick up after that interruption though - ASSERT_BSONOBJ_EQ(BSON("getMore" << (long long)(1) << "collection" + ASSERT_BSONOBJ_EQ(BSON("getMore" << 1LL << "collection" << "test" << "batchSize" << 3), - scheduleSuccessfulCursorResponse("nextBatch", 3, 5, 1)); + scheduleSuccessfulCursorResponse("nextBatch", 3, 5, cursorId)); ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 3); ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 4); ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 5); - scheduleSuccessfulCursorResponse("nextBatch", 6, 6, 0); + cursorId = 0; + scheduleSuccessfulCursorResponse("nextBatch", 6, 6, cursorId); // We don't issue extra getmores after returning a 0 cursor id ASSERT_FALSE(hasReadyRequests()); @@ -239,15 +240,112 @@ TEST_F(TaskExecutorCursorFixture, MultipleBatchesWorks) { } /** + * Ensure we allow empty firstBatch. + */ +TEST_F(TaskExecutorCursorFixture, EmptyFirstBatch) { + const auto findCmd = BSON("find" + << "test" + << "batchSize" << 2); + const auto getMoreCmd = BSON("getMore" << 1LL << "collection" + << "test" + << "batchSize" << 3); + const CursorId cursorId = 1; + + RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get()); + + TaskExecutorCursor tec(&getExecutor(), rcr, [] { + TaskExecutorCursor::Options opts; + opts.batchSize = 3; + return opts; + }()); + + // Schedule a cursor response with an empty "firstBatch". Use end < start so we don't + // append any doc to "firstBatch". + ASSERT_BSONOBJ_EQ(findCmd, scheduleSuccessfulCursorResponse("firstBatch", 1, 0, cursorId)); + + stdx::thread th([&] { + // Wait for the getMore run by the getNext() below to be ready, and schedule a + // cursor response with a non-empty "nextBatch". + while (!hasReadyRequests()) { + sleepmillis(10); + } + + ASSERT_BSONOBJ_EQ(getMoreCmd, + scheduleSuccessfulCursorResponse("nextBatch", 1, 1, cursorId)); + }); + + // Verify that the first doc is the doc from the second batch. + ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 1); + + th.join(); +} + +/** + * Ensure we allow any empty non-initial batch. + */ +TEST_F(TaskExecutorCursorFixture, EmptyNonInitialBatch) { + const auto findCmd = BSON("find" + << "test" + << "batchSize" << 2); + const auto getMoreCmd = BSON("getMore" << 1LL << "collection" + << "test" + << "batchSize" << 3); + const CursorId cursorId = 1; + + RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get()); + + TaskExecutorCursor tec(&getExecutor(), rcr, [] { + TaskExecutorCursor::Options opts; + opts.batchSize = 3; + return opts; + }()); + + // Schedule a cursor response with a non-empty "firstBatch". + ASSERT_BSONOBJ_EQ(findCmd, scheduleSuccessfulCursorResponse("firstBatch", 1, 1, cursorId)); + + ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 1); + + // Schedule two consecutive cursor responses with empty "nextBatch". Use end < start so + // we don't append any doc to "nextBatch". + ASSERT_BSONOBJ_EQ(getMoreCmd, scheduleSuccessfulCursorResponse("nextBatch", 1, 0, cursorId)); + + stdx::thread th([&] { + // Wait for the first getMore run by the getNext() below to be ready, and schedule a + // cursor response with a non-empty "nextBatch". + while (!hasReadyRequests()) { + sleepmillis(10); + } + + ASSERT_BSONOBJ_EQ(getMoreCmd, + scheduleSuccessfulCursorResponse("nextBatch", 1, 0, cursorId)); + + // Wait for the second getMore run by the getNext() below to be ready, and schedule a + // cursor response with a non-empty "nextBatch". + while (!hasReadyRequests()) { + sleepmillis(10); + } + + ASSERT_BSONOBJ_EQ(getMoreCmd, + scheduleSuccessfulCursorResponse("nextBatch", 2, 2, cursorId)); + }); + + // Verify that the next doc is the doc from the fourth batch. + ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 2); + + th.join(); +} + +/** * Ensure lsid is passed in all stages of querying */ TEST_F(TaskExecutorCursorFixture, LsidIsPassed) { auto lsid = makeLogicalSessionIdForTest(); opCtx->setLogicalSessionId(lsid); - auto findCmd = BSON("find" - << "test" - << "batchSize" << 1); + const auto findCmd = BSON("find" + << "test" + << "batchSize" << 1); + const CursorId cursorId = 1; RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get()); @@ -262,15 +360,15 @@ TEST_F(TaskExecutorCursorFixture, LsidIsPassed) { ASSERT_BSONOBJ_EQ(BSON("find" << "test" << "batchSize" << 1 << "lsid" << lsid.toBSON()), - scheduleSuccessfulCursorResponse("firstBatch", 1, 1, 1)); + scheduleSuccessfulCursorResponse("firstBatch", 1, 1, cursorId)); ASSERT_EQUALS(tec->getNext(opCtx.get()).get()["x"].Int(), 1); // lsid in the getmore - ASSERT_BSONOBJ_EQ(BSON("getMore" << (long long)(1) << "collection" + ASSERT_BSONOBJ_EQ(BSON("getMore" << 1LL << "collection" << "test" << "batchSize" << 1 << "lsid" << lsid.toBSON()), - scheduleSuccessfulCursorResponse("nextBatch", 2, 2, 1)); + scheduleSuccessfulCursorResponse("nextBatch", 2, 2, cursorId)); tec.reset(); |