/**
* Copyright 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/platform/basic.h"
#include
#include "mongo/client/fetcher.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/repl/replication_executor.h"
#include "mongo/db/repl/replication_executor_test_fixture.h"
#include "mongo/executor/network_interface_mock.h"
#include "mongo/unittest/unittest.h"
namespace {
using namespace mongo;
using namespace mongo::repl;
using executor::NetworkInterfaceMock;
using executor::TaskExecutor;
const HostAndPort target("localhost", -1);
const BSONObj findCmdObj = BSON("find"
<< "coll");
class FetcherTest : public ReplicationExecutorTest {
public:
FetcherTest();
void clear();
void scheduleNetworkResponse(const BSONObj& obj);
void scheduleNetworkResponse(const BSONObj& obj, Milliseconds millis);
void scheduleNetworkResponse(ErrorCodes::Error code, const std::string& reason);
void scheduleNetworkResponseFor(const BSONObj& filter, const BSONObj& obj);
// Calls scheduleNetworkResponse + finishProcessingNetworkResponse
void processNetworkResponse(const BSONObj& obj);
// Calls scheduleNetworkResponse + finishProcessingNetworkResponse
void processNetworkResponse(ErrorCodes::Error code, const std::string& reason);
void finishProcessingNetworkResponse();
protected:
void setUp() override;
void tearDown() override;
Status status;
CursorId cursorId;
NamespaceString nss;
Fetcher::Documents documents;
Milliseconds elapsedMillis;
bool first;
Fetcher::NextAction nextAction;
std::unique_ptr fetcher;
// Called at end of _callback
Fetcher::CallbackFn callbackHook;
private:
void _callback(const StatusWith& result,
Fetcher::NextAction* nextAction,
BSONObjBuilder* getMoreBob);
};
FetcherTest::FetcherTest()
: status(getDetectableErrorStatus()), cursorId(-1), nextAction(Fetcher::NextAction::kInvalid) {}
void FetcherTest::setUp() {
ReplicationExecutorTest::setUp();
clear();
fetcher.reset(new Fetcher(&getExecutor(),
target,
"db",
findCmdObj,
stdx::bind(&FetcherTest::_callback,
this,
stdx::placeholders::_1,
stdx::placeholders::_2,
stdx::placeholders::_3)));
launchExecutorThread();
}
void FetcherTest::tearDown() {
ReplicationExecutorTest::tearDown();
// Executor may still invoke fetcher's callback before shutting down.
fetcher.reset();
}
void FetcherTest::clear() {
status = getDetectableErrorStatus();
cursorId = -1;
nss = NamespaceString();
documents.clear();
elapsedMillis = Milliseconds(0);
first = false;
nextAction = Fetcher::NextAction::kInvalid;
callbackHook = Fetcher::CallbackFn();
}
void FetcherTest::scheduleNetworkResponse(const BSONObj& obj) {
scheduleNetworkResponse(obj, Milliseconds(0));
}
void FetcherTest::scheduleNetworkResponse(const BSONObj& obj, Milliseconds millis) {
NetworkInterfaceMock* net = getNet();
ASSERT_TRUE(net->hasReadyRequests());
executor::RemoteCommandResponse response(obj, BSONObj(), millis);
TaskExecutor::ResponseStatus responseStatus(response);
net->scheduleResponse(net->getNextReadyRequest(), net->now(), responseStatus);
}
void FetcherTest::scheduleNetworkResponseFor(const BSONObj& filter, const BSONObj& obj) {
ASSERT_TRUE(filter[1].eoo()); // The filter should only have one field, to match the cmd name
NetworkInterfaceMock* net = getNet();
ASSERT_TRUE(net->hasReadyRequests());
Milliseconds millis(0);
executor::RemoteCommandResponse response(obj, BSONObj(), millis);
TaskExecutor::ResponseStatus responseStatus(response);
auto req = net->getNextReadyRequest();
ASSERT_EQ(req->getRequest().cmdObj[0], filter[0]);
net->scheduleResponse(req, net->now(), responseStatus);
}
void FetcherTest::scheduleNetworkResponse(ErrorCodes::Error code, const std::string& reason) {
NetworkInterfaceMock* net = getNet();
ASSERT_TRUE(net->hasReadyRequests());
TaskExecutor::ResponseStatus responseStatus(code, reason);
net->scheduleResponse(net->getNextReadyRequest(), net->now(), responseStatus);
}
void FetcherTest::processNetworkResponse(const BSONObj& obj) {
scheduleNetworkResponse(obj);
finishProcessingNetworkResponse();
}
void FetcherTest::processNetworkResponse(ErrorCodes::Error code, const std::string& reason) {
scheduleNetworkResponse(code, reason);
finishProcessingNetworkResponse();
}
void FetcherTest::finishProcessingNetworkResponse() {
clear();
ASSERT_TRUE(fetcher->isActive());
getNet()->runReadyNetworkOperations();
ASSERT_FALSE(getNet()->hasReadyRequests());
ASSERT_FALSE(fetcher->isActive());
}
void FetcherTest::_callback(const StatusWith& result,
Fetcher::NextAction* nextActionFromFetcher,
BSONObjBuilder* getMoreBob) {
status = result.getStatus();
if (result.isOK()) {
const Fetcher::QueryResponse& batchData = result.getValue();
cursorId = batchData.cursorId;
nss = batchData.nss;
documents = batchData.documents;
elapsedMillis = batchData.elapsedMillis;
first = batchData.first;
}
if (callbackHook) {
callbackHook(result, nextActionFromFetcher, getMoreBob);
}
if (nextActionFromFetcher) {
nextAction = *nextActionFromFetcher;
}
}
void unusedFetcherCallback(const StatusWith& fetchResult,
Fetcher::NextAction* nextAction,
BSONObjBuilder* getMoreBob) {
FAIL("should not reach here");
}
TEST_F(FetcherTest, InvalidConstruction) {
TaskExecutor& executor = getExecutor();
// Null executor.
ASSERT_THROWS(Fetcher(nullptr, target, "db", findCmdObj, unusedFetcherCallback), UserException);
// Empty database name.
ASSERT_THROWS(Fetcher(&executor, target, "", findCmdObj, unusedFetcherCallback), UserException);
// Empty command object.
ASSERT_THROWS(Fetcher(&executor, target, "db", BSONObj(), unusedFetcherCallback),
UserException);
// Callback function cannot be null.
ASSERT_THROWS(Fetcher(&executor, target, "db", findCmdObj, Fetcher::CallbackFn()),
UserException);
}
// Command object can refer to any command that returns a cursor. This
// includes listIndexes and listCollections.
TEST_F(FetcherTest, NonFindCommand) {
TaskExecutor& executor = getExecutor();
Fetcher(&executor,
target,
"db",
BSON("listIndexes"
<< "coll"),
unusedFetcherCallback);
Fetcher(&executor, target, "db", BSON("listCollections" << 1), unusedFetcherCallback);
Fetcher(&executor, target, "db", BSON("a" << 1), unusedFetcherCallback);
}
TEST_F(FetcherTest, GetDiagnosticString) {
Fetcher fetcher(&getExecutor(), target, "db", findCmdObj, unusedFetcherCallback);
ASSERT_FALSE(fetcher.getDiagnosticString().empty());
}
TEST_F(FetcherTest, IsActiveAfterSchedule) {
ASSERT_FALSE(fetcher->isActive());
ASSERT_OK(fetcher->schedule());
ASSERT_TRUE(fetcher->isActive());
}
TEST_F(FetcherTest, ScheduleWhenActive) {
ASSERT_OK(fetcher->schedule());
ASSERT_TRUE(fetcher->isActive());
ASSERT_NOT_OK(fetcher->schedule());
}
TEST_F(FetcherTest, CancelWithoutSchedule) {
ASSERT_FALSE(fetcher->isActive());
fetcher->cancel();
}
TEST_F(FetcherTest, WaitWithoutSchedule) {
ASSERT_FALSE(fetcher->isActive());
fetcher->wait();
}
TEST_F(FetcherTest, ShutdownBeforeSchedule) {
getExecutor().shutdown();
ASSERT_NOT_OK(fetcher->schedule());
ASSERT_FALSE(fetcher->isActive());
}
TEST_F(FetcherTest, ScheduleAndCancel) {
ASSERT_OK(fetcher->schedule());
scheduleNetworkResponse(BSON("ok" << 1));
fetcher->cancel();
finishProcessingNetworkResponse();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status.code());
}
TEST_F(FetcherTest, ScheduleButShutdown) {
ASSERT_OK(fetcher->schedule());
scheduleNetworkResponse(BSON("ok" << 1));
getExecutor().shutdown();
// Network interface should not deliver mock response to callback.
finishProcessingNetworkResponse();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status.code());
}
TEST_F(FetcherTest, FindCommandFailed1) {
ASSERT_OK(fetcher->schedule());
processNetworkResponse(ErrorCodes::BadValue, "bad hint");
ASSERT_EQUALS(ErrorCodes::BadValue, status.code());
ASSERT_EQUALS("bad hint", status.reason());
}
TEST_F(FetcherTest, FindCommandFailed2) {
ASSERT_OK(fetcher->schedule());
processNetworkResponse(BSON("ok" << 0 << "errmsg"
<< "bad hint"
<< "code" << int(ErrorCodes::BadValue)));
ASSERT_EQUALS(ErrorCodes::BadValue, status.code());
ASSERT_EQUALS("bad hint", status.reason());
}
TEST_F(FetcherTest, CursorFieldMissing) {
ASSERT_OK(fetcher->schedule());
processNetworkResponse(BSON("ok" << 1));
ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code());
ASSERT_STRING_CONTAINS(status.reason(), "must contain 'cursor' field");
}
TEST_F(FetcherTest, CursorNotAnObject) {
ASSERT_OK(fetcher->schedule());
processNetworkResponse(BSON("cursor" << 123 << "ok" << 1));
ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code());
ASSERT_STRING_CONTAINS(status.reason(), "'cursor' field must be an object");
}
TEST_F(FetcherTest, CursorIdFieldMissing) {
ASSERT_OK(fetcher->schedule());
processNetworkResponse(BSON("cursor" << BSON("ns"
<< "db.coll"
<< "firstBatch" << BSONArray()) << "ok" << 1));
ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code());
ASSERT_STRING_CONTAINS(status.reason(), "must contain 'cursor.id' field");
}
TEST_F(FetcherTest, CursorIdNotLongNumber) {
ASSERT_OK(fetcher->schedule());
processNetworkResponse(
BSON("cursor" << BSON("id" << 123.1 << "ns"
<< "db.coll"
<< "firstBatch" << BSONArray()) << "ok" << 1));
ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code());
ASSERT_STRING_CONTAINS(status.reason(), "'cursor.id' field must be");
ASSERT_EQ((int)Fetcher::NextAction::kInvalid, (int)nextAction);
}
TEST_F(FetcherTest, NamespaceFieldMissing) {
ASSERT_OK(fetcher->schedule());
processNetworkResponse(
BSON("cursor" << BSON("id" << 123LL << "firstBatch" << BSONArray()) << "ok" << 1));
ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code());
ASSERT_STRING_CONTAINS(status.reason(), "must contain 'cursor.ns' field");
}
TEST_F(FetcherTest, NamespaceNotAString) {
ASSERT_OK(fetcher->schedule());
processNetworkResponse(BSON("cursor" << BSON("id" << 123LL << "ns" << 123 << "firstBatch"
<< BSONArray()) << "ok" << 1));
ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code());
ASSERT_STRING_CONTAINS(status.reason(), "'cursor.ns' field must be a string");
}
TEST_F(FetcherTest, NamespaceEmpty) {
ASSERT_OK(fetcher->schedule());
processNetworkResponse(
BSON("cursor" << BSON("id" << 123LL << "ns"
<< ""
<< "firstBatch" << BSONArray()) << "ok" << 1));
ASSERT_EQUALS(ErrorCodes::BadValue, status.code());
ASSERT_STRING_CONTAINS(status.reason(), "'cursor.ns' contains an invalid namespace");
}
TEST_F(FetcherTest, NamespaceMissingCollectionName) {
ASSERT_OK(fetcher->schedule());
processNetworkResponse(
BSON("cursor" << BSON("id" << 123LL << "ns"
<< "db."
<< "firstBatch" << BSONArray()) << "ok" << 1));
ASSERT_EQUALS(ErrorCodes::BadValue, status.code());
ASSERT_STRING_CONTAINS(status.reason(), "'cursor.ns' contains an invalid namespace");
}
TEST_F(FetcherTest, FirstBatchFieldMissing) {
ASSERT_OK(fetcher->schedule());
processNetworkResponse(BSON("cursor" << BSON("id" << 0LL << "ns"
<< "db.coll") << "ok" << 1));
ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code());
ASSERT_STRING_CONTAINS(status.reason(), "must contain 'cursor.firstBatch' field");
}
TEST_F(FetcherTest, FirstBatchNotAnArray) {
ASSERT_OK(fetcher->schedule());
processNetworkResponse(BSON("cursor" << BSON("id" << 0LL << "ns"
<< "db.coll"
<< "firstBatch" << 123) << "ok" << 1));
ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code());
ASSERT_STRING_CONTAINS(status.reason(), "'cursor.firstBatch' field must be an array");
}
TEST_F(FetcherTest, FirstBatchArrayContainsNonObject) {
ASSERT_OK(fetcher->schedule());
processNetworkResponse(
BSON("cursor" << BSON("id" << 0LL << "ns"
<< "db.coll"
<< "firstBatch" << BSON_ARRAY(8)) << "ok" << 1));
ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code());
ASSERT_STRING_CONTAINS(status.reason(), "found non-object");
ASSERT_STRING_CONTAINS(status.reason(), "in 'cursor.firstBatch' field");
}
TEST_F(FetcherTest, FirstBatchEmptyArray) {
ASSERT_OK(fetcher->schedule());
processNetworkResponse(
BSON("cursor" << BSON("id" << 0LL << "ns"
<< "db.coll"
<< "firstBatch" << BSONArray()) << "ok" << 1));
ASSERT_OK(status);
ASSERT_EQUALS(0, cursorId);
ASSERT_EQUALS("db.coll", nss.ns());
ASSERT_TRUE(documents.empty());
}
TEST_F(FetcherTest, FetchOneDocument) {
ASSERT_OK(fetcher->schedule());
const BSONObj doc = BSON("_id" << 1);
processNetworkResponse(
BSON("cursor" << BSON("id" << 0LL << "ns"
<< "db.coll"
<< "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1));
ASSERT_OK(status);
ASSERT_EQUALS(0, cursorId);
ASSERT_EQUALS("db.coll", nss.ns());
ASSERT_EQUALS(1U, documents.size());
ASSERT_EQUALS(doc, documents.front());
}
TEST_F(FetcherTest, SetNextActionToContinueWhenNextBatchIsNotAvailable) {
ASSERT_OK(fetcher->schedule());
const BSONObj doc = BSON("_id" << 1);
callbackHook = [](const StatusWith& fetchResult,
Fetcher::NextAction* nextAction,
BSONObjBuilder* getMoreBob) {
ASSERT_OK(fetchResult.getStatus());
Fetcher::QueryResponse batchData{fetchResult.getValue()};
ASSERT(nextAction);
*nextAction = Fetcher::NextAction::kGetMore;
ASSERT_FALSE(getMoreBob);
};
processNetworkResponse(
BSON("cursor" << BSON("id" << 0LL << "ns"
<< "db.coll"
<< "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1));
ASSERT_OK(status);
ASSERT_EQUALS(0, cursorId);
ASSERT_EQUALS("db.coll", nss.ns());
ASSERT_EQUALS(1U, documents.size());
ASSERT_EQUALS(doc, documents.front());
}
void appendGetMoreRequest(const StatusWith& fetchResult,
Fetcher::NextAction* nextAction,
BSONObjBuilder* getMoreBob) {
if (!getMoreBob) {
return;
}
const auto& batchData = fetchResult.getValue();
getMoreBob->append("getMore", batchData.cursorId);
getMoreBob->append("collection", batchData.nss.coll());
}
TEST_F(FetcherTest, FetchMultipleBatches) {
callbackHook = appendGetMoreRequest;
ASSERT_OK(fetcher->schedule());
const BSONObj doc = BSON("_id" << 1);
scheduleNetworkResponse(
BSON("cursor" << BSON("id" << 1LL << "ns"
<< "db.coll"
<< "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1),
Milliseconds(100));
getNet()->runReadyNetworkOperations();
ASSERT_OK(status);
ASSERT_EQUALS(1LL, cursorId);
ASSERT_EQUALS("db.coll", nss.ns());
ASSERT_EQUALS(1U, documents.size());
ASSERT_EQUALS(doc, documents.front());
ASSERT_EQUALS(elapsedMillis, Milliseconds(100));
ASSERT_TRUE(first);
ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
ASSERT_TRUE(fetcher->isActive());
ASSERT_TRUE(getNet()->hasReadyRequests());
const BSONObj doc2 = BSON("_id" << 2);
scheduleNetworkResponse(
BSON("cursor" << BSON("id" << 1LL << "ns"
<< "db.coll"
<< "nextBatch" << BSON_ARRAY(doc2)) << "ok" << 1),
Milliseconds(200));
getNet()->runReadyNetworkOperations();
ASSERT_OK(status);
ASSERT_EQUALS(1LL, cursorId);
ASSERT_EQUALS("db.coll", nss.ns());
ASSERT_EQUALS(1U, documents.size());
ASSERT_EQUALS(doc2, documents.front());
ASSERT_EQUALS(elapsedMillis, Milliseconds(200));
ASSERT_FALSE(first);
ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
ASSERT_TRUE(fetcher->isActive());
ASSERT_TRUE(getNet()->hasReadyRequests());
const BSONObj doc3 = BSON("_id" << 3);
scheduleNetworkResponse(
BSON("cursor" << BSON("id" << 0LL << "ns"
<< "db.coll"
<< "nextBatch" << BSON_ARRAY(doc3)) << "ok" << 1),
Milliseconds(300));
getNet()->runReadyNetworkOperations();
ASSERT_OK(status);
ASSERT_EQUALS(0, cursorId);
ASSERT_EQUALS("db.coll", nss.ns());
ASSERT_EQUALS(1U, documents.size());
ASSERT_EQUALS(doc3, documents.front());
ASSERT_EQUALS(elapsedMillis, Milliseconds(300));
ASSERT_FALSE(first);
ASSERT_TRUE(Fetcher::NextAction::kNoAction == nextAction);
ASSERT_FALSE(fetcher->isActive());
ASSERT_FALSE(getNet()->hasReadyRequests());
}
TEST_F(FetcherTest, ScheduleGetMoreAndCancel) {
callbackHook = appendGetMoreRequest;
ASSERT_OK(fetcher->schedule());
const BSONObj doc = BSON("_id" << 1);
scheduleNetworkResponse(
BSON("cursor" << BSON("id" << 1LL << "ns"
<< "db.coll"
<< "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1));
getNet()->runReadyNetworkOperations();
ASSERT_OK(status);
ASSERT_EQUALS(1LL, cursorId);
ASSERT_EQUALS("db.coll", nss.ns());
ASSERT_EQUALS(1U, documents.size());
ASSERT_EQUALS(doc, documents.front());
ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
ASSERT_TRUE(fetcher->isActive());
ASSERT_TRUE(getNet()->hasReadyRequests());
const BSONObj doc2 = BSON("_id" << 2);
scheduleNetworkResponse(
BSON("cursor" << BSON("id" << 1LL << "ns"
<< "db.coll"
<< "nextBatch" << BSON_ARRAY(doc2)) << "ok" << 1));
getNet()->runReadyNetworkOperations();
ASSERT_OK(status);
ASSERT_EQUALS(1LL, cursorId);
ASSERT_EQUALS("db.coll", nss.ns());
ASSERT_EQUALS(1U, documents.size());
ASSERT_EQUALS(doc2, documents.front());
ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
ASSERT_TRUE(fetcher->isActive());
fetcher->cancel();
finishProcessingNetworkResponse();
ASSERT_NOT_OK(status);
}
TEST_F(FetcherTest, ScheduleGetMoreButShutdown) {
callbackHook = appendGetMoreRequest;
ASSERT_OK(fetcher->schedule());
const BSONObj doc = BSON("_id" << 1);
scheduleNetworkResponse(
BSON("cursor" << BSON("id" << 1LL << "ns"
<< "db.coll"
<< "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1));
getNet()->runReadyNetworkOperations();
ASSERT_OK(status);
ASSERT_EQUALS(1LL, cursorId);
ASSERT_EQUALS("db.coll", nss.ns());
ASSERT_EQUALS(1U, documents.size());
ASSERT_EQUALS(doc, documents.front());
ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
ASSERT_TRUE(fetcher->isActive());
ASSERT_TRUE(getNet()->hasReadyRequests());
const BSONObj doc2 = BSON("_id" << 2);
scheduleNetworkResponse(
BSON("cursor" << BSON("id" << 1LL << "ns"
<< "db.coll"
<< "nextBatch" << BSON_ARRAY(doc2)) << "ok" << 1));
getNet()->runReadyNetworkOperations();
ASSERT_OK(status);
ASSERT_EQUALS(1LL, cursorId);
ASSERT_EQUALS("db.coll", nss.ns());
ASSERT_EQUALS(1U, documents.size());
ASSERT_EQUALS(doc2, documents.front());
ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
ASSERT_TRUE(fetcher->isActive());
getExecutor().shutdown();
finishProcessingNetworkResponse();
ASSERT_NOT_OK(status);
}
void setNextActionToNoAction(const StatusWith& fetchResult,
Fetcher::NextAction* nextAction,
BSONObjBuilder* getMoreBob) {
*nextAction = Fetcher::NextAction::kNoAction;
}
TEST_F(FetcherTest, UpdateNextActionAfterSecondBatch) {
callbackHook = appendGetMoreRequest;
ASSERT_OK(fetcher->schedule());
const BSONObj doc = BSON("_id" << 1);
scheduleNetworkResponse(
BSON("cursor" << BSON("id" << 1LL << "ns"
<< "db.coll"
<< "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1));
getNet()->runReadyNetworkOperations();
ASSERT_OK(status);
ASSERT_EQUALS(1LL, cursorId);
ASSERT_EQUALS("db.coll", nss.ns());
ASSERT_EQUALS(1U, documents.size());
ASSERT_EQUALS(doc, documents.front());
ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
ASSERT_TRUE(fetcher->isActive());
ASSERT_TRUE(getNet()->hasReadyRequests());
const BSONObj doc2 = BSON("_id" << 2);
scheduleNetworkResponse(
BSON("cursor" << BSON("id" << 1LL << "ns"
<< "db.coll"
<< "nextBatch" << BSON_ARRAY(doc2)) << "ok" << 1));
callbackHook = setNextActionToNoAction;
getNet()->runReadyNetworkOperations();
scheduleNetworkResponseFor(BSON("killCursors" << nss.coll()), BSON("ok" << false));
ASSERT_OK(status);
ASSERT_EQUALS(1LL, cursorId);
ASSERT_EQUALS("db.coll", nss.ns());
ASSERT_EQUALS(1U, documents.size());
ASSERT_EQUALS(doc2, documents.front());
ASSERT_TRUE(Fetcher::NextAction::kNoAction == nextAction);
ASSERT_FALSE(fetcher->isActive());
}
/**
* This will be invoked twice before the fetcher returns control to the replication executor.
*/
void shutdownDuringSecondBatch(const StatusWith& fetchResult,
Fetcher::NextAction* nextAction,
BSONObjBuilder* getMoreBob,
const BSONObj& doc2,
TaskExecutor* executor,
bool* isShutdownCalled) {
if (*isShutdownCalled) {
return;
}
// First time during second batch
ASSERT_OK(fetchResult.getStatus());
Fetcher::QueryResponse batchData{fetchResult.getValue()};
ASSERT_EQUALS(1U, batchData.documents.size());
ASSERT_EQUALS(doc2, batchData.documents.front());
ASSERT_TRUE(Fetcher::NextAction::kGetMore == *nextAction);
ASSERT(getMoreBob);
getMoreBob->append("getMore", batchData.cursorId);
getMoreBob->append("collection", batchData.nss.coll());
executor->shutdown();
*isShutdownCalled = true;
}
TEST_F(FetcherTest, ShutdownDuringSecondBatch) {
callbackHook = appendGetMoreRequest;
ASSERT_OK(fetcher->schedule());
const BSONObj doc = BSON("_id" << 1);
scheduleNetworkResponse(
BSON("cursor" << BSON("id" << 1LL << "ns"
<< "db.coll"
<< "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1));
getNet()->runReadyNetworkOperations();
ASSERT_OK(status);
ASSERT_EQUALS(1LL, cursorId);
ASSERT_EQUALS("db.coll", nss.ns());
ASSERT_EQUALS(1U, documents.size());
ASSERT_EQUALS(doc, documents.front());
ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
ASSERT_TRUE(fetcher->isActive());
ASSERT_TRUE(getNet()->hasReadyRequests());
const BSONObj doc2 = BSON("_id" << 2);
scheduleNetworkResponse(
BSON("cursor" << BSON("id" << 1LL << "ns"
<< "db.coll"
<< "nextBatch" << BSON_ARRAY(doc2)) << "ok" << 1));
bool isShutdownCalled = false;
callbackHook = stdx::bind(shutdownDuringSecondBatch,
stdx::placeholders::_1,
stdx::placeholders::_2,
stdx::placeholders::_3,
doc2,
&getExecutor(),
&isShutdownCalled);
getNet()->runReadyNetworkOperations();
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status.code());
ASSERT_FALSE(fetcher->isActive());
}
} // namespace