summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2018-09-19 17:05:12 -0400
committerMatthew Russotto <matthew.russotto@10gen.com>2018-09-19 17:05:22 -0400
commit4917206219237841b61b09a22848a3d1e7733adc (patch)
treeda10e99a03def408409205cac585148c1a9b792f
parentfffa22c25442b291995adfca886e86f5ff7bb9eb (diff)
downloadmongo-4917206219237841b61b09a22848a3d1e7733adc.tar.gz
SERVER-36096 Convert CollectionCloner to use DBClientConnection.
-rw-r--r--src/mongo/client/dbclient_base.cpp11
-rw-r--r--src/mongo/client/dbclient_base.h12
-rw-r--r--src/mongo/client/dbclient_connection.cpp8
-rw-r--r--src/mongo/client/dbclient_connection.h9
-rw-r--r--src/mongo/client/dbclient_cursor.h22
-rw-r--r--src/mongo/client/dbclient_mockcursor.h34
-rw-r--r--src/mongo/db/repl/SConscript4
-rw-r--r--src/mongo/db/repl/base_cloner_test_fixture.cpp14
-rw-r--r--src/mongo/db/repl/collection_cloner.cpp341
-rw-r--r--src/mongo/db/repl/collection_cloner.h104
-rw-r--r--src/mongo/db/repl/collection_cloner_test.cpp779
-rw-r--r--src/mongo/db/repl/database_cloner.cpp15
-rw-r--r--src/mongo/db/repl/database_cloner_test.cpp188
-rw-r--r--src/mongo/db/repl/databases_cloner.cpp9
-rw-r--r--src/mongo/db/repl/databases_cloner.h10
-rw-r--r--src/mongo/db/repl/databases_cloner_test.cpp68
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp9
-rw-r--r--src/mongo/db/repl/initial_syncer.h12
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp87
-rw-r--r--src/mongo/dbtests/SConscript1
-rw-r--r--src/mongo/dbtests/mock/mock_dbclient_connection.cpp15
-rw-r--r--src/mongo/dbtests/mock/mock_dbclient_connection.h13
-rw-r--r--src/mongo/dbtests/mock/mock_dbclient_cursor.cpp49
-rw-r--r--src/mongo/dbtests/mock/mock_dbclient_cursor.h58
-rw-r--r--src/mongo/dbtests/mock/mock_remote_db_server.cpp8
-rw-r--r--src/mongo/dbtests/mock/mock_remote_db_server.h12
26 files changed, 832 insertions, 1060 deletions
diff --git a/src/mongo/client/dbclient_base.cpp b/src/mongo/client/dbclient_base.cpp
index 5acd20e6289..1ea3b8e6b5b 100644
--- a/src/mongo/client/dbclient_base.cpp
+++ b/src/mongo/client/dbclient_base.cpp
@@ -720,22 +720,25 @@ unsigned long long DBClientBase::query(stdx::function<void(const BSONObj&)> f,
const NamespaceStringOrUUID& nsOrUuid,
Query query,
const BSONObj* fieldsToReturn,
- int queryOptions) {
+ int queryOptions,
+ int batchSize) {
DBClientFunConvertor fun;
fun._f = f;
stdx::function<void(DBClientCursorBatchIterator&)> ptr(fun);
- return this->query(ptr, nsOrUuid, query, fieldsToReturn, queryOptions);
+ return this->query(ptr, nsOrUuid, query, fieldsToReturn, queryOptions, batchSize);
}
unsigned long long DBClientBase::query(stdx::function<void(DBClientCursorBatchIterator&)> f,
const NamespaceStringOrUUID& nsOrUuid,
Query query,
const BSONObj* fieldsToReturn,
- int queryOptions) {
+ int queryOptions,
+ int batchSize) {
// mask options
queryOptions &= (int)(QueryOption_NoCursorTimeout | QueryOption_SlaveOk);
- unique_ptr<DBClientCursor> c(this->query(nsOrUuid, query, 0, 0, fieldsToReturn, queryOptions));
+ unique_ptr<DBClientCursor> c(
+ this->query(nsOrUuid, query, 0, 0, fieldsToReturn, queryOptions, batchSize));
uassert(16090, "socket error for mapping query", c.get());
unsigned long long n = 0;
diff --git a/src/mongo/client/dbclient_base.h b/src/mongo/client/dbclient_base.h
index 13c8de941ef..c6a8ebf2f0e 100644
--- a/src/mongo/client/dbclient_base.h
+++ b/src/mongo/client/dbclient_base.h
@@ -83,13 +83,15 @@ class DBClientQueryInterface {
const NamespaceStringOrUUID& nsOrUuid,
Query query,
const BSONObj* fieldsToReturn = 0,
- int queryOptions = 0) = 0;
+ int queryOptions = 0,
+ int batchSize = 0) = 0;
virtual unsigned long long query(stdx::function<void(DBClientCursorBatchIterator&)> f,
const NamespaceStringOrUUID& nsOrUuid,
Query query,
const BSONObj* fieldsToReturn = 0,
- int queryOptions = 0) = 0;
+ int queryOptions = 0,
+ int batchSize = 0) = 0;
};
/**
@@ -604,13 +606,15 @@ public:
const NamespaceStringOrUUID& nsOrUuid,
Query query,
const BSONObj* fieldsToReturn = 0,
- int queryOptions = 0) final;
+ int queryOptions = 0,
+ int batchSize = 0) final;
unsigned long long query(stdx::function<void(DBClientCursorBatchIterator&)> f,
const NamespaceStringOrUUID& nsOrUuid,
Query query,
const BSONObj* fieldsToReturn = 0,
- int queryOptions = 0) override;
+ int queryOptions = 0,
+ int batchSize = 0) override;
/** don't use this - called automatically by DBClientCursor for you
diff --git a/src/mongo/client/dbclient_connection.cpp b/src/mongo/client/dbclient_connection.cpp
index 71fb7ca9a98..f623c91c325 100644
--- a/src/mongo/client/dbclient_connection.cpp
+++ b/src/mongo/client/dbclient_connection.cpp
@@ -508,16 +508,18 @@ unsigned long long DBClientConnection::query(stdx::function<void(DBClientCursorB
const NamespaceStringOrUUID& nsOrUuid,
Query query,
const BSONObj* fieldsToReturn,
- int queryOptions) {
+ int queryOptions,
+ int batchSize) {
if (!(availableOptions() & QueryOption_Exhaust)) {
- return DBClientBase::query(f, nsOrUuid, query, fieldsToReturn, queryOptions);
+ return DBClientBase::query(f, nsOrUuid, query, fieldsToReturn, queryOptions, batchSize);
}
// mask options
queryOptions &= (int)(QueryOption_NoCursorTimeout | QueryOption_SlaveOk);
queryOptions |= (int)QueryOption_Exhaust;
- unique_ptr<DBClientCursor> c(this->query(nsOrUuid, query, 0, 0, fieldsToReturn, queryOptions));
+ unique_ptr<DBClientCursor> c(
+ this->query(nsOrUuid, query, 0, 0, fieldsToReturn, queryOptions, batchSize));
uassert(13386, "socket error for mapping query", c.get());
unsigned long long n = 0;
diff --git a/src/mongo/client/dbclient_connection.h b/src/mongo/client/dbclient_connection.h
index 3977d03e279..9b56bebc608 100644
--- a/src/mongo/client/dbclient_connection.h
+++ b/src/mongo/client/dbclient_connection.h
@@ -108,9 +108,7 @@ public:
* @param errmsg any relevant error message will appended to the string
* @return false if fails to connect.
*/
- virtual bool connect(const HostAndPort& server,
- StringData applicationName,
- std::string& errmsg);
+ bool connect(const HostAndPort& server, StringData applicationName, std::string& errmsg);
/**
* Semantically equivalent to the previous connect method, but returns a Status
@@ -121,7 +119,7 @@ public:
* @param a hook to validate the 'isMaster' reply received during connection. If the hook
* fails, the connection will be terminated and a non-OK status will be returned.
*/
- Status connect(const HostAndPort& server, StringData applicationName);
+ virtual Status connect(const HostAndPort& server, StringData applicationName);
/**
* This version of connect does not run 'isMaster' after creating a TCP connection to the
@@ -167,7 +165,8 @@ public:
const NamespaceStringOrUUID& nsOrUuid,
Query query,
const BSONObj* fieldsToReturn,
- int queryOptions) override;
+ int queryOptions,
+ int batchSize = 0) override;
using DBClientBase::runCommandWithTarget;
std::pair<rpc::UniqueReply, DBClientBase*> runCommandWithTarget(OpMsgRequest request) override;
diff --git a/src/mongo/client/dbclient_cursor.h b/src/mongo/client/dbclient_cursor.h
index bf0c520552c..7ce30e33633 100644
--- a/src/mongo/client/dbclient_cursor.h
+++ b/src/mongo/client/dbclient_cursor.h
@@ -221,6 +221,18 @@ public:
return _connectionHasPendingReplies;
}
+protected:
+ struct Batch {
+ // TODO remove constructors after c++17 toolchain upgrade
+ Batch() = default;
+ Batch(std::vector<BSONObj> initial, size_t initialPos = 0)
+ : objs(std::move(initial)), pos(initialPos) {}
+ std::vector<BSONObj> objs;
+ size_t pos = 0;
+ };
+
+ Batch batch;
+
private:
DBClientCursor(DBClientBase* client,
const NamespaceStringOrUUID& nsOrUuid,
@@ -235,16 +247,6 @@ private:
int nextBatchSize();
- struct Batch {
- // TODO remove constructors after c++17 toolchain upgrade
- Batch() = default;
- Batch(std::vector<BSONObj> initial, size_t initialPos = 0)
- : objs(std::move(initial)), pos(initialPos) {}
- std::vector<BSONObj> objs;
- size_t pos = 0;
- };
-
- Batch batch;
DBClientBase* _client;
std::string _originalHost;
NamespaceStringOrUUID _nsOrUuid;
diff --git a/src/mongo/client/dbclient_mockcursor.h b/src/mongo/client/dbclient_mockcursor.h
index 6ebd87f52ef..873805f0cae 100644
--- a/src/mongo/client/dbclient_mockcursor.h
+++ b/src/mongo/client/dbclient_mockcursor.h
@@ -32,28 +32,46 @@
#include "mongo/client/dbclient_cursor.h"
namespace mongo {
-
+// DBClientMockCursor supports only a small subset of DBClientCursor operations.
+// It supports only iteration, including use of DBClientCursorBatchIterator. If a batchsize
+// is given, iteration is broken up into multiple batches at batchSize boundaries.
class DBClientMockCursor : public DBClientCursor {
public:
- DBClientMockCursor(mongo::DBClientBase* client, const BSONArray& mockCollection)
+ DBClientMockCursor(mongo::DBClientBase* client,
+ const BSONArray& mockCollection,
+ unsigned long batchSize = 0)
: mongo::DBClientCursor(client, NamespaceString(), 0, 0, 0),
_collectionArray(mockCollection),
- _iter(_collectionArray) {}
+ _iter(_collectionArray),
+ _batchSize(batchSize) {
+ if (_batchSize)
+ setBatchSize(_batchSize);
+ fillNextBatch();
+ }
virtual ~DBClientMockCursor() {}
- bool more() {
- return _iter.more();
- }
- BSONObj next() {
- return _iter.next().Obj();
+ bool more() override {
+ if (_batchSize && batch.pos == _batchSize) {
+ fillNextBatch();
+ }
+ return batch.pos < batch.objs.size();
}
private:
+ void fillNextBatch() {
+ int leftInBatch = _batchSize;
+ batch.objs.clear();
+ while (_iter.more() && (!_batchSize || leftInBatch--)) {
+ batch.objs.emplace_back(_iter.next().Obj());
+ }
+ batch.pos = 0;
+ }
// The BSONObjIterator expects the underlying BSONObj to stay in scope while the
// iterator is in use, so we store it here.
BSONArray _collectionArray;
BSONObjIterator _iter;
+ unsigned long _batchSize;
// non-copyable , non-assignable
DBClientMockCursor(const DBClientMockCursor&);
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 98d1cee071f..a82e03769a9 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1246,6 +1246,7 @@ env.Library(
],
LIBDEPS=[
'task_runner',
+ 'oplogreader',
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/client/fetcher',
'$BUILD_DIR/mongo/client/remote_command_retry_scheduler',
@@ -1266,6 +1267,7 @@ env.CppUnitTest(
'base_cloner_test_fixture',
'$BUILD_DIR/mongo/db/auth/authmocks',
'$BUILD_DIR/mongo/db/auth/authorization_manager_global',
+ '$BUILD_DIR/mongo/dbtests/mocklib',
'$BUILD_DIR/mongo/unittest/task_executor_proxy',
],
)
@@ -1292,6 +1294,7 @@ env.CppUnitTest(
'base_cloner_test_fixture',
'$BUILD_DIR/mongo/db/auth/authmocks',
'$BUILD_DIR/mongo/db/commands/list_collections_filter',
+ '$BUILD_DIR/mongo/dbtests/mocklib',
'$BUILD_DIR/mongo/unittest/task_executor_proxy',
],
)
@@ -1304,6 +1307,7 @@ env.Library(
LIBDEPS=[
'database_cloner',
'$BUILD_DIR/mongo/db/service_context',
+ '$BUILD_DIR/mongo/dbtests/mocklib',
],
)
diff --git a/src/mongo/db/repl/base_cloner_test_fixture.cpp b/src/mongo/db/repl/base_cloner_test_fixture.cpp
index 88d0a84d996..930d901d86a 100644
--- a/src/mongo/db/repl/base_cloner_test_fixture.cpp
+++ b/src/mongo/db/repl/base_cloner_test_fixture.cpp
@@ -214,13 +214,13 @@ void BaseClonerTest::finishProcessingNetworkResponse() {
}
void BaseClonerTest::testLifeCycle() {
- // IsActiveAfterStart
+ log() << "Testing IsActiveAfterStart";
ASSERT_FALSE(getCloner()->isActive());
ASSERT_OK(getCloner()->startup());
ASSERT_TRUE(getCloner()->isActive());
tearDown();
- // StartWhenActive
+ log() << "Testing StartWhenActive";
setUp();
ASSERT_OK(getCloner()->startup());
ASSERT_TRUE(getCloner()->isActive());
@@ -228,28 +228,28 @@ void BaseClonerTest::testLifeCycle() {
ASSERT_TRUE(getCloner()->isActive());
tearDown();
- // CancelWithoutStart
+ log() << "Testing CancelWithoutStart";
setUp();
ASSERT_FALSE(getCloner()->isActive());
getCloner()->shutdown();
ASSERT_FALSE(getCloner()->isActive());
tearDown();
- // WaitWithoutStart
+ log() << "Testing WaitWithoutStart";
setUp();
ASSERT_FALSE(getCloner()->isActive());
getCloner()->join();
ASSERT_FALSE(getCloner()->isActive());
tearDown();
- // ShutdownBeforeStart
+ log() << "Testing ShutdownBeforeStart";
setUp();
getExecutor().shutdown();
ASSERT_NOT_OK(getCloner()->startup());
ASSERT_FALSE(getCloner()->isActive());
tearDown();
- // StartAndCancel
+ log() << "Testing StartAndCancel";
setUp();
ASSERT_OK(getCloner()->startup());
getCloner()->shutdown();
@@ -261,7 +261,7 @@ void BaseClonerTest::testLifeCycle() {
ASSERT_FALSE(getCloner()->isActive());
tearDown();
- // StartButShutdown
+ log() << "Testing StartButShutdown";
setUp();
ASSERT_OK(getCloner()->startup());
{
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp
index 041c7eaf113..fac7cec4235 100644
--- a/src/mongo/db/repl/collection_cloner.cpp
+++ b/src/mongo/db/repl/collection_cloner.cpp
@@ -36,9 +36,12 @@
#include "mongo/base/string_data.h"
#include "mongo/bson/util/bson_extract.h"
+#include "mongo/client/dbclient_connection.h"
#include "mongo/client/remote_command_retry_scheduler.h"
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/query/cursor_response.h"
+#include "mongo/db/repl/oplogreader.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/server_parameters.h"
@@ -79,7 +82,7 @@ MONGO_FAIL_POINT_DEFINE(initialSyncHangBeforeCollectionClone);
MONGO_FAIL_POINT_DEFINE(initialSyncHangDuringCollectionClone);
// Failpoint which causes initial sync to hang after handling the next batch of results from the
-// 'AsyncResultsMerger', optionally limited to a specific collection.
+// DBClientConnection, optionally limited to a specific collection.
MONGO_FAIL_POINT_DEFINE(initialSyncHangCollectionClonerAfterHandlingBatchResponse);
// Failpoint which causes initial sync to hang before establishing the cursors (but after
@@ -161,20 +164,26 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor,
_dbWorkTaskRunner.schedule(task);
return executor::TaskExecutor::CallbackHandle();
}),
+ _createClientFn([] { return stdx::make_unique<DBClientConnection>(); }),
_progressMeter(1U, // total will be replaced with count command result.
kProgressMeterSecondsBetween,
kProgressMeterCheckInterval,
"documents copied",
str::stream() << _sourceNss.toString() << " collection clone progress"),
- _collectionCloningBatchSize(batchSize) {
+ _collectionClonerBatchSize(batchSize) {
// Fetcher throws an exception on null executor.
invariant(executor);
uassert(ErrorCodes::BadValue,
"invalid collection namespace: " + sourceNss.ns(),
sourceNss.isValid());
uassertStatusOK(options.validateForStorage());
+ uassert(50953,
+ "Missing collection UUID in CollectionCloner, collection name: " + sourceNss.ns(),
+ _options.uuid);
uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion);
uassert(ErrorCodes::BadValue, "storage interface cannot be null", storageInterface);
+ uassert(
+ 50954, "collectionClonerBatchSize must be non-negative.", _collectionClonerBatchSize >= 0);
_stats.ns = _sourceNss.ns();
}
@@ -245,19 +254,6 @@ void CollectionCloner::shutdown() {
}
void CollectionCloner::_cancelRemainingWork_inlock() {
- if (_arm) {
- // This method can be called from a callback from either a TaskExecutor or a TaskRunner. The
- // TaskExecutor should never have an OperationContext attached to the Client, and the
- // TaskRunner should always have an OperationContext attached. Unfortunately, we don't know
- // which situation we're in, so have to handle both.
- auto& client = cc();
- if (auto opCtx = client.getOperationContext()) {
- _killArmHandle = _arm->kill(opCtx);
- } else {
- auto newOpCtx = client.makeOperationContext();
- _killArmHandle = _arm->kill(newOpCtx.get());
- }
- }
_countScheduler.shutdown();
_listIndexesFetcher.shutdown();
if (_establishCollectionCursorsScheduler) {
@@ -266,6 +262,8 @@ void CollectionCloner::_cancelRemainingWork_inlock() {
if (_verifyCollectionDroppedScheduler) {
_verifyCollectionDroppedScheduler->shutdown();
}
+ _queryState =
+ _queryState == QueryState::kRunning ? QueryState::kCanceling : QueryState::kFinished;
_dbWorkTaskRunner.cancel();
}
@@ -276,10 +274,10 @@ CollectionCloner::Stats CollectionCloner::getStats() const {
void CollectionCloner::join() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
- if (_killArmHandle) {
- _executor->waitForEvent(_killArmHandle);
- }
- _condition.wait(lk, [this]() { return !_isActive_inlock(); });
+ _condition.wait(lk, [this]() {
+ return (_queryState == QueryState::kNotStarted || _queryState == QueryState::kFinished) &&
+ !_isActive_inlock();
+ });
}
void CollectionCloner::waitForDbWorker() {
@@ -294,6 +292,11 @@ void CollectionCloner::setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& sched
_scheduleDbWorkFn = scheduleDbWorkFn;
}
+void CollectionCloner::setCreateClientFn_forTest(const CreateClientFn& createClientFn) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _createClientFn = createClientFn;
+}
+
std::vector<BSONObj> CollectionCloner::getDocumentsToInsert_forTest() {
LockGuard lk(_mutex);
return _documentsToInsert;
@@ -317,7 +320,7 @@ void CollectionCloner::_countCallback(
long long count = 0;
Status commandStatus = getStatusFromCommandResult(args.response.data);
- if (commandStatus == ErrorCodes::NamespaceNotFound && _options.uuid) {
+ if (commandStatus == ErrorCodes::NamespaceNotFound) {
// Querying by a non-existing collection by UUID returns an error. Treat same as
// behavior of find by namespace and use count == 0.
} else if (!commandStatus.isOK()) {
@@ -475,15 +478,44 @@ void CollectionCloner::_beginCollectionCallback(const executor::TaskExecutor::Ca
_collLoader = std::move(collectionBulkLoader.getValue());
- BSONObjBuilder cmdObj;
-
- cmdObj.appendElements(makeCommandWithUUIDorCollectionName("find", _options.uuid, _sourceNss));
- cmdObj.append("noCursorTimeout", true);
- // Set batchSize to be 0 to establish the cursor without fetching any documents,
- cmdObj.append("batchSize", 0);
+ // The query cannot run on the database work thread, because it needs to be able to
+ // schedule work on that thread while still running.
+ auto runQueryCallback =
+ _executor->scheduleWork([this](const executor::TaskExecutor::CallbackArgs& callbackData) {
+ ON_BLOCK_EXIT([this] {
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ _queryState = QueryState::kFinished;
+ }
+ _condition.notify_all();
+ });
+ _runQuery(callbackData);
+ });
+ if (!runQueryCallback.isOK()) {
+ _finishCallback(runQueryCallback.getStatus());
+ return;
+ }
+}
- Client::initThreadIfNotAlready();
- auto opCtx = cc().getOperationContext();
+void CollectionCloner::_runQuery(const executor::TaskExecutor::CallbackArgs& callbackData) {
+ if (!callbackData.status.isOK()) {
+ _finishCallback(callbackData.status);
+ return;
+ }
+ bool queryStateOK = false;
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ queryStateOK = _queryState == QueryState::kNotStarted;
+ if (queryStateOK) {
+ _queryState = QueryState::kRunning;
+ }
+ }
+ if (!queryStateOK) {
+ // _finishCallback must not called with _mutex locked. If the queryState changes
+ // after the mutex is released, we will do the query and cancel after the first batch.
+ _finishCallback({ErrorCodes::CallbackCanceled, "Collection cloning cancelled."});
+ return;
+ }
MONGO_FAIL_POINT_BLOCK(initialSyncHangBeforeCollectionClone, options) {
const BSONObj& data = options.getData();
@@ -496,205 +528,81 @@ void CollectionCloner::_beginCollectionCallback(const executor::TaskExecutor::Ca
}
}
- _establishCollectionCursorsScheduler = stdx::make_unique<RemoteCommandRetryScheduler>(
- _executor,
- RemoteCommandRequest(_source,
- _sourceNss.db().toString(),
- cmdObj.obj(),
- ReadPreferenceSetting::secondaryPreferredMetadata(),
- opCtx,
- RemoteCommandRequest::kNoTimeout),
- [=](const RemoteCommandCallbackArgs& rcbd) { _establishCollectionCursorsCallback(rcbd); },
- RemoteCommandRetryScheduler::makeRetryPolicy(
- numInitialSyncCollectionFindAttempts.load(),
- executor::RemoteCommandRequest::kNoTimeout,
- RemoteCommandRetryScheduler::kAllRetriableErrors));
- auto scheduleStatus = _establishCollectionCursorsScheduler->startup();
-
- if (!scheduleStatus.isOK()) {
- _establishCollectionCursorsScheduler.reset();
- _finishCallback(scheduleStatus);
- return;
- }
-}
-
-Status CollectionCloner::_parseCursorResponse(BSONObj response,
- std::vector<CursorResponse>* cursors) {
- StatusWith<CursorResponse> findResponse = CursorResponse::parseFromBSON(response);
- if (!findResponse.isOK()) {
- return findResponse.getStatus().withContext(
- str::stream() << "Error parsing the 'find' query against collection '"
- << _sourceNss.ns()
- << "'");
- }
- cursors->push_back(std::move(findResponse.getValue()));
- return Status::OK();
-}
-
-void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCallbackArgs& rcbd) {
- if (_state == State::kShuttingDown) {
- Status shuttingDownStatus{ErrorCodes::CallbackCanceled, "Cloner shutting down."};
- _finishCallback(shuttingDownStatus);
- return;
- }
- auto response = rcbd.response;
- if (!response.isOK()) {
- _finishCallback(response.status);
- return;
- }
- Status commandStatus = getStatusFromCommandResult(response.data);
- if (commandStatus == ErrorCodes::NamespaceNotFound) {
- _finishCallback(Status::OK());
+ auto conn = _createClientFn();
+ Status clientConnectionStatus = conn->connect(_source, StringData());
+ if (!clientConnectionStatus.isOK()) {
+ _finishCallback(clientConnectionStatus);
return;
}
- if (!commandStatus.isOK()) {
- _finishCallback(commandStatus.withContext(
- str::stream() << "Error querying collection '" << _sourceNss.ns() << "'"));
+ if (!replAuthenticate(conn.get())) {
+ _finishCallback({ErrorCodes::AuthenticationFailed,
+ str::stream() << "Failed to authenticate to " << _source});
return;
}
- std::vector<CursorResponse> cursorResponses;
- Status parseResponseStatus = _parseCursorResponse(response.data, &cursorResponses);
- if (!parseResponseStatus.isOK()) {
- _finishCallback(parseResponseStatus);
- return;
- }
- LOG(1) << "Collection cloner running with " << cursorResponses.size()
- << " cursors established.";
-
- // Initialize the 'AsyncResultsMerger'(ARM).
- std::vector<RemoteCursor> remoteCursors;
- for (auto&& cursorResponse : cursorResponses) {
- // A placeholder 'ShardId' is used until the ARM is made less sharding specific.
- remoteCursors.emplace_back();
- auto& newCursor = remoteCursors.back();
- newCursor.setShardId("CollectionClonerSyncSource");
- newCursor.setHostAndPort(_source);
- newCursor.setCursorResponse(std::move(cursorResponse));
- }
-
- AsyncResultsMergerParams armParams;
- armParams.setNss(_sourceNss);
- armParams.setRemotes(std::move(remoteCursors));
- if (_collectionCloningBatchSize > 0) {
- armParams.setBatchSize(_collectionCloningBatchSize);
- }
- auto opCtx = cc().makeOperationContext();
- _arm = stdx::make_unique<AsyncResultsMerger>(opCtx.get(), _executor, std::move(armParams));
- _arm->detachFromOperationContext();
- opCtx.reset();
-
// This completion guard invokes _finishCallback on destruction.
auto cancelRemainingWorkInLock = [this]() { _cancelRemainingWork_inlock(); };
auto finishCallbackFn = [this](const Status& status) { _finishCallback(status); };
auto onCompletionGuard =
std::make_shared<OnCompletionGuard>(cancelRemainingWorkInLock, finishCallbackFn);
- // Lock guard must be declared after completion guard. If there is an error in this function
- // that will cause the destructor of the completion guard to run, the destructor must be run
- // outside the mutex. This is a necessary condition to invoke _finishCallback.
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- Status scheduleStatus = _scheduleNextARMResultsCallback(onCompletionGuard);
- if (!scheduleStatus.isOK()) {
- onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, scheduleStatus);
- return;
- }
-}
-
-Status CollectionCloner::_bufferNextBatchFromArm(WithLock lock) {
- // We expect this callback to execute in a thread from a TaskExecutor which will not have an
- // OperationContext populated. We must make one ourselves.
- auto opCtx = cc().makeOperationContext();
- _arm->reattachToOperationContext(opCtx.get());
- while (_arm->ready()) {
- auto armResultStatus = _arm->nextReady();
- if (!armResultStatus.getStatus().isOK()) {
- return armResultStatus.getStatus();
- }
- if (armResultStatus.getValue().isEOF()) {
- // We have reached the end of the batch.
- break;
- } else {
- auto queryResult = armResultStatus.getValue().getResult();
- _documentsToInsert.push_back(std::move(*queryResult));
+ try {
+ conn->query(
+ [this, onCompletionGuard](DBClientCursorBatchIterator& iter) {
+ _handleNextBatch(onCompletionGuard, iter);
+ },
+ NamespaceStringOrUUID(_sourceNss.db().toString(), *_options.uuid),
+ Query(),
+ nullptr /* fieldsToReturn */,
+ QueryOption_NoCursorTimeout | QueryOption_SlaveOk,
+ _collectionClonerBatchSize);
+ } catch (const DBException& e) {
+ auto queryStatus = e.toStatus().withContext(str::stream() << "Error querying collection '"
+ << _sourceNss.ns());
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ if (queryStatus.code() == ErrorCodes::OperationFailed ||
+ queryStatus.code() == ErrorCodes::CursorNotFound) {
+ // With these errors, it's possible the collection was dropped while we were
+ // cloning. If so, we'll execute the drop during oplog application, so it's OK to
+ // just stop cloning.
+ _verifyCollectionWasDropped(lock, queryStatus, onCompletionGuard);
+ return;
+ } else if (queryStatus.code() != ErrorCodes::NamespaceNotFound) {
+ // NamespaceNotFound means the collection was dropped before we started cloning, so
+ // we're OK to ignore the error. Any other error we must report.
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, queryStatus);
+ return;
}
}
- _arm->detachFromOperationContext();
-
- return Status::OK();
-}
-
-Status CollectionCloner::_scheduleNextARMResultsCallback(
- std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
- // We expect this callback to execute in a thread from a TaskExecutor which will not have an
- // OperationContext populated. We must make one ourselves.
- auto opCtx = cc().makeOperationContext();
- _arm->reattachToOperationContext(opCtx.get());
- auto nextEvent = _arm->nextEvent();
- _arm->detachFromOperationContext();
- if (!nextEvent.isOK()) {
- return nextEvent.getStatus();
- }
- auto event = nextEvent.getValue();
- auto handleARMResultsOnNextEvent =
- _executor->onEvent(event, [=](const executor::TaskExecutor::CallbackArgs& cbd) {
- _handleARMResultsCallback(cbd, onCompletionGuard);
- });
- return handleARMResultsOnNextEvent.getStatus();
+ waitForDbWorker();
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, Status::OK());
}
-void CollectionCloner::_handleARMResultsCallback(
- const executor::TaskExecutor::CallbackArgs& cbd,
- std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
- auto setResultAndCancelRemainingWork = [this](std::shared_ptr<OnCompletionGuard> guard,
- Status status) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- guard->setResultAndCancelRemainingWork_inlock(lock, status);
- return;
- };
-
- if (!cbd.status.isOK()) {
- // Wait for active inserts to complete.
- waitForDbWorker();
- Status newStatus = cbd.status.withContext(str::stream() << "Error querying collection '"
- << _sourceNss.ns());
- setResultAndCancelRemainingWork(onCompletionGuard, cbd.status);
- return;
- }
-
- // Pull the documents from the ARM into a buffer until the entire batch has been processed.
- bool lastBatch;
+void CollectionCloner::_handleNextBatch(std::shared_ptr<OnCompletionGuard> onCompletionGuard,
+ DBClientCursorBatchIterator& iter) {
+ _stats.receivedBatches++;
{
- UniqueLock lk(_mutex);
- auto nextBatchStatus = _bufferNextBatchFromArm(lk);
- if (!nextBatchStatus.isOK()) {
- if (_options.uuid && (nextBatchStatus.code() == ErrorCodes::OperationFailed ||
- nextBatchStatus.code() == ErrorCodes::CursorNotFound)) {
- // With these errors, it's possible the collection was dropped while we were
- // cloning. If so, we'll execute the drop during oplog application, so it's OK to
- // just stop cloning. This is only safe if cloning by UUID; if we are cloning by
- // name, we have no way to detect if the collection was dropped and another
- // collection with the same name created in the interim.
- _verifyCollectionWasDropped(lk, nextBatchStatus, onCompletionGuard, cbd.opCtx);
- } else {
- onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, nextBatchStatus);
- }
- return;
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ uassert(ErrorCodes::CallbackCanceled,
+ "Collection cloning cancelled.",
+ _queryState != QueryState::kCanceling);
+ while (iter.moreInCurrentBatch()) {
+ BSONObj o = iter.nextSafe();
+ _documentsToInsert.emplace_back(std::move(o));
}
-
- // Check if this is the last batch of documents to clone.
- lastBatch = _arm->remotesExhausted();
}
// Schedule the next document batch insertion.
auto&& scheduleResult = _scheduleDbWorkFn([=](const executor::TaskExecutor::CallbackArgs& cbd) {
- _insertDocumentsCallback(cbd, lastBatch, onCompletionGuard);
+ _insertDocumentsCallback(cbd, onCompletionGuard);
});
+
if (!scheduleResult.isOK()) {
Status newStatus = scheduleResult.getStatus().withContext(
str::stream() << "Error cloning collection '" << _sourceNss.ns() << "'");
- setResultAndCancelRemainingWork(onCompletionGuard, scheduleResult.getStatus());
- return;
+ // We must throw an exception to terminate query.
+ uassertStatusOK(newStatus);
}
MONGO_FAIL_POINT_BLOCK(initialSyncHangCollectionClonerAfterHandlingBatchResponse, nssData) {
@@ -711,23 +619,12 @@ void CollectionCloner::_handleARMResultsCallback(
}
}
}
-
- // If the remote cursors are not exhausted, schedule this callback again to handle
- // the impending cursor response.
- if (!lastBatch) {
- Status scheduleStatus = _scheduleNextARMResultsCallback(onCompletionGuard);
- if (!scheduleStatus.isOK()) {
- setResultAndCancelRemainingWork(onCompletionGuard, scheduleStatus);
- return;
- }
- }
}
void CollectionCloner::_verifyCollectionWasDropped(
const stdx::unique_lock<stdx::mutex>& lk,
Status batchStatus,
- std::shared_ptr<OnCompletionGuard> onCompletionGuard,
- OperationContext* opCtx) {
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
// If we already have a _verifyCollectionDroppedScheduler, just return; the existing
// scheduler will take care of cleaning up.
if (_verifyCollectionDroppedScheduler) {
@@ -742,7 +639,7 @@ void CollectionCloner::_verifyCollectionWasDropped(
_sourceNss.db().toString(),
cmdObj.obj(),
ReadPreferenceSetting::secondaryPreferredMetadata(),
- opCtx,
+ nullptr /* No OperationContext require for replication commands */,
RemoteCommandRequest::kNoTimeout),
[this, batchStatus, onCompletionGuard](const RemoteCommandCallbackArgs& args) {
// If the attempt to determine if the collection was dropped fails for any reason other
@@ -786,7 +683,6 @@ void CollectionCloner::_verifyCollectionWasDropped(
void CollectionCloner::_insertDocumentsCallback(
const executor::TaskExecutor::CallbackArgs& cbd,
- bool lastBatch,
std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
if (!cbd.status.isOK()) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
@@ -798,14 +694,11 @@ void CollectionCloner::_insertDocumentsCallback(
std::vector<BSONObj> docs;
if (_documentsToInsert.size() == 0) {
warning() << "_insertDocumentsCallback, but no documents to insert for ns:" << _destNss;
- if (lastBatch) {
- onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, Status::OK());
- }
return;
}
_documentsToInsert.swap(docs);
_stats.documentsCopied += docs.size();
- ++_stats.fetchBatches;
+ ++_stats.fetchedBatches;
_progressMeter.hit(int(docs.size()));
invariant(_collLoader);
const auto status = _collLoader->insertDocuments(docs.cbegin(), docs.cend());
@@ -827,11 +720,6 @@ void CollectionCloner::_insertDocumentsCallback(
lk.lock();
}
}
-
- if (lastBatch) {
- // Clean up resources once the last batch has been copied over and set the status to OK.
- onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, Status::OK());
- }
}
void CollectionCloner::_finishCallback(const Status& status) {
@@ -898,7 +786,7 @@ void CollectionCloner::Stats::append(BSONObjBuilder* builder) const {
builder->appendNumber(kDocumentsToCopyFieldName, documentToCopy);
builder->appendNumber(kDocumentsCopiedFieldName, documentsCopied);
builder->appendNumber("indexes", indexes);
- builder->appendNumber("fetchedBatches", fetchBatches);
+ builder->appendNumber("fetchedBatches", fetchedBatches);
if (start != Date_t()) {
builder->appendDate("start", start);
if (end != Date_t()) {
@@ -908,6 +796,7 @@ void CollectionCloner::Stats::append(BSONObjBuilder* builder) const {
builder->appendNumber("elapsedMillis", elapsedMillis);
}
}
+ builder->appendNumber("receivedBatches", receivedBatches);
}
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h
index 038a2a41b64..bdcc3bd05cd 100644
--- a/src/mongo/db/repl/collection_cloner.h
+++ b/src/mongo/db/repl/collection_cloner.h
@@ -36,6 +36,7 @@
#include "mongo/base/status.h"
#include "mongo/base/string_data.h"
#include "mongo/bson/bsonobj.h"
+#include "mongo/client/dbclient_connection.h"
#include "mongo/client/fetcher.h"
#include "mongo/client/remote_command_retry_scheduler.h"
#include "mongo/db/catalog/collection_options.h"
@@ -46,7 +47,6 @@
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/repl/task_runner.h"
#include "mongo/executor/task_executor.h"
-#include "mongo/s/query/async_results_merger.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/functional.h"
#include "mongo/stdx/mutex.h"
@@ -79,7 +79,8 @@ public:
size_t documentToCopy{0};
size_t documentsCopied{0};
size_t indexes{0};
- size_t fetchBatches{0};
+ size_t fetchedBatches{0}; // This is actually inserted batches.
+ size_t receivedBatches{0};
std::string toString() const;
BSONObj toBSON() const;
@@ -94,6 +95,13 @@ public:
const executor::TaskExecutor::CallbackFn&)>;
/**
+ * Type of function to create a database client
+ *
+ * Used for testing only.
+ */
+ using CreateClientFn = stdx::function<std::unique_ptr<DBClientConnection>()>;
+
+ /**
* Creates CollectionCloner task in inactive state. Use start() to activate cloner.
*
* The cloner calls 'onCompletion' when the collection cloning has completed or failed.
@@ -145,6 +153,22 @@ public:
void setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& scheduleDbWorkFn);
/**
+ * Allows a different client class to be injected.
+ *
+ * For testing only.
+ */
+ void setCreateClientFn_forTest(const CreateClientFn& createClientFn);
+
+ /**
+ * Allows batch size to be changed after construction.
+ *
+ * For testing only.
+ */
+ void setBatchSize_forTest(int batchSize) {
+ const_cast<int&>(_collectionClonerBatchSize) = batchSize;
+ }
+
+ /**
* Returns the documents currently stored in the '_documents' buffer that is intended
* to be inserted through the collection loader.
*
@@ -183,62 +207,39 @@ private:
*
* Called multiple times if there are more than one batch of responses from listIndexes
* cursor.
- *
- * 'nextAction' is an in/out arg indicating the next action planned and to be taken
- * by the fetcher.
*/
void _beginCollectionCallback(const executor::TaskExecutor::CallbackArgs& callbackData);
/**
- * Parses the cursor responses from the 'find' command and passes them into the
- * 'AsyncResultsMerger'.
- */
- void _establishCollectionCursorsCallback(const RemoteCommandCallbackArgs& rcbd);
-
- /**
- * Takes a cursors buffer and parses the 'find' response into cursor
- * responses that are pushed onto the buffer.
- */
- Status _parseCursorResponse(BSONObj response, std::vector<CursorResponse>* cursors);
-
- /**
- * Calls to get the next event from the 'AsyncResultsMerger'. This schedules
- * '_handleAsyncResultsCallback' to be run when the event is signaled successfully.
- */
- Status _scheduleNextARMResultsCallback(std::shared_ptr<OnCompletionGuard> onCompletionGuard);
-
- /**
- * Runs for each time a new batch of documents can be retrieved from the 'AsyncResultsMerger'.
- * Buffers the documents retrieved for insertion and schedules a '_insertDocumentsCallback'
- * to insert the contents of the buffer.
+ * Using a DBClientConnection, executes a query to retrieve all documents in the collection.
+ * For each batch returned by the upstream node, _handleNextBatch will be called with the data.
+ * This method will return when the entire query is finished or failed.
*/
- void _handleARMResultsCallback(const executor::TaskExecutor::CallbackArgs& cbd,
- std::shared_ptr<OnCompletionGuard> onCompletionGuard);
+ void _runQuery(const executor::TaskExecutor::CallbackArgs& callbackData);
/**
- * Pull all ready results from the ARM into a buffer to be inserted.
+ * Put all results from a query batch into a buffer to be inserted, and schedule
+ * it to be inserted.
*/
- Status _bufferNextBatchFromArm(WithLock lock);
+ void _handleNextBatch(std::shared_ptr<OnCompletionGuard> onCompletionGuard,
+ DBClientCursorBatchIterator& iter);
/**
- * Called whenever there is a new batch of documents ready from the 'AsyncResultsMerger'.
- * On the last batch, 'lastBatch' will be true.
+ * Called whenever there is a new batch of documents ready from the DBClientConnection.
*
* Each document returned will be inserted via the storage interfaceRequest storage
* interface.
*/
void _insertDocumentsCallback(const executor::TaskExecutor::CallbackArgs& cbd,
- bool lastBatch,
std::shared_ptr<OnCompletionGuard> onCompletionGuard);
/**
- * Verifies that an error from the ARM was the result of a collection drop. If
+ * Verifies that an error from the query was the result of a collection drop. If
* so, cloning is stopped with no error. Otherwise it is stopped with the given error.
*/
void _verifyCollectionWasDropped(const stdx::unique_lock<stdx::mutex>& lk,
Status batchStatus,
- std::shared_ptr<OnCompletionGuard> onCompletionGuard,
- OperationContext* opCtx);
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard);
/**
* Reports completion status.
@@ -271,21 +272,15 @@ private:
Fetcher _listIndexesFetcher; // (S)
std::vector<BSONObj> _indexSpecs; // (M)
BSONObj _idIndexSpec; // (M)
- std::vector<BSONObj>
- _documentsToInsert; // (M) Documents read from 'AsyncResultsMerger' to insert.
- TaskRunner _dbWorkTaskRunner; // (R)
+ std::vector<BSONObj> _documentsToInsert; // (M) Documents read from source to insert.
+ TaskRunner _dbWorkTaskRunner; // (R)
ScheduleDbWorkFn
- _scheduleDbWorkFn; // (RT) Function for scheduling database work using the executor.
- Stats _stats; // (M) stats for this instance.
- ProgressMeter _progressMeter; // (M) progress meter for this instance.
- const int _collectionCloningBatchSize; // (R) The size of the batches of documents returned in
- // collection cloning.
-
- // (M) Component responsible for fetching the documents from the collection cloner cursor(s).
- std::unique_ptr<AsyncResultsMerger> _arm;
-
- // (M) The event handle for the 'kill' event of the 'AsyncResultsMerger'.
- executor::TaskExecutor::EventHandle _killArmHandle;
+ _scheduleDbWorkFn; // (RT) Function for scheduling database work using the executor.
+ CreateClientFn _createClientFn; // (RT) Function for creating a database client.
+ Stats _stats; // (M) stats for this instance.
+ ProgressMeter _progressMeter; // (M) progress meter for this instance.
+ const int _collectionClonerBatchSize; // (R) The size of the batches of documents returned in
+ // collection cloning.
// (M) Scheduler used to establish the initial cursor or set of cursors.
std::unique_ptr<RemoteCommandRetryScheduler> _establishCollectionCursorsScheduler;
@@ -293,6 +288,15 @@ private:
// (M) Scheduler used to determine if a cursor was closed because the collection was dropped.
std::unique_ptr<RemoteCommandRetryScheduler> _verifyCollectionDroppedScheduler;
+ // (M) State of query. Set to kCanceling to cause query to stop. If the query is kRunning
+ // or kCanceling, wait for query to reach kFinished using _condition
+ enum class QueryState {
+ kNotStarted,
+ kRunning,
+ kCanceling,
+ kFinished
+ } _queryState = QueryState::kNotStarted;
+
// State transitions:
// PreStart --> Running --> ShuttingDown --> Complete
// It is possible to skip intermediate states. For example,
diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp
index 726a2b6a2cf..d302ed0492d 100644
--- a/src/mongo/db/repl/collection_cloner_test.cpp
+++ b/src/mongo/db/repl/collection_cloner_test.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/repl/collection_cloner.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/repl/storage_interface_mock.h"
+#include "mongo/dbtests/mock/mock_dbclient_connection.h"
#include "mongo/stdx/memory.h"
#include "mongo/unittest/task_executor_proxy.h"
#include "mongo/unittest/unittest.h"
@@ -58,6 +59,134 @@ public:
}
};
+class FailableMockDBClientConnection : public MockDBClientConnection {
+public:
+ FailableMockDBClientConnection(MockRemoteDBServer* remote, executor::NetworkInterfaceMock* net)
+ : MockDBClientConnection(remote), _net(net) {}
+
+ virtual ~FailableMockDBClientConnection() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _paused = false;
+ _cond.notify_all();
+ _cond.wait(lk, [this] { return !_resuming; });
+ }
+
+ Status connect(const HostAndPort& host, StringData applicationName) override {
+ if (!_failureForConnect.isOK())
+ return _failureForConnect;
+ return MockDBClientConnection::connect(host, applicationName);
+ }
+
+ using MockDBClientConnection::query; // This avoids warnings from -Woverloaded-virtual
+ unsigned long long query(stdx::function<void(mongo::DBClientCursorBatchIterator&)> f,
+ const NamespaceStringOrUUID& nsOrUuid,
+ mongo::Query query,
+ const mongo::BSONObj* fieldsToReturn,
+ int queryOptions,
+ int batchSize) override {
+ ON_BLOCK_EXIT([this]() {
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _queryCount++;
+ }
+ _cond.notify_all();
+ });
+ {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _waiting = _paused;
+ _cond.notify_all();
+ while (_paused) {
+ lk.unlock();
+ _net->waitForWork();
+ lk.lock();
+ }
+ _waiting = false;
+ }
+ auto result = MockDBClientConnection::query(
+ f, nsOrUuid, query, fieldsToReturn, queryOptions, batchSize);
+ uassertStatusOK(_failureForQuery);
+ return result;
+ }
+
+ void setFailureForConnect(Status failure) {
+ _failureForConnect = failure;
+ }
+
+ void setFailureForQuery(Status failure) {
+ _failureForQuery = failure;
+ }
+
+ void pause() {
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _paused = true;
+ }
+ _cond.notify_all();
+ }
+ void resume() {
+ {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _resuming = true;
+ _paused = false;
+ _resumedQueryCount = _queryCount;
+ while (_waiting) {
+ lk.unlock();
+ _net->signalWorkAvailable();
+ mongo::sleepmillis(10);
+ lk.lock();
+ }
+ _resuming = false;
+ _cond.notify_all();
+ }
+ }
+
+ // Waits for the next query after pause() is called to start.
+ void waitForPausedQuery() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _cond.wait(lk, [this] { return _waiting; });
+ }
+
+ // Waits for the next query to run after resume() is called to complete.
+ void waitForResumedQuery() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _cond.wait(lk, [this] { return _resumedQueryCount != _queryCount; });
+ }
+
+private:
+ executor::NetworkInterfaceMock* _net;
+ stdx::mutex _mutex;
+ stdx::condition_variable _cond;
+ bool _paused = false;
+ bool _waiting = false;
+ bool _resuming = false;
+ int _queryCount = 0;
+ int _resumedQueryCount = 0;
+ Status _failureForConnect = Status::OK();
+ Status _failureForQuery = Status::OK();
+};
+
+// RAII class to pause the client; since tests are very exception-heavy this prevents them
+// from hanging on failure.
+class MockClientPauser {
+ MONGO_DISALLOW_COPYING(MockClientPauser);
+
+public:
+ MockClientPauser(FailableMockDBClientConnection* client) : _client(client) {
+ _client->pause();
+ };
+ ~MockClientPauser() {
+ resume();
+ }
+ void resume() {
+ if (_client)
+ _client->resume();
+ _client = nullptr;
+ }
+
+private:
+ FailableMockDBClientConnection* _client;
+};
+
class CollectionClonerTest : public BaseClonerTest {
public:
BaseCloner* getCloner() const override;
@@ -70,6 +199,17 @@ protected:
void setUp() override;
void tearDown() override;
+ virtual CollectionOptions getCollectionOptions() const {
+ CollectionOptions options;
+ options.uuid = UUID::gen();
+ return options;
+ }
+
+ virtual const NamespaceString& getStartupNss() const {
+ return nss;
+ };
+
+
std::vector<BSONObj> makeSecondaryIndexSpecs(const NamespaceString& nss);
// A simple arbitrary value to use as the default batch size.
@@ -79,16 +219,19 @@ protected:
std::unique_ptr<CollectionCloner> collectionCloner;
CollectionMockStats collectionStats; // Used by the _loader.
CollectionBulkLoaderMock* _loader; // Owned by CollectionCloner.
+ bool _clientCreated = false;
+ FailableMockDBClientConnection* _client; // owned by the CollectionCloner once created.
+ std::unique_ptr<MockRemoteDBServer> _server;
};
void CollectionClonerTest::setUp() {
BaseClonerTest::setUp();
- options = {};
+ options = getCollectionOptions();
collectionCloner.reset(nullptr);
collectionCloner = stdx::make_unique<CollectionCloner>(&getExecutor(),
dbWorkThreadPool.get(),
target,
- nss,
+ getStartupNss(),
options,
setStatusCallback(),
storageInterface.get(),
@@ -106,6 +249,13 @@ void CollectionClonerTest::setUp() {
return StatusWith<std::unique_ptr<CollectionBulkLoader>>(
std::unique_ptr<CollectionBulkLoader>(_loader));
};
+ _server = stdx::make_unique<MockRemoteDBServer>(target.toString());
+ _server->assignCollectionUuid(nss.ns(), *options.uuid);
+ _client = new FailableMockDBClientConnection(_server.get(), getNet());
+ collectionCloner->setCreateClientFn_forTest([this]() {
+ _clientCreated = true;
+ return std::unique_ptr<DBClientConnection>(_client);
+ });
}
// Return index specs to use for secondary indexes.
@@ -123,7 +273,11 @@ std::vector<BSONObj> CollectionClonerTest::makeSecondaryIndexSpecs(const Namespa
void CollectionClonerTest::tearDown() {
BaseClonerTest::tearDown();
// Executor may still invoke collection cloner's callback before shutting down.
- collectionCloner.reset(nullptr);
+ collectionCloner.reset();
+ if (!_clientCreated)
+ delete _client;
+ _clientCreated = false;
+ _server.reset();
options = {};
}
@@ -180,6 +334,19 @@ TEST_F(CollectionClonerTest, InvalidConstruction) {
"'storageEngine.storageEngine1' has to be an embedded document.");
}
+ // UUID must be present.
+ {
+ CollectionOptions invalidOptions = options;
+ invalidOptions.uuid = boost::none;
+ StorageInterface* si = storageInterface.get();
+ ASSERT_THROWS_CODE_AND_WHAT(
+ CollectionCloner(
+ &executor, pool, target, nss, invalidOptions, cb, si, defaultBatchSize),
+ AssertionException,
+ 50953,
+ "Missing collection UUID in CollectionCloner, collection name: db.coll");
+ }
+
// Callback function cannot be null.
{
CollectionCloner::CallbackFn nullCb;
@@ -190,6 +357,17 @@ TEST_F(CollectionClonerTest, InvalidConstruction) {
ErrorCodes::BadValue,
"callback function cannot be null");
}
+
+ // Batch size must be non-negative.
+ {
+ StorageInterface* si = storageInterface.get();
+ constexpr int kInvalidBatchSize = -1;
+ ASSERT_THROWS_CODE_AND_WHAT(
+ CollectionCloner(&executor, pool, target, nss, options, cb, si, kInvalidBatchSize),
+ AssertionException,
+ 50954,
+ "collectionClonerBatchSize must be non-negative.");
+ }
}
TEST_F(CollectionClonerTest, ClonerLifeCycle) {
@@ -206,7 +384,9 @@ TEST_F(CollectionClonerTest, FirstRemoteCommand) {
auto&& noiRequest = noi->getRequest();
ASSERT_EQUALS(nss.db().toString(), noiRequest.dbname);
ASSERT_EQUALS("count", std::string(noiRequest.cmdObj.firstElementFieldName()));
- ASSERT_EQUALS(nss.coll().toString(), noiRequest.cmdObj.firstElement().valuestrsafe());
+ auto requestUUID = uassertStatusOK(UUID::parse(noiRequest.cmdObj.firstElement()));
+ ASSERT_EQUALS(options.uuid.get(), requestUUID);
+
ASSERT_FALSE(net->hasReadyRequests());
ASSERT_TRUE(collectionCloner->isActive());
}
@@ -347,18 +527,16 @@ TEST_F(CollectionClonerTest,
ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus());
}
-TEST_F(CollectionClonerTest, DoNotCreateIDIndexIfAutoIndexIdUsed) {
- options = {};
- options.autoIndexId = CollectionOptions::NO;
- collectionCloner.reset(new CollectionCloner(&getExecutor(),
- dbWorkThreadPool.get(),
- target,
- nss,
- options,
- setStatusCallback(),
- storageInterface.get(),
- defaultBatchSize));
+class CollectionClonerNoAutoIndexTest : public CollectionClonerTest {
+protected:
+ CollectionOptions getCollectionOptions() const override {
+ CollectionOptions options = CollectionClonerTest::getCollectionOptions();
+ options.autoIndexId = CollectionOptions::NO;
+ return options;
+ }
+};
+TEST_F(CollectionClonerNoAutoIndexTest, DoNotCreateIDIndexIfAutoIndexIdUsed) {
NamespaceString collNss;
CollectionOptions collOptions;
std::vector<BSONObj> collIndexSpecs{BSON("fakeindexkeys" << 1)}; // init with one doc.
@@ -376,32 +554,24 @@ TEST_F(CollectionClonerTest, DoNotCreateIDIndexIfAutoIndexIdUsed) {
return std::unique_ptr<CollectionBulkLoader>(loader);
};
+ const BSONObj doc = BSON("_id" << 1);
+ _server->insert(nss.ns(), doc);
+ // Pause the CollectionCloner before executing the query so we can verify events which are
+ // supposed to happen before the query.
+ MockClientPauser pauser(_client);
ASSERT_OK(collectionCloner->startup());
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
+ processNetworkResponse(createCountResponse(1));
processNetworkResponse(createListIndexesResponse(0, BSONArray()));
}
ASSERT_TRUE(collectionCloner->isActive());
- collectionCloner->waitForDbWorker();
+ _client->waitForPausedQuery();
ASSERT_TRUE(collectionCloner->isActive());
ASSERT_TRUE(collectionStats.initCalled);
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
- const BSONObj doc = BSON("_id" << 1);
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc)));
- }
+ pauser.resume();
collectionCloner->join();
ASSERT_EQUALS(1, collectionStats.insertCount);
ASSERT_TRUE(collectionStats.commitCalled);
@@ -595,6 +765,9 @@ TEST_F(CollectionClonerTest, BeginCollectionFailed) {
}
TEST_F(CollectionClonerTest, BeginCollection) {
+ // Pause the CollectionCloner before executing the query so we can verify state after
+ // the listIndexes call.
+ MockClientPauser pauser(_client);
ASSERT_OK(collectionCloner->startup());
CollectionMockStats stats;
@@ -682,7 +855,10 @@ TEST_F(CollectionClonerTest, FindFetcherScheduleFailed) {
ASSERT_FALSE(collectionCloner->isActive());
}
-TEST_F(CollectionClonerTest, FindCommandAfterBeginCollection) {
+TEST_F(CollectionClonerTest, QueryAfterCreateCollection) {
+ // Pause the CollectionCloner before executing the query so we can verify the collection is
+ // created before the query.
+ MockClientPauser pauser(_client);
ASSERT_OK(collectionCloner->startup());
CollectionMockStats stats;
@@ -705,21 +881,15 @@ TEST_F(CollectionClonerTest, FindCommandAfterBeginCollection) {
collectionCloner->waitForDbWorker();
ASSERT_TRUE(collectionCreated);
-
- // Fetcher should be scheduled after cloner creates collection.
- auto net = getNet();
- executor::NetworkInterfaceMock::InNetworkGuard guard(net);
- ASSERT_TRUE(net->hasReadyRequests());
- NetworkOperationIterator noi = net->getNextReadyRequest();
- auto&& noiRequest = noi->getRequest();
- ASSERT_EQUALS(nss.db().toString(), noiRequest.dbname);
- ASSERT_EQUALS("find", std::string(noiRequest.cmdObj.firstElementFieldName()));
- ASSERT_EQUALS(nss.coll().toString(), noiRequest.cmdObj.firstElement().valuestrsafe());
- ASSERT_TRUE(noiRequest.cmdObj.getField("noCursorTimeout").trueValue());
- ASSERT_FALSE(net->hasReadyRequests());
+ // Make sure the query starts.
+ _client->waitForPausedQuery();
}
-TEST_F(CollectionClonerTest, EstablishCursorCommandFailed) {
+TEST_F(CollectionClonerTest, QueryFailed) {
+ // For this test to work properly, the error cannot be one of the special codes
+ // (OperationFailed or CursorNotFound) which trigger an attempt to see if the collection
+ // was deleted.
+ _client->setFailureForQuery({ErrorCodes::UnknownError, "QueryFailedTest UnknownError"});
ASSERT_OK(collectionCloner->startup());
{
@@ -727,98 +897,22 @@ TEST_F(CollectionClonerTest, EstablishCursorCommandFailed) {
processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
- ASSERT_TRUE(collectionCloner->isActive());
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(BSON("ok" << 0 << "errmsg"
- << ""
- << "code"
- << ErrorCodes::CursorNotFound));
- }
-
- ASSERT_EQUALS(ErrorCodes::CursorNotFound, getStatus().code());
- ASSERT_FALSE(collectionCloner->isActive());
-}
-
-TEST_F(CollectionClonerTest, CollectionClonerResendsFindCommandOnRetriableError) {
- ASSERT_OK(collectionCloner->startup());
-
- auto net = getNet();
- executor::NetworkInterfaceMock::InNetworkGuard guard(net);
-
- // CollectionCollection sends listIndexes request irrespective of collection size in a
- // successful count response.
- assertRemoteCommandNameEquals("count", net->scheduleSuccessfulResponse(createCountResponse(0)));
- net->runReadyNetworkOperations();
-
- // CollectionCloner requires a successful listIndexes response in order to send the find request
- // for the documents in the collection.
- assertRemoteCommandNameEquals(
- "listIndexes",
- net->scheduleSuccessfulResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))));
- net->runReadyNetworkOperations();
-
- // Respond to the find request with a retriable error.
- assertRemoteCommandNameEquals("find",
- net->scheduleErrorResponse(Status(ErrorCodes::HostNotFound, "")));
- net->runReadyNetworkOperations();
- ASSERT_TRUE(collectionCloner->isActive());
-
- // This check exists to ensure that the command used to establish the cursors is retried,
- // regardless of the command format.
- auto noi = net->getNextReadyRequest();
- assertRemoteCommandNameEquals("find", noi->getRequest());
- net->blackHole(noi);
-}
-
-TEST_F(CollectionClonerTest, EstablishCursorCommandCanceled) {
- ASSERT_OK(collectionCloner->startup());
-
- ASSERT_TRUE(collectionCloner->isActive());
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
- scheduleNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
- }
- ASSERT_TRUE(collectionCloner->isActive());
-
- auto net = getNet();
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
-
- net->runReadyNetworkOperations();
- }
-
- collectionCloner->waitForDbWorker();
-
- ASSERT_TRUE(collectionCloner->isActive());
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- scheduleNetworkResponse(BSON("ok" << 1));
- }
- ASSERT_TRUE(collectionCloner->isActive());
-
- collectionCloner->shutdown();
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- getNet()->logQueues();
- net->runReadyNetworkOperations();
- }
-
- ASSERT_EQUALS(ErrorCodes::CallbackCanceled, getStatus().code());
+ collectionCloner->join();
+ ASSERT_EQUALS(ErrorCodes::UnknownError, getStatus().code());
ASSERT_FALSE(collectionCloner->isActive());
}
TEST_F(CollectionClonerTest, InsertDocumentsScheduleDbWorkFailed) {
+ // Set up documents to be returned from upstream node.
+ _server->insert(nss.ns(), BSON("_id" << 1));
+
+ // Pause the client so we can set up the failure.
+ MockClientPauser pauser(_client);
ASSERT_OK(collectionCloner->startup());
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
+ processNetworkResponse(createCountResponse(1));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
@@ -831,31 +925,24 @@ TEST_F(CollectionClonerTest, InsertDocumentsScheduleDbWorkFailed) {
return StatusWith<executor::TaskExecutor::CallbackHandle>(ErrorCodes::UnknownError, "");
});
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
- const BSONObj doc = BSON("_id" << 1);
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc)));
- }
+ pauser.resume();
+ collectionCloner->join();
ASSERT_EQUALS(ErrorCodes::UnknownError, getStatus().code());
ASSERT_FALSE(collectionCloner->isActive());
}
TEST_F(CollectionClonerTest, InsertDocumentsCallbackCanceled) {
+ // Set up documents to be returned from upstream node.
+ _server->insert(nss.ns(), BSON("_id" << 1));
+
+ // Pause the client so we can set up the failure.
+ MockClientPauser pauser(_client);
ASSERT_OK(collectionCloner->startup());
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
+ processNetworkResponse(createCountResponse(1));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
@@ -874,37 +961,30 @@ TEST_F(CollectionClonerTest, InsertDocumentsCallbackCanceled) {
return StatusWith<executor::TaskExecutor::CallbackHandle>(handle);
});
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(BSON("_id" << 1))));
- }
+ pauser.resume();
collectionCloner->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, getStatus().code());
ASSERT_FALSE(collectionCloner->isActive());
}
TEST_F(CollectionClonerTest, InsertDocumentsFailed) {
+ // Set up documents to be returned from upstream node.
+ _server->insert(nss.ns(), BSON("_id" << 1));
+
+ // Pause the client so we can set up the failure.
+ MockClientPauser pauser(_client);
ASSERT_OK(collectionCloner->startup());
ASSERT_TRUE(collectionCloner->isActive());
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
+ processNetworkResponse(createCountResponse(1));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
ASSERT_TRUE(collectionCloner->isActive());
getNet()->logQueues();
- collectionCloner->waitForDbWorker();
+ _client->waitForPausedQuery();
ASSERT_TRUE(collectionCloner->isActive());
ASSERT_TRUE(collectionStats.initCalled);
@@ -913,20 +993,8 @@ TEST_F(CollectionClonerTest, InsertDocumentsFailed) {
const std::vector<BSONObj>::const_iterator end) {
return Status(ErrorCodes::OperationFailed, "");
};
-
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray));
- }
-
- collectionCloner->waitForDbWorker();
ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(BSON("_id" << 1))));
- }
+ pauser.resume();
collectionCloner->join();
ASSERT_FALSE(collectionCloner->isActive());
@@ -936,39 +1004,24 @@ TEST_F(CollectionClonerTest, InsertDocumentsFailed) {
}
TEST_F(CollectionClonerTest, InsertDocumentsSingleBatch) {
+ // Set up documents to be returned from upstream node.
+ _server->insert(nss.ns(), BSON("_id" << 1));
+ _server->insert(nss.ns(), BSON("_id" << 2));
+
ASSERT_OK(collectionCloner->startup());
ASSERT_TRUE(collectionCloner->isActive());
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
+ processNetworkResponse(createCountResponse(2));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
- ASSERT_TRUE(collectionCloner->isActive());
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
- ASSERT_TRUE(collectionStats.initCalled);
-
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
- const BSONObj doc = BSON("_id" << 1);
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc)));
- }
-
collectionCloner->join();
// TODO: record the documents during insert and compare them
// -- maybe better done using a real storage engine, like ephemeral for test.
- ASSERT_EQUALS(1, collectionStats.insertCount);
+ ASSERT_EQUALS(2, collectionStats.insertCount);
+ auto stats = collectionCloner->getStats();
+ ASSERT_EQUALS(1u, stats.receivedBatches);
ASSERT_TRUE(collectionStats.commitCalled);
ASSERT_OK(getStatus());
@@ -976,115 +1029,27 @@ TEST_F(CollectionClonerTest, InsertDocumentsSingleBatch) {
}
TEST_F(CollectionClonerTest, InsertDocumentsMultipleBatches) {
+ // Set up documents to be returned from upstream node.
+ _server->insert(nss.ns(), BSON("_id" << 1));
+ _server->insert(nss.ns(), BSON("_id" << 2));
+ _server->insert(nss.ns(), BSON("_id" << 3));
+
+ collectionCloner->setBatchSize_forTest(2);
ASSERT_OK(collectionCloner->startup());
ASSERT_TRUE(collectionCloner->isActive());
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
+ processNetworkResponse(createCountResponse(3));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
- ASSERT_TRUE(collectionCloner->isActive());
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
- ASSERT_TRUE(collectionStats.initCalled);
-
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
- const BSONObj doc = BSON("_id" << 1);
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, BSON_ARRAY(doc)));
- }
-
- collectionCloner->waitForDbWorker();
- // TODO: record the documents during insert and compare them
- // -- maybe better done using a real storage engine, like ephemeral for test.
- ASSERT_EQUALS(1, collectionStats.insertCount);
-
- ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
- ASSERT_TRUE(collectionCloner->isActive());
-
- const BSONObj doc2 = BSON("_id" << 1);
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc2)));
- }
-
collectionCloner->join();
// TODO: record the documents during insert and compare them
// -- maybe better done using a real storage engine, like ephemeral for test.
- ASSERT_EQUALS(2, collectionStats.insertCount);
- ASSERT_TRUE(collectionStats.commitCalled);
-
- ASSERT_OK(getStatus());
- ASSERT_FALSE(collectionCloner->isActive());
-}
-
-TEST_F(CollectionClonerTest, LastBatchContainsNoDocuments) {
- ASSERT_OK(collectionCloner->startup());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
- processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
- }
- ASSERT_TRUE(collectionCloner->isActive());
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
- ASSERT_TRUE(collectionStats.initCalled);
-
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
- const BSONObj doc = BSON("_id" << 1);
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, BSON_ARRAY(doc)));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(1, collectionStats.insertCount);
-
- ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
- ASSERT_TRUE(collectionCloner->isActive());
-
- const BSONObj doc2 = BSON("_id" << 2);
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, BSON_ARRAY(doc2), "nextBatch"));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(2, collectionStats.insertCount);
-
- ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(emptyArray));
- }
-
- collectionCloner->join();
- ASSERT_EQUALS(2, collectionStats.insertCount);
+ ASSERT_EQUALS(3, collectionStats.insertCount);
ASSERT_TRUE(collectionStats.commitCalled);
+ auto stats = collectionCloner->getStats();
+ ASSERT_EQUALS(2u, stats.receivedBatches);
ASSERT_OK(getStatus());
ASSERT_FALSE(collectionCloner->isActive());
@@ -1101,53 +1066,24 @@ TEST_F(CollectionClonerTest, CollectionClonerTransitionsToCompleteIfShutdownBefo
* Restarting cloning should fail with ErrorCodes::ShutdownInProgress error.
*/
TEST_F(CollectionClonerTest, CollectionClonerCannotBeRestartedAfterPreviousFailure) {
+ // Set up document to return from upstream.
+ _server->insert(nss.ns(), BSON("_id" << 1));
+
// First cloning attempt - fails while reading documents from source collection.
unittest::log() << "Starting first collection cloning attempt";
+ _client->setFailureForQuery(
+ {ErrorCodes::UnknownError, "failed to read remaining documents from source collection"});
ASSERT_OK(collectionCloner->startup());
ASSERT_TRUE(collectionCloner->isActive());
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
+ processNetworkResponse(createCountResponse(1));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
- ASSERT_TRUE(collectionCloner->isActive());
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
- ASSERT_TRUE(collectionStats.initCalled);
-
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, BSON_ARRAY(BSON("_id" << 1))));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(1, collectionStats.insertCount);
-
- // Check that the status hasn't changed from the initial value.
- ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(ErrorCodes::OperationFailed,
- "failed to read remaining documents from source collection");
- }
-
collectionCloner->join();
- ASSERT_EQUALS(1, collectionStats.insertCount);
- ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus());
+ ASSERT_EQUALS(ErrorCodes::UnknownError, getStatus());
ASSERT_FALSE(collectionCloner->isActive());
// Second cloning attempt - run to completion.
@@ -1209,6 +1145,7 @@ TEST_F(CollectionClonerTest, CollectionClonerResetsOnCompletionCallbackFunctionA
TEST_F(CollectionClonerTest,
CollectionClonerWaitsForPendingTasksToCompleteBeforeInvokingOnCompletionCallback) {
+ MockClientPauser pauser(_client);
ASSERT_OK(collectionCloner->startup());
ASSERT_TRUE(collectionCloner->isActive());
@@ -1227,20 +1164,11 @@ TEST_F(CollectionClonerTest,
}
ASSERT_TRUE(collectionCloner->isActive());
- collectionCloner->waitForDbWorker();
+ _client->waitForPausedQuery();
ASSERT_TRUE(collectionCloner->isActive());
ASSERT_TRUE(collectionStats.initCalled);
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
- // At this point, the CollectionCloner has sent the find request to establish the cursor.
+ // At this point, the CollectionCloner is waiting for the query to complete.
// We want to return the first batch of documents for the collection from the network so that
// the CollectionCloner schedules the first _insertDocuments DB task and the getMore request for
// the next batch of documents.
@@ -1257,26 +1185,20 @@ TEST_F(CollectionClonerTest,
// Return first batch of collection documents from remote server for the getMore request.
const BSONObj doc = BSON("_id" << 1);
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
-
- assertRemoteCommandNameEquals(
- "getMore", net->scheduleSuccessfulResponse(createCursorResponse(1, BSON_ARRAY(doc))));
- net->runReadyNetworkOperations();
- }
-
- // Confirm that CollectionCloner attempted to schedule _insertDocuments task.
- ASSERT_TRUE(insertDocumentsFn);
-
- // Return an error for the getMore request for the next batch of collection documents.
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
-
- assertRemoteCommandNameEquals(
- "getMore",
- net->scheduleErrorResponse(Status(ErrorCodes::OperationFailed, "getMore failed")));
- net->runReadyNetworkOperations();
- }
+ _server->insert(nss.ns(), doc);
+ _client->setFailureForQuery({ErrorCodes::UnknownError, "getMore failed"});
+ // Wait for the _runQuery method to exit. We can't get at it directly but we can wait
+ // for a task scheduled after it to complete.
+ auto& executor = getExecutor();
+ auto& event = executor.makeEvent().getValue();
+ auto nextTask =
+ executor
+ .scheduleWork([&executor, event](const executor::TaskExecutor::CallbackArgs&) {
+ executor.signalEvent(event);
+ })
+ .getValue();
+ pauser.resume();
+ executor.waitForEvent(event);
// CollectionCloner should still be active because we have not finished processing the
// insertDocuments task.
@@ -1287,90 +1209,39 @@ TEST_F(CollectionClonerTest,
// error passed to the completion guard (ie. from the failed getMore request).
executor::TaskExecutor::CallbackArgs callbackArgs(
&getExecutor(), {}, Status(ErrorCodes::CallbackCanceled, ""));
+ ASSERT_TRUE(insertDocumentsFn);
insertDocumentsFn(callbackArgs);
// Reset 'insertDocumentsFn' to release last reference count on completion guard.
insertDocumentsFn = {};
- // No need to call CollectionCloner::join() because we invoked the _insertDocuments callback
- // synchronously.
+ collectionCloner->join();
ASSERT_FALSE(collectionCloner->isActive());
- ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus());
+ ASSERT_EQUALS(ErrorCodes::UnknownError, getStatus());
}
-class CollectionClonerUUIDTest : public CollectionClonerTest {
+class CollectionClonerRenamedBeforeStartTest : public CollectionClonerTest {
protected:
- // The UUID tests should deal gracefully with renamed collections, so start the cloner with
- // an alternate name.
+ // The CollectionCloner should deal gracefully with collections renamed before the cloner
+ // was started, so start it with an alternate name.
const NamespaceString alternateNss{"db", "alternateCollName"};
- void startupWithUUID(int maxNumCloningCursors = 1) {
- collectionCloner.reset();
- options.uuid = UUID::gen();
- collectionCloner = stdx::make_unique<CollectionCloner>(&getExecutor(),
- dbWorkThreadPool.get(),
- target,
- alternateNss,
- options,
- setStatusCallback(),
- storageInterface.get(),
- defaultBatchSize);
-
- ASSERT_OK(collectionCloner->startup());
- }
-
- void testWithMaxNumCloningCursors(int maxNumCloningCursors, StringData cmdName) {
- startupWithUUID(maxNumCloningCursors);
-
- CollectionOptions actualOptions;
- CollectionMockStats stats;
- CollectionBulkLoaderMock* loader = new CollectionBulkLoaderMock(&stats);
- bool collectionCreated = false;
- storageInterface->createCollectionForBulkFn = [&](const NamespaceString& theNss,
- const CollectionOptions& theOptions,
- const BSONObj idIndexSpec,
- const std::vector<BSONObj>& theIndexSpecs)
- -> StatusWith<std::unique_ptr<CollectionBulkLoader>> {
- collectionCreated = true;
- actualOptions = theOptions;
- return std::unique_ptr<CollectionBulkLoader>(loader);
- };
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
- processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCreated);
-
- // Fetcher should be scheduled after cloner creates collection.
- auto net = getNet();
- executor::NetworkInterfaceMock::InNetworkGuard guard(net);
- ASSERT_TRUE(net->hasReadyRequests());
- NetworkOperationIterator noi = net->getNextReadyRequest();
- ASSERT_FALSE(net->hasReadyRequests());
- auto&& noiRequest = noi->getRequest();
- ASSERT_EQUALS(nss.db().toString(), noiRequest.dbname);
- ASSERT_BSONOBJ_EQ(actualOptions.toBSON(), options.toBSON());
-
- ASSERT_EQUALS(cmdName, std::string(noiRequest.cmdObj.firstElementFieldName()));
- ASSERT_EQUALS(cmdName == "find", noiRequest.cmdObj.getField("noCursorTimeout").trueValue());
- auto requestUUID = uassertStatusOK(UUID::parse(noiRequest.cmdObj.firstElement()));
- ASSERT_EQUALS(options.uuid.get(), requestUUID);
- }
+ const NamespaceString& getStartupNss() const override {
+ return alternateNss;
+ };
/**
* Sets up a test for the CollectionCloner that simulates the collection being dropped while
* copying the documents.
- * The ARM returns CursorNotFound error to indicate a collection drop. Subsequently, the
- * CollectionCloner should run a find command on the collection by UUID. This should be the next
- * ready request on in the network interface.
+ * The DBClientConnection returns a CursorNotFound error to indicate a collection drop.
*/
void setUpVerifyCollectionWasDroppedTest() {
- startupWithUUID();
-
+ // Pause the query so we can reliably wait for it to complete.
+ MockClientPauser pauser(_client);
+ // Return error response from the query.
+ _client->setFailureForQuery(
+ {ErrorCodes::CursorNotFound, "collection dropped while copying documents"});
+ ASSERT_OK(collectionCloner->startup());
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
processNetworkResponse(createCountResponse(0));
@@ -1378,24 +1249,10 @@ protected:
}
ASSERT_TRUE(collectionCloner->isActive());
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
+ _client->waitForPausedQuery();
ASSERT_TRUE(collectionStats.initCalled);
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, BSONArray()));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
- // Return error response to getMore command.
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(ErrorCodes::CursorNotFound,
- "collection dropped while copying documents");
- }
+ pauser.resume();
+ _client->waitForResumedQuery();
}
/**
@@ -1416,8 +1273,8 @@ protected:
}
};
-TEST_F(CollectionClonerUUIDTest, FirstRemoteCommandWithUUID) {
- startupWithUUID();
+TEST_F(CollectionClonerRenamedBeforeStartTest, FirstRemoteCommandWithRenamedCollection) {
+ ASSERT_OK(collectionCloner->startup());
auto net = getNet();
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
@@ -1433,9 +1290,7 @@ TEST_F(CollectionClonerUUIDTest, FirstRemoteCommandWithUUID) {
ASSERT_TRUE(collectionCloner->isActive());
}
-TEST_F(CollectionClonerUUIDTest, BeginCollectionWithUUID) {
- startupWithUUID();
-
+TEST_F(CollectionClonerRenamedBeforeStartTest, BeginCollectionWithUUID) {
CollectionMockStats stats;
CollectionBulkLoaderMock* loader = new CollectionBulkLoaderMock(&stats);
NamespaceString collNss;
@@ -1454,6 +1309,11 @@ TEST_F(CollectionClonerUUIDTest, BeginCollectionWithUUID) {
return std::unique_ptr<CollectionBulkLoader>(loader);
};
+ // Pause the client so the cloner stops in the fetcher.
+ MockClientPauser pauser(_client);
+
+ ASSERT_OK(collectionCloner->startup());
+
// Split listIndexes response into 2 batches: first batch contains idIndexSpec and
// second batch contains specs. We expect the collection cloner to fix up the collection names
// (here from 'nss' to 'alternateNss') in the index specs, as the collection with the given UUID
@@ -1506,20 +1366,16 @@ TEST_F(CollectionClonerUUIDTest, BeginCollectionWithUUID) {
ASSERT_TRUE(collectionCloner->isActive());
}
-TEST_F(CollectionClonerUUIDTest, SingleCloningCursorWithUUIDUsesFindCommand) {
- // With a single cloning cursor, expect a find command.
- testWithMaxNumCloningCursors(1, "find");
-}
-
/**
* Start cloning.
- * While copying collection, simulate a collection drop by having the ARM return a CursorNotFound
- * error.
+ * While copying collection, simulate a collection drop by having the DBClientConnection return a
+ * CursorNotFound error.
* The CollectionCloner should run a find command on the collection by UUID.
* Simulate successful find command with a drop-pending namespace in the response.
* The CollectionCloner should complete with a successful final status.
*/
-TEST_F(CollectionClonerUUIDTest, CloningIsSuccessfulIfCollectionWasDroppedWhileCopyingDocuments) {
+TEST_F(CollectionClonerRenamedBeforeStartTest,
+ CloningIsSuccessfulIfCollectionWasDroppedWhileCopyingDocuments) {
setUpVerifyCollectionWasDroppedTest();
// CollectionCloner should send a find command with the collection's UUID.
@@ -1544,14 +1400,13 @@ TEST_F(CollectionClonerUUIDTest, CloningIsSuccessfulIfCollectionWasDroppedWhileC
/**
* Start cloning.
- * While copying collection, simulate a collection drop by having the ARM return a CursorNotFound
- * error.
+ * While copying collection, simulate a collection drop by having the DBClientConnection return a
+ * CursorNotFound error.
* The CollectionCloner should run a find command on the collection by UUID.
* Shut the CollectionCloner down.
- * The CollectionCloner should return a CursorNotFound final status which is the last error from the
- * ARM.
+ * The CollectionCloner should return a CursorNotFound final status.
*/
-TEST_F(CollectionClonerUUIDTest,
+TEST_F(CollectionClonerRenamedBeforeStartTest,
ShuttingDownCollectionClonerDuringCollectionDropVerificationReturnsCallbackCanceled) {
setUpVerifyCollectionWasDroppedTest();
diff --git a/src/mongo/db/repl/database_cloner.cpp b/src/mongo/db/repl/database_cloner.cpp
index 1d5223f54fb..f439e0061b9 100644
--- a/src/mongo/db/repl/database_cloner.cpp
+++ b/src/mongo/db/repl/database_cloner.cpp
@@ -65,9 +65,18 @@ const char* kOptionsFieldName = "options";
const char* kInfoFieldName = "info";
const char* kUUIDFieldName = "uuid";
-// The batchSize to use for the find/getMore queries called by the CollectionCloner
-constexpr int kUseARMDefaultBatchSize = -1;
-MONGO_EXPORT_STARTUP_SERVER_PARAMETER(collectionClonerBatchSize, int, kUseARMDefaultBatchSize);
+// The batch size (number of documents) to use for the queries in the CollectionCloner. Default of
+// 0 means the limit is the number of documents which fit in a single BSON object.
+MONGO_EXPORT_STARTUP_SERVER_PARAMETER(collectionClonerBatchSize, int, 0)
+ ->withValidator([](const int& batchSize) {
+ return (batchSize >= 0)
+ ? Status::OK()
+ : Status(ErrorCodes::Error(50952),
+ str::stream()
+ << "collectionClonerBatchSize must be greater than or equal to 0. '"
+ << batchSize
+ << "' is an invalid setting.");
+ });
// The number of attempts for the listCollections commands.
MONGO_EXPORT_SERVER_PARAMETER(numInitialSyncListCollectionsAttempts, int, 3);
diff --git a/src/mongo/db/repl/database_cloner_test.cpp b/src/mongo/db/repl/database_cloner_test.cpp
index 1238a794c76..adc479b7967 100644
--- a/src/mongo/db/repl/database_cloner_test.cpp
+++ b/src/mongo/db/repl/database_cloner_test.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/repl/base_cloner_test_fixture.h"
#include "mongo/db/repl/database_cloner.h"
#include "mongo/db/repl/storage_interface.h"
+#include "mongo/dbtests/mock/mock_dbclient_connection.h"
#include "mongo/unittest/task_executor_proxy.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/mongoutils/str.h"
@@ -59,6 +60,11 @@ struct CollectionCloneInfo {
class DatabaseClonerTest : public BaseClonerTest {
public:
+ DatabaseClonerTest() {
+ _options1.uuid = UUID::gen();
+ _options2.uuid = UUID::gen();
+ _options3.uuid = UUID::gen();
+ }
void clear() override;
BaseCloner* getCloner() const override;
@@ -77,6 +83,11 @@ protected:
std::map<NamespaceString, CollectionCloneInfo> _collections;
std::unique_ptr<DatabaseCloner> _databaseCloner;
+ CollectionOptions _options1;
+ CollectionOptions _options2;
+ CollectionOptions _options3;
+ DatabaseCloner::StartCollectionClonerFn _startCollectionCloner;
+ std::unique_ptr<MockRemoteDBServer> _mockServer;
};
void DatabaseClonerTest::setUp() {
@@ -96,6 +107,19 @@ void DatabaseClonerTest::setUp() {
return getExecutor().scheduleWork(work);
});
+ _mockServer = stdx::make_unique<MockRemoteDBServer>(target.toString());
+ _mockServer->assignCollectionUuid("db.a", *_options1.uuid);
+ _mockServer->assignCollectionUuid("db.b", *_options2.uuid);
+ _mockServer->assignCollectionUuid("db.c", *_options3.uuid);
+ _startCollectionCloner = [this](CollectionCloner& cloner) {
+ cloner.setCreateClientFn_forTest([&cloner, this]() {
+ return std::unique_ptr<DBClientConnection>(
+ new MockDBClientConnection(_mockServer.get()));
+ });
+ return cloner.startup();
+ };
+ _databaseCloner->setStartCollectionClonerFn(_startCollectionCloner);
+
storageInterface->createCollectionForBulkFn =
[this](const NamespaceString& nss,
const CollectionOptions& options,
@@ -366,15 +390,15 @@ TEST_F(DatabaseClonerTest, ListCollectionsPredicate) {
const std::vector<BSONObj> sourceInfos = {BSON("name"
<< "a"
<< "options"
- << BSONObj()),
+ << _options1.toBSON()),
BSON("name"
<< "b"
<< "options"
- << BSONObj()),
+ << _options2.toBSON()),
BSON("name"
<< "c"
<< "options"
- << BSONObj())};
+ << _options3.toBSON())};
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
processNetworkResponse(createListCollectionsResponse(
@@ -400,11 +424,11 @@ TEST_F(DatabaseClonerTest, ListCollectionsMultipleBatches) {
const std::vector<BSONObj> sourceInfos = {BSON("name"
<< "a"
<< "options"
- << BSONObj()),
+ << _options1.toBSON()),
BSON("name"
<< "b"
<< "options"
- << BSONObj())};
+ << _options2.toBSON())};
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
processNetworkResponse(createListCollectionsResponse(1, BSON_ARRAY(sourceInfos[0])));
@@ -447,7 +471,7 @@ TEST_F(DatabaseClonerTest, CollectionInfoNameFieldMissing) {
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
processNetworkResponse(
- createListCollectionsResponse(0, BSON_ARRAY(BSON("options" << BSONObj()))));
+ createListCollectionsResponse(0, BSON_ARRAY(BSON("options" << _options1.toBSON()))));
}
ASSERT_EQUALS(ErrorCodes::FailedToParse, getStatus().code());
@@ -465,7 +489,7 @@ TEST_F(DatabaseClonerTest, CollectionInfoNameNotAString) {
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
processNetworkResponse(createListCollectionsResponse(
- 0, BSON_ARRAY(BSON("name" << 123 << "options" << BSONObj()))));
+ 0, BSON_ARRAY(BSON("name" << 123 << "options" << _options1.toBSON()))));
}
ASSERT_EQUALS(ErrorCodes::TypeMismatch, getStatus().code());
@@ -482,11 +506,12 @@ TEST_F(DatabaseClonerTest, CollectionInfoNameEmpty) {
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createListCollectionsResponse(0,
- BSON_ARRAY(BSON("name"
- << ""
- << "options"
- << BSONObj()))));
+ processNetworkResponse(
+ createListCollectionsResponse(0,
+ BSON_ARRAY(BSON("name"
+ << ""
+ << "options"
+ << _options1.toBSON()))));
}
ASSERT_EQUALS(ErrorCodes::BadValue, getStatus().code());
@@ -503,15 +528,16 @@ TEST_F(DatabaseClonerTest, CollectionInfoNameDuplicate) {
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createListCollectionsResponse(0,
- BSON_ARRAY(BSON("name"
- << "a"
- << "options"
- << BSONObj())
- << BSON("name"
- << "a"
- << "options"
- << BSONObj()))));
+ processNetworkResponse(
+ createListCollectionsResponse(0,
+ BSON_ARRAY(BSON("name"
+ << "a"
+ << "options"
+ << _options1.toBSON())
+ << BSON("name"
+ << "a"
+ << "options"
+ << _options2.toBSON()))));
}
ASSERT_EQUALS(ErrorCodes::DuplicateKey, getStatus().code());
@@ -581,6 +607,26 @@ TEST_F(DatabaseClonerTest, InvalidCollectionOptions) {
ASSERT_EQUALS(DatabaseCloner::State::kComplete, _databaseCloner->getState_forTest());
}
+TEST_F(DatabaseClonerTest, InvalidMissingUUID) {
+ ASSERT_EQUALS(DatabaseCloner::State::kPreStart, _databaseCloner->getState_forTest());
+
+ ASSERT_OK(_databaseCloner->startup());
+ ASSERT_EQUALS(DatabaseCloner::State::kRunning, _databaseCloner->getState_forTest());
+
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createListCollectionsResponse(0,
+ BSON_ARRAY(BSON("name"
+ << "a"
+ << "options"
+ << BSONObj()))));
+ }
+
+ ASSERT_EQUALS(50953, getStatus().code());
+ ASSERT_FALSE(_databaseCloner->isActive());
+ ASSERT_EQUALS(DatabaseCloner::State::kComplete, _databaseCloner->getState_forTest());
+}
+
TEST_F(DatabaseClonerTest, DatabaseClonerResendsListCollectionsRequestOnRetriableError) {
ASSERT_EQUALS(DatabaseCloner::State::kPreStart, _databaseCloner->getState_forTest());
@@ -636,37 +682,6 @@ TEST_F(DatabaseClonerTest, ListCollectionsReturnsEmptyCollectionName) {
ASSERT_EQUALS(DatabaseCloner::State::kComplete, _databaseCloner->getState_forTest());
}
-TEST_F(DatabaseClonerTest, DatabaseClonerAcceptsCollectionOptionsContainUuid) {
- ASSERT_EQUALS(DatabaseCloner::State::kPreStart, _databaseCloner->getState_forTest());
-
- ASSERT_OK(_databaseCloner->startup());
- ASSERT_EQUALS(DatabaseCloner::State::kRunning, _databaseCloner->getState_forTest());
-
- bool collectionClonerStarted = false;
- _databaseCloner->setStartCollectionClonerFn(
- [&collectionClonerStarted](CollectionCloner& cloner) {
- collectionClonerStarted = true;
- return cloner.startup();
- });
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- CollectionOptions options;
- options.uuid = UUID::gen();
- processNetworkResponse(
- createListCollectionsResponse(0,
- BSON_ARRAY(BSON("name"
- << "a"
- << "options"
- << options.toBSON()))));
- }
-
- ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
- ASSERT_TRUE(collectionClonerStarted);
- ASSERT_TRUE(_databaseCloner->isActive());
- ASSERT_EQUALS(DatabaseCloner::State::kRunning, _databaseCloner->getState_forTest());
-}
-
TEST_F(DatabaseClonerTest, StartFirstCollectionClonerFailed) {
ASSERT_EQUALS(DatabaseCloner::State::kPreStart, _databaseCloner->getState_forTest());
@@ -680,11 +695,12 @@ TEST_F(DatabaseClonerTest, StartFirstCollectionClonerFailed) {
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createListCollectionsResponse(0,
- BSON_ARRAY(BSON("name"
- << "a"
- << "options"
- << BSONObj()))));
+ processNetworkResponse(
+ createListCollectionsResponse(0,
+ BSON_ARRAY(BSON("name"
+ << "a"
+ << "options"
+ << _options1.toBSON()))));
}
ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus().code());
@@ -701,28 +717,29 @@ TEST_F(DatabaseClonerTest, StartSecondCollectionClonerFailed) {
const Status errStatus{ErrorCodes::OperationFailed,
"StartSecondCollectionClonerFailed injected failure."};
- _databaseCloner->setStartCollectionClonerFn([errStatus](CollectionCloner& cloner) -> Status {
- if (cloner.getSourceNamespace().coll() == "b") {
- return errStatus;
- }
- return cloner.startup();
- });
+ _databaseCloner->setStartCollectionClonerFn(
+ [errStatus, this](CollectionCloner& cloner) -> Status {
+ if (cloner.getSourceNamespace().coll() == "b") {
+ return errStatus;
+ }
+ return _startCollectionCloner(cloner);
+ });
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createListCollectionsResponse(0,
- BSON_ARRAY(BSON("name"
- << "a"
- << "options"
- << BSONObj())
- << BSON("name"
- << "b"
- << "options"
- << BSONObj()))));
+ processNetworkResponse(
+ createListCollectionsResponse(0,
+ BSON_ARRAY(BSON("name"
+ << "a"
+ << "options"
+ << _options1.toBSON())
+ << BSON("name"
+ << "b"
+ << "options"
+ << _options2.toBSON()))));
processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
- processNetworkResponse(createCursorResponse(0, BSONArray()));
}
_databaseCloner->join();
ASSERT_FALSE(_databaseCloner->isActive());
@@ -746,18 +763,18 @@ TEST_F(DatabaseClonerTest, ShutdownCancelsCollectionCloning) {
BSON_ARRAY(BSON("name"
<< "a"
<< "options"
- << BSONObj())
+ << _options1.toBSON())
<< BSON("name"
<< "b"
<< "options"
- << BSONObj())))));
+ << _options2.toBSON())))));
net->runReadyNetworkOperations();
// CollectionCloner sends collection count request on startup.
// Blackhole count request to leave collection cloner active.
auto noi = net->getNextReadyRequest();
assertRemoteCommandNameEquals("count", noi->getRequest());
- ASSERT_EQUALS("a", noi->getRequest().cmdObj.firstElement().String());
+ ASSERT_EQUALS(*_options1.uuid, UUID::parse(noi->getRequest().cmdObj.firstElement()));
net->blackHole(noi);
}
@@ -785,11 +802,11 @@ TEST_F(DatabaseClonerTest, FirstCollectionListIndexesFailed) {
const std::vector<BSONObj> sourceInfos = {BSON("name"
<< "a"
<< "options"
- << BSONObj()),
+ << _options1.toBSON()),
BSON("name"
<< "b"
<< "options"
- << BSONObj())};
+ << _options2.toBSON())};
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
processNetworkResponse(
@@ -810,7 +827,6 @@ TEST_F(DatabaseClonerTest, FirstCollectionListIndexesFailed) {
processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
- processNetworkResponse(createCursorResponse(0, BSONArray()));
}
_databaseCloner->join();
ASSERT_EQ(getStatus().code(), ErrorCodes::InitialSyncFailure);
@@ -841,11 +857,11 @@ TEST_F(DatabaseClonerTest, CreateCollections) {
const std::vector<BSONObj> sourceInfos = {BSON("name"
<< "a"
<< "options"
- << BSONObj()),
+ << _options1.toBSON()),
BSON("name"
<< "b"
<< "options"
- << BSONObj())};
+ << _options2.toBSON())};
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
processNetworkResponse(
@@ -862,22 +878,12 @@ TEST_F(DatabaseClonerTest, CreateCollections) {
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
ASSERT_TRUE(_databaseCloner->isActive());
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(0, BSONArray()));
- }
- ASSERT_TRUE(_databaseCloner->isActive());
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
- ASSERT_TRUE(_databaseCloner->isActive());
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(0, BSONArray()));
- }
_databaseCloner->join();
ASSERT_OK(getStatus());
diff --git a/src/mongo/db/repl/databases_cloner.cpp b/src/mongo/db/repl/databases_cloner.cpp
index e4cad653e4f..649c51ee7e5 100644
--- a/src/mongo/db/repl/databases_cloner.cpp
+++ b/src/mongo/db/repl/databases_cloner.cpp
@@ -230,6 +230,12 @@ void DatabasesCloner::setScheduleDbWorkFn_forTest(const CollectionCloner::Schedu
_scheduleDbWorkFn = work;
}
+void DatabasesCloner::setStartCollectionClonerFn(
+ const StartCollectionClonerFn& startCollectionCloner) {
+ LockGuard lk(_mutex);
+ _startCollectionClonerFn = startCollectionCloner;
+}
+
StatusWith<std::vector<BSONElement>> DatabasesCloner::parseListDatabasesResponse_forTest(
BSONObj dbResponse) {
return _parseListDatabasesResponse(dbResponse);
@@ -360,6 +366,9 @@ void DatabasesCloner::_onListDatabaseFinish(
if (_scheduleDbWorkFn) {
dbCloner->setScheduleDbWorkFn_forTest(_scheduleDbWorkFn);
}
+ if (_startCollectionClonerFn) {
+ dbCloner->setStartCollectionClonerFn(_startCollectionClonerFn);
+ }
// Start first database cloner.
if (_databaseCloners.empty()) {
startStatus = dbCloner->startup();
diff --git a/src/mongo/db/repl/databases_cloner.h b/src/mongo/db/repl/databases_cloner.h
index afb69c4ade9..dd189dc7fda 100644
--- a/src/mongo/db/repl/databases_cloner.h
+++ b/src/mongo/db/repl/databases_cloner.h
@@ -66,6 +66,8 @@ public:
using IncludeDbFilterFn = stdx::function<bool(const BSONObj& dbInfo)>;
using OnFinishFn = stdx::function<void(const Status&)>;
+ using StartCollectionClonerFn = DatabaseCloner::StartCollectionClonerFn;
+
DatabasesCloner(StorageInterface* si,
executor::TaskExecutor* exec,
ThreadPool* dbWorkThreadPool,
@@ -97,6 +99,13 @@ public:
void setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& scheduleDbWorkFn);
/**
+ * Overrides how executor starts a collection cloner.
+ *
+ * For testing only.
+ */
+ void setStartCollectionClonerFn(const StartCollectionClonerFn& startCollectionCloner);
+
+ /**
* Calls DatabasesCloner::_setAdminAsFirst.
* For testing only.
*/
@@ -170,6 +179,7 @@ private:
ThreadPool* _dbWorkThreadPool; // (R) db worker thread pool for collection cloning.
const HostAndPort _source; // (R) The source to use.
CollectionCloner::ScheduleDbWorkFn _scheduleDbWorkFn; // (M)
+ StartCollectionClonerFn _startCollectionClonerFn; // (M)
const IncludeDbFilterFn _includeDbFn; // (R) function which decides which dbs are cloned.
OnFinishFn _finishFn; // (M) function called when finished.
diff --git a/src/mongo/db/repl/databases_cloner_test.cpp b/src/mongo/db/repl/databases_cloner_test.cpp
index 66f2138ceaa..11dad10fe9e 100644
--- a/src/mongo/db/repl/databases_cloner_test.cpp
+++ b/src/mongo/db/repl/databases_cloner_test.cpp
@@ -39,6 +39,7 @@
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/service_context_test_fixture.h"
+#include "mongo/dbtests/mock/mock_dbclient_connection.h"
#include "mongo/executor/network_interface_mock.h"
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/stdx/mutex.h"
@@ -188,6 +189,8 @@ protected:
};
_dbWorkThreadPool.startup();
+ _target = HostAndPort{"local:1234"};
+ _mockServer = stdx::make_unique<MockRemoteDBServer>(_target.toString());
}
void tearDown() override {
@@ -287,7 +290,7 @@ protected:
DatabasesCloner cloner{&getStorage(),
&getExecutor(),
&getDbWorkThreadPool(),
- HostAndPort{"local:1234"},
+ _target,
[](const BSONObj&) { return true; },
[&](const Status& status) {
UniqueLock lk(mutex);
@@ -300,6 +303,13 @@ protected:
return getExecutor().scheduleWork(work);
});
+ cloner.setStartCollectionClonerFn([this](CollectionCloner& cloner) {
+ cloner.setCreateClientFn_forTest([&cloner, this]() {
+ return std::unique_ptr<DBClientConnection>(
+ new MockDBClientConnection(_mockServer.get()));
+ });
+ return cloner.startup();
+ });
ASSERT_OK(cloner.startup());
ASSERT_TRUE(cloner.isActive());
@@ -327,6 +337,8 @@ private:
protected:
StorageInterfaceMock _storageInterface;
+ HostAndPort _target;
+ std::unique_ptr<MockRemoteDBServer> _mockServer;
private:
ThreadPool _dbWorkThreadPool;
@@ -900,15 +912,22 @@ TEST_F(DBsClonerTest, AdminDbValidationErrorShouldAbortTheCloner) {
}
TEST_F(DBsClonerTest, SingleDatabaseCopiesCompletely) {
+ CollectionOptions options;
+ options.uuid = UUID::gen();
+ _mockServer->assignCollectionUuid("a.a", *options.uuid);
const Responses resps = {
// Clone Start
// listDatabases
{"listDatabases", fromjson("{ok:1, databases:[{name:'a'}]}")},
// listCollections for "a"
{"listCollections",
- fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
- "{name:'a', options:{}} "
- "]}}")},
+ BSON("ok" << 1 << "cursor" << BSON("id" << 0ll << "ns"
+ << "a.$cmd.listCollections"
+ << "firstBatch"
+ << BSON_ARRAY(BSON("name"
+ << "a"
+ << "options"
+ << options.toBSON()))))},
// count:a
{"count", BSON("n" << 1 << "ok" << 1)},
// listIndexes:a
@@ -919,17 +938,18 @@ TEST_F(DBsClonerTest, SingleDatabaseCopiesCompletely) {
"{v:"
<< OplogEntry::kOplogVersion
<< ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}")},
- // find:a
- {"find",
- fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:["
- "{_id:1, a:1} "
- "]}}")},
// Clone Done
};
runCompleteClone(resps);
}
TEST_F(DBsClonerTest, TwoDatabasesCopiesCompletely) {
+ CollectionOptions options1;
+ CollectionOptions options2;
+ options1.uuid = UUID::gen();
+ options2.uuid = UUID::gen();
+ _mockServer->assignCollectionUuid("a.a", *options1.uuid);
+ _mockServer->assignCollectionUuid("b.b", *options1.uuid);
const Responses resps =
{
// Clone Start
@@ -937,9 +957,13 @@ TEST_F(DBsClonerTest, TwoDatabasesCopiesCompletely) {
{"listDatabases", fromjson("{ok:1, databases:[{name:'a'}, {name:'b'}]}")},
// listCollections for "a"
{"listCollections",
- fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
- "{name:'a', options:{}} "
- "]}}")},
+ BSON("ok" << 1 << "cursor" << BSON("id" << 0ll << "ns"
+ << "a.$cmd.listCollections"
+ << "firstBatch"
+ << BSON_ARRAY(BSON("name"
+ << "a"
+ << "options"
+ << options1.toBSON()))))},
// count:a
{"count", BSON("n" << 1 << "ok" << 1)},
// listIndexes:a
@@ -949,16 +973,15 @@ TEST_F(DBsClonerTest, TwoDatabasesCopiesCompletely) {
"{v:"
<< OplogEntry::kOplogVersion
<< ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}")},
- // find:a
- {"find",
- fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:["
- "{_id:1, a:1} "
- "]}}")},
// listCollections for "b"
{"listCollections",
- fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'b.$cmd.listCollections', firstBatch:["
- "{name:'b', options:{}} "
- "]}}")},
+ BSON("ok" << 1 << "cursor" << BSON("id" << 0ll << "ns"
+ << "b.$cmd.listCollections"
+ << "firstBatch"
+ << BSON_ARRAY(BSON("name"
+ << "b"
+ << "options"
+ << options2.toBSON()))))},
// count:b
{"count", BSON("n" << 2 << "ok" << 1)},
// listIndexes:b
@@ -968,11 +991,6 @@ TEST_F(DBsClonerTest, TwoDatabasesCopiesCompletely) {
"{v:"
<< OplogEntry::kOplogVersion
<< ", key:{_id:1}, name:'_id_', ns:'b.b'}]}}")},
- // find:b
- {"find",
- fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'b.b', firstBatch:["
- "{_id:2, a:1},{_id:3, b:1}"
- "]}}")},
};
runCompleteClone(resps);
}
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index e577b273e5c..1665a83aba2 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -381,6 +381,12 @@ void InitialSyncer::setScheduleDbWorkFn_forTest(const CollectionCloner::Schedule
_scheduleDbWorkFn = work;
}
+void InitialSyncer::setStartCollectionClonerFn(
+ const StartCollectionClonerFn& startCollectionCloner) {
+ LockGuard lk(_mutex);
+ _startCollectionClonerFn = startCollectionCloner;
+}
+
void InitialSyncer::_setUp_inlock(OperationContext* opCtx, std::uint32_t initialSyncMaxAttempts) {
// 'opCtx' is passed through from startup().
_replicationProcess->getConsistencyMarkers()->setInitialSyncFlag(opCtx);
@@ -837,6 +843,9 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse>
// facilitate testing.
_initialSyncState->dbsCloner->setScheduleDbWorkFn_forTest(_scheduleDbWorkFn);
}
+ if (_startCollectionClonerFn) {
+ _initialSyncState->dbsCloner->setStartCollectionClonerFn(_startCollectionClonerFn);
+ }
LOG(2) << "Starting DatabasesCloner: " << _initialSyncState->dbsCloner->toString();
diff --git a/src/mongo/db/repl/initial_syncer.h b/src/mongo/db/repl/initial_syncer.h
index d9d1c1944f1..039b01d2056 100644
--- a/src/mongo/db/repl/initial_syncer.h
+++ b/src/mongo/db/repl/initial_syncer.h
@@ -41,6 +41,7 @@
#include "mongo/db/repl/callback_completion_guard.h"
#include "mongo/db/repl/collection_cloner.h"
#include "mongo/db/repl/data_replicator_external_state.h"
+#include "mongo/db/repl/database_cloner.h"
#include "mongo/db/repl/multiapplier.h"
#include "mongo/db/repl/oplog_applier.h"
#include "mongo/db/repl/oplog_buffer.h"
@@ -48,6 +49,7 @@
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/rollback_checker.h"
#include "mongo/db/repl/sync_source_selector.h"
+#include "mongo/dbtests/mock/mock_dbclient_connection.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/functional.h"
#include "mongo/stdx/mutex.h"
@@ -149,6 +151,8 @@ public:
*/
using OnCompletionGuard = CallbackCompletionGuard<StatusWith<OpTimeWithHash>>;
+ using StartCollectionClonerFn = DatabaseCloner::StartCollectionClonerFn;
+
struct InitialSyncAttemptInfo {
int durationMillis;
Status status;
@@ -218,6 +222,13 @@ public:
*/
void setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& scheduleDbWorkFn);
+ /**
+ * Overrides how executor starts a collection cloner.
+ *
+ * For testing only.
+ */
+ void setStartCollectionClonerFn(const StartCollectionClonerFn& startCollectionCloner);
+
// State transitions:
// PreStart --> Running --> ShuttingDown --> Complete
// It is possible to skip intermediate states. For example, calling shutdown() when the data
@@ -621,6 +632,7 @@ private:
// Passed to CollectionCloner via DatabasesCloner.
CollectionCloner::ScheduleDbWorkFn _scheduleDbWorkFn; // (M)
+ StartCollectionClonerFn _startCollectionClonerFn; // (M)
// Contains stats on the current initial sync request (includes all attempts).
// To access these stats in a user-readable format, use getInitialSyncProgress().
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index 5d65c8616ff..eed9e149a80 100644
--- a/src/mongo/db/repl/initial_syncer_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -66,6 +66,7 @@
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/scopeguard.h"
+#include "mongo/db/server_parameters.h"
#include "mongo/unittest/barrier.h"
#include "mongo/unittest/unittest.h"
@@ -324,6 +325,10 @@ protected:
_dbWorkThreadPool = stdx::make_unique<ThreadPool>(ThreadPool::Options());
_dbWorkThreadPool->startup();
+ _target = HostAndPort{"localhost:12346"};
+ _mockServer = stdx::make_unique<MockRemoteDBServer>(_target.toString());
+ _options1.uuid = UUID::gen();
+
Client::initThreadIfNotAlready();
reset();
@@ -401,7 +406,13 @@ protected:
[this](const executor::TaskExecutor::CallbackFn& work) {
return getExecutor().scheduleWork(work);
});
-
+ _initialSyncer->setStartCollectionClonerFn([this](CollectionCloner& cloner) {
+ cloner.setCreateClientFn_forTest([&cloner, this]() {
+ return std::unique_ptr<DBClientConnection>(
+ new MockDBClientConnection(_mockServer.get()));
+ });
+ return cloner.startup();
+ });
} catch (...) {
ASSERT_OK(exceptionToStatus());
}
@@ -454,6 +465,9 @@ protected:
OpTime _myLastOpTime;
std::unique_ptr<SyncSourceSelectorMock> _syncSourceSelector;
std::unique_ptr<StorageInterfaceMock> _storageInterface;
+ HostAndPort _target;
+ std::unique_ptr<MockRemoteDBServer> _mockServer;
+ CollectionOptions _options1;
std::unique_ptr<ReplicationProcess> _replicationProcess;
std::unique_ptr<ThreadPool> _dbWorkThreadPool;
std::map<NamespaceString, CollectionMockStats> _collectionStats;
@@ -2875,15 +2889,19 @@ TEST_F(InitialSyncerTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfter
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->blackHole(noi);
+ // Set up data for "a"
+ _mockServer->assignCollectionUuid(nss.ns(), *_options1.uuid);
+ _mockServer->insert(nss.ns(), BSON("_id" << 1 << "a" << 1));
+
// listCollections for "a"
- request = net->scheduleSuccessfulResponse(
- makeCursorResponse(0LL, nss, {BSON("name" << nss.coll() << "options" << BSONObj())}));
+ request = net->scheduleSuccessfulResponse(makeCursorResponse(
+ 0LL, nss, {BSON("name" << nss.coll() << "options" << _options1.toBSON())}));
assertRemoteCommandNameEquals("listCollections", request);
// count:a
request = assertRemoteCommandNameEquals(
"count", net->scheduleSuccessfulResponse(BSON("n" << 1 << "ok" << 1)));
- ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
+ ASSERT_EQUALS(*_options1.uuid, UUID::parse(request.cmdObj.firstElement()));
ASSERT_EQUALS(nss.db(), request.dbname);
// listIndexes:a
@@ -2896,14 +2914,7 @@ TEST_F(InitialSyncerTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfter
<< "_id_"
<< "ns"
<< nss.ns())})));
- ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
- ASSERT_EQUALS(nss.db(), request.dbname);
-
- // find:a
- request = assertRemoteCommandNameEquals("find",
- net->scheduleSuccessfulResponse(makeCursorResponse(
- 0LL, nss, {BSON("_id" << 1 << "a" << 1)})));
- ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
+ ASSERT_EQUALS(*_options1.uuid, UUID::parse(request.cmdObj.firstElement()));
ASSERT_EQUALS(nss.db(), request.dbname);
// Second last oplog entry fetcher.
@@ -3537,9 +3548,13 @@ TEST_F(InitialSyncerTest,
ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
net->runReadyNetworkOperations();
+ // Set up data for "a"
+ _mockServer->assignCollectionUuid(nss.ns(), *_options1.uuid);
+ _mockServer->insert(nss.ns(), BSON("_id" << 1 << "a" << 1));
+
// listCollections for "a"
- request = net->scheduleSuccessfulResponse(
- makeCursorResponse(0LL, nss, {BSON("name" << nss.coll() << "options" << BSONObj())}));
+ request = net->scheduleSuccessfulResponse(makeCursorResponse(
+ 0LL, nss, {BSON("name" << nss.coll() << "options" << _options1.toBSON())}));
assertRemoteCommandNameEquals("listCollections", request);
// Black hole OplogFetcher's getMore request.
@@ -3551,7 +3566,7 @@ TEST_F(InitialSyncerTest,
// count:a
request = net->scheduleSuccessfulResponse(BSON("n" << 1 << "ok" << 1));
assertRemoteCommandNameEquals("count", request);
- ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
+ ASSERT_EQUALS(*_options1.uuid, UUID::parse(request.cmdObj.firstElement()));
ASSERT_EQUALS(nss.db(), request.dbname);
// listIndexes:a
@@ -3563,14 +3578,7 @@ TEST_F(InitialSyncerTest,
<< "ns"
<< nss.ns())}));
assertRemoteCommandNameEquals("listIndexes", request);
- ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
- ASSERT_EQUALS(nss.db(), request.dbname);
-
- // find:a
- request = net->scheduleSuccessfulResponse(
- makeCursorResponse(0LL, nss, {BSON("_id" << 1 << "a" << 1)}));
- assertRemoteCommandNameEquals("find", request);
- ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
+ ASSERT_EQUALS(*_options1.uuid, UUID::parse(request.cmdObj.firstElement()));
ASSERT_EQUALS(nss.db(), request.dbname);
// Second last oplog entry fetcher.
@@ -3761,6 +3769,10 @@ TEST_F(InitialSyncerTest, OplogOutOfOrderOnOplogFetchFinish) {
TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
+ ASSERT_OK(ServerParameterSet::getGlobal()
+ ->getMap()
+ .find("collectionClonerBatchSize")
+ ->second->setFromString("1"));
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 27017));
ASSERT_OK(initialSyncer->startup(opCtx.get(), 2U));
@@ -3882,9 +3894,15 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) {
ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
net->runReadyNetworkOperations();
+ // Set up data for "a"
+ _mockServer->assignCollectionUuid(nss.ns(), *_options1.uuid);
+ for (int i = 1; i <= 5; ++i) {
+ _mockServer->insert(nss.ns(), BSON("_id" << i << "a" << i));
+ }
+
// listCollections for "a"
- request = net->scheduleSuccessfulResponse(
- makeCursorResponse(0LL, nss, {BSON("name" << nss.coll() << "options" << BSONObj())}));
+ request = net->scheduleSuccessfulResponse(makeCursorResponse(
+ 0LL, nss, {BSON("name" << nss.coll() << "options" << _options1.toBSON())}));
assertRemoteCommandNameEquals("listCollections", request);
auto noi = net->getNextReadyRequest();
@@ -3895,7 +3913,7 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) {
// count:a
request = net->scheduleSuccessfulResponse(BSON("n" << 5 << "ok" << 1));
assertRemoteCommandNameEquals("count", request);
- ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
+ ASSERT_EQUALS(*_options1.uuid, UUID::parse(request.cmdObj.firstElement()));
ASSERT_EQUALS(nss.db(), request.dbname);
// listIndexes:a
@@ -3907,18 +3925,9 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) {
<< "ns"
<< nss.ns())}));
assertRemoteCommandNameEquals("listIndexes", request);
- ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
+ ASSERT_EQUALS(*_options1.uuid, UUID::parse(request.cmdObj.firstElement()));
ASSERT_EQUALS(nss.db(), request.dbname);
- // find:a - 5 batches
- for (int i = 1; i <= 5; ++i) {
- request = net->scheduleSuccessfulResponse(
- makeCursorResponse(i < 5 ? 2LL : 0LL, nss, {BSON("_id" << i << "a" << i)}, i == 1));
- ASSERT_EQUALS(i == 1 ? "find" : "getMore",
- request.cmdObj.firstElement().fieldNameStringData());
- net->runReadyNetworkOperations();
- }
-
// Second last oplog entry fetcher.
// Send oplog entry with timestamp 2. InitialSyncer will update this end timestamp after
// applying the first batch.
@@ -3950,7 +3959,7 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) {
5, collectionProgress.getIntField(CollectionCloner::Stats::kDocumentsCopiedFieldName))
<< collectionProgress;
ASSERT_EQUALS(1, collectionProgress.getIntField("indexes")) << collectionProgress;
- ASSERT_EQUALS(5, collectionProgress.getIntField("fetchedBatches")) << collectionProgress;
+ ASSERT_EQUALS(5, collectionProgress.getIntField("receivedBatches")) << collectionProgress;
attempts = progress["initialSyncAttempts"].Obj();
ASSERT_EQUALS(attempts.nFields(), 1) << progress;
@@ -4061,8 +4070,10 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressOmitsClonerStatsIfClonerStatsExc
// listCollections for "a"
std::vector<BSONObj> collectionInfos;
for (std::size_t i = 0; i < numCollections; ++i) {
+ CollectionOptions options;
const std::string collName = str::stream() << "coll-" << i;
- collectionInfos.push_back(BSON("name" << collName << "options" << BSONObj()));
+ options.uuid = UUID::gen();
+ collectionInfos.push_back(BSON("name" << collName << "options" << options.toBSON()));
}
request = net->scheduleSuccessfulResponse(
makeCursorResponse(0LL, nss.getCommandNS(), collectionInfos));
diff --git a/src/mongo/dbtests/SConscript b/src/mongo/dbtests/SConscript
index f604a3310c3..77fff17d2d3 100644
--- a/src/mongo/dbtests/SConscript
+++ b/src/mongo/dbtests/SConscript
@@ -39,7 +39,6 @@ env.Library(
source=[
'mock/mock_conn_registry.cpp',
'mock/mock_dbclient_connection.cpp',
- 'mock/mock_dbclient_cursor.cpp',
'mock/mock_remote_db_server.cpp',
'mock/mock_replica_set.cpp'
],
diff --git a/src/mongo/dbtests/mock/mock_dbclient_connection.cpp b/src/mongo/dbtests/mock/mock_dbclient_connection.cpp
index b7c477227f9..dfd3332d978 100644
--- a/src/mongo/dbtests/mock/mock_dbclient_connection.cpp
+++ b/src/mongo/dbtests/mock/mock_dbclient_connection.cpp
@@ -29,7 +29,7 @@
#include "mongo/dbtests/mock/mock_dbclient_connection.h"
-#include "mongo/dbtests/mock/mock_dbclient_cursor.h"
+#include "mongo/client/dbclient_mockcursor.h"
#include "mongo/util/net/socket_exception.h"
#include "mongo/util/time_support.h"
@@ -82,14 +82,11 @@ std::unique_ptr<mongo::DBClientCursor> MockDBClientConnection::query(
const BSONObj* fieldsToReturn,
int queryOptions,
int batchSize) {
- // The mock client does not support UUIDs.
- invariant(nsOrUuid.nss());
-
checkConnection();
try {
mongo::BSONArray result(_remoteServer->query(_remoteServerInstanceID,
- nsOrUuid.nss()->ns(),
+ nsOrUuid,
query,
nToReturn,
nToSkip,
@@ -98,7 +95,7 @@ std::unique_ptr<mongo::DBClientCursor> MockDBClientConnection::query(
batchSize));
std::unique_ptr<mongo::DBClientCursor> cursor;
- cursor.reset(new MockDBClientCursor(this, result));
+ cursor.reset(new DBClientMockCursor(this, BSONArray(result.copy()), batchSize));
return cursor;
} catch (const mongo::DBException&) {
_isFailed = true;
@@ -130,9 +127,9 @@ unsigned long long MockDBClientConnection::query(
const NamespaceStringOrUUID& nsOrUuid,
mongo::Query query,
const mongo::BSONObj* fieldsToReturn,
- int queryOptions) {
- verify(false);
- return 0;
+ int queryOptions,
+ int batchSize) {
+ return DBClientBase::query(f, nsOrUuid, query, fieldsToReturn, queryOptions, batchSize);
}
uint64_t MockDBClientConnection::getSockCreationMicroSec() const {
diff --git a/src/mongo/dbtests/mock/mock_dbclient_connection.h b/src/mongo/dbtests/mock/mock_dbclient_connection.h
index 0871c82fcfc..a5186df803f 100644
--- a/src/mongo/dbtests/mock/mock_dbclient_connection.h
+++ b/src/mongo/dbtests/mock/mock_dbclient_connection.h
@@ -61,10 +61,12 @@ public:
bool connect(const char* hostName, StringData applicationName, std::string& errmsg);
- inline bool connect(const HostAndPort& host,
- StringData applicationName,
- std::string& errmsg) override {
- return connect(host.toString().c_str(), applicationName, errmsg);
+ Status connect(const HostAndPort& host, StringData applicationName) override {
+ std::string errmsg;
+ if (!connect(host.toString().c_str(), applicationName, errmsg)) {
+ return {ErrorCodes::HostNotFound, errmsg};
+ }
+ return Status::OK();
}
using DBClientBase::runCommandWithTarget;
@@ -104,7 +106,8 @@ public:
const NamespaceStringOrUUID& nsOrUuid,
mongo::Query query,
const mongo::BSONObj* fieldsToReturn = 0,
- int queryOptions = 0) override;
+ int queryOptions = 0,
+ int batchSize = 0) override;
//
// Unsupported methods (these are pure virtuals in the base class)
diff --git a/src/mongo/dbtests/mock/mock_dbclient_cursor.cpp b/src/mongo/dbtests/mock/mock_dbclient_cursor.cpp
deleted file mode 100644
index f0fb2499adb..00000000000
--- a/src/mongo/dbtests/mock/mock_dbclient_cursor.cpp
+++ /dev/null
@@ -1,49 +0,0 @@
-//@file dbclientmockcursor.h
-
-/* Copyright 2012 10gen Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/dbtests/mock/mock_dbclient_cursor.h"
-
-namespace mongo {
-MockDBClientCursor::MockDBClientCursor(mongo::DBClientBase* client,
- const mongo::BSONArray& resultSet)
- : mongo::DBClientCursor(client, NamespaceString(), 0, 0, 0) {
- _resultSet = resultSet.copy();
- _cursor.reset(new mongo::DBClientMockCursor(client, BSONArray(_resultSet)));
-}
-
-bool MockDBClientCursor::more() {
- return _cursor->more();
-}
-
-mongo::BSONObj MockDBClientCursor::next() {
- return _cursor->next();
-}
-}
diff --git a/src/mongo/dbtests/mock/mock_dbclient_cursor.h b/src/mongo/dbtests/mock/mock_dbclient_cursor.h
deleted file mode 100644
index 2a0561c781b..00000000000
--- a/src/mongo/dbtests/mock/mock_dbclient_cursor.h
+++ /dev/null
@@ -1,58 +0,0 @@
-//@file dbclientmockcursor.h
-
-/* Copyright 2012 10gen Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#pragma once
-
-
-#include "mongo/client/dbclient_cursor.h"
-#include "mongo/client/dbclient_mockcursor.h"
-
-namespace mongo {
-
-/**
- * Simple adapter class for mongo::DBClientMockCursor to mongo::DBClientCursor.
- * Only supports more and next, the behavior of other operations are undefined.
- */
-class MockDBClientCursor : public mongo::DBClientCursor {
-public:
- MockDBClientCursor(mongo::DBClientBase* client, const mongo::BSONArray& mockCollection);
-
- bool more() override;
-
- /**
- * Note: has the same contract as DBClientCursor - returned BSONObj will
- * become invalid when this cursor is destroyed.
- */
- mongo::BSONObj next() override;
-
-private:
- std::unique_ptr<mongo::DBClientMockCursor> _cursor;
- mongo::BSONObj _resultSet;
-};
-}
diff --git a/src/mongo/dbtests/mock/mock_remote_db_server.cpp b/src/mongo/dbtests/mock/mock_remote_db_server.cpp
index baa35e0f8f6..23021cf9fd8 100644
--- a/src/mongo/dbtests/mock/mock_remote_db_server.cpp
+++ b/src/mongo/dbtests/mock/mock_remote_db_server.cpp
@@ -135,6 +135,11 @@ void MockRemoteDBServer::remove(const string& ns, Query query, int flags) {
_dataMgr.erase(ns);
}
+void MockRemoteDBServer::assignCollectionUuid(const std::string& ns, const mongo::UUID& uuid) {
+ scoped_spinlock sLock(_lock);
+ _uuidToNs[uuid] = ns;
+}
+
rpc::UniqueReply MockRemoteDBServer::runCommand(InstanceID id, const OpMsgRequest& request) {
checkIfUp(id);
std::string cmdName = request.getCommandName().toString();
@@ -169,7 +174,7 @@ rpc::UniqueReply MockRemoteDBServer::runCommand(InstanceID id, const OpMsgReques
}
mongo::BSONArray MockRemoteDBServer::query(MockRemoteDBServer::InstanceID id,
- const string& ns,
+ const NamespaceStringOrUUID& nsOrUuid,
mongo::Query query,
int nToReturn,
int nToSkip,
@@ -187,6 +192,7 @@ mongo::BSONArray MockRemoteDBServer::query(MockRemoteDBServer::InstanceID id,
scoped_spinlock sLock(_lock);
_queryCount++;
+ auto ns = nsOrUuid.uuid() ? _uuidToNs[*nsOrUuid.uuid()] : nsOrUuid.nss()->ns();
const vector<BSONObj>& coll = _dataMgr[ns];
BSONArrayBuilder result;
for (vector<BSONObj>::const_iterator iter = coll.begin(); iter != coll.end(); ++iter) {
diff --git a/src/mongo/dbtests/mock/mock_remote_db_server.h b/src/mongo/dbtests/mock/mock_remote_db_server.h
index 3c147220668..9c1ab72184c 100644
--- a/src/mongo/dbtests/mock/mock_remote_db_server.h
+++ b/src/mongo/dbtests/mock/mock_remote_db_server.h
@@ -146,13 +146,21 @@ public:
*/
void remove(const std::string& ns, Query query, int flags = 0);
+ /**
+ * Assign a UUID to a collection
+ *
+ * @param ns the namespace to be associated with the uuid.
+ * @param uuid the uuid to associate with the namespace.
+ */
+ void assignCollectionUuid(const std::string& ns, const mongo::UUID& uuid);
+
//
// DBClientBase methods
//
rpc::UniqueReply runCommand(InstanceID id, const OpMsgRequest& request);
mongo::BSONArray query(InstanceID id,
- const std::string& ns,
+ const NamespaceStringOrUUID& nsOrUuid,
mongo::Query query = mongo::Query(),
int nToReturn = 0,
int nToSkip = 0,
@@ -210,6 +218,7 @@ private:
typedef stdx::unordered_map<std::string, std::shared_ptr<CircularBSONIterator>> CmdToReplyObj;
typedef stdx::unordered_map<std::string, std::vector<BSONObj>> MockDataMgr;
+ typedef stdx::unordered_map<mongo::UUID, std::string, UUID::Hash> UUIDMap;
bool _isRunning;
@@ -221,6 +230,7 @@ private:
//
CmdToReplyObj _cmdMap;
MockDataMgr _dataMgr;
+ UUIDMap _uuidToNs;
//
// Op Counters