summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheahuychou Mao <cheahuychou.mao@mongodb.com>2020-02-24 13:11:16 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-03-02 19:46:59 +0000
commitb8583f28abf62cf624cd039c6c0aecc8dce653e6 (patch)
treebba98e8733b3a79527e6aa2dbc9a978bb03f5b64
parentf44bfe5d2fa808b299c5bc83979138b5eb6df6be (diff)
downloadmongo-b8583f28abf62cf624cd039c6c0aecc8dce653e6.tar.gz
SERVER-45726 Allow empty firstBatch with TaskExecutorCursor
(cherry picked from commit 727e3a51192152607de7ab4ac3cc8d909b1f5df1)
-rw-r--r--src/mongo/executor/task_executor_cursor.cpp19
-rw-r--r--src/mongo/executor/task_executor_cursor.h15
-rw-r--r--src/mongo/executor/task_executor_cursor_test.cpp164
3 files changed, 149 insertions, 49 deletions
diff --git a/src/mongo/executor/task_executor_cursor.cpp b/src/mongo/executor/task_executor_cursor.cpp
index 316561d2987..368a76f1abf 100644
--- a/src/mongo/executor/task_executor_cursor.cpp
+++ b/src/mongo/executor/task_executor_cursor.cpp
@@ -61,7 +61,7 @@ TaskExecutorCursor::~TaskExecutorCursor() {
_executor->cancel(*_cbHandle);
}
- if (_cursorId > 0) {
+ if (_cursorId >= kMinLegalCursorId) {
// We deliberately ignore failures to kill the cursor. This "best effort" is acceptable
// because some timeout mechanism on the remote host can be expected to reap it later.
//
@@ -78,7 +78,7 @@ TaskExecutorCursor::~TaskExecutorCursor() {
}
boost::optional<BSONObj> TaskExecutorCursor::getNext(OperationContext* opCtx) {
- if (_batchIter == _batch.end()) {
+ while (_batchIter == _batch.end() && _cursorId != kClosedCursorId) {
_getNextBatch(opCtx);
}
@@ -130,9 +130,8 @@ void TaskExecutorCursor::_runRemoteCommand(const RemoteCommandRequest& rcr) {
}
void TaskExecutorCursor::_getNextBatch(OperationContext* opCtx) {
- if (_cursorId == 0) {
- return;
- }
+ invariant(_cbHandle, "_getNextBatch() requires an async request to have already been sent.");
+ invariant(_cursorId != kClosedCursorId);
auto clock = opCtx->getServiceContext()->getPreciseClockSource();
auto dateStart = clock->now();
@@ -143,10 +142,10 @@ void TaskExecutorCursor::_getNextBatch(OperationContext* opCtx) {
_millisecondsWaiting += std::max(Milliseconds(0), dateEnd - dateStart);
uassertStatusOK(out);
- // If we had a cursor id, set it to 0 so that we don't attempt to kill the cursor if there was
- // an error
- if (_cursorId > 0) {
- _cursorId = 0;
+ // If we had a cursor id, set it to kClosedCursorId so that we don't attempt to kill the cursor
+ // if there was an error.
+ if (_cursorId >= kMinLegalCursorId) {
+ _cursorId = kClosedCursorId;
}
// if we've received a response from our last request (initial or getmore), our remote operation
@@ -156,7 +155,7 @@ void TaskExecutorCursor::_getNextBatch(OperationContext* opCtx) {
auto cr = uassertStatusOK(CursorResponse::parseFromBSON(out.getValue()));
// If this was our first batch
- if (_cursorId == -1) {
+ if (_cursorId == kUnitializedCursorId) {
_ns = cr.getNSS();
_rcr.dbname = _ns.db().toString();
}
diff --git a/src/mongo/executor/task_executor_cursor.h b/src/mongo/executor/task_executor_cursor.h
index 2e650fcee23..020d4986039 100644
--- a/src/mongo/executor/task_executor_cursor.h
+++ b/src/mongo/executor/task_executor_cursor.h
@@ -57,6 +57,14 @@ namespace executor {
*/
class TaskExecutorCursor {
public:
+ // Cursor id has 1 of 3 states.
+ // <0 - We haven't yet received a response for our initial request
+ // 0 - Cursor is done (errored or consumed)
+ // >=1 - Cursor is live on the remote
+ constexpr static CursorId kUnitializedCursorId = -1;
+ constexpr static CursorId kClosedCursorId = 0;
+ constexpr static CursorId kMinLegalCursorId = 1;
+
struct Options {
boost::optional<int64_t> batchSize;
};
@@ -128,12 +136,7 @@ private:
// Stash the callbackhandle for the current outstanding operation
boost::optional<TaskExecutor::CallbackHandle> _cbHandle;
- // cursor id has 1 of 3 states.
- //
- // <1 - We haven't yet received a response for our initial request
- // 0 - Cursor is done (errored or consumed)
- // >1 - Cursor is live on the remote
- CursorId _cursorId = -1;
+ CursorId _cursorId = kUnitializedCursorId;
// This is a sum of the time spent waiting on remote calls.
Milliseconds _millisecondsWaiting = Milliseconds(0);
diff --git a/src/mongo/executor/task_executor_cursor_test.cpp b/src/mongo/executor/task_executor_cursor_test.cpp
index 57719c44a2c..c27eb038179 100644
--- a/src/mongo/executor/task_executor_cursor_test.cpp
+++ b/src/mongo/executor/task_executor_cursor_test.cpp
@@ -118,15 +118,16 @@ public:
* Ensure we work for a single simple batch
*/
TEST_F(TaskExecutorCursorFixture, SingleBatchWorks) {
- auto findCmd = BSON("find"
- << "test"
- << "batchSize" << 2);
+ const auto findCmd = BSON("find"
+ << "test"
+ << "batchSize" << 2);
+ const CursorId cursorId = 0;
RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get());
TaskExecutorCursor tec(&getExecutor(), rcr);
- ASSERT_BSONOBJ_EQ(findCmd, scheduleSuccessfulCursorResponse("firstBatch", 1, 2, 0));
+ ASSERT_BSONOBJ_EQ(findCmd, scheduleSuccessfulCursorResponse("firstBatch", 1, 2, cursorId));
ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 1);
@@ -141,12 +142,11 @@ TEST_F(TaskExecutorCursorFixture, SingleBatchWorks) {
* Ensure we work if find fails (and that we receive the error code it failed with)
*/
TEST_F(TaskExecutorCursorFixture, FailureInFind) {
- RemoteCommandRequest rcr(HostAndPort("localhost"),
- "test",
- BSON("find"
- << "test"
- << "batchSize" << 2),
- opCtx.get());
+ const auto findCmd = BSON("find"
+ << "test"
+ << "batchSize" << 2);
+
+ RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get());
TaskExecutorCursor tec(&getExecutor(), rcr);
@@ -165,17 +165,17 @@ TEST_F(TaskExecutorCursorFixture, FailureInFind) {
* Ensure early termination of the cursor calls killCursor (if we know about the cursor id)
*/
TEST_F(TaskExecutorCursorFixture, EarlyReturnKillsCursor) {
- RemoteCommandRequest rcr(HostAndPort("localhost"),
- "test",
- BSON("find"
- << "test"
- << "batchSize" << 2),
- opCtx.get());
+ const auto findCmd = BSON("find"
+ << "test"
+ << "batchSize" << 2);
+ const CursorId cursorId = 1;
+
+ RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get());
{
TaskExecutorCursor tec(&getExecutor(), rcr);
- scheduleSuccessfulCursorResponse("firstBatch", 1, 2, 1);
+ scheduleSuccessfulCursorResponse("firstBatch", 1, 2, cursorId);
ASSERT(tec.getNext(opCtx.get()));
}
@@ -190,12 +190,12 @@ TEST_F(TaskExecutorCursorFixture, EarlyReturnKillsCursor) {
* Ensure multiple batches works correctly
*/
TEST_F(TaskExecutorCursorFixture, MultipleBatchesWorks) {
- RemoteCommandRequest rcr(HostAndPort("localhost"),
- "test",
- BSON("find"
- << "test"
- << "batchSize" << 2),
- opCtx.get());
+ const auto findCmd = BSON("find"
+ << "test"
+ << "batchSize" << 2);
+ CursorId cursorId = 1;
+
+ RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get());
TaskExecutorCursor tec(&getExecutor(), rcr, [] {
TaskExecutorCursor::Options opts;
@@ -203,7 +203,7 @@ TEST_F(TaskExecutorCursorFixture, MultipleBatchesWorks) {
return opts;
}());
- scheduleSuccessfulCursorResponse("firstBatch", 1, 2, 1);
+ scheduleSuccessfulCursorResponse("firstBatch", 1, 2, cursorId);
ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 1);
@@ -219,16 +219,17 @@ TEST_F(TaskExecutorCursorFixture, MultipleBatchesWorks) {
ErrorCodes::ExceededTimeLimit);
// We can pick up after that interruption though
- ASSERT_BSONOBJ_EQ(BSON("getMore" << (long long)(1) << "collection"
+ ASSERT_BSONOBJ_EQ(BSON("getMore" << 1LL << "collection"
<< "test"
<< "batchSize" << 3),
- scheduleSuccessfulCursorResponse("nextBatch", 3, 5, 1));
+ scheduleSuccessfulCursorResponse("nextBatch", 3, 5, cursorId));
ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 3);
ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 4);
ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 5);
- scheduleSuccessfulCursorResponse("nextBatch", 6, 6, 0);
+ cursorId = 0;
+ scheduleSuccessfulCursorResponse("nextBatch", 6, 6, cursorId);
// We don't issue extra getmores after returning a 0 cursor id
ASSERT_FALSE(hasReadyRequests());
@@ -239,15 +240,112 @@ TEST_F(TaskExecutorCursorFixture, MultipleBatchesWorks) {
}
/**
+ * Ensure we allow empty firstBatch.
+ */
+TEST_F(TaskExecutorCursorFixture, EmptyFirstBatch) {
+ const auto findCmd = BSON("find"
+ << "test"
+ << "batchSize" << 2);
+ const auto getMoreCmd = BSON("getMore" << 1LL << "collection"
+ << "test"
+ << "batchSize" << 3);
+ const CursorId cursorId = 1;
+
+ RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get());
+
+ TaskExecutorCursor tec(&getExecutor(), rcr, [] {
+ TaskExecutorCursor::Options opts;
+ opts.batchSize = 3;
+ return opts;
+ }());
+
+ // Schedule a cursor response with an empty "firstBatch". Use end < start so we don't
+ // append any doc to "firstBatch".
+ ASSERT_BSONOBJ_EQ(findCmd, scheduleSuccessfulCursorResponse("firstBatch", 1, 0, cursorId));
+
+ stdx::thread th([&] {
+ // Wait for the getMore run by the getNext() below to be ready, and schedule a
+ // cursor response with a non-empty "nextBatch".
+ while (!hasReadyRequests()) {
+ sleepmillis(10);
+ }
+
+ ASSERT_BSONOBJ_EQ(getMoreCmd,
+ scheduleSuccessfulCursorResponse("nextBatch", 1, 1, cursorId));
+ });
+
+ // Verify that the first doc is the doc from the second batch.
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 1);
+
+ th.join();
+}
+
+/**
+ * Ensure we allow any empty non-initial batch.
+ */
+TEST_F(TaskExecutorCursorFixture, EmptyNonInitialBatch) {
+ const auto findCmd = BSON("find"
+ << "test"
+ << "batchSize" << 2);
+ const auto getMoreCmd = BSON("getMore" << 1LL << "collection"
+ << "test"
+ << "batchSize" << 3);
+ const CursorId cursorId = 1;
+
+ RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get());
+
+ TaskExecutorCursor tec(&getExecutor(), rcr, [] {
+ TaskExecutorCursor::Options opts;
+ opts.batchSize = 3;
+ return opts;
+ }());
+
+ // Schedule a cursor response with a non-empty "firstBatch".
+ ASSERT_BSONOBJ_EQ(findCmd, scheduleSuccessfulCursorResponse("firstBatch", 1, 1, cursorId));
+
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 1);
+
+ // Schedule two consecutive cursor responses with empty "nextBatch". Use end < start so
+ // we don't append any doc to "nextBatch".
+ ASSERT_BSONOBJ_EQ(getMoreCmd, scheduleSuccessfulCursorResponse("nextBatch", 1, 0, cursorId));
+
+ stdx::thread th([&] {
+ // Wait for the first getMore run by the getNext() below to be ready, and schedule a
+ // cursor response with a non-empty "nextBatch".
+ while (!hasReadyRequests()) {
+ sleepmillis(10);
+ }
+
+ ASSERT_BSONOBJ_EQ(getMoreCmd,
+ scheduleSuccessfulCursorResponse("nextBatch", 1, 0, cursorId));
+
+ // Wait for the second getMore run by the getNext() below to be ready, and schedule a
+ // cursor response with a non-empty "nextBatch".
+ while (!hasReadyRequests()) {
+ sleepmillis(10);
+ }
+
+ ASSERT_BSONOBJ_EQ(getMoreCmd,
+ scheduleSuccessfulCursorResponse("nextBatch", 2, 2, cursorId));
+ });
+
+ // Verify that the next doc is the doc from the fourth batch.
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 2);
+
+ th.join();
+}
+
+/**
* Ensure lsid is passed in all stages of querying
*/
TEST_F(TaskExecutorCursorFixture, LsidIsPassed) {
auto lsid = makeLogicalSessionIdForTest();
opCtx->setLogicalSessionId(lsid);
- auto findCmd = BSON("find"
- << "test"
- << "batchSize" << 1);
+ const auto findCmd = BSON("find"
+ << "test"
+ << "batchSize" << 1);
+ const CursorId cursorId = 1;
RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get());
@@ -262,15 +360,15 @@ TEST_F(TaskExecutorCursorFixture, LsidIsPassed) {
ASSERT_BSONOBJ_EQ(BSON("find"
<< "test"
<< "batchSize" << 1 << "lsid" << lsid.toBSON()),
- scheduleSuccessfulCursorResponse("firstBatch", 1, 1, 1));
+ scheduleSuccessfulCursorResponse("firstBatch", 1, 1, cursorId));
ASSERT_EQUALS(tec->getNext(opCtx.get()).get()["x"].Int(), 1);
// lsid in the getmore
- ASSERT_BSONOBJ_EQ(BSON("getMore" << (long long)(1) << "collection"
+ ASSERT_BSONOBJ_EQ(BSON("getMore" << 1LL << "collection"
<< "test"
<< "batchSize" << 1 << "lsid" << lsid.toBSON()),
- scheduleSuccessfulCursorResponse("nextBatch", 2, 2, 1));
+ scheduleSuccessfulCursorResponse("nextBatch", 2, 2, cursorId));
tec.reset();