From 4917206219237841b61b09a22848a3d1e7733adc Mon Sep 17 00:00:00 2001 From: Matthew Russotto Date: Wed, 19 Sep 2018 17:05:12 -0400 Subject: SERVER-36096 Convert CollectionCloner to use DBClientConnection. --- src/mongo/client/dbclient_base.cpp | 11 +- src/mongo/client/dbclient_base.h | 12 +- src/mongo/client/dbclient_connection.cpp | 8 +- src/mongo/client/dbclient_connection.h | 9 +- src/mongo/client/dbclient_cursor.h | 22 +- src/mongo/client/dbclient_mockcursor.h | 34 +- src/mongo/db/repl/SConscript | 4 + src/mongo/db/repl/base_cloner_test_fixture.cpp | 14 +- src/mongo/db/repl/collection_cloner.cpp | 341 +++------ src/mongo/db/repl/collection_cloner.h | 104 +-- src/mongo/db/repl/collection_cloner_test.cpp | 779 +++++++++------------ src/mongo/db/repl/database_cloner.cpp | 15 +- src/mongo/db/repl/database_cloner_test.cpp | 188 ++--- src/mongo/db/repl/databases_cloner.cpp | 9 + src/mongo/db/repl/databases_cloner.h | 10 + src/mongo/db/repl/databases_cloner_test.cpp | 68 +- src/mongo/db/repl/initial_syncer.cpp | 9 + src/mongo/db/repl/initial_syncer.h | 12 + src/mongo/db/repl/initial_syncer_test.cpp | 87 ++- src/mongo/dbtests/SConscript | 1 - .../dbtests/mock/mock_dbclient_connection.cpp | 15 +- src/mongo/dbtests/mock/mock_dbclient_connection.h | 13 +- src/mongo/dbtests/mock/mock_dbclient_cursor.cpp | 49 -- src/mongo/dbtests/mock/mock_dbclient_cursor.h | 58 -- src/mongo/dbtests/mock/mock_remote_db_server.cpp | 8 +- src/mongo/dbtests/mock/mock_remote_db_server.h | 12 +- 26 files changed, 832 insertions(+), 1060 deletions(-) delete mode 100644 src/mongo/dbtests/mock/mock_dbclient_cursor.cpp delete mode 100644 src/mongo/dbtests/mock/mock_dbclient_cursor.h diff --git a/src/mongo/client/dbclient_base.cpp b/src/mongo/client/dbclient_base.cpp index 5acd20e6289..1ea3b8e6b5b 100644 --- a/src/mongo/client/dbclient_base.cpp +++ b/src/mongo/client/dbclient_base.cpp @@ -720,22 +720,25 @@ unsigned long long DBClientBase::query(stdx::function f, const NamespaceStringOrUUID& nsOrUuid, Query query, const BSONObj* fieldsToReturn, - int queryOptions) { + int queryOptions, + int batchSize) { DBClientFunConvertor fun; fun._f = f; stdx::function ptr(fun); - return this->query(ptr, nsOrUuid, query, fieldsToReturn, queryOptions); + return this->query(ptr, nsOrUuid, query, fieldsToReturn, queryOptions, batchSize); } unsigned long long DBClientBase::query(stdx::function f, const NamespaceStringOrUUID& nsOrUuid, Query query, const BSONObj* fieldsToReturn, - int queryOptions) { + int queryOptions, + int batchSize) { // mask options queryOptions &= (int)(QueryOption_NoCursorTimeout | QueryOption_SlaveOk); - unique_ptr c(this->query(nsOrUuid, query, 0, 0, fieldsToReturn, queryOptions)); + unique_ptr c( + this->query(nsOrUuid, query, 0, 0, fieldsToReturn, queryOptions, batchSize)); uassert(16090, "socket error for mapping query", c.get()); unsigned long long n = 0; diff --git a/src/mongo/client/dbclient_base.h b/src/mongo/client/dbclient_base.h index 13c8de941ef..c6a8ebf2f0e 100644 --- a/src/mongo/client/dbclient_base.h +++ b/src/mongo/client/dbclient_base.h @@ -83,13 +83,15 @@ class DBClientQueryInterface { const NamespaceStringOrUUID& nsOrUuid, Query query, const BSONObj* fieldsToReturn = 0, - int queryOptions = 0) = 0; + int queryOptions = 0, + int batchSize = 0) = 0; virtual unsigned long long query(stdx::function f, const NamespaceStringOrUUID& nsOrUuid, Query query, const BSONObj* fieldsToReturn = 0, - int queryOptions = 0) = 0; + int queryOptions = 0, + int batchSize = 0) = 0; }; /** @@ -604,13 +606,15 @@ public: const NamespaceStringOrUUID& nsOrUuid, Query query, const BSONObj* fieldsToReturn = 0, - int queryOptions = 0) final; + int queryOptions = 0, + int batchSize = 0) final; unsigned long long query(stdx::function f, const NamespaceStringOrUUID& nsOrUuid, Query query, const BSONObj* fieldsToReturn = 0, - int queryOptions = 0) override; + int queryOptions = 0, + int batchSize = 0) override; /** don't use this - called automatically by DBClientCursor for you diff --git a/src/mongo/client/dbclient_connection.cpp b/src/mongo/client/dbclient_connection.cpp index 71fb7ca9a98..f623c91c325 100644 --- a/src/mongo/client/dbclient_connection.cpp +++ b/src/mongo/client/dbclient_connection.cpp @@ -508,16 +508,18 @@ unsigned long long DBClientConnection::query(stdx::function c(this->query(nsOrUuid, query, 0, 0, fieldsToReturn, queryOptions)); + unique_ptr c( + this->query(nsOrUuid, query, 0, 0, fieldsToReturn, queryOptions, batchSize)); uassert(13386, "socket error for mapping query", c.get()); unsigned long long n = 0; diff --git a/src/mongo/client/dbclient_connection.h b/src/mongo/client/dbclient_connection.h index 3977d03e279..9b56bebc608 100644 --- a/src/mongo/client/dbclient_connection.h +++ b/src/mongo/client/dbclient_connection.h @@ -108,9 +108,7 @@ public: * @param errmsg any relevant error message will appended to the string * @return false if fails to connect. */ - virtual bool connect(const HostAndPort& server, - StringData applicationName, - std::string& errmsg); + bool connect(const HostAndPort& server, StringData applicationName, std::string& errmsg); /** * Semantically equivalent to the previous connect method, but returns a Status @@ -121,7 +119,7 @@ public: * @param a hook to validate the 'isMaster' reply received during connection. If the hook * fails, the connection will be terminated and a non-OK status will be returned. */ - Status connect(const HostAndPort& server, StringData applicationName); + virtual Status connect(const HostAndPort& server, StringData applicationName); /** * This version of connect does not run 'isMaster' after creating a TCP connection to the @@ -167,7 +165,8 @@ public: const NamespaceStringOrUUID& nsOrUuid, Query query, const BSONObj* fieldsToReturn, - int queryOptions) override; + int queryOptions, + int batchSize = 0) override; using DBClientBase::runCommandWithTarget; std::pair runCommandWithTarget(OpMsgRequest request) override; diff --git a/src/mongo/client/dbclient_cursor.h b/src/mongo/client/dbclient_cursor.h index bf0c520552c..7ce30e33633 100644 --- a/src/mongo/client/dbclient_cursor.h +++ b/src/mongo/client/dbclient_cursor.h @@ -221,6 +221,18 @@ public: return _connectionHasPendingReplies; } +protected: + struct Batch { + // TODO remove constructors after c++17 toolchain upgrade + Batch() = default; + Batch(std::vector initial, size_t initialPos = 0) + : objs(std::move(initial)), pos(initialPos) {} + std::vector objs; + size_t pos = 0; + }; + + Batch batch; + private: DBClientCursor(DBClientBase* client, const NamespaceStringOrUUID& nsOrUuid, @@ -235,16 +247,6 @@ private: int nextBatchSize(); - struct Batch { - // TODO remove constructors after c++17 toolchain upgrade - Batch() = default; - Batch(std::vector initial, size_t initialPos = 0) - : objs(std::move(initial)), pos(initialPos) {} - std::vector objs; - size_t pos = 0; - }; - - Batch batch; DBClientBase* _client; std::string _originalHost; NamespaceStringOrUUID _nsOrUuid; diff --git a/src/mongo/client/dbclient_mockcursor.h b/src/mongo/client/dbclient_mockcursor.h index 6ebd87f52ef..873805f0cae 100644 --- a/src/mongo/client/dbclient_mockcursor.h +++ b/src/mongo/client/dbclient_mockcursor.h @@ -32,28 +32,46 @@ #include "mongo/client/dbclient_cursor.h" namespace mongo { - +// DBClientMockCursor supports only a small subset of DBClientCursor operations. +// It supports only iteration, including use of DBClientCursorBatchIterator. If a batchsize +// is given, iteration is broken up into multiple batches at batchSize boundaries. class DBClientMockCursor : public DBClientCursor { public: - DBClientMockCursor(mongo::DBClientBase* client, const BSONArray& mockCollection) + DBClientMockCursor(mongo::DBClientBase* client, + const BSONArray& mockCollection, + unsigned long batchSize = 0) : mongo::DBClientCursor(client, NamespaceString(), 0, 0, 0), _collectionArray(mockCollection), - _iter(_collectionArray) {} + _iter(_collectionArray), + _batchSize(batchSize) { + if (_batchSize) + setBatchSize(_batchSize); + fillNextBatch(); + } virtual ~DBClientMockCursor() {} - bool more() { - return _iter.more(); - } - BSONObj next() { - return _iter.next().Obj(); + bool more() override { + if (_batchSize && batch.pos == _batchSize) { + fillNextBatch(); + } + return batch.pos < batch.objs.size(); } private: + void fillNextBatch() { + int leftInBatch = _batchSize; + batch.objs.clear(); + while (_iter.more() && (!_batchSize || leftInBatch--)) { + batch.objs.emplace_back(_iter.next().Obj()); + } + batch.pos = 0; + } // The BSONObjIterator expects the underlying BSONObj to stay in scope while the // iterator is in use, so we store it here. BSONArray _collectionArray; BSONObjIterator _iter; + unsigned long _batchSize; // non-copyable , non-assignable DBClientMockCursor(const DBClientMockCursor&); diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 98d1cee071f..a82e03769a9 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1246,6 +1246,7 @@ env.Library( ], LIBDEPS=[ 'task_runner', + 'oplogreader', '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/client/fetcher', '$BUILD_DIR/mongo/client/remote_command_retry_scheduler', @@ -1266,6 +1267,7 @@ env.CppUnitTest( 'base_cloner_test_fixture', '$BUILD_DIR/mongo/db/auth/authmocks', '$BUILD_DIR/mongo/db/auth/authorization_manager_global', + '$BUILD_DIR/mongo/dbtests/mocklib', '$BUILD_DIR/mongo/unittest/task_executor_proxy', ], ) @@ -1292,6 +1294,7 @@ env.CppUnitTest( 'base_cloner_test_fixture', '$BUILD_DIR/mongo/db/auth/authmocks', '$BUILD_DIR/mongo/db/commands/list_collections_filter', + '$BUILD_DIR/mongo/dbtests/mocklib', '$BUILD_DIR/mongo/unittest/task_executor_proxy', ], ) @@ -1304,6 +1307,7 @@ env.Library( LIBDEPS=[ 'database_cloner', '$BUILD_DIR/mongo/db/service_context', + '$BUILD_DIR/mongo/dbtests/mocklib', ], ) diff --git a/src/mongo/db/repl/base_cloner_test_fixture.cpp b/src/mongo/db/repl/base_cloner_test_fixture.cpp index 88d0a84d996..930d901d86a 100644 --- a/src/mongo/db/repl/base_cloner_test_fixture.cpp +++ b/src/mongo/db/repl/base_cloner_test_fixture.cpp @@ -214,13 +214,13 @@ void BaseClonerTest::finishProcessingNetworkResponse() { } void BaseClonerTest::testLifeCycle() { - // IsActiveAfterStart + log() << "Testing IsActiveAfterStart"; ASSERT_FALSE(getCloner()->isActive()); ASSERT_OK(getCloner()->startup()); ASSERT_TRUE(getCloner()->isActive()); tearDown(); - // StartWhenActive + log() << "Testing StartWhenActive"; setUp(); ASSERT_OK(getCloner()->startup()); ASSERT_TRUE(getCloner()->isActive()); @@ -228,28 +228,28 @@ void BaseClonerTest::testLifeCycle() { ASSERT_TRUE(getCloner()->isActive()); tearDown(); - // CancelWithoutStart + log() << "Testing CancelWithoutStart"; setUp(); ASSERT_FALSE(getCloner()->isActive()); getCloner()->shutdown(); ASSERT_FALSE(getCloner()->isActive()); tearDown(); - // WaitWithoutStart + log() << "Testing WaitWithoutStart"; setUp(); ASSERT_FALSE(getCloner()->isActive()); getCloner()->join(); ASSERT_FALSE(getCloner()->isActive()); tearDown(); - // ShutdownBeforeStart + log() << "Testing ShutdownBeforeStart"; setUp(); getExecutor().shutdown(); ASSERT_NOT_OK(getCloner()->startup()); ASSERT_FALSE(getCloner()->isActive()); tearDown(); - // StartAndCancel + log() << "Testing StartAndCancel"; setUp(); ASSERT_OK(getCloner()->startup()); getCloner()->shutdown(); @@ -261,7 +261,7 @@ void BaseClonerTest::testLifeCycle() { ASSERT_FALSE(getCloner()->isActive()); tearDown(); - // StartButShutdown + log() << "Testing StartButShutdown"; setUp(); ASSERT_OK(getCloner()->startup()); { diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index 041c7eaf113..fac7cec4235 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -36,9 +36,12 @@ #include "mongo/base/string_data.h" #include "mongo/bson/util/bson_extract.h" +#include "mongo/client/dbclient_connection.h" #include "mongo/client/remote_command_retry_scheduler.h" #include "mongo/db/catalog/collection_options.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/query/cursor_response.h" +#include "mongo/db/repl/oplogreader.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/server_parameters.h" @@ -79,7 +82,7 @@ MONGO_FAIL_POINT_DEFINE(initialSyncHangBeforeCollectionClone); MONGO_FAIL_POINT_DEFINE(initialSyncHangDuringCollectionClone); // Failpoint which causes initial sync to hang after handling the next batch of results from the -// 'AsyncResultsMerger', optionally limited to a specific collection. +// DBClientConnection, optionally limited to a specific collection. MONGO_FAIL_POINT_DEFINE(initialSyncHangCollectionClonerAfterHandlingBatchResponse); // Failpoint which causes initial sync to hang before establishing the cursors (but after @@ -161,20 +164,26 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor, _dbWorkTaskRunner.schedule(task); return executor::TaskExecutor::CallbackHandle(); }), + _createClientFn([] { return stdx::make_unique(); }), _progressMeter(1U, // total will be replaced with count command result. kProgressMeterSecondsBetween, kProgressMeterCheckInterval, "documents copied", str::stream() << _sourceNss.toString() << " collection clone progress"), - _collectionCloningBatchSize(batchSize) { + _collectionClonerBatchSize(batchSize) { // Fetcher throws an exception on null executor. invariant(executor); uassert(ErrorCodes::BadValue, "invalid collection namespace: " + sourceNss.ns(), sourceNss.isValid()); uassertStatusOK(options.validateForStorage()); + uassert(50953, + "Missing collection UUID in CollectionCloner, collection name: " + sourceNss.ns(), + _options.uuid); uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion); uassert(ErrorCodes::BadValue, "storage interface cannot be null", storageInterface); + uassert( + 50954, "collectionClonerBatchSize must be non-negative.", _collectionClonerBatchSize >= 0); _stats.ns = _sourceNss.ns(); } @@ -245,19 +254,6 @@ void CollectionCloner::shutdown() { } void CollectionCloner::_cancelRemainingWork_inlock() { - if (_arm) { - // This method can be called from a callback from either a TaskExecutor or a TaskRunner. The - // TaskExecutor should never have an OperationContext attached to the Client, and the - // TaskRunner should always have an OperationContext attached. Unfortunately, we don't know - // which situation we're in, so have to handle both. - auto& client = cc(); - if (auto opCtx = client.getOperationContext()) { - _killArmHandle = _arm->kill(opCtx); - } else { - auto newOpCtx = client.makeOperationContext(); - _killArmHandle = _arm->kill(newOpCtx.get()); - } - } _countScheduler.shutdown(); _listIndexesFetcher.shutdown(); if (_establishCollectionCursorsScheduler) { @@ -266,6 +262,8 @@ void CollectionCloner::_cancelRemainingWork_inlock() { if (_verifyCollectionDroppedScheduler) { _verifyCollectionDroppedScheduler->shutdown(); } + _queryState = + _queryState == QueryState::kRunning ? QueryState::kCanceling : QueryState::kFinished; _dbWorkTaskRunner.cancel(); } @@ -276,10 +274,10 @@ CollectionCloner::Stats CollectionCloner::getStats() const { void CollectionCloner::join() { stdx::unique_lock lk(_mutex); - if (_killArmHandle) { - _executor->waitForEvent(_killArmHandle); - } - _condition.wait(lk, [this]() { return !_isActive_inlock(); }); + _condition.wait(lk, [this]() { + return (_queryState == QueryState::kNotStarted || _queryState == QueryState::kFinished) && + !_isActive_inlock(); + }); } void CollectionCloner::waitForDbWorker() { @@ -294,6 +292,11 @@ void CollectionCloner::setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& sched _scheduleDbWorkFn = scheduleDbWorkFn; } +void CollectionCloner::setCreateClientFn_forTest(const CreateClientFn& createClientFn) { + stdx::lock_guard lk(_mutex); + _createClientFn = createClientFn; +} + std::vector CollectionCloner::getDocumentsToInsert_forTest() { LockGuard lk(_mutex); return _documentsToInsert; @@ -317,7 +320,7 @@ void CollectionCloner::_countCallback( long long count = 0; Status commandStatus = getStatusFromCommandResult(args.response.data); - if (commandStatus == ErrorCodes::NamespaceNotFound && _options.uuid) { + if (commandStatus == ErrorCodes::NamespaceNotFound) { // Querying by a non-existing collection by UUID returns an error. Treat same as // behavior of find by namespace and use count == 0. } else if (!commandStatus.isOK()) { @@ -475,15 +478,44 @@ void CollectionCloner::_beginCollectionCallback(const executor::TaskExecutor::Ca _collLoader = std::move(collectionBulkLoader.getValue()); - BSONObjBuilder cmdObj; - - cmdObj.appendElements(makeCommandWithUUIDorCollectionName("find", _options.uuid, _sourceNss)); - cmdObj.append("noCursorTimeout", true); - // Set batchSize to be 0 to establish the cursor without fetching any documents, - cmdObj.append("batchSize", 0); + // The query cannot run on the database work thread, because it needs to be able to + // schedule work on that thread while still running. + auto runQueryCallback = + _executor->scheduleWork([this](const executor::TaskExecutor::CallbackArgs& callbackData) { + ON_BLOCK_EXIT([this] { + { + stdx::lock_guard lock(_mutex); + _queryState = QueryState::kFinished; + } + _condition.notify_all(); + }); + _runQuery(callbackData); + }); + if (!runQueryCallback.isOK()) { + _finishCallback(runQueryCallback.getStatus()); + return; + } +} - Client::initThreadIfNotAlready(); - auto opCtx = cc().getOperationContext(); +void CollectionCloner::_runQuery(const executor::TaskExecutor::CallbackArgs& callbackData) { + if (!callbackData.status.isOK()) { + _finishCallback(callbackData.status); + return; + } + bool queryStateOK = false; + { + stdx::lock_guard lock(_mutex); + queryStateOK = _queryState == QueryState::kNotStarted; + if (queryStateOK) { + _queryState = QueryState::kRunning; + } + } + if (!queryStateOK) { + // _finishCallback must not called with _mutex locked. If the queryState changes + // after the mutex is released, we will do the query and cancel after the first batch. + _finishCallback({ErrorCodes::CallbackCanceled, "Collection cloning cancelled."}); + return; + } MONGO_FAIL_POINT_BLOCK(initialSyncHangBeforeCollectionClone, options) { const BSONObj& data = options.getData(); @@ -496,205 +528,81 @@ void CollectionCloner::_beginCollectionCallback(const executor::TaskExecutor::Ca } } - _establishCollectionCursorsScheduler = stdx::make_unique( - _executor, - RemoteCommandRequest(_source, - _sourceNss.db().toString(), - cmdObj.obj(), - ReadPreferenceSetting::secondaryPreferredMetadata(), - opCtx, - RemoteCommandRequest::kNoTimeout), - [=](const RemoteCommandCallbackArgs& rcbd) { _establishCollectionCursorsCallback(rcbd); }, - RemoteCommandRetryScheduler::makeRetryPolicy( - numInitialSyncCollectionFindAttempts.load(), - executor::RemoteCommandRequest::kNoTimeout, - RemoteCommandRetryScheduler::kAllRetriableErrors)); - auto scheduleStatus = _establishCollectionCursorsScheduler->startup(); - - if (!scheduleStatus.isOK()) { - _establishCollectionCursorsScheduler.reset(); - _finishCallback(scheduleStatus); - return; - } -} - -Status CollectionCloner::_parseCursorResponse(BSONObj response, - std::vector* cursors) { - StatusWith findResponse = CursorResponse::parseFromBSON(response); - if (!findResponse.isOK()) { - return findResponse.getStatus().withContext( - str::stream() << "Error parsing the 'find' query against collection '" - << _sourceNss.ns() - << "'"); - } - cursors->push_back(std::move(findResponse.getValue())); - return Status::OK(); -} - -void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCallbackArgs& rcbd) { - if (_state == State::kShuttingDown) { - Status shuttingDownStatus{ErrorCodes::CallbackCanceled, "Cloner shutting down."}; - _finishCallback(shuttingDownStatus); - return; - } - auto response = rcbd.response; - if (!response.isOK()) { - _finishCallback(response.status); - return; - } - Status commandStatus = getStatusFromCommandResult(response.data); - if (commandStatus == ErrorCodes::NamespaceNotFound) { - _finishCallback(Status::OK()); + auto conn = _createClientFn(); + Status clientConnectionStatus = conn->connect(_source, StringData()); + if (!clientConnectionStatus.isOK()) { + _finishCallback(clientConnectionStatus); return; } - if (!commandStatus.isOK()) { - _finishCallback(commandStatus.withContext( - str::stream() << "Error querying collection '" << _sourceNss.ns() << "'")); + if (!replAuthenticate(conn.get())) { + _finishCallback({ErrorCodes::AuthenticationFailed, + str::stream() << "Failed to authenticate to " << _source}); return; } - std::vector cursorResponses; - Status parseResponseStatus = _parseCursorResponse(response.data, &cursorResponses); - if (!parseResponseStatus.isOK()) { - _finishCallback(parseResponseStatus); - return; - } - LOG(1) << "Collection cloner running with " << cursorResponses.size() - << " cursors established."; - - // Initialize the 'AsyncResultsMerger'(ARM). - std::vector remoteCursors; - for (auto&& cursorResponse : cursorResponses) { - // A placeholder 'ShardId' is used until the ARM is made less sharding specific. - remoteCursors.emplace_back(); - auto& newCursor = remoteCursors.back(); - newCursor.setShardId("CollectionClonerSyncSource"); - newCursor.setHostAndPort(_source); - newCursor.setCursorResponse(std::move(cursorResponse)); - } - - AsyncResultsMergerParams armParams; - armParams.setNss(_sourceNss); - armParams.setRemotes(std::move(remoteCursors)); - if (_collectionCloningBatchSize > 0) { - armParams.setBatchSize(_collectionCloningBatchSize); - } - auto opCtx = cc().makeOperationContext(); - _arm = stdx::make_unique(opCtx.get(), _executor, std::move(armParams)); - _arm->detachFromOperationContext(); - opCtx.reset(); - // This completion guard invokes _finishCallback on destruction. auto cancelRemainingWorkInLock = [this]() { _cancelRemainingWork_inlock(); }; auto finishCallbackFn = [this](const Status& status) { _finishCallback(status); }; auto onCompletionGuard = std::make_shared(cancelRemainingWorkInLock, finishCallbackFn); - // Lock guard must be declared after completion guard. If there is an error in this function - // that will cause the destructor of the completion guard to run, the destructor must be run - // outside the mutex. This is a necessary condition to invoke _finishCallback. - stdx::lock_guard lock(_mutex); - Status scheduleStatus = _scheduleNextARMResultsCallback(onCompletionGuard); - if (!scheduleStatus.isOK()) { - onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, scheduleStatus); - return; - } -} - -Status CollectionCloner::_bufferNextBatchFromArm(WithLock lock) { - // We expect this callback to execute in a thread from a TaskExecutor which will not have an - // OperationContext populated. We must make one ourselves. - auto opCtx = cc().makeOperationContext(); - _arm->reattachToOperationContext(opCtx.get()); - while (_arm->ready()) { - auto armResultStatus = _arm->nextReady(); - if (!armResultStatus.getStatus().isOK()) { - return armResultStatus.getStatus(); - } - if (armResultStatus.getValue().isEOF()) { - // We have reached the end of the batch. - break; - } else { - auto queryResult = armResultStatus.getValue().getResult(); - _documentsToInsert.push_back(std::move(*queryResult)); + try { + conn->query( + [this, onCompletionGuard](DBClientCursorBatchIterator& iter) { + _handleNextBatch(onCompletionGuard, iter); + }, + NamespaceStringOrUUID(_sourceNss.db().toString(), *_options.uuid), + Query(), + nullptr /* fieldsToReturn */, + QueryOption_NoCursorTimeout | QueryOption_SlaveOk, + _collectionClonerBatchSize); + } catch (const DBException& e) { + auto queryStatus = e.toStatus().withContext(str::stream() << "Error querying collection '" + << _sourceNss.ns()); + stdx::unique_lock lock(_mutex); + if (queryStatus.code() == ErrorCodes::OperationFailed || + queryStatus.code() == ErrorCodes::CursorNotFound) { + // With these errors, it's possible the collection was dropped while we were + // cloning. If so, we'll execute the drop during oplog application, so it's OK to + // just stop cloning. + _verifyCollectionWasDropped(lock, queryStatus, onCompletionGuard); + return; + } else if (queryStatus.code() != ErrorCodes::NamespaceNotFound) { + // NamespaceNotFound means the collection was dropped before we started cloning, so + // we're OK to ignore the error. Any other error we must report. + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, queryStatus); + return; } } - _arm->detachFromOperationContext(); - - return Status::OK(); -} - -Status CollectionCloner::_scheduleNextARMResultsCallback( - std::shared_ptr onCompletionGuard) { - // We expect this callback to execute in a thread from a TaskExecutor which will not have an - // OperationContext populated. We must make one ourselves. - auto opCtx = cc().makeOperationContext(); - _arm->reattachToOperationContext(opCtx.get()); - auto nextEvent = _arm->nextEvent(); - _arm->detachFromOperationContext(); - if (!nextEvent.isOK()) { - return nextEvent.getStatus(); - } - auto event = nextEvent.getValue(); - auto handleARMResultsOnNextEvent = - _executor->onEvent(event, [=](const executor::TaskExecutor::CallbackArgs& cbd) { - _handleARMResultsCallback(cbd, onCompletionGuard); - }); - return handleARMResultsOnNextEvent.getStatus(); + waitForDbWorker(); + stdx::lock_guard lock(_mutex); + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, Status::OK()); } -void CollectionCloner::_handleARMResultsCallback( - const executor::TaskExecutor::CallbackArgs& cbd, - std::shared_ptr onCompletionGuard) { - auto setResultAndCancelRemainingWork = [this](std::shared_ptr guard, - Status status) { - stdx::lock_guard lock(_mutex); - guard->setResultAndCancelRemainingWork_inlock(lock, status); - return; - }; - - if (!cbd.status.isOK()) { - // Wait for active inserts to complete. - waitForDbWorker(); - Status newStatus = cbd.status.withContext(str::stream() << "Error querying collection '" - << _sourceNss.ns()); - setResultAndCancelRemainingWork(onCompletionGuard, cbd.status); - return; - } - - // Pull the documents from the ARM into a buffer until the entire batch has been processed. - bool lastBatch; +void CollectionCloner::_handleNextBatch(std::shared_ptr onCompletionGuard, + DBClientCursorBatchIterator& iter) { + _stats.receivedBatches++; { - UniqueLock lk(_mutex); - auto nextBatchStatus = _bufferNextBatchFromArm(lk); - if (!nextBatchStatus.isOK()) { - if (_options.uuid && (nextBatchStatus.code() == ErrorCodes::OperationFailed || - nextBatchStatus.code() == ErrorCodes::CursorNotFound)) { - // With these errors, it's possible the collection was dropped while we were - // cloning. If so, we'll execute the drop during oplog application, so it's OK to - // just stop cloning. This is only safe if cloning by UUID; if we are cloning by - // name, we have no way to detect if the collection was dropped and another - // collection with the same name created in the interim. - _verifyCollectionWasDropped(lk, nextBatchStatus, onCompletionGuard, cbd.opCtx); - } else { - onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, nextBatchStatus); - } - return; + stdx::lock_guard lk(_mutex); + uassert(ErrorCodes::CallbackCanceled, + "Collection cloning cancelled.", + _queryState != QueryState::kCanceling); + while (iter.moreInCurrentBatch()) { + BSONObj o = iter.nextSafe(); + _documentsToInsert.emplace_back(std::move(o)); } - - // Check if this is the last batch of documents to clone. - lastBatch = _arm->remotesExhausted(); } // Schedule the next document batch insertion. auto&& scheduleResult = _scheduleDbWorkFn([=](const executor::TaskExecutor::CallbackArgs& cbd) { - _insertDocumentsCallback(cbd, lastBatch, onCompletionGuard); + _insertDocumentsCallback(cbd, onCompletionGuard); }); + if (!scheduleResult.isOK()) { Status newStatus = scheduleResult.getStatus().withContext( str::stream() << "Error cloning collection '" << _sourceNss.ns() << "'"); - setResultAndCancelRemainingWork(onCompletionGuard, scheduleResult.getStatus()); - return; + // We must throw an exception to terminate query. + uassertStatusOK(newStatus); } MONGO_FAIL_POINT_BLOCK(initialSyncHangCollectionClonerAfterHandlingBatchResponse, nssData) { @@ -711,23 +619,12 @@ void CollectionCloner::_handleARMResultsCallback( } } } - - // If the remote cursors are not exhausted, schedule this callback again to handle - // the impending cursor response. - if (!lastBatch) { - Status scheduleStatus = _scheduleNextARMResultsCallback(onCompletionGuard); - if (!scheduleStatus.isOK()) { - setResultAndCancelRemainingWork(onCompletionGuard, scheduleStatus); - return; - } - } } void CollectionCloner::_verifyCollectionWasDropped( const stdx::unique_lock& lk, Status batchStatus, - std::shared_ptr onCompletionGuard, - OperationContext* opCtx) { + std::shared_ptr onCompletionGuard) { // If we already have a _verifyCollectionDroppedScheduler, just return; the existing // scheduler will take care of cleaning up. if (_verifyCollectionDroppedScheduler) { @@ -742,7 +639,7 @@ void CollectionCloner::_verifyCollectionWasDropped( _sourceNss.db().toString(), cmdObj.obj(), ReadPreferenceSetting::secondaryPreferredMetadata(), - opCtx, + nullptr /* No OperationContext require for replication commands */, RemoteCommandRequest::kNoTimeout), [this, batchStatus, onCompletionGuard](const RemoteCommandCallbackArgs& args) { // If the attempt to determine if the collection was dropped fails for any reason other @@ -786,7 +683,6 @@ void CollectionCloner::_verifyCollectionWasDropped( void CollectionCloner::_insertDocumentsCallback( const executor::TaskExecutor::CallbackArgs& cbd, - bool lastBatch, std::shared_ptr onCompletionGuard) { if (!cbd.status.isOK()) { stdx::lock_guard lock(_mutex); @@ -798,14 +694,11 @@ void CollectionCloner::_insertDocumentsCallback( std::vector docs; if (_documentsToInsert.size() == 0) { warning() << "_insertDocumentsCallback, but no documents to insert for ns:" << _destNss; - if (lastBatch) { - onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, Status::OK()); - } return; } _documentsToInsert.swap(docs); _stats.documentsCopied += docs.size(); - ++_stats.fetchBatches; + ++_stats.fetchedBatches; _progressMeter.hit(int(docs.size())); invariant(_collLoader); const auto status = _collLoader->insertDocuments(docs.cbegin(), docs.cend()); @@ -827,11 +720,6 @@ void CollectionCloner::_insertDocumentsCallback( lk.lock(); } } - - if (lastBatch) { - // Clean up resources once the last batch has been copied over and set the status to OK. - onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, Status::OK()); - } } void CollectionCloner::_finishCallback(const Status& status) { @@ -898,7 +786,7 @@ void CollectionCloner::Stats::append(BSONObjBuilder* builder) const { builder->appendNumber(kDocumentsToCopyFieldName, documentToCopy); builder->appendNumber(kDocumentsCopiedFieldName, documentsCopied); builder->appendNumber("indexes", indexes); - builder->appendNumber("fetchedBatches", fetchBatches); + builder->appendNumber("fetchedBatches", fetchedBatches); if (start != Date_t()) { builder->appendDate("start", start); if (end != Date_t()) { @@ -908,6 +796,7 @@ void CollectionCloner::Stats::append(BSONObjBuilder* builder) const { builder->appendNumber("elapsedMillis", elapsedMillis); } } + builder->appendNumber("receivedBatches", receivedBatches); } } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h index 038a2a41b64..bdcc3bd05cd 100644 --- a/src/mongo/db/repl/collection_cloner.h +++ b/src/mongo/db/repl/collection_cloner.h @@ -36,6 +36,7 @@ #include "mongo/base/status.h" #include "mongo/base/string_data.h" #include "mongo/bson/bsonobj.h" +#include "mongo/client/dbclient_connection.h" #include "mongo/client/fetcher.h" #include "mongo/client/remote_command_retry_scheduler.h" #include "mongo/db/catalog/collection_options.h" @@ -46,7 +47,6 @@ #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/task_runner.h" #include "mongo/executor/task_executor.h" -#include "mongo/s/query/async_results_merger.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/mutex.h" @@ -79,7 +79,8 @@ public: size_t documentToCopy{0}; size_t documentsCopied{0}; size_t indexes{0}; - size_t fetchBatches{0}; + size_t fetchedBatches{0}; // This is actually inserted batches. + size_t receivedBatches{0}; std::string toString() const; BSONObj toBSON() const; @@ -93,6 +94,13 @@ public: using ScheduleDbWorkFn = stdx::function( const executor::TaskExecutor::CallbackFn&)>; + /** + * Type of function to create a database client + * + * Used for testing only. + */ + using CreateClientFn = stdx::function()>; + /** * Creates CollectionCloner task in inactive state. Use start() to activate cloner. * @@ -144,6 +152,22 @@ public: */ void setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& scheduleDbWorkFn); + /** + * Allows a different client class to be injected. + * + * For testing only. + */ + void setCreateClientFn_forTest(const CreateClientFn& createClientFn); + + /** + * Allows batch size to be changed after construction. + * + * For testing only. + */ + void setBatchSize_forTest(int batchSize) { + const_cast(_collectionClonerBatchSize) = batchSize; + } + /** * Returns the documents currently stored in the '_documents' buffer that is intended * to be inserted through the collection loader. @@ -183,62 +207,39 @@ private: * * Called multiple times if there are more than one batch of responses from listIndexes * cursor. - * - * 'nextAction' is an in/out arg indicating the next action planned and to be taken - * by the fetcher. */ void _beginCollectionCallback(const executor::TaskExecutor::CallbackArgs& callbackData); /** - * Parses the cursor responses from the 'find' command and passes them into the - * 'AsyncResultsMerger'. - */ - void _establishCollectionCursorsCallback(const RemoteCommandCallbackArgs& rcbd); - - /** - * Takes a cursors buffer and parses the 'find' response into cursor - * responses that are pushed onto the buffer. - */ - Status _parseCursorResponse(BSONObj response, std::vector* cursors); - - /** - * Calls to get the next event from the 'AsyncResultsMerger'. This schedules - * '_handleAsyncResultsCallback' to be run when the event is signaled successfully. - */ - Status _scheduleNextARMResultsCallback(std::shared_ptr onCompletionGuard); - - /** - * Runs for each time a new batch of documents can be retrieved from the 'AsyncResultsMerger'. - * Buffers the documents retrieved for insertion and schedules a '_insertDocumentsCallback' - * to insert the contents of the buffer. + * Using a DBClientConnection, executes a query to retrieve all documents in the collection. + * For each batch returned by the upstream node, _handleNextBatch will be called with the data. + * This method will return when the entire query is finished or failed. */ - void _handleARMResultsCallback(const executor::TaskExecutor::CallbackArgs& cbd, - std::shared_ptr onCompletionGuard); + void _runQuery(const executor::TaskExecutor::CallbackArgs& callbackData); /** - * Pull all ready results from the ARM into a buffer to be inserted. + * Put all results from a query batch into a buffer to be inserted, and schedule + * it to be inserted. */ - Status _bufferNextBatchFromArm(WithLock lock); + void _handleNextBatch(std::shared_ptr onCompletionGuard, + DBClientCursorBatchIterator& iter); /** - * Called whenever there is a new batch of documents ready from the 'AsyncResultsMerger'. - * On the last batch, 'lastBatch' will be true. + * Called whenever there is a new batch of documents ready from the DBClientConnection. * * Each document returned will be inserted via the storage interfaceRequest storage * interface. */ void _insertDocumentsCallback(const executor::TaskExecutor::CallbackArgs& cbd, - bool lastBatch, std::shared_ptr onCompletionGuard); /** - * Verifies that an error from the ARM was the result of a collection drop. If + * Verifies that an error from the query was the result of a collection drop. If * so, cloning is stopped with no error. Otherwise it is stopped with the given error. */ void _verifyCollectionWasDropped(const stdx::unique_lock& lk, Status batchStatus, - std::shared_ptr onCompletionGuard, - OperationContext* opCtx); + std::shared_ptr onCompletionGuard); /** * Reports completion status. @@ -271,21 +272,15 @@ private: Fetcher _listIndexesFetcher; // (S) std::vector _indexSpecs; // (M) BSONObj _idIndexSpec; // (M) - std::vector - _documentsToInsert; // (M) Documents read from 'AsyncResultsMerger' to insert. - TaskRunner _dbWorkTaskRunner; // (R) + std::vector _documentsToInsert; // (M) Documents read from source to insert. + TaskRunner _dbWorkTaskRunner; // (R) ScheduleDbWorkFn - _scheduleDbWorkFn; // (RT) Function for scheduling database work using the executor. - Stats _stats; // (M) stats for this instance. - ProgressMeter _progressMeter; // (M) progress meter for this instance. - const int _collectionCloningBatchSize; // (R) The size of the batches of documents returned in - // collection cloning. - - // (M) Component responsible for fetching the documents from the collection cloner cursor(s). - std::unique_ptr _arm; - - // (M) The event handle for the 'kill' event of the 'AsyncResultsMerger'. - executor::TaskExecutor::EventHandle _killArmHandle; + _scheduleDbWorkFn; // (RT) Function for scheduling database work using the executor. + CreateClientFn _createClientFn; // (RT) Function for creating a database client. + Stats _stats; // (M) stats for this instance. + ProgressMeter _progressMeter; // (M) progress meter for this instance. + const int _collectionClonerBatchSize; // (R) The size of the batches of documents returned in + // collection cloning. // (M) Scheduler used to establish the initial cursor or set of cursors. std::unique_ptr _establishCollectionCursorsScheduler; @@ -293,6 +288,15 @@ private: // (M) Scheduler used to determine if a cursor was closed because the collection was dropped. std::unique_ptr _verifyCollectionDroppedScheduler; + // (M) State of query. Set to kCanceling to cause query to stop. If the query is kRunning + // or kCanceling, wait for query to reach kFinished using _condition + enum class QueryState { + kNotStarted, + kRunning, + kCanceling, + kFinished + } _queryState = QueryState::kNotStarted; + // State transitions: // PreStart --> Running --> ShuttingDown --> Complete // It is possible to skip intermediate states. For example, diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp index 726a2b6a2cf..d302ed0492d 100644 --- a/src/mongo/db/repl/collection_cloner_test.cpp +++ b/src/mongo/db/repl/collection_cloner_test.cpp @@ -37,6 +37,7 @@ #include "mongo/db/repl/collection_cloner.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/storage_interface_mock.h" +#include "mongo/dbtests/mock/mock_dbclient_connection.h" #include "mongo/stdx/memory.h" #include "mongo/unittest/task_executor_proxy.h" #include "mongo/unittest/unittest.h" @@ -58,6 +59,134 @@ public: } }; +class FailableMockDBClientConnection : public MockDBClientConnection { +public: + FailableMockDBClientConnection(MockRemoteDBServer* remote, executor::NetworkInterfaceMock* net) + : MockDBClientConnection(remote), _net(net) {} + + virtual ~FailableMockDBClientConnection() { + stdx::unique_lock lk(_mutex); + _paused = false; + _cond.notify_all(); + _cond.wait(lk, [this] { return !_resuming; }); + } + + Status connect(const HostAndPort& host, StringData applicationName) override { + if (!_failureForConnect.isOK()) + return _failureForConnect; + return MockDBClientConnection::connect(host, applicationName); + } + + using MockDBClientConnection::query; // This avoids warnings from -Woverloaded-virtual + unsigned long long query(stdx::function f, + const NamespaceStringOrUUID& nsOrUuid, + mongo::Query query, + const mongo::BSONObj* fieldsToReturn, + int queryOptions, + int batchSize) override { + ON_BLOCK_EXIT([this]() { + { + stdx::lock_guard lk(_mutex); + _queryCount++; + } + _cond.notify_all(); + }); + { + stdx::unique_lock lk(_mutex); + _waiting = _paused; + _cond.notify_all(); + while (_paused) { + lk.unlock(); + _net->waitForWork(); + lk.lock(); + } + _waiting = false; + } + auto result = MockDBClientConnection::query( + f, nsOrUuid, query, fieldsToReturn, queryOptions, batchSize); + uassertStatusOK(_failureForQuery); + return result; + } + + void setFailureForConnect(Status failure) { + _failureForConnect = failure; + } + + void setFailureForQuery(Status failure) { + _failureForQuery = failure; + } + + void pause() { + { + stdx::lock_guard lk(_mutex); + _paused = true; + } + _cond.notify_all(); + } + void resume() { + { + stdx::unique_lock lk(_mutex); + _resuming = true; + _paused = false; + _resumedQueryCount = _queryCount; + while (_waiting) { + lk.unlock(); + _net->signalWorkAvailable(); + mongo::sleepmillis(10); + lk.lock(); + } + _resuming = false; + _cond.notify_all(); + } + } + + // Waits for the next query after pause() is called to start. + void waitForPausedQuery() { + stdx::unique_lock lk(_mutex); + _cond.wait(lk, [this] { return _waiting; }); + } + + // Waits for the next query to run after resume() is called to complete. + void waitForResumedQuery() { + stdx::unique_lock lk(_mutex); + _cond.wait(lk, [this] { return _resumedQueryCount != _queryCount; }); + } + +private: + executor::NetworkInterfaceMock* _net; + stdx::mutex _mutex; + stdx::condition_variable _cond; + bool _paused = false; + bool _waiting = false; + bool _resuming = false; + int _queryCount = 0; + int _resumedQueryCount = 0; + Status _failureForConnect = Status::OK(); + Status _failureForQuery = Status::OK(); +}; + +// RAII class to pause the client; since tests are very exception-heavy this prevents them +// from hanging on failure. +class MockClientPauser { + MONGO_DISALLOW_COPYING(MockClientPauser); + +public: + MockClientPauser(FailableMockDBClientConnection* client) : _client(client) { + _client->pause(); + }; + ~MockClientPauser() { + resume(); + } + void resume() { + if (_client) + _client->resume(); + _client = nullptr; + } + +private: + FailableMockDBClientConnection* _client; +}; + class CollectionClonerTest : public BaseClonerTest { public: BaseCloner* getCloner() const override; @@ -70,6 +199,17 @@ protected: void setUp() override; void tearDown() override; + virtual CollectionOptions getCollectionOptions() const { + CollectionOptions options; + options.uuid = UUID::gen(); + return options; + } + + virtual const NamespaceString& getStartupNss() const { + return nss; + }; + + std::vector makeSecondaryIndexSpecs(const NamespaceString& nss); // A simple arbitrary value to use as the default batch size. @@ -79,16 +219,19 @@ protected: std::unique_ptr collectionCloner; CollectionMockStats collectionStats; // Used by the _loader. CollectionBulkLoaderMock* _loader; // Owned by CollectionCloner. + bool _clientCreated = false; + FailableMockDBClientConnection* _client; // owned by the CollectionCloner once created. + std::unique_ptr _server; }; void CollectionClonerTest::setUp() { BaseClonerTest::setUp(); - options = {}; + options = getCollectionOptions(); collectionCloner.reset(nullptr); collectionCloner = stdx::make_unique(&getExecutor(), dbWorkThreadPool.get(), target, - nss, + getStartupNss(), options, setStatusCallback(), storageInterface.get(), @@ -106,6 +249,13 @@ void CollectionClonerTest::setUp() { return StatusWith>( std::unique_ptr(_loader)); }; + _server = stdx::make_unique(target.toString()); + _server->assignCollectionUuid(nss.ns(), *options.uuid); + _client = new FailableMockDBClientConnection(_server.get(), getNet()); + collectionCloner->setCreateClientFn_forTest([this]() { + _clientCreated = true; + return std::unique_ptr(_client); + }); } // Return index specs to use for secondary indexes. @@ -123,7 +273,11 @@ std::vector CollectionClonerTest::makeSecondaryIndexSpecs(const Namespa void CollectionClonerTest::tearDown() { BaseClonerTest::tearDown(); // Executor may still invoke collection cloner's callback before shutting down. - collectionCloner.reset(nullptr); + collectionCloner.reset(); + if (!_clientCreated) + delete _client; + _clientCreated = false; + _server.reset(); options = {}; } @@ -180,6 +334,19 @@ TEST_F(CollectionClonerTest, InvalidConstruction) { "'storageEngine.storageEngine1' has to be an embedded document."); } + // UUID must be present. + { + CollectionOptions invalidOptions = options; + invalidOptions.uuid = boost::none; + StorageInterface* si = storageInterface.get(); + ASSERT_THROWS_CODE_AND_WHAT( + CollectionCloner( + &executor, pool, target, nss, invalidOptions, cb, si, defaultBatchSize), + AssertionException, + 50953, + "Missing collection UUID in CollectionCloner, collection name: db.coll"); + } + // Callback function cannot be null. { CollectionCloner::CallbackFn nullCb; @@ -190,6 +357,17 @@ TEST_F(CollectionClonerTest, InvalidConstruction) { ErrorCodes::BadValue, "callback function cannot be null"); } + + // Batch size must be non-negative. + { + StorageInterface* si = storageInterface.get(); + constexpr int kInvalidBatchSize = -1; + ASSERT_THROWS_CODE_AND_WHAT( + CollectionCloner(&executor, pool, target, nss, options, cb, si, kInvalidBatchSize), + AssertionException, + 50954, + "collectionClonerBatchSize must be non-negative."); + } } TEST_F(CollectionClonerTest, ClonerLifeCycle) { @@ -206,7 +384,9 @@ TEST_F(CollectionClonerTest, FirstRemoteCommand) { auto&& noiRequest = noi->getRequest(); ASSERT_EQUALS(nss.db().toString(), noiRequest.dbname); ASSERT_EQUALS("count", std::string(noiRequest.cmdObj.firstElementFieldName())); - ASSERT_EQUALS(nss.coll().toString(), noiRequest.cmdObj.firstElement().valuestrsafe()); + auto requestUUID = uassertStatusOK(UUID::parse(noiRequest.cmdObj.firstElement())); + ASSERT_EQUALS(options.uuid.get(), requestUUID); + ASSERT_FALSE(net->hasReadyRequests()); ASSERT_TRUE(collectionCloner->isActive()); } @@ -347,18 +527,16 @@ TEST_F(CollectionClonerTest, ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus()); } -TEST_F(CollectionClonerTest, DoNotCreateIDIndexIfAutoIndexIdUsed) { - options = {}; - options.autoIndexId = CollectionOptions::NO; - collectionCloner.reset(new CollectionCloner(&getExecutor(), - dbWorkThreadPool.get(), - target, - nss, - options, - setStatusCallback(), - storageInterface.get(), - defaultBatchSize)); +class CollectionClonerNoAutoIndexTest : public CollectionClonerTest { +protected: + CollectionOptions getCollectionOptions() const override { + CollectionOptions options = CollectionClonerTest::getCollectionOptions(); + options.autoIndexId = CollectionOptions::NO; + return options; + } +}; +TEST_F(CollectionClonerNoAutoIndexTest, DoNotCreateIDIndexIfAutoIndexIdUsed) { NamespaceString collNss; CollectionOptions collOptions; std::vector collIndexSpecs{BSON("fakeindexkeys" << 1)}; // init with one doc. @@ -376,32 +554,24 @@ TEST_F(CollectionClonerTest, DoNotCreateIDIndexIfAutoIndexIdUsed) { return std::unique_ptr(loader); }; + const BSONObj doc = BSON("_id" << 1); + _server->insert(nss.ns(), doc); + // Pause the CollectionCloner before executing the query so we can verify events which are + // supposed to happen before the query. + MockClientPauser pauser(_client); ASSERT_OK(collectionCloner->startup()); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCountResponse(0)); + processNetworkResponse(createCountResponse(1)); processNetworkResponse(createListIndexesResponse(0, BSONArray())); } ASSERT_TRUE(collectionCloner->isActive()); - collectionCloner->waitForDbWorker(); + _client->waitForPausedQuery(); ASSERT_TRUE(collectionCloner->isActive()); ASSERT_TRUE(collectionStats.initCalled); - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - - const BSONObj doc = BSON("_id" << 1); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc))); - } + pauser.resume(); collectionCloner->join(); ASSERT_EQUALS(1, collectionStats.insertCount); ASSERT_TRUE(collectionStats.commitCalled); @@ -595,6 +765,9 @@ TEST_F(CollectionClonerTest, BeginCollectionFailed) { } TEST_F(CollectionClonerTest, BeginCollection) { + // Pause the CollectionCloner before executing the query so we can verify state after + // the listIndexes call. + MockClientPauser pauser(_client); ASSERT_OK(collectionCloner->startup()); CollectionMockStats stats; @@ -682,7 +855,10 @@ TEST_F(CollectionClonerTest, FindFetcherScheduleFailed) { ASSERT_FALSE(collectionCloner->isActive()); } -TEST_F(CollectionClonerTest, FindCommandAfterBeginCollection) { +TEST_F(CollectionClonerTest, QueryAfterCreateCollection) { + // Pause the CollectionCloner before executing the query so we can verify the collection is + // created before the query. + MockClientPauser pauser(_client); ASSERT_OK(collectionCloner->startup()); CollectionMockStats stats; @@ -705,21 +881,15 @@ TEST_F(CollectionClonerTest, FindCommandAfterBeginCollection) { collectionCloner->waitForDbWorker(); ASSERT_TRUE(collectionCreated); - - // Fetcher should be scheduled after cloner creates collection. - auto net = getNet(); - executor::NetworkInterfaceMock::InNetworkGuard guard(net); - ASSERT_TRUE(net->hasReadyRequests()); - NetworkOperationIterator noi = net->getNextReadyRequest(); - auto&& noiRequest = noi->getRequest(); - ASSERT_EQUALS(nss.db().toString(), noiRequest.dbname); - ASSERT_EQUALS("find", std::string(noiRequest.cmdObj.firstElementFieldName())); - ASSERT_EQUALS(nss.coll().toString(), noiRequest.cmdObj.firstElement().valuestrsafe()); - ASSERT_TRUE(noiRequest.cmdObj.getField("noCursorTimeout").trueValue()); - ASSERT_FALSE(net->hasReadyRequests()); + // Make sure the query starts. + _client->waitForPausedQuery(); } -TEST_F(CollectionClonerTest, EstablishCursorCommandFailed) { +TEST_F(CollectionClonerTest, QueryFailed) { + // For this test to work properly, the error cannot be one of the special codes + // (OperationFailed or CursorNotFound) which trigger an attempt to see if the collection + // was deleted. + _client->setFailureForQuery({ErrorCodes::UnknownError, "QueryFailedTest UnknownError"}); ASSERT_OK(collectionCloner->startup()); { @@ -727,98 +897,22 @@ TEST_F(CollectionClonerTest, EstablishCursorCommandFailed) { processNetworkResponse(createCountResponse(0)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); } - ASSERT_TRUE(collectionCloner->isActive()); - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(BSON("ok" << 0 << "errmsg" - << "" - << "code" - << ErrorCodes::CursorNotFound)); - } - - ASSERT_EQUALS(ErrorCodes::CursorNotFound, getStatus().code()); - ASSERT_FALSE(collectionCloner->isActive()); -} - -TEST_F(CollectionClonerTest, CollectionClonerResendsFindCommandOnRetriableError) { - ASSERT_OK(collectionCloner->startup()); - - auto net = getNet(); - executor::NetworkInterfaceMock::InNetworkGuard guard(net); - - // CollectionCollection sends listIndexes request irrespective of collection size in a - // successful count response. - assertRemoteCommandNameEquals("count", net->scheduleSuccessfulResponse(createCountResponse(0))); - net->runReadyNetworkOperations(); - - // CollectionCloner requires a successful listIndexes response in order to send the find request - // for the documents in the collection. - assertRemoteCommandNameEquals( - "listIndexes", - net->scheduleSuccessfulResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)))); - net->runReadyNetworkOperations(); - - // Respond to the find request with a retriable error. - assertRemoteCommandNameEquals("find", - net->scheduleErrorResponse(Status(ErrorCodes::HostNotFound, ""))); - net->runReadyNetworkOperations(); - ASSERT_TRUE(collectionCloner->isActive()); - - // This check exists to ensure that the command used to establish the cursors is retried, - // regardless of the command format. - auto noi = net->getNextReadyRequest(); - assertRemoteCommandNameEquals("find", noi->getRequest()); - net->blackHole(noi); -} - -TEST_F(CollectionClonerTest, EstablishCursorCommandCanceled) { - ASSERT_OK(collectionCloner->startup()); - - ASSERT_TRUE(collectionCloner->isActive()); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCountResponse(0)); - scheduleNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); - } - ASSERT_TRUE(collectionCloner->isActive()); - - auto net = getNet(); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - - net->runReadyNetworkOperations(); - } - - collectionCloner->waitForDbWorker(); - - ASSERT_TRUE(collectionCloner->isActive()); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - scheduleNetworkResponse(BSON("ok" << 1)); - } - ASSERT_TRUE(collectionCloner->isActive()); - - collectionCloner->shutdown(); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - getNet()->logQueues(); - net->runReadyNetworkOperations(); - } - - ASSERT_EQUALS(ErrorCodes::CallbackCanceled, getStatus().code()); + collectionCloner->join(); + ASSERT_EQUALS(ErrorCodes::UnknownError, getStatus().code()); ASSERT_FALSE(collectionCloner->isActive()); } TEST_F(CollectionClonerTest, InsertDocumentsScheduleDbWorkFailed) { + // Set up documents to be returned from upstream node. + _server->insert(nss.ns(), BSON("_id" << 1)); + + // Pause the client so we can set up the failure. + MockClientPauser pauser(_client); ASSERT_OK(collectionCloner->startup()); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCountResponse(0)); + processNetworkResponse(createCountResponse(1)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); } @@ -831,31 +925,24 @@ TEST_F(CollectionClonerTest, InsertDocumentsScheduleDbWorkFailed) { return StatusWith(ErrorCodes::UnknownError, ""); }); - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - - const BSONObj doc = BSON("_id" << 1); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc))); - } + pauser.resume(); + collectionCloner->join(); ASSERT_EQUALS(ErrorCodes::UnknownError, getStatus().code()); ASSERT_FALSE(collectionCloner->isActive()); } TEST_F(CollectionClonerTest, InsertDocumentsCallbackCanceled) { + // Set up documents to be returned from upstream node. + _server->insert(nss.ns(), BSON("_id" << 1)); + + // Pause the client so we can set up the failure. + MockClientPauser pauser(_client); ASSERT_OK(collectionCloner->startup()); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCountResponse(0)); + processNetworkResponse(createCountResponse(1)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); } @@ -874,37 +961,30 @@ TEST_F(CollectionClonerTest, InsertDocumentsCallbackCanceled) { return StatusWith(handle); }); - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(BSON("_id" << 1)))); - } + pauser.resume(); collectionCloner->join(); ASSERT_EQUALS(ErrorCodes::CallbackCanceled, getStatus().code()); ASSERT_FALSE(collectionCloner->isActive()); } TEST_F(CollectionClonerTest, InsertDocumentsFailed) { + // Set up documents to be returned from upstream node. + _server->insert(nss.ns(), BSON("_id" << 1)); + + // Pause the client so we can set up the failure. + MockClientPauser pauser(_client); ASSERT_OK(collectionCloner->startup()); ASSERT_TRUE(collectionCloner->isActive()); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCountResponse(0)); + processNetworkResponse(createCountResponse(1)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); } ASSERT_TRUE(collectionCloner->isActive()); getNet()->logQueues(); - collectionCloner->waitForDbWorker(); + _client->waitForPausedQuery(); ASSERT_TRUE(collectionCloner->isActive()); ASSERT_TRUE(collectionStats.initCalled); @@ -913,20 +993,8 @@ TEST_F(CollectionClonerTest, InsertDocumentsFailed) { const std::vector::const_iterator end) { return Status(ErrorCodes::OperationFailed, ""); }; - - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray)); - } - - collectionCloner->waitForDbWorker(); ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(BSON("_id" << 1)))); - } + pauser.resume(); collectionCloner->join(); ASSERT_FALSE(collectionCloner->isActive()); @@ -936,39 +1004,24 @@ TEST_F(CollectionClonerTest, InsertDocumentsFailed) { } TEST_F(CollectionClonerTest, InsertDocumentsSingleBatch) { + // Set up documents to be returned from upstream node. + _server->insert(nss.ns(), BSON("_id" << 1)); + _server->insert(nss.ns(), BSON("_id" << 2)); + ASSERT_OK(collectionCloner->startup()); ASSERT_TRUE(collectionCloner->isActive()); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCountResponse(0)); + processNetworkResponse(createCountResponse(2)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); } - ASSERT_TRUE(collectionCloner->isActive()); - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - ASSERT_TRUE(collectionStats.initCalled); - - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - - const BSONObj doc = BSON("_id" << 1); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc))); - } - collectionCloner->join(); // TODO: record the documents during insert and compare them // -- maybe better done using a real storage engine, like ephemeral for test. - ASSERT_EQUALS(1, collectionStats.insertCount); + ASSERT_EQUALS(2, collectionStats.insertCount); + auto stats = collectionCloner->getStats(); + ASSERT_EQUALS(1u, stats.receivedBatches); ASSERT_TRUE(collectionStats.commitCalled); ASSERT_OK(getStatus()); @@ -976,115 +1029,27 @@ TEST_F(CollectionClonerTest, InsertDocumentsSingleBatch) { } TEST_F(CollectionClonerTest, InsertDocumentsMultipleBatches) { + // Set up documents to be returned from upstream node. + _server->insert(nss.ns(), BSON("_id" << 1)); + _server->insert(nss.ns(), BSON("_id" << 2)); + _server->insert(nss.ns(), BSON("_id" << 3)); + + collectionCloner->setBatchSize_forTest(2); ASSERT_OK(collectionCloner->startup()); ASSERT_TRUE(collectionCloner->isActive()); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCountResponse(0)); + processNetworkResponse(createCountResponse(3)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); } - ASSERT_TRUE(collectionCloner->isActive()); - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - ASSERT_TRUE(collectionStats.initCalled); - - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - - const BSONObj doc = BSON("_id" << 1); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, BSON_ARRAY(doc))); - } - - collectionCloner->waitForDbWorker(); - // TODO: record the documents during insert and compare them - // -- maybe better done using a real storage engine, like ephemeral for test. - ASSERT_EQUALS(1, collectionStats.insertCount); - - ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); - ASSERT_TRUE(collectionCloner->isActive()); - - const BSONObj doc2 = BSON("_id" << 1); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc2))); - } - collectionCloner->join(); // TODO: record the documents during insert and compare them // -- maybe better done using a real storage engine, like ephemeral for test. - ASSERT_EQUALS(2, collectionStats.insertCount); - ASSERT_TRUE(collectionStats.commitCalled); - - ASSERT_OK(getStatus()); - ASSERT_FALSE(collectionCloner->isActive()); -} - -TEST_F(CollectionClonerTest, LastBatchContainsNoDocuments) { - ASSERT_OK(collectionCloner->startup()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCountResponse(0)); - processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); - } - ASSERT_TRUE(collectionCloner->isActive()); - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - ASSERT_TRUE(collectionStats.initCalled); - - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - - const BSONObj doc = BSON("_id" << 1); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, BSON_ARRAY(doc))); - } - - collectionCloner->waitForDbWorker(); - ASSERT_EQUALS(1, collectionStats.insertCount); - - ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); - ASSERT_TRUE(collectionCloner->isActive()); - - const BSONObj doc2 = BSON("_id" << 2); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, BSON_ARRAY(doc2), "nextBatch")); - } - - collectionCloner->waitForDbWorker(); - ASSERT_EQUALS(2, collectionStats.insertCount); - - ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse(emptyArray)); - } - - collectionCloner->join(); - ASSERT_EQUALS(2, collectionStats.insertCount); + ASSERT_EQUALS(3, collectionStats.insertCount); ASSERT_TRUE(collectionStats.commitCalled); + auto stats = collectionCloner->getStats(); + ASSERT_EQUALS(2u, stats.receivedBatches); ASSERT_OK(getStatus()); ASSERT_FALSE(collectionCloner->isActive()); @@ -1101,53 +1066,24 @@ TEST_F(CollectionClonerTest, CollectionClonerTransitionsToCompleteIfShutdownBefo * Restarting cloning should fail with ErrorCodes::ShutdownInProgress error. */ TEST_F(CollectionClonerTest, CollectionClonerCannotBeRestartedAfterPreviousFailure) { + // Set up document to return from upstream. + _server->insert(nss.ns(), BSON("_id" << 1)); + // First cloning attempt - fails while reading documents from source collection. unittest::log() << "Starting first collection cloning attempt"; + _client->setFailureForQuery( + {ErrorCodes::UnknownError, "failed to read remaining documents from source collection"}); ASSERT_OK(collectionCloner->startup()); ASSERT_TRUE(collectionCloner->isActive()); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCountResponse(0)); + processNetworkResponse(createCountResponse(1)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); } - ASSERT_TRUE(collectionCloner->isActive()); - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - ASSERT_TRUE(collectionStats.initCalled); - - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, BSON_ARRAY(BSON("_id" << 1)))); - } - - collectionCloner->waitForDbWorker(); - ASSERT_EQUALS(1, collectionStats.insertCount); - - // Check that the status hasn't changed from the initial value. - ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(ErrorCodes::OperationFailed, - "failed to read remaining documents from source collection"); - } - collectionCloner->join(); - ASSERT_EQUALS(1, collectionStats.insertCount); - ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus()); + ASSERT_EQUALS(ErrorCodes::UnknownError, getStatus()); ASSERT_FALSE(collectionCloner->isActive()); // Second cloning attempt - run to completion. @@ -1209,6 +1145,7 @@ TEST_F(CollectionClonerTest, CollectionClonerResetsOnCompletionCallbackFunctionA TEST_F(CollectionClonerTest, CollectionClonerWaitsForPendingTasksToCompleteBeforeInvokingOnCompletionCallback) { + MockClientPauser pauser(_client); ASSERT_OK(collectionCloner->startup()); ASSERT_TRUE(collectionCloner->isActive()); @@ -1227,20 +1164,11 @@ TEST_F(CollectionClonerTest, } ASSERT_TRUE(collectionCloner->isActive()); - collectionCloner->waitForDbWorker(); + _client->waitForPausedQuery(); ASSERT_TRUE(collectionCloner->isActive()); ASSERT_TRUE(collectionStats.initCalled); - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - - // At this point, the CollectionCloner has sent the find request to establish the cursor. + // At this point, the CollectionCloner is waiting for the query to complete. // We want to return the first batch of documents for the collection from the network so that // the CollectionCloner schedules the first _insertDocuments DB task and the getMore request for // the next batch of documents. @@ -1257,26 +1185,20 @@ TEST_F(CollectionClonerTest, // Return first batch of collection documents from remote server for the getMore request. const BSONObj doc = BSON("_id" << 1); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - - assertRemoteCommandNameEquals( - "getMore", net->scheduleSuccessfulResponse(createCursorResponse(1, BSON_ARRAY(doc)))); - net->runReadyNetworkOperations(); - } - - // Confirm that CollectionCloner attempted to schedule _insertDocuments task. - ASSERT_TRUE(insertDocumentsFn); - - // Return an error for the getMore request for the next batch of collection documents. - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - - assertRemoteCommandNameEquals( - "getMore", - net->scheduleErrorResponse(Status(ErrorCodes::OperationFailed, "getMore failed"))); - net->runReadyNetworkOperations(); - } + _server->insert(nss.ns(), doc); + _client->setFailureForQuery({ErrorCodes::UnknownError, "getMore failed"}); + // Wait for the _runQuery method to exit. We can't get at it directly but we can wait + // for a task scheduled after it to complete. + auto& executor = getExecutor(); + auto& event = executor.makeEvent().getValue(); + auto nextTask = + executor + .scheduleWork([&executor, event](const executor::TaskExecutor::CallbackArgs&) { + executor.signalEvent(event); + }) + .getValue(); + pauser.resume(); + executor.waitForEvent(event); // CollectionCloner should still be active because we have not finished processing the // insertDocuments task. @@ -1287,90 +1209,39 @@ TEST_F(CollectionClonerTest, // error passed to the completion guard (ie. from the failed getMore request). executor::TaskExecutor::CallbackArgs callbackArgs( &getExecutor(), {}, Status(ErrorCodes::CallbackCanceled, "")); + ASSERT_TRUE(insertDocumentsFn); insertDocumentsFn(callbackArgs); // Reset 'insertDocumentsFn' to release last reference count on completion guard. insertDocumentsFn = {}; - // No need to call CollectionCloner::join() because we invoked the _insertDocuments callback - // synchronously. + collectionCloner->join(); ASSERT_FALSE(collectionCloner->isActive()); - ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus()); + ASSERT_EQUALS(ErrorCodes::UnknownError, getStatus()); } -class CollectionClonerUUIDTest : public CollectionClonerTest { +class CollectionClonerRenamedBeforeStartTest : public CollectionClonerTest { protected: - // The UUID tests should deal gracefully with renamed collections, so start the cloner with - // an alternate name. + // The CollectionCloner should deal gracefully with collections renamed before the cloner + // was started, so start it with an alternate name. const NamespaceString alternateNss{"db", "alternateCollName"}; - void startupWithUUID(int maxNumCloningCursors = 1) { - collectionCloner.reset(); - options.uuid = UUID::gen(); - collectionCloner = stdx::make_unique(&getExecutor(), - dbWorkThreadPool.get(), - target, - alternateNss, - options, - setStatusCallback(), - storageInterface.get(), - defaultBatchSize); - - ASSERT_OK(collectionCloner->startup()); - } - - void testWithMaxNumCloningCursors(int maxNumCloningCursors, StringData cmdName) { - startupWithUUID(maxNumCloningCursors); - - CollectionOptions actualOptions; - CollectionMockStats stats; - CollectionBulkLoaderMock* loader = new CollectionBulkLoaderMock(&stats); - bool collectionCreated = false; - storageInterface->createCollectionForBulkFn = [&](const NamespaceString& theNss, - const CollectionOptions& theOptions, - const BSONObj idIndexSpec, - const std::vector& theIndexSpecs) - -> StatusWith> { - collectionCreated = true; - actualOptions = theOptions; - return std::unique_ptr(loader); - }; - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCountResponse(0)); - processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCreated); - - // Fetcher should be scheduled after cloner creates collection. - auto net = getNet(); - executor::NetworkInterfaceMock::InNetworkGuard guard(net); - ASSERT_TRUE(net->hasReadyRequests()); - NetworkOperationIterator noi = net->getNextReadyRequest(); - ASSERT_FALSE(net->hasReadyRequests()); - auto&& noiRequest = noi->getRequest(); - ASSERT_EQUALS(nss.db().toString(), noiRequest.dbname); - ASSERT_BSONOBJ_EQ(actualOptions.toBSON(), options.toBSON()); - - ASSERT_EQUALS(cmdName, std::string(noiRequest.cmdObj.firstElementFieldName())); - ASSERT_EQUALS(cmdName == "find", noiRequest.cmdObj.getField("noCursorTimeout").trueValue()); - auto requestUUID = uassertStatusOK(UUID::parse(noiRequest.cmdObj.firstElement())); - ASSERT_EQUALS(options.uuid.get(), requestUUID); - } + const NamespaceString& getStartupNss() const override { + return alternateNss; + }; /** * Sets up a test for the CollectionCloner that simulates the collection being dropped while * copying the documents. - * The ARM returns CursorNotFound error to indicate a collection drop. Subsequently, the - * CollectionCloner should run a find command on the collection by UUID. This should be the next - * ready request on in the network interface. + * The DBClientConnection returns a CursorNotFound error to indicate a collection drop. */ void setUpVerifyCollectionWasDroppedTest() { - startupWithUUID(); - + // Pause the query so we can reliably wait for it to complete. + MockClientPauser pauser(_client); + // Return error response from the query. + _client->setFailureForQuery( + {ErrorCodes::CursorNotFound, "collection dropped while copying documents"}); + ASSERT_OK(collectionCloner->startup()); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); processNetworkResponse(createCountResponse(0)); @@ -1378,24 +1249,10 @@ protected: } ASSERT_TRUE(collectionCloner->isActive()); - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); + _client->waitForPausedQuery(); ASSERT_TRUE(collectionStats.initCalled); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, BSONArray())); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - - // Return error response to getMore command. - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(ErrorCodes::CursorNotFound, - "collection dropped while copying documents"); - } + pauser.resume(); + _client->waitForResumedQuery(); } /** @@ -1416,8 +1273,8 @@ protected: } }; -TEST_F(CollectionClonerUUIDTest, FirstRemoteCommandWithUUID) { - startupWithUUID(); +TEST_F(CollectionClonerRenamedBeforeStartTest, FirstRemoteCommandWithRenamedCollection) { + ASSERT_OK(collectionCloner->startup()); auto net = getNet(); executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); @@ -1433,9 +1290,7 @@ TEST_F(CollectionClonerUUIDTest, FirstRemoteCommandWithUUID) { ASSERT_TRUE(collectionCloner->isActive()); } -TEST_F(CollectionClonerUUIDTest, BeginCollectionWithUUID) { - startupWithUUID(); - +TEST_F(CollectionClonerRenamedBeforeStartTest, BeginCollectionWithUUID) { CollectionMockStats stats; CollectionBulkLoaderMock* loader = new CollectionBulkLoaderMock(&stats); NamespaceString collNss; @@ -1454,6 +1309,11 @@ TEST_F(CollectionClonerUUIDTest, BeginCollectionWithUUID) { return std::unique_ptr(loader); }; + // Pause the client so the cloner stops in the fetcher. + MockClientPauser pauser(_client); + + ASSERT_OK(collectionCloner->startup()); + // Split listIndexes response into 2 batches: first batch contains idIndexSpec and // second batch contains specs. We expect the collection cloner to fix up the collection names // (here from 'nss' to 'alternateNss') in the index specs, as the collection with the given UUID @@ -1506,20 +1366,16 @@ TEST_F(CollectionClonerUUIDTest, BeginCollectionWithUUID) { ASSERT_TRUE(collectionCloner->isActive()); } -TEST_F(CollectionClonerUUIDTest, SingleCloningCursorWithUUIDUsesFindCommand) { - // With a single cloning cursor, expect a find command. - testWithMaxNumCloningCursors(1, "find"); -} - /** * Start cloning. - * While copying collection, simulate a collection drop by having the ARM return a CursorNotFound - * error. + * While copying collection, simulate a collection drop by having the DBClientConnection return a + * CursorNotFound error. * The CollectionCloner should run a find command on the collection by UUID. * Simulate successful find command with a drop-pending namespace in the response. * The CollectionCloner should complete with a successful final status. */ -TEST_F(CollectionClonerUUIDTest, CloningIsSuccessfulIfCollectionWasDroppedWhileCopyingDocuments) { +TEST_F(CollectionClonerRenamedBeforeStartTest, + CloningIsSuccessfulIfCollectionWasDroppedWhileCopyingDocuments) { setUpVerifyCollectionWasDroppedTest(); // CollectionCloner should send a find command with the collection's UUID. @@ -1544,14 +1400,13 @@ TEST_F(CollectionClonerUUIDTest, CloningIsSuccessfulIfCollectionWasDroppedWhileC /** * Start cloning. - * While copying collection, simulate a collection drop by having the ARM return a CursorNotFound - * error. + * While copying collection, simulate a collection drop by having the DBClientConnection return a + * CursorNotFound error. * The CollectionCloner should run a find command on the collection by UUID. * Shut the CollectionCloner down. - * The CollectionCloner should return a CursorNotFound final status which is the last error from the - * ARM. + * The CollectionCloner should return a CursorNotFound final status. */ -TEST_F(CollectionClonerUUIDTest, +TEST_F(CollectionClonerRenamedBeforeStartTest, ShuttingDownCollectionClonerDuringCollectionDropVerificationReturnsCallbackCanceled) { setUpVerifyCollectionWasDroppedTest(); diff --git a/src/mongo/db/repl/database_cloner.cpp b/src/mongo/db/repl/database_cloner.cpp index 1d5223f54fb..f439e0061b9 100644 --- a/src/mongo/db/repl/database_cloner.cpp +++ b/src/mongo/db/repl/database_cloner.cpp @@ -65,9 +65,18 @@ const char* kOptionsFieldName = "options"; const char* kInfoFieldName = "info"; const char* kUUIDFieldName = "uuid"; -// The batchSize to use for the find/getMore queries called by the CollectionCloner -constexpr int kUseARMDefaultBatchSize = -1; -MONGO_EXPORT_STARTUP_SERVER_PARAMETER(collectionClonerBatchSize, int, kUseARMDefaultBatchSize); +// The batch size (number of documents) to use for the queries in the CollectionCloner. Default of +// 0 means the limit is the number of documents which fit in a single BSON object. +MONGO_EXPORT_STARTUP_SERVER_PARAMETER(collectionClonerBatchSize, int, 0) + ->withValidator([](const int& batchSize) { + return (batchSize >= 0) + ? Status::OK() + : Status(ErrorCodes::Error(50952), + str::stream() + << "collectionClonerBatchSize must be greater than or equal to 0. '" + << batchSize + << "' is an invalid setting."); + }); // The number of attempts for the listCollections commands. MONGO_EXPORT_SERVER_PARAMETER(numInitialSyncListCollectionsAttempts, int, 3); diff --git a/src/mongo/db/repl/database_cloner_test.cpp b/src/mongo/db/repl/database_cloner_test.cpp index 1238a794c76..adc479b7967 100644 --- a/src/mongo/db/repl/database_cloner_test.cpp +++ b/src/mongo/db/repl/database_cloner_test.cpp @@ -38,6 +38,7 @@ #include "mongo/db/repl/base_cloner_test_fixture.h" #include "mongo/db/repl/database_cloner.h" #include "mongo/db/repl/storage_interface.h" +#include "mongo/dbtests/mock/mock_dbclient_connection.h" #include "mongo/unittest/task_executor_proxy.h" #include "mongo/unittest/unittest.h" #include "mongo/util/mongoutils/str.h" @@ -59,6 +60,11 @@ struct CollectionCloneInfo { class DatabaseClonerTest : public BaseClonerTest { public: + DatabaseClonerTest() { + _options1.uuid = UUID::gen(); + _options2.uuid = UUID::gen(); + _options3.uuid = UUID::gen(); + } void clear() override; BaseCloner* getCloner() const override; @@ -77,6 +83,11 @@ protected: std::map _collections; std::unique_ptr _databaseCloner; + CollectionOptions _options1; + CollectionOptions _options2; + CollectionOptions _options3; + DatabaseCloner::StartCollectionClonerFn _startCollectionCloner; + std::unique_ptr _mockServer; }; void DatabaseClonerTest::setUp() { @@ -96,6 +107,19 @@ void DatabaseClonerTest::setUp() { return getExecutor().scheduleWork(work); }); + _mockServer = stdx::make_unique(target.toString()); + _mockServer->assignCollectionUuid("db.a", *_options1.uuid); + _mockServer->assignCollectionUuid("db.b", *_options2.uuid); + _mockServer->assignCollectionUuid("db.c", *_options3.uuid); + _startCollectionCloner = [this](CollectionCloner& cloner) { + cloner.setCreateClientFn_forTest([&cloner, this]() { + return std::unique_ptr( + new MockDBClientConnection(_mockServer.get())); + }); + return cloner.startup(); + }; + _databaseCloner->setStartCollectionClonerFn(_startCollectionCloner); + storageInterface->createCollectionForBulkFn = [this](const NamespaceString& nss, const CollectionOptions& options, @@ -366,15 +390,15 @@ TEST_F(DatabaseClonerTest, ListCollectionsPredicate) { const std::vector sourceInfos = {BSON("name" << "a" << "options" - << BSONObj()), + << _options1.toBSON()), BSON("name" << "b" << "options" - << BSONObj()), + << _options2.toBSON()), BSON("name" << "c" << "options" - << BSONObj())}; + << _options3.toBSON())}; { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); processNetworkResponse(createListCollectionsResponse( @@ -400,11 +424,11 @@ TEST_F(DatabaseClonerTest, ListCollectionsMultipleBatches) { const std::vector sourceInfos = {BSON("name" << "a" << "options" - << BSONObj()), + << _options1.toBSON()), BSON("name" << "b" << "options" - << BSONObj())}; + << _options2.toBSON())}; { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); processNetworkResponse(createListCollectionsResponse(1, BSON_ARRAY(sourceInfos[0]))); @@ -447,7 +471,7 @@ TEST_F(DatabaseClonerTest, CollectionInfoNameFieldMissing) { { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); processNetworkResponse( - createListCollectionsResponse(0, BSON_ARRAY(BSON("options" << BSONObj())))); + createListCollectionsResponse(0, BSON_ARRAY(BSON("options" << _options1.toBSON())))); } ASSERT_EQUALS(ErrorCodes::FailedToParse, getStatus().code()); @@ -465,7 +489,7 @@ TEST_F(DatabaseClonerTest, CollectionInfoNameNotAString) { { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); processNetworkResponse(createListCollectionsResponse( - 0, BSON_ARRAY(BSON("name" << 123 << "options" << BSONObj())))); + 0, BSON_ARRAY(BSON("name" << 123 << "options" << _options1.toBSON())))); } ASSERT_EQUALS(ErrorCodes::TypeMismatch, getStatus().code()); @@ -482,11 +506,12 @@ TEST_F(DatabaseClonerTest, CollectionInfoNameEmpty) { { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createListCollectionsResponse(0, - BSON_ARRAY(BSON("name" - << "" - << "options" - << BSONObj())))); + processNetworkResponse( + createListCollectionsResponse(0, + BSON_ARRAY(BSON("name" + << "" + << "options" + << _options1.toBSON())))); } ASSERT_EQUALS(ErrorCodes::BadValue, getStatus().code()); @@ -503,15 +528,16 @@ TEST_F(DatabaseClonerTest, CollectionInfoNameDuplicate) { { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createListCollectionsResponse(0, - BSON_ARRAY(BSON("name" - << "a" - << "options" - << BSONObj()) - << BSON("name" - << "a" - << "options" - << BSONObj())))); + processNetworkResponse( + createListCollectionsResponse(0, + BSON_ARRAY(BSON("name" + << "a" + << "options" + << _options1.toBSON()) + << BSON("name" + << "a" + << "options" + << _options2.toBSON())))); } ASSERT_EQUALS(ErrorCodes::DuplicateKey, getStatus().code()); @@ -581,6 +607,26 @@ TEST_F(DatabaseClonerTest, InvalidCollectionOptions) { ASSERT_EQUALS(DatabaseCloner::State::kComplete, _databaseCloner->getState_forTest()); } +TEST_F(DatabaseClonerTest, InvalidMissingUUID) { + ASSERT_EQUALS(DatabaseCloner::State::kPreStart, _databaseCloner->getState_forTest()); + + ASSERT_OK(_databaseCloner->startup()); + ASSERT_EQUALS(DatabaseCloner::State::kRunning, _databaseCloner->getState_forTest()); + + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createListCollectionsResponse(0, + BSON_ARRAY(BSON("name" + << "a" + << "options" + << BSONObj())))); + } + + ASSERT_EQUALS(50953, getStatus().code()); + ASSERT_FALSE(_databaseCloner->isActive()); + ASSERT_EQUALS(DatabaseCloner::State::kComplete, _databaseCloner->getState_forTest()); +} + TEST_F(DatabaseClonerTest, DatabaseClonerResendsListCollectionsRequestOnRetriableError) { ASSERT_EQUALS(DatabaseCloner::State::kPreStart, _databaseCloner->getState_forTest()); @@ -636,37 +682,6 @@ TEST_F(DatabaseClonerTest, ListCollectionsReturnsEmptyCollectionName) { ASSERT_EQUALS(DatabaseCloner::State::kComplete, _databaseCloner->getState_forTest()); } -TEST_F(DatabaseClonerTest, DatabaseClonerAcceptsCollectionOptionsContainUuid) { - ASSERT_EQUALS(DatabaseCloner::State::kPreStart, _databaseCloner->getState_forTest()); - - ASSERT_OK(_databaseCloner->startup()); - ASSERT_EQUALS(DatabaseCloner::State::kRunning, _databaseCloner->getState_forTest()); - - bool collectionClonerStarted = false; - _databaseCloner->setStartCollectionClonerFn( - [&collectionClonerStarted](CollectionCloner& cloner) { - collectionClonerStarted = true; - return cloner.startup(); - }); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - CollectionOptions options; - options.uuid = UUID::gen(); - processNetworkResponse( - createListCollectionsResponse(0, - BSON_ARRAY(BSON("name" - << "a" - << "options" - << options.toBSON())))); - } - - ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); - ASSERT_TRUE(collectionClonerStarted); - ASSERT_TRUE(_databaseCloner->isActive()); - ASSERT_EQUALS(DatabaseCloner::State::kRunning, _databaseCloner->getState_forTest()); -} - TEST_F(DatabaseClonerTest, StartFirstCollectionClonerFailed) { ASSERT_EQUALS(DatabaseCloner::State::kPreStart, _databaseCloner->getState_forTest()); @@ -680,11 +695,12 @@ TEST_F(DatabaseClonerTest, StartFirstCollectionClonerFailed) { { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createListCollectionsResponse(0, - BSON_ARRAY(BSON("name" - << "a" - << "options" - << BSONObj())))); + processNetworkResponse( + createListCollectionsResponse(0, + BSON_ARRAY(BSON("name" + << "a" + << "options" + << _options1.toBSON())))); } ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus().code()); @@ -701,28 +717,29 @@ TEST_F(DatabaseClonerTest, StartSecondCollectionClonerFailed) { const Status errStatus{ErrorCodes::OperationFailed, "StartSecondCollectionClonerFailed injected failure."}; - _databaseCloner->setStartCollectionClonerFn([errStatus](CollectionCloner& cloner) -> Status { - if (cloner.getSourceNamespace().coll() == "b") { - return errStatus; - } - return cloner.startup(); - }); + _databaseCloner->setStartCollectionClonerFn( + [errStatus, this](CollectionCloner& cloner) -> Status { + if (cloner.getSourceNamespace().coll() == "b") { + return errStatus; + } + return _startCollectionCloner(cloner); + }); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createListCollectionsResponse(0, - BSON_ARRAY(BSON("name" - << "a" - << "options" - << BSONObj()) - << BSON("name" - << "b" - << "options" - << BSONObj())))); + processNetworkResponse( + createListCollectionsResponse(0, + BSON_ARRAY(BSON("name" + << "a" + << "options" + << _options1.toBSON()) + << BSON("name" + << "b" + << "options" + << _options2.toBSON())))); processNetworkResponse(createCountResponse(0)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); - processNetworkResponse(createCursorResponse(0, BSONArray())); } _databaseCloner->join(); ASSERT_FALSE(_databaseCloner->isActive()); @@ -746,18 +763,18 @@ TEST_F(DatabaseClonerTest, ShutdownCancelsCollectionCloning) { BSON_ARRAY(BSON("name" << "a" << "options" - << BSONObj()) + << _options1.toBSON()) << BSON("name" << "b" << "options" - << BSONObj()))))); + << _options2.toBSON()))))); net->runReadyNetworkOperations(); // CollectionCloner sends collection count request on startup. // Blackhole count request to leave collection cloner active. auto noi = net->getNextReadyRequest(); assertRemoteCommandNameEquals("count", noi->getRequest()); - ASSERT_EQUALS("a", noi->getRequest().cmdObj.firstElement().String()); + ASSERT_EQUALS(*_options1.uuid, UUID::parse(noi->getRequest().cmdObj.firstElement())); net->blackHole(noi); } @@ -785,11 +802,11 @@ TEST_F(DatabaseClonerTest, FirstCollectionListIndexesFailed) { const std::vector sourceInfos = {BSON("name" << "a" << "options" - << BSONObj()), + << _options1.toBSON()), BSON("name" << "b" << "options" - << BSONObj())}; + << _options2.toBSON())}; { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); processNetworkResponse( @@ -810,7 +827,6 @@ TEST_F(DatabaseClonerTest, FirstCollectionListIndexesFailed) { processNetworkResponse(createCountResponse(0)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); - processNetworkResponse(createCursorResponse(0, BSONArray())); } _databaseCloner->join(); ASSERT_EQ(getStatus().code(), ErrorCodes::InitialSyncFailure); @@ -841,11 +857,11 @@ TEST_F(DatabaseClonerTest, CreateCollections) { const std::vector sourceInfos = {BSON("name" << "a" << "options" - << BSONObj()), + << _options1.toBSON()), BSON("name" << "b" << "options" - << BSONObj())}; + << _options2.toBSON())}; { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); processNetworkResponse( @@ -862,22 +878,12 @@ TEST_F(DatabaseClonerTest, CreateCollections) { processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); } ASSERT_TRUE(_databaseCloner->isActive()); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(0, BSONArray())); - } - ASSERT_TRUE(_databaseCloner->isActive()); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); processNetworkResponse(createCountResponse(0)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); } - ASSERT_TRUE(_databaseCloner->isActive()); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(0, BSONArray())); - } _databaseCloner->join(); ASSERT_OK(getStatus()); diff --git a/src/mongo/db/repl/databases_cloner.cpp b/src/mongo/db/repl/databases_cloner.cpp index e4cad653e4f..649c51ee7e5 100644 --- a/src/mongo/db/repl/databases_cloner.cpp +++ b/src/mongo/db/repl/databases_cloner.cpp @@ -230,6 +230,12 @@ void DatabasesCloner::setScheduleDbWorkFn_forTest(const CollectionCloner::Schedu _scheduleDbWorkFn = work; } +void DatabasesCloner::setStartCollectionClonerFn( + const StartCollectionClonerFn& startCollectionCloner) { + LockGuard lk(_mutex); + _startCollectionClonerFn = startCollectionCloner; +} + StatusWith> DatabasesCloner::parseListDatabasesResponse_forTest( BSONObj dbResponse) { return _parseListDatabasesResponse(dbResponse); @@ -360,6 +366,9 @@ void DatabasesCloner::_onListDatabaseFinish( if (_scheduleDbWorkFn) { dbCloner->setScheduleDbWorkFn_forTest(_scheduleDbWorkFn); } + if (_startCollectionClonerFn) { + dbCloner->setStartCollectionClonerFn(_startCollectionClonerFn); + } // Start first database cloner. if (_databaseCloners.empty()) { startStatus = dbCloner->startup(); diff --git a/src/mongo/db/repl/databases_cloner.h b/src/mongo/db/repl/databases_cloner.h index afb69c4ade9..dd189dc7fda 100644 --- a/src/mongo/db/repl/databases_cloner.h +++ b/src/mongo/db/repl/databases_cloner.h @@ -66,6 +66,8 @@ public: using IncludeDbFilterFn = stdx::function; using OnFinishFn = stdx::function; + using StartCollectionClonerFn = DatabaseCloner::StartCollectionClonerFn; + DatabasesCloner(StorageInterface* si, executor::TaskExecutor* exec, ThreadPool* dbWorkThreadPool, @@ -96,6 +98,13 @@ public: */ void setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& scheduleDbWorkFn); + /** + * Overrides how executor starts a collection cloner. + * + * For testing only. + */ + void setStartCollectionClonerFn(const StartCollectionClonerFn& startCollectionCloner); + /** * Calls DatabasesCloner::_setAdminAsFirst. * For testing only. @@ -170,6 +179,7 @@ private: ThreadPool* _dbWorkThreadPool; // (R) db worker thread pool for collection cloning. const HostAndPort _source; // (R) The source to use. CollectionCloner::ScheduleDbWorkFn _scheduleDbWorkFn; // (M) + StartCollectionClonerFn _startCollectionClonerFn; // (M) const IncludeDbFilterFn _includeDbFn; // (R) function which decides which dbs are cloned. OnFinishFn _finishFn; // (M) function called when finished. diff --git a/src/mongo/db/repl/databases_cloner_test.cpp b/src/mongo/db/repl/databases_cloner_test.cpp index 66f2138ceaa..11dad10fe9e 100644 --- a/src/mongo/db/repl/databases_cloner_test.cpp +++ b/src/mongo/db/repl/databases_cloner_test.cpp @@ -39,6 +39,7 @@ #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/service_context_test_fixture.h" +#include "mongo/dbtests/mock/mock_dbclient_connection.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/stdx/mutex.h" @@ -188,6 +189,8 @@ protected: }; _dbWorkThreadPool.startup(); + _target = HostAndPort{"local:1234"}; + _mockServer = stdx::make_unique(_target.toString()); } void tearDown() override { @@ -287,7 +290,7 @@ protected: DatabasesCloner cloner{&getStorage(), &getExecutor(), &getDbWorkThreadPool(), - HostAndPort{"local:1234"}, + _target, [](const BSONObj&) { return true; }, [&](const Status& status) { UniqueLock lk(mutex); @@ -300,6 +303,13 @@ protected: return getExecutor().scheduleWork(work); }); + cloner.setStartCollectionClonerFn([this](CollectionCloner& cloner) { + cloner.setCreateClientFn_forTest([&cloner, this]() { + return std::unique_ptr( + new MockDBClientConnection(_mockServer.get())); + }); + return cloner.startup(); + }); ASSERT_OK(cloner.startup()); ASSERT_TRUE(cloner.isActive()); @@ -327,6 +337,8 @@ private: protected: StorageInterfaceMock _storageInterface; + HostAndPort _target; + std::unique_ptr _mockServer; private: ThreadPool _dbWorkThreadPool; @@ -900,15 +912,22 @@ TEST_F(DBsClonerTest, AdminDbValidationErrorShouldAbortTheCloner) { } TEST_F(DBsClonerTest, SingleDatabaseCopiesCompletely) { + CollectionOptions options; + options.uuid = UUID::gen(); + _mockServer->assignCollectionUuid("a.a", *options.uuid); const Responses resps = { // Clone Start // listDatabases {"listDatabases", fromjson("{ok:1, databases:[{name:'a'}]}")}, // listCollections for "a" {"listCollections", - fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:[" - "{name:'a', options:{}} " - "]}}")}, + BSON("ok" << 1 << "cursor" << BSON("id" << 0ll << "ns" + << "a.$cmd.listCollections" + << "firstBatch" + << BSON_ARRAY(BSON("name" + << "a" + << "options" + << options.toBSON()))))}, // count:a {"count", BSON("n" << 1 << "ok" << 1)}, // listIndexes:a @@ -919,17 +938,18 @@ TEST_F(DBsClonerTest, SingleDatabaseCopiesCompletely) { "{v:" << OplogEntry::kOplogVersion << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}")}, - // find:a - {"find", - fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:[" - "{_id:1, a:1} " - "]}}")}, // Clone Done }; runCompleteClone(resps); } TEST_F(DBsClonerTest, TwoDatabasesCopiesCompletely) { + CollectionOptions options1; + CollectionOptions options2; + options1.uuid = UUID::gen(); + options2.uuid = UUID::gen(); + _mockServer->assignCollectionUuid("a.a", *options1.uuid); + _mockServer->assignCollectionUuid("b.b", *options1.uuid); const Responses resps = { // Clone Start @@ -937,9 +957,13 @@ TEST_F(DBsClonerTest, TwoDatabasesCopiesCompletely) { {"listDatabases", fromjson("{ok:1, databases:[{name:'a'}, {name:'b'}]}")}, // listCollections for "a" {"listCollections", - fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:[" - "{name:'a', options:{}} " - "]}}")}, + BSON("ok" << 1 << "cursor" << BSON("id" << 0ll << "ns" + << "a.$cmd.listCollections" + << "firstBatch" + << BSON_ARRAY(BSON("name" + << "a" + << "options" + << options1.toBSON()))))}, // count:a {"count", BSON("n" << 1 << "ok" << 1)}, // listIndexes:a @@ -949,16 +973,15 @@ TEST_F(DBsClonerTest, TwoDatabasesCopiesCompletely) { "{v:" << OplogEntry::kOplogVersion << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}")}, - // find:a - {"find", - fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:[" - "{_id:1, a:1} " - "]}}")}, // listCollections for "b" {"listCollections", - fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'b.$cmd.listCollections', firstBatch:[" - "{name:'b', options:{}} " - "]}}")}, + BSON("ok" << 1 << "cursor" << BSON("id" << 0ll << "ns" + << "b.$cmd.listCollections" + << "firstBatch" + << BSON_ARRAY(BSON("name" + << "b" + << "options" + << options2.toBSON()))))}, // count:b {"count", BSON("n" << 2 << "ok" << 1)}, // listIndexes:b @@ -968,11 +991,6 @@ TEST_F(DBsClonerTest, TwoDatabasesCopiesCompletely) { "{v:" << OplogEntry::kOplogVersion << ", key:{_id:1}, name:'_id_', ns:'b.b'}]}}")}, - // find:b - {"find", - fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'b.b', firstBatch:[" - "{_id:2, a:1},{_id:3, b:1}" - "]}}")}, }; runCompleteClone(resps); } diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index e577b273e5c..1665a83aba2 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -381,6 +381,12 @@ void InitialSyncer::setScheduleDbWorkFn_forTest(const CollectionCloner::Schedule _scheduleDbWorkFn = work; } +void InitialSyncer::setStartCollectionClonerFn( + const StartCollectionClonerFn& startCollectionCloner) { + LockGuard lk(_mutex); + _startCollectionClonerFn = startCollectionCloner; +} + void InitialSyncer::_setUp_inlock(OperationContext* opCtx, std::uint32_t initialSyncMaxAttempts) { // 'opCtx' is passed through from startup(). _replicationProcess->getConsistencyMarkers()->setInitialSyncFlag(opCtx); @@ -837,6 +843,9 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith // facilitate testing. _initialSyncState->dbsCloner->setScheduleDbWorkFn_forTest(_scheduleDbWorkFn); } + if (_startCollectionClonerFn) { + _initialSyncState->dbsCloner->setStartCollectionClonerFn(_startCollectionClonerFn); + } LOG(2) << "Starting DatabasesCloner: " << _initialSyncState->dbsCloner->toString(); diff --git a/src/mongo/db/repl/initial_syncer.h b/src/mongo/db/repl/initial_syncer.h index d9d1c1944f1..039b01d2056 100644 --- a/src/mongo/db/repl/initial_syncer.h +++ b/src/mongo/db/repl/initial_syncer.h @@ -41,6 +41,7 @@ #include "mongo/db/repl/callback_completion_guard.h" #include "mongo/db/repl/collection_cloner.h" #include "mongo/db/repl/data_replicator_external_state.h" +#include "mongo/db/repl/database_cloner.h" #include "mongo/db/repl/multiapplier.h" #include "mongo/db/repl/oplog_applier.h" #include "mongo/db/repl/oplog_buffer.h" @@ -48,6 +49,7 @@ #include "mongo/db/repl/optime.h" #include "mongo/db/repl/rollback_checker.h" #include "mongo/db/repl/sync_source_selector.h" +#include "mongo/dbtests/mock/mock_dbclient_connection.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/mutex.h" @@ -149,6 +151,8 @@ public: */ using OnCompletionGuard = CallbackCompletionGuard>; + using StartCollectionClonerFn = DatabaseCloner::StartCollectionClonerFn; + struct InitialSyncAttemptInfo { int durationMillis; Status status; @@ -218,6 +222,13 @@ public: */ void setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& scheduleDbWorkFn); + /** + * Overrides how executor starts a collection cloner. + * + * For testing only. + */ + void setStartCollectionClonerFn(const StartCollectionClonerFn& startCollectionCloner); + // State transitions: // PreStart --> Running --> ShuttingDown --> Complete // It is possible to skip intermediate states. For example, calling shutdown() when the data @@ -621,6 +632,7 @@ private: // Passed to CollectionCloner via DatabasesCloner. CollectionCloner::ScheduleDbWorkFn _scheduleDbWorkFn; // (M) + StartCollectionClonerFn _startCollectionClonerFn; // (M) // Contains stats on the current initial sync request (includes all attempts). // To access these stats in a user-readable format, use getInitialSyncProgress(). diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp index 5d65c8616ff..eed9e149a80 100644 --- a/src/mongo/db/repl/initial_syncer_test.cpp +++ b/src/mongo/db/repl/initial_syncer_test.cpp @@ -66,6 +66,7 @@ #include "mongo/util/mongoutils/str.h" #include "mongo/util/scopeguard.h" +#include "mongo/db/server_parameters.h" #include "mongo/unittest/barrier.h" #include "mongo/unittest/unittest.h" @@ -324,6 +325,10 @@ protected: _dbWorkThreadPool = stdx::make_unique(ThreadPool::Options()); _dbWorkThreadPool->startup(); + _target = HostAndPort{"localhost:12346"}; + _mockServer = stdx::make_unique(_target.toString()); + _options1.uuid = UUID::gen(); + Client::initThreadIfNotAlready(); reset(); @@ -401,7 +406,13 @@ protected: [this](const executor::TaskExecutor::CallbackFn& work) { return getExecutor().scheduleWork(work); }); - + _initialSyncer->setStartCollectionClonerFn([this](CollectionCloner& cloner) { + cloner.setCreateClientFn_forTest([&cloner, this]() { + return std::unique_ptr( + new MockDBClientConnection(_mockServer.get())); + }); + return cloner.startup(); + }); } catch (...) { ASSERT_OK(exceptionToStatus()); } @@ -454,6 +465,9 @@ protected: OpTime _myLastOpTime; std::unique_ptr _syncSourceSelector; std::unique_ptr _storageInterface; + HostAndPort _target; + std::unique_ptr _mockServer; + CollectionOptions _options1; std::unique_ptr _replicationProcess; std::unique_ptr _dbWorkThreadPool; std::map _collectionStats; @@ -2875,15 +2889,19 @@ TEST_F(InitialSyncerTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfter ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); net->blackHole(noi); + // Set up data for "a" + _mockServer->assignCollectionUuid(nss.ns(), *_options1.uuid); + _mockServer->insert(nss.ns(), BSON("_id" << 1 << "a" << 1)); + // listCollections for "a" - request = net->scheduleSuccessfulResponse( - makeCursorResponse(0LL, nss, {BSON("name" << nss.coll() << "options" << BSONObj())})); + request = net->scheduleSuccessfulResponse(makeCursorResponse( + 0LL, nss, {BSON("name" << nss.coll() << "options" << _options1.toBSON())})); assertRemoteCommandNameEquals("listCollections", request); // count:a request = assertRemoteCommandNameEquals( "count", net->scheduleSuccessfulResponse(BSON("n" << 1 << "ok" << 1))); - ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String()); + ASSERT_EQUALS(*_options1.uuid, UUID::parse(request.cmdObj.firstElement())); ASSERT_EQUALS(nss.db(), request.dbname); // listIndexes:a @@ -2896,14 +2914,7 @@ TEST_F(InitialSyncerTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfter << "_id_" << "ns" << nss.ns())}))); - ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String()); - ASSERT_EQUALS(nss.db(), request.dbname); - - // find:a - request = assertRemoteCommandNameEquals("find", - net->scheduleSuccessfulResponse(makeCursorResponse( - 0LL, nss, {BSON("_id" << 1 << "a" << 1)}))); - ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String()); + ASSERT_EQUALS(*_options1.uuid, UUID::parse(request.cmdObj.firstElement())); ASSERT_EQUALS(nss.db(), request.dbname); // Second last oplog entry fetcher. @@ -3537,9 +3548,13 @@ TEST_F(InitialSyncerTest, ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay")); net->runReadyNetworkOperations(); + // Set up data for "a" + _mockServer->assignCollectionUuid(nss.ns(), *_options1.uuid); + _mockServer->insert(nss.ns(), BSON("_id" << 1 << "a" << 1)); + // listCollections for "a" - request = net->scheduleSuccessfulResponse( - makeCursorResponse(0LL, nss, {BSON("name" << nss.coll() << "options" << BSONObj())})); + request = net->scheduleSuccessfulResponse(makeCursorResponse( + 0LL, nss, {BSON("name" << nss.coll() << "options" << _options1.toBSON())})); assertRemoteCommandNameEquals("listCollections", request); // Black hole OplogFetcher's getMore request. @@ -3551,7 +3566,7 @@ TEST_F(InitialSyncerTest, // count:a request = net->scheduleSuccessfulResponse(BSON("n" << 1 << "ok" << 1)); assertRemoteCommandNameEquals("count", request); - ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String()); + ASSERT_EQUALS(*_options1.uuid, UUID::parse(request.cmdObj.firstElement())); ASSERT_EQUALS(nss.db(), request.dbname); // listIndexes:a @@ -3563,14 +3578,7 @@ TEST_F(InitialSyncerTest, << "ns" << nss.ns())})); assertRemoteCommandNameEquals("listIndexes", request); - ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String()); - ASSERT_EQUALS(nss.db(), request.dbname); - - // find:a - request = net->scheduleSuccessfulResponse( - makeCursorResponse(0LL, nss, {BSON("_id" << 1 << "a" << 1)})); - assertRemoteCommandNameEquals("find", request); - ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String()); + ASSERT_EQUALS(*_options1.uuid, UUID::parse(request.cmdObj.firstElement())); ASSERT_EQUALS(nss.db(), request.dbname); // Second last oplog entry fetcher. @@ -3761,6 +3769,10 @@ TEST_F(InitialSyncerTest, OplogOutOfOrderOnOplogFetchFinish) { TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) { auto initialSyncer = &getInitialSyncer(); auto opCtx = makeOpCtx(); + ASSERT_OK(ServerParameterSet::getGlobal() + ->getMap() + .find("collectionClonerBatchSize") + ->second->setFromString("1")); _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 27017)); ASSERT_OK(initialSyncer->startup(opCtx.get(), 2U)); @@ -3882,9 +3894,15 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) { ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay")); net->runReadyNetworkOperations(); + // Set up data for "a" + _mockServer->assignCollectionUuid(nss.ns(), *_options1.uuid); + for (int i = 1; i <= 5; ++i) { + _mockServer->insert(nss.ns(), BSON("_id" << i << "a" << i)); + } + // listCollections for "a" - request = net->scheduleSuccessfulResponse( - makeCursorResponse(0LL, nss, {BSON("name" << nss.coll() << "options" << BSONObj())})); + request = net->scheduleSuccessfulResponse(makeCursorResponse( + 0LL, nss, {BSON("name" << nss.coll() << "options" << _options1.toBSON())})); assertRemoteCommandNameEquals("listCollections", request); auto noi = net->getNextReadyRequest(); @@ -3895,7 +3913,7 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) { // count:a request = net->scheduleSuccessfulResponse(BSON("n" << 5 << "ok" << 1)); assertRemoteCommandNameEquals("count", request); - ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String()); + ASSERT_EQUALS(*_options1.uuid, UUID::parse(request.cmdObj.firstElement())); ASSERT_EQUALS(nss.db(), request.dbname); // listIndexes:a @@ -3907,18 +3925,9 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) { << "ns" << nss.ns())})); assertRemoteCommandNameEquals("listIndexes", request); - ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String()); + ASSERT_EQUALS(*_options1.uuid, UUID::parse(request.cmdObj.firstElement())); ASSERT_EQUALS(nss.db(), request.dbname); - // find:a - 5 batches - for (int i = 1; i <= 5; ++i) { - request = net->scheduleSuccessfulResponse( - makeCursorResponse(i < 5 ? 2LL : 0LL, nss, {BSON("_id" << i << "a" << i)}, i == 1)); - ASSERT_EQUALS(i == 1 ? "find" : "getMore", - request.cmdObj.firstElement().fieldNameStringData()); - net->runReadyNetworkOperations(); - } - // Second last oplog entry fetcher. // Send oplog entry with timestamp 2. InitialSyncer will update this end timestamp after // applying the first batch. @@ -3950,7 +3959,7 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) { 5, collectionProgress.getIntField(CollectionCloner::Stats::kDocumentsCopiedFieldName)) << collectionProgress; ASSERT_EQUALS(1, collectionProgress.getIntField("indexes")) << collectionProgress; - ASSERT_EQUALS(5, collectionProgress.getIntField("fetchedBatches")) << collectionProgress; + ASSERT_EQUALS(5, collectionProgress.getIntField("receivedBatches")) << collectionProgress; attempts = progress["initialSyncAttempts"].Obj(); ASSERT_EQUALS(attempts.nFields(), 1) << progress; @@ -4061,8 +4070,10 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressOmitsClonerStatsIfClonerStatsExc // listCollections for "a" std::vector collectionInfos; for (std::size_t i = 0; i < numCollections; ++i) { + CollectionOptions options; const std::string collName = str::stream() << "coll-" << i; - collectionInfos.push_back(BSON("name" << collName << "options" << BSONObj())); + options.uuid = UUID::gen(); + collectionInfos.push_back(BSON("name" << collName << "options" << options.toBSON())); } request = net->scheduleSuccessfulResponse( makeCursorResponse(0LL, nss.getCommandNS(), collectionInfos)); diff --git a/src/mongo/dbtests/SConscript b/src/mongo/dbtests/SConscript index f604a3310c3..77fff17d2d3 100644 --- a/src/mongo/dbtests/SConscript +++ b/src/mongo/dbtests/SConscript @@ -39,7 +39,6 @@ env.Library( source=[ 'mock/mock_conn_registry.cpp', 'mock/mock_dbclient_connection.cpp', - 'mock/mock_dbclient_cursor.cpp', 'mock/mock_remote_db_server.cpp', 'mock/mock_replica_set.cpp' ], diff --git a/src/mongo/dbtests/mock/mock_dbclient_connection.cpp b/src/mongo/dbtests/mock/mock_dbclient_connection.cpp index b7c477227f9..dfd3332d978 100644 --- a/src/mongo/dbtests/mock/mock_dbclient_connection.cpp +++ b/src/mongo/dbtests/mock/mock_dbclient_connection.cpp @@ -29,7 +29,7 @@ #include "mongo/dbtests/mock/mock_dbclient_connection.h" -#include "mongo/dbtests/mock/mock_dbclient_cursor.h" +#include "mongo/client/dbclient_mockcursor.h" #include "mongo/util/net/socket_exception.h" #include "mongo/util/time_support.h" @@ -82,14 +82,11 @@ std::unique_ptr MockDBClientConnection::query( const BSONObj* fieldsToReturn, int queryOptions, int batchSize) { - // The mock client does not support UUIDs. - invariant(nsOrUuid.nss()); - checkConnection(); try { mongo::BSONArray result(_remoteServer->query(_remoteServerInstanceID, - nsOrUuid.nss()->ns(), + nsOrUuid, query, nToReturn, nToSkip, @@ -98,7 +95,7 @@ std::unique_ptr MockDBClientConnection::query( batchSize)); std::unique_ptr cursor; - cursor.reset(new MockDBClientCursor(this, result)); + cursor.reset(new DBClientMockCursor(this, BSONArray(result.copy()), batchSize)); return cursor; } catch (const mongo::DBException&) { _isFailed = true; @@ -130,9 +127,9 @@ unsigned long long MockDBClientConnection::query( const NamespaceStringOrUUID& nsOrUuid, mongo::Query query, const mongo::BSONObj* fieldsToReturn, - int queryOptions) { - verify(false); - return 0; + int queryOptions, + int batchSize) { + return DBClientBase::query(f, nsOrUuid, query, fieldsToReturn, queryOptions, batchSize); } uint64_t MockDBClientConnection::getSockCreationMicroSec() const { diff --git a/src/mongo/dbtests/mock/mock_dbclient_connection.h b/src/mongo/dbtests/mock/mock_dbclient_connection.h index 0871c82fcfc..a5186df803f 100644 --- a/src/mongo/dbtests/mock/mock_dbclient_connection.h +++ b/src/mongo/dbtests/mock/mock_dbclient_connection.h @@ -61,10 +61,12 @@ public: bool connect(const char* hostName, StringData applicationName, std::string& errmsg); - inline bool connect(const HostAndPort& host, - StringData applicationName, - std::string& errmsg) override { - return connect(host.toString().c_str(), applicationName, errmsg); + Status connect(const HostAndPort& host, StringData applicationName) override { + std::string errmsg; + if (!connect(host.toString().c_str(), applicationName, errmsg)) { + return {ErrorCodes::HostNotFound, errmsg}; + } + return Status::OK(); } using DBClientBase::runCommandWithTarget; @@ -104,7 +106,8 @@ public: const NamespaceStringOrUUID& nsOrUuid, mongo::Query query, const mongo::BSONObj* fieldsToReturn = 0, - int queryOptions = 0) override; + int queryOptions = 0, + int batchSize = 0) override; // // Unsupported methods (these are pure virtuals in the base class) diff --git a/src/mongo/dbtests/mock/mock_dbclient_cursor.cpp b/src/mongo/dbtests/mock/mock_dbclient_cursor.cpp deleted file mode 100644 index f0fb2499adb..00000000000 --- a/src/mongo/dbtests/mock/mock_dbclient_cursor.cpp +++ /dev/null @@ -1,49 +0,0 @@ -//@file dbclientmockcursor.h - -/* Copyright 2012 10gen Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/dbtests/mock/mock_dbclient_cursor.h" - -namespace mongo { -MockDBClientCursor::MockDBClientCursor(mongo::DBClientBase* client, - const mongo::BSONArray& resultSet) - : mongo::DBClientCursor(client, NamespaceString(), 0, 0, 0) { - _resultSet = resultSet.copy(); - _cursor.reset(new mongo::DBClientMockCursor(client, BSONArray(_resultSet))); -} - -bool MockDBClientCursor::more() { - return _cursor->more(); -} - -mongo::BSONObj MockDBClientCursor::next() { - return _cursor->next(); -} -} diff --git a/src/mongo/dbtests/mock/mock_dbclient_cursor.h b/src/mongo/dbtests/mock/mock_dbclient_cursor.h deleted file mode 100644 index 2a0561c781b..00000000000 --- a/src/mongo/dbtests/mock/mock_dbclient_cursor.h +++ /dev/null @@ -1,58 +0,0 @@ -//@file dbclientmockcursor.h - -/* Copyright 2012 10gen Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#pragma once - - -#include "mongo/client/dbclient_cursor.h" -#include "mongo/client/dbclient_mockcursor.h" - -namespace mongo { - -/** - * Simple adapter class for mongo::DBClientMockCursor to mongo::DBClientCursor. - * Only supports more and next, the behavior of other operations are undefined. - */ -class MockDBClientCursor : public mongo::DBClientCursor { -public: - MockDBClientCursor(mongo::DBClientBase* client, const mongo::BSONArray& mockCollection); - - bool more() override; - - /** - * Note: has the same contract as DBClientCursor - returned BSONObj will - * become invalid when this cursor is destroyed. - */ - mongo::BSONObj next() override; - -private: - std::unique_ptr _cursor; - mongo::BSONObj _resultSet; -}; -} diff --git a/src/mongo/dbtests/mock/mock_remote_db_server.cpp b/src/mongo/dbtests/mock/mock_remote_db_server.cpp index baa35e0f8f6..23021cf9fd8 100644 --- a/src/mongo/dbtests/mock/mock_remote_db_server.cpp +++ b/src/mongo/dbtests/mock/mock_remote_db_server.cpp @@ -135,6 +135,11 @@ void MockRemoteDBServer::remove(const string& ns, Query query, int flags) { _dataMgr.erase(ns); } +void MockRemoteDBServer::assignCollectionUuid(const std::string& ns, const mongo::UUID& uuid) { + scoped_spinlock sLock(_lock); + _uuidToNs[uuid] = ns; +} + rpc::UniqueReply MockRemoteDBServer::runCommand(InstanceID id, const OpMsgRequest& request) { checkIfUp(id); std::string cmdName = request.getCommandName().toString(); @@ -169,7 +174,7 @@ rpc::UniqueReply MockRemoteDBServer::runCommand(InstanceID id, const OpMsgReques } mongo::BSONArray MockRemoteDBServer::query(MockRemoteDBServer::InstanceID id, - const string& ns, + const NamespaceStringOrUUID& nsOrUuid, mongo::Query query, int nToReturn, int nToSkip, @@ -187,6 +192,7 @@ mongo::BSONArray MockRemoteDBServer::query(MockRemoteDBServer::InstanceID id, scoped_spinlock sLock(_lock); _queryCount++; + auto ns = nsOrUuid.uuid() ? _uuidToNs[*nsOrUuid.uuid()] : nsOrUuid.nss()->ns(); const vector& coll = _dataMgr[ns]; BSONArrayBuilder result; for (vector::const_iterator iter = coll.begin(); iter != coll.end(); ++iter) { diff --git a/src/mongo/dbtests/mock/mock_remote_db_server.h b/src/mongo/dbtests/mock/mock_remote_db_server.h index 3c147220668..9c1ab72184c 100644 --- a/src/mongo/dbtests/mock/mock_remote_db_server.h +++ b/src/mongo/dbtests/mock/mock_remote_db_server.h @@ -146,13 +146,21 @@ public: */ void remove(const std::string& ns, Query query, int flags = 0); + /** + * Assign a UUID to a collection + * + * @param ns the namespace to be associated with the uuid. + * @param uuid the uuid to associate with the namespace. + */ + void assignCollectionUuid(const std::string& ns, const mongo::UUID& uuid); + // // DBClientBase methods // rpc::UniqueReply runCommand(InstanceID id, const OpMsgRequest& request); mongo::BSONArray query(InstanceID id, - const std::string& ns, + const NamespaceStringOrUUID& nsOrUuid, mongo::Query query = mongo::Query(), int nToReturn = 0, int nToSkip = 0, @@ -210,6 +218,7 @@ private: typedef stdx::unordered_map> CmdToReplyObj; typedef stdx::unordered_map> MockDataMgr; + typedef stdx::unordered_map UUIDMap; bool _isRunning; @@ -221,6 +230,7 @@ private: // CmdToReplyObj _cmdMap; MockDataMgr _dataMgr; + UUIDMap _uuidToNs; // // Op Counters -- cgit v1.2.1