diff options
author | Jason Chan <jason.chan@10gen.com> | 2020-12-03 16:30:42 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-12-07 18:06:54 +0000 |
commit | 07430d1cb41b102670783afeb21361d61b564783 (patch) | |
tree | 1ada85335bde8e729492287c31ef38d9a3de80a2 | |
parent | 9b518b1c13000e6c0568ee43d520b6bd712e820a (diff) | |
download | mongo-07430d1cb41b102670783afeb21361d61b564783.tar.gz |
SERVER-47863 Add fields to initial sync progress metrics
-rw-r--r-- | jstests/replsets/get_replication_info_helper.js | 38 | ||||
-rw-r--r-- | jstests/replsets/initial_sync_replSetGetStatus.js | 108 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner.cpp | 29 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/databases_cloner.cpp | 48 | ||||
-rw-r--r-- | src/mongo/db/repl/databases_cloner.h | 19 | ||||
-rw-r--r-- | src/mongo/db/repl/databases_cloner_test.cpp | 26 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_syncer.cpp | 40 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_syncer.h | 18 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_syncer_test.cpp | 68 | ||||
-rw-r--r-- | src/mongo/shell/db.js | 22 |
11 files changed, 385 insertions, 34 deletions
diff --git a/jstests/replsets/get_replication_info_helper.js b/jstests/replsets/get_replication_info_helper.js index 39355e2ceda..0558d8ee5ee 100644 --- a/jstests/replsets/get_replication_info_helper.js +++ b/jstests/replsets/get_replication_info_helper.js @@ -4,12 +4,46 @@ (function() { "use strict"; var name = "getReplicationInfo"; -var replSet = new ReplSetTest({name: name, nodes: 3, oplogSize: 50}); -var nodes = replSet.nodeList(); +const replSet = new ReplSetTest({name: name, nodes: 2, oplogSize: 50}); + replSet.startSet(); replSet.initiate(); var primary = replSet.getPrimary(); + +// Test that db.printSlaveReplicationInfo() and db.printSecondaryReplicationInfo() both print +// out initial sync info when called during an initial sync. +const syncTarget = replSet.add({ + rsConfig: {votes: 0, priority: 0}, + setParameter: { + 'failpoint.forceSyncSourceCandidate': + tojson({mode: 'alwaysOn', data: {hostAndPort: primary.name}}) + } +}); +syncTarget.setSecondaryOk(); +assert.commandWorked(syncTarget.getDB('admin').runCommand( + {configureFailPoint: 'initialSyncHangBeforeFinish', mode: 'alwaysOn'})); +replSet.reInitiate(); + +// Wait for initial sync to pause before it copies the databases. +checkLog.contains(syncTarget, 'initial sync - initialSyncHangBeforeFinish fail point enabled'); +const callPrintSecondaryReplInfo = startParallelShell( + "db.getSiblingDB('admin').printSecondaryReplicationInfo();", syncTarget.port); +callPrintSecondaryReplInfo(); +assert(rawMongoProgramOutput().match("InitialSyncSyncSource: " + primary.name)); +assert(rawMongoProgramOutput().match("InitialSyncRemainingEstimatedDuration: ")); +clearRawMongoProgramOutput(); + +const callPrintSlaveReplInfo = + startParallelShell("db.getSiblingDB('admin').printSlaveReplicationInfo();", syncTarget.port); +callPrintSlaveReplInfo(); +assert(rawMongoProgramOutput().match("InitialSyncSyncSource: " + primary.name)); +assert(rawMongoProgramOutput().match("InitialSyncRemainingEstimatedDuration: ")); +clearRawMongoProgramOutput(); +assert.commandWorked(syncTarget.getDB('admin').runCommand( + {configureFailPoint: 'initialSyncHangBeforeFinish', mode: 'off'})); +replSet.awaitSecondaryNodes(); + for (var i = 0; i < 100; i++) { primary.getDB('test').foo.insert({a: i}); } diff --git a/jstests/replsets/initial_sync_replSetGetStatus.js b/jstests/replsets/initial_sync_replSetGetStatus.js index 57289816d51..8b1d7364e9d 100644 --- a/jstests/replsets/initial_sync_replSetGetStatus.js +++ b/jstests/replsets/initial_sync_replSetGetStatus.js @@ -16,12 +16,24 @@ replSet.startSet(); replSet.initiate(); var primary = replSet.getPrimary(); +const barColl = primary.getDB('pretest').bar; +assert.commandWorked(barColl.insert({a: 1})); +assert.commandWorked(barColl.insert({a: 2})); +assert.commandWorked(barColl.insert({a: 3})); + var coll = primary.getDB('test').foo; assert.writeOK(coll.insert({a: 1})); assert.writeOK(coll.insert({a: 2})); // Add a secondary node but make it hang before copying databases. -var secondary = replSet.add(); +let secondary = replSet.add({ + rsConfig: {votes: 0, priority: 0}, + setParameter: { + 'collectionClonerBatchSize': 2, + 'failpoint.initialSyncHangDuringCollectionClone': + tojson({mode: 'alwaysOn', data: {namespace: barColl.getFullName(), numDocsToClone: 2}}) + } +}); secondary.setSlaveOk(); assert.commandWorked(secondary.getDB('admin').runCommand( @@ -53,23 +65,91 @@ assert.writeOK(coll.insert({a: 4})); assert.commandWorked(secondary.getDB('admin').runCommand( {configureFailPoint: 'initialSyncHangBeforeCopyingDatabases', mode: 'off'})); +// Wait for initial sync to pause halfway through cloning the 'pretest.bar' collection. +checkLog.contains(secondary, + "initial sync - initialSyncHangDuringCollectionClone fail point enabled."); + +const pretestDbRes = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1})); + +assert.gt(pretestDbRes.initialSyncStatus.totalInitialSyncElapsedMillis, 0); +assert.gt(pretestDbRes.initialSyncStatus.remainingInitialSyncEstimatedMillis, 0); +assert.gt(pretestDbRes.initialSyncStatus.approxTotalDataSize, 0); + +assert.eq(pretestDbRes.initialSyncStatus.databases.pretest.collections, 1); +assert.eq(pretestDbRes.initialSyncStatus.databases.pretest.clonedCollections, 0); + +let barCollRes = pretestDbRes.initialSyncStatus.databases.pretest["pretest.bar"]; +assert.eq(barCollRes.documentsToCopy, 3); +// Even though we set the collectionClonerBatchSize to 2, it is possible for a batch to actually +// return only 1 document. This can lead to us hitting the failpoint in the next batch instead, +// causing us to copy up to 3 documents. +assert.lte(barCollRes.documentsCopied, 3); +assert.gt(barCollRes.bytesToCopy, 0); +assert.gt(barCollRes.approxBytesCopied, 0); +assert.lte(barCollRes.approxBytesCopied, barCollRes.bytesToCopy); +assert.lt(barCollRes.approxBytesCopied, pretestDbRes.initialSyncStatus.approxTotalDataSize); + +const bytesCopiedAdminDb = + pretestDbRes.initialSyncStatus.databases.admin["admin.system.version"].approxBytesCopied + + pretestDbRes.initialSyncStatus.databases.admin["admin.system.keys"].approxBytesCopied; +assert.eq(pretestDbRes.initialSyncStatus.approxTotalBytesCopied, + bytesCopiedAdminDb + barCollRes.approxBytesCopied); +assert.gt(pretestDbRes.initialSyncStatus.approxTotalBytesCopied, 0); + +// The server still has the 'pretest' and 'test' dbs to finish cloning. +assert.eq(pretestDbRes.initialSyncStatus.databases.databasesCloned, 2); +assert.eq(pretestDbRes.initialSyncStatus.databases.databasesToClone, 2); + +assert.commandWorked(secondary.adminCommand( + {configureFailPoint: 'initialSyncHangDuringCollectionClone', mode: "off"})); + // Wait for initial sync to pause right before it finishes. checkLog.contains(secondary, 'initial sync - initialSyncHangBeforeFinish fail point enabled'); -res = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1})); -assert(res.initialSyncStatus, +const endOfCloningRes = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1})); +assert(endOfCloningRes.initialSyncStatus, () => "Response should have an 'initialSyncStatus' field: " + tojson(res)); -assert.eq(res.initialSyncStatus.fetchedMissingDocs, 0); -assert.eq(res.initialSyncStatus.appliedOps, 3); -assert.eq(res.initialSyncStatus.failedInitialSyncAttempts, 0); -assert.eq(res.initialSyncStatus.maxFailedInitialSyncAttempts, 10); -assert.eq(res.initialSyncStatus.databases.databasesCloned, 3); -assert.eq(res.initialSyncStatus.databases.test.collections, 1); -assert.eq(res.initialSyncStatus.databases.test.clonedCollections, 1); -assert.eq(res.initialSyncStatus.databases.test["test.foo"].documentsToCopy, 4); -assert.eq(res.initialSyncStatus.databases.test["test.foo"].documentsCopied, 4); -assert.eq(res.initialSyncStatus.databases.test["test.foo"].indexes, 1); -assert.eq(res.initialSyncStatus.databases.test["test.foo"].fetchedBatches, 1); + +// Assert metrics have progressed in the right direction since the last time we checked the metrics. +assert.gt(endOfCloningRes.initialSyncStatus.totalInitialSyncElapsedMillis, + pretestDbRes.initialSyncStatus.totalInitialSyncElapsedMillis); +assert.lt(endOfCloningRes.initialSyncStatus.remainingInitialSyncEstimatedMillis, + pretestDbRes.initialSyncStatus.remainingInitialSyncEstimatedMillis); +assert.gt(endOfCloningRes.initialSyncStatus.approxTotalBytesCopied, + pretestDbRes.initialSyncStatus.approxTotalBytesCopied); +assert.eq(endOfCloningRes.initialSyncStatus.approxTotalDataSize, + pretestDbRes.initialSyncStatus.approxTotalDataSize); + +assert.eq(endOfCloningRes.initialSyncStatus.failedInitialSyncAttempts, 0); +assert.eq(endOfCloningRes.initialSyncStatus.maxFailedInitialSyncAttempts, 10); + +assert.eq(endOfCloningRes.initialSyncStatus.databases.databasesCloned, 4); +assert.eq(endOfCloningRes.initialSyncStatus.databases.databasesToClone, 0); + +assert.eq(endOfCloningRes.initialSyncStatus.databases.pretest.collections, 1); +assert.eq(endOfCloningRes.initialSyncStatus.databases.pretest.clonedCollections, 1); +barCollRes = endOfCloningRes.initialSyncStatus.databases.pretest["pretest.bar"]; +assert.eq(barCollRes.documentsToCopy, 3); +assert.eq(barCollRes.documentsCopied, 3); +assert.eq(barCollRes.indexes, 1); +assert.lte(barCollRes.fetchedBatches, 2); +assert.gt(barCollRes.bytesToCopy, 0); +assert.eq(barCollRes.approxBytesCopied, barCollRes.bytesToCopy); + +let fooCollRes = endOfCloningRes.initialSyncStatus.databases.test["test.foo"]; +assert.eq(endOfCloningRes.initialSyncStatus.databases.test.collections, 1); +assert.eq(endOfCloningRes.initialSyncStatus.databases.test.clonedCollections, 1); +assert.eq(fooCollRes.documentsToCopy, 4); +assert.eq(fooCollRes.documentsCopied, 4); +assert.eq(fooCollRes.indexes, 1); +assert.lte(fooCollRes.fetchedBatches, 2); +assert.gt(fooCollRes.bytesToCopy, 0); +assert.eq(fooCollRes.approxBytesCopied, fooCollRes.bytesToCopy); + +assert.eq(endOfCloningRes.initialSyncStatus.approxTotalDataSize, + endOfCloningRes.initialSyncStatus.approxTotalBytesCopied); +assert.eq(endOfCloningRes.initialSyncStatus.approxTotalBytesCopied, + fooCollRes.approxBytesCopied + barCollRes.approxBytesCopied + bytesCopiedAdminDb); // Let initial sync finish and get into secondary state. assert.commandWorked(secondary.getDB('admin').runCommand( diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index 80efca4571e..09d405c72d9 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -544,6 +544,30 @@ void CollectionCloner::_runQuery(const executor::TaskExecutor::CallbackArgs& cal } try { + BSONObj res; + _clientConnection->runCommand( + _sourceNss.db().toString(), BSON("collStats" << _sourceNss.coll().toString()), res); + if (auto status = getStatusFromCommandResult(res); status.isOK()) { + stdx::lock_guard<Latch> lock(_mutex); + _stats.bytesToCopy = res.getField("size").safeNumberLong(); + if (_stats.bytesToCopy > 0) { + // The 'avgObjSize' parameter is only available if 'collStats' returns a 'size' + // field greater than zero. + _stats.avgObjSize = res.getField("avgObjSize").safeNumberLong(); + } + } else { + LOG(1) << "Skipping the recording of some initial sync metrics due to a bad status " + "response from calling 'collStats' on source '" + << _source.toString() << "': " << status; + } + } catch (const DBException& e) { + LOG(1) + << "Skipping the recording of some initial sync metrics due to an error when calling " + "'collStats' on source '" + << _source.toString() << "': " << e.toStatus(); + } + + try { _clientConnection->query( [this, onCompletionGuard](DBClientCursorBatchIterator& iter) { _handleNextBatch(onCompletionGuard, iter); @@ -700,6 +724,7 @@ void CollectionCloner::_insertDocumentsCallback( } _documentsToInsert.swap(docs); _stats.documentsCopied += docs.size(); + _stats.approxBytesCopied = ((long)_stats.documentsCopied) * _stats.avgObjSize; ++_stats.fetchedBatches; _progressMeter.hit(int(docs.size())); invariant(_collLoader); @@ -789,6 +814,10 @@ void CollectionCloner::Stats::append(BSONObjBuilder* builder) const { builder->appendNumber(kDocumentsCopiedFieldName, documentsCopied); builder->appendNumber("indexes", indexes); builder->appendNumber("fetchedBatches", fetchedBatches); + builder->appendNumber("bytesToCopy", bytesToCopy); + if (bytesToCopy) { + builder->appendNumber("approxBytesCopied", approxBytesCopied); + } if (start != Date_t()) { builder->appendDate("start", start); if (end != Date_t()) { diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h index 1eb92679c9e..8c3769797e2 100644 --- a/src/mongo/db/repl/collection_cloner.h +++ b/src/mongo/db/repl/collection_cloner.h @@ -82,6 +82,9 @@ public: size_t indexes{0}; size_t fetchedBatches{0}; // This is actually inserted batches. size_t receivedBatches{0}; + long long bytesToCopy{0}; + long long avgObjSize{0}; + long long approxBytesCopied{0}; std::string toString() const; BSONObj toBSON() const; diff --git a/src/mongo/db/repl/databases_cloner.cpp b/src/mongo/db/repl/databases_cloner.cpp index 1a0857ce6b7..5737ef7e2a6 100644 --- a/src/mongo/db/repl/databases_cloner.cpp +++ b/src/mongo/db/repl/databases_cloner.cpp @@ -41,6 +41,7 @@ #include "mongo/db/catalog/collection_options.h" #include "mongo/db/client.h" #include "mongo/db/repl/databases_cloner_gen.h" +#include "mongo/db/repl/oplogreader.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/stdx/functional.h" @@ -74,7 +75,8 @@ DatabasesCloner::DatabasesCloner(StorageInterface* si, _source(source), _includeDbFn(includeDbPred), _finishFn(finishFn), - _storage(si) { + _storage(si), + _createClientFn([] { return stdx::make_unique<DBClientConnection>(); }) { uassert(ErrorCodes::InvalidOptions, "storage interface must be provided.", si); uassert(ErrorCodes::InvalidOptions, "executor must be provided.", exec); uassert( @@ -93,6 +95,7 @@ std::string DatabasesCloner::toString() const { return str::stream() << "initial sync --" << " active:" << _isActive_inlock() << " status:" << _status.toString() << " source:" << _source.toString() + << " db cloners remaining:" << _stats.databasesToClone << " db cloners completed:" << _stats.databasesCloned << " db count:" << _databaseCloners.size(); } @@ -170,6 +173,7 @@ BSONObj DatabasesCloner::Stats::toBSON() const { } void DatabasesCloner::Stats::append(BSONObjBuilder* builder) const { + builder->appendNumber("databasesToClone", databasesToClone); builder->appendNumber("databasesCloned", databasesCloned); for (auto&& db : databaseStats) { BSONObjBuilder dbBuilder(builder->subobjStart(db.dbname)); @@ -227,6 +231,11 @@ void DatabasesCloner::setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& work) _scheduleDbWorkFn = work; } +void DatabasesCloner::setCreateClientFn_forTest(const CreateClientFn& createClientFn) { + stdx::lock_guard<Latch> lk(_mutex); + _createClientFn = createClientFn; +} + void DatabasesCloner::setStartCollectionClonerFn( const StartCollectionClonerFn& startCollectionCloner) { LockGuard lk(_mutex); @@ -303,6 +312,10 @@ void DatabasesCloner::_onListDatabaseFinish( // initial sync is occurring. _setAdminAsFirst(dbsArray); + // Create a client connection to call 'dbStats' to gather initial sync metrics. + auto clientConnection = _createClientFn(); + const auto clientConnectionStatus = clientConnection->connect(_source, StringData()); + for (BSONElement arrayElement : dbsArray) { const BSONObj dbBSON = arrayElement.Obj(); @@ -384,6 +397,37 @@ void DatabasesCloner::_onListDatabaseFinish( // add cloner to list. _databaseCloners.push_back(dbCloner); + + try { + if (!replAuthenticate(clientConnection.get())) { + LOG(1) << "Skipping the recording of initial sync data size metrics for the '" + << dbName << "' database as we failed to authenticate to " << _source; + continue; + } + auto status = clientConnectionStatus; + if (status.isOK()) { + BSONObj res; + clientConnection->runCommand(dbName, BSON("dbStats" << 1), res); + auto respStatus = getStatusFromCommandResult(res); + if (respStatus.isOK()) { + _stats.dataSize += res.getField("dataSize").safeNumberLong(); + continue; + } + // 'dbStats' returned a bad status. + status = respStatus; + } + // It is possible for the call to 'dbStats' to fail if the sync source contains + // invalid views. We should not fail initial sync in this case due to the + // situation where the replica set may have lost majority availability and + // therefore have no access to a primary to fix the view definitions. Instead, + // we simply skip recording the data size metrics. + LOG(1) << "Skipping the recording of initial sync data size metrics for the '" << dbName + << "' database due to bad status response to 'dbStats' command: " << status; + } catch (const DBException& e) { + LOG(1) << "Skipping the recording of initial sync data size metrics for the '" << dbName + << "' database due to an error when calling the 'dbStats' command: " + << e.toStatus(); + } } if (_databaseCloners.size() == 0) { if (_status.isOK()) { @@ -392,6 +436,7 @@ void DatabasesCloner::_onListDatabaseFinish( _fail_inlock(&lk, _status); } } + _stats.databasesToClone = _databaseCloners.size(); } std::vector<std::shared_ptr<DatabaseCloner>> DatabasesCloner::_getDatabaseCloners() const { @@ -435,6 +480,7 @@ void DatabasesCloner::_onEachDBCloneFinish(const Status& status, const std::stri } _stats.databasesCloned++; + _stats.databasesToClone--; if (_stats.databasesCloned == _databaseCloners.size()) { _succeed_inlock(&lk); diff --git a/src/mongo/db/repl/databases_cloner.h b/src/mongo/db/repl/databases_cloner.h index db6f1129edf..f0d85907386 100644 --- a/src/mongo/db/repl/databases_cloner.h +++ b/src/mongo/db/repl/databases_cloner.h @@ -57,6 +57,8 @@ class DatabasesCloner { public: struct Stats { size_t databasesCloned{0}; + size_t databasesToClone{0}; + long long dataSize{0}; std::vector<DatabaseCloner::Stats> databaseStats; std::string toString() const; @@ -70,6 +72,13 @@ public: using ScheduleDbWorkFn = stdx::function<StatusWith<executor::TaskExecutor::CallbackHandle>( executor::TaskExecutor::CallbackFn)>; + /** + * Type of function to create a database client + * + * Used for testing only. + */ + using CreateClientFn = stdx::function<std::unique_ptr<DBClientConnection>()>; + DatabasesCloner(StorageInterface* si, executor::TaskExecutor* exec, ThreadPool* dbWorkThreadPool, @@ -119,6 +128,13 @@ public: */ StatusWith<std::vector<BSONElement>> parseListDatabasesResponse_forTest(BSONObj dbResponse); + /** + * Allows a different client class to be injected. + * + * For testing only. + */ + void setCreateClientFn_forTest(const CreateClientFn& createClientFn); + private: bool _isActive_inlock() const; @@ -174,6 +190,7 @@ private: // (R) Read-only in concurrent operation; no synchronization required. // (M) Reads and writes guarded by _mutex // (S) Self-synchronizing; access in any way from any context. + // (RT) Read-only in concurrent operation; synchronized externally by tests // mutable Mutex _mutex = MONGO_MAKE_LATCH("DatabasesCloner::_mutex"); // (S) Status _status{ErrorCodes::NotYetInitialized, ""}; // (M) If it is not OK, we stop everything. @@ -191,6 +208,8 @@ private: std::vector<std::shared_ptr<DatabaseCloner>> _databaseCloners; // (M) database cloners by name Stats _stats; // (M) + CreateClientFn _createClientFn; // (RT) Function for creating a database client. + // State transitions: // PreStart --> Running --> ShuttingDown --> Complete // It is possible to skip intermediate states. For example, diff --git a/src/mongo/db/repl/databases_cloner_test.cpp b/src/mongo/db/repl/databases_cloner_test.cpp index b09146240b3..25d5a2b2037 100644 --- a/src/mongo/db/repl/databases_cloner_test.cpp +++ b/src/mongo/db/repl/databases_cloner_test.cpp @@ -285,6 +285,15 @@ protected: return Status::OK(); }; + void startWithMockClient(DatabasesCloner& cloner) { + // The databases cloner uses DBDirectClient to call 'dbstats' for initial sync metrics. + cloner.setCreateClientFn_forTest([this]() { + return std::unique_ptr<DBClientConnection>( + new MockDBClientConnection(_mockServer.get())); + }); + ASSERT_OK(cloner.startup()); + } + void runCompleteClone(Responses responses) { Status result{Status::OK()}; bool done = false; @@ -306,6 +315,11 @@ protected: return getExecutor().scheduleWork(std::move(work)); }); + cloner.setCreateClientFn_forTest([this]() { + return std::unique_ptr<DBClientConnection>( + new MockDBClientConnection(_mockServer.get())); + }); + cloner.setStartCollectionClonerFn([this](CollectionCloner& cloner) { cloner.setCreateClientFn_forTest([&cloner, this]() { return std::unique_ptr<DBClientConnection>( @@ -688,7 +702,7 @@ TEST_F(DBsClonerTest, DatabasesClonerResetsOnFinishCallbackFunctionAfterCompleti result = status; }}; - ASSERT_OK(cloner.startup()); + startWithMockClient(cloner); ASSERT_TRUE(cloner.isActive()); sharedCallbackData.reset(); @@ -717,7 +731,7 @@ TEST_F(DBsClonerTest, FailsOnListCollectionsOnOnlyDatabase) { result = status; }}; - ASSERT_OK(cloner.startup()); + startWithMockClient(cloner); ASSERT_TRUE(cloner.isActive()); auto net = getNet(); @@ -747,7 +761,7 @@ TEST_F(DBsClonerTest, FailsOnListCollectionsOnFirstOfTwoDatabases) { result = status; }}; - ASSERT_OK(cloner.startup()); + startWithMockClient(cloner); ASSERT_TRUE(cloner.isActive()); auto net = getNet(); @@ -805,7 +819,7 @@ TEST_F(DBsClonerTest, FailingToScheduleSecondDatabaseClonerShouldCancelTheCloner result = status; }}; - ASSERT_OK(cloner.startup()); + startWithMockClient(cloner); ASSERT_TRUE(cloner.isActive()); auto net = getNet(); @@ -850,7 +864,7 @@ TEST_F(DBsClonerTest, DatabaseClonerChecksAdminDbUsingStorageInterfaceAfterCopyi result = status; }}; - ASSERT_OK(cloner.startup()); + startWithMockClient(cloner); ASSERT_TRUE(cloner.isActive()); auto net = getNet(); @@ -891,7 +905,7 @@ TEST_F(DBsClonerTest, AdminDbValidationErrorShouldAbortTheCloner) { result = status; }}; - ASSERT_OK(cloner.startup()); + startWithMockClient(cloner); ASSERT_TRUE(cloner.isActive()); auto net = getNet(); diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index 589736ec2db..b8b7b2ecab0 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -340,6 +340,11 @@ InitialSyncer::State InitialSyncer::getState_forTest() const { return _state; } +void InitialSyncer::setDbsClonerCreateClientFn_forTest(const CreateClientFn& createClientFn) { + stdx::lock_guard<Latch> lk(_mutex); + _dbsClonerCreateClientFn = createClientFn; +} + Date_t InitialSyncer::getWallClockTime_forTest() const { stdx::lock_guard<Latch> lk(_mutex); return _lastApplied.wallTime; @@ -388,6 +393,29 @@ void InitialSyncer::_appendInitialSyncProgressMinimal_inlock(BSONObjBuilder* bob if (!_initialSyncState) { return; } + if (_initialSyncState && _initialSyncState->dbsCloner) { + const auto dbsClonerStats = _initialSyncState->dbsCloner->getStats(); + const auto approxTotalDataSize = dbsClonerStats.dataSize; + bob->appendNumber("approxTotalDataSize", approxTotalDataSize); + long long approxTotalBytesCopied = 0; + for (auto dbClonerStats : dbsClonerStats.databaseStats) { + for (auto collClonerStats : dbClonerStats.collectionStats) { + approxTotalBytesCopied += collClonerStats.approxBytesCopied; + } + } + bob->appendNumber("approxTotalBytesCopied", approxTotalBytesCopied); + if (approxTotalBytesCopied > 0) { + const auto statsObj = bob->asTempObj(); + auto totalInitialSyncElapsedMillis = + statsObj.getField("totalInitialSyncElapsedMillis").safeNumberLong(); + const auto downloadRate = + (double)totalInitialSyncElapsedMillis / (double)approxTotalBytesCopied; + const auto remainingInitialSyncEstimatedMillis = + downloadRate * (double)(approxTotalDataSize - approxTotalBytesCopied); + bob->appendNumber("remainingInitialSyncEstimatedMillis", + (long long)remainingInitialSyncEstimatedMillis); + } + } bob->appendNumber("fetchedMissingDocs", _initialSyncState->fetchedMissingDocs); bob->appendNumber("appliedOps", _initialSyncState->appliedOps); if (!_initialSyncState->beginApplyingTimestamp.isNull()) { @@ -1061,6 +1089,10 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse> // facilitate testing. _initialSyncState->dbsCloner->setScheduleDbWorkFn_forTest(_scheduleDbWorkFn); } + if (_dbsClonerCreateClientFn) { + // Used for testing to inject a different client class to be used in the DatabasesCloner. + _initialSyncState->dbsCloner->setCreateClientFn_forTest(_dbsClonerCreateClientFn); + } if (_startCollectionClonerFn) { _initialSyncState->dbsCloner->setStartCollectionClonerFn(_startCollectionClonerFn); } @@ -1854,12 +1886,14 @@ void InitialSyncer::Stats::append(BSONObjBuilder* builder) const { static_cast<long long>(maxFailedInitialSyncAttempts)); if (initialSyncStart != Date_t()) { builder->appendDate("initialSyncStart", initialSyncStart); + auto elapsedDurationEnd = Date_t::now(); if (initialSyncEnd != Date_t()) { builder->appendDate("initialSyncEnd", initialSyncEnd); - auto elapsed = initialSyncEnd - initialSyncStart; - long long elapsedMillis = duration_cast<Milliseconds>(elapsed).count(); - builder->appendNumber("initialSyncElapsedMillis", elapsedMillis); + elapsedDurationEnd = initialSyncEnd; } + long long elapsedMillis = + duration_cast<Milliseconds>(elapsedDurationEnd - initialSyncStart).count(); + builder->appendNumber("totalInitialSyncElapsedMillis", elapsedMillis); } BSONArrayBuilder arrBuilder(builder->subarrayStart("initialSyncAttempts")); for (unsigned int i = 0; i < initialSyncAttemptInfos.size(); ++i) { diff --git a/src/mongo/db/repl/initial_syncer.h b/src/mongo/db/repl/initial_syncer.h index d8ba0b9474c..4a34795fbc9 100644 --- a/src/mongo/db/repl/initial_syncer.h +++ b/src/mongo/db/repl/initial_syncer.h @@ -154,6 +154,13 @@ public: using StartCollectionClonerFn = DatabaseCloner::StartCollectionClonerFn; + /** + * Type of function to create a database client + * + * Used for testing only. + */ + using CreateClientFn = stdx::function<std::unique_ptr<DBClientConnection>()>; + struct InitialSyncAttemptInfo { int durationMillis; Status status; @@ -230,6 +237,13 @@ public: */ void setStartCollectionClonerFn(const StartCollectionClonerFn& startCollectionCloner); + /** + * Allows a different client class to be injected for the databases cloner. + * + * For testing only. + */ + void setDbsClonerCreateClientFn_forTest(const CreateClientFn& createClientFn); + // State transitions: // PreStart --> Running --> ShuttingDown --> Complete // It is possible to skip intermediate states. For example, calling shutdown() when the data @@ -674,6 +688,10 @@ private: // Current initial syncer state. See comments for State enum class for details. State _state = State::kPreStart; // (M) + // Passed to DatabasesCloner. + CreateClientFn _dbsClonerCreateClientFn; // (M) + + // Passed to CollectionCloner via DatabasesCloner. DatabaseCloner::ScheduleDbWorkFn _scheduleDbWorkFn; // (M) StartCollectionClonerFn _startCollectionClonerFn; // (M) diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp index 2c32e736d88..7cf799c5bb0 100644 --- a/src/mongo/db/repl/initial_syncer_test.cpp +++ b/src/mongo/db/repl/initial_syncer_test.cpp @@ -407,6 +407,10 @@ protected: [this](executor::TaskExecutor::CallbackFn work) { return getExecutor().scheduleWork(std::move(work)); }); + _initialSyncer->setDbsClonerCreateClientFn_forTest([this]() { + return std::unique_ptr<DBClientConnection>( + new MockDBClientConnection(_mockServer.get())); + }); _initialSyncer->setStartCollectionClonerFn([this](CollectionCloner& cloner) { cloner.setCreateClientFn_forTest([&cloner, this]() { return std::unique_ptr<DBClientConnection>( @@ -4485,15 +4489,20 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) { auto progress = initialSyncer->getInitialSyncProgress(); log() << "Progress after first failed response: " << progress; - ASSERT_EQUALS(progress.nFields(), 8) << progress; + ASSERT_EQUALS(progress.nFields(), 11) << progress; + ASSERT_FALSE(progress.hasField("remainingInitialSyncEstimatedMillis")); + ASSERT_FALSE(progress.hasField("InitialSyncEnd")); ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 0) << progress; ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), 2) << progress; + ASSERT_EQUALS(progress.getIntField("approxTotalDataSize"), 0) << progress; + ASSERT_EQUALS(progress.getIntField("approxTotalBytesCopied"), 0) << progress; ASSERT_EQUALS(progress["initialSyncStart"].type(), Date) << progress; ASSERT_EQUALS(progress["initialSyncOplogStart"].timestamp(), Timestamp(1, 1)) << progress; ASSERT_BSONOBJ_EQ(progress.getObjectField("initialSyncAttempts"), BSONObj()); ASSERT_EQUALS(progress.getIntField("fetchedMissingDocs"), 0) << progress; ASSERT_EQUALS(progress.getIntField("appliedOps"), 0) << progress; - ASSERT_BSONOBJ_EQ(progress.getObjectField("databases"), BSON("databasesCloned" << 0)); + ASSERT_BSONOBJ_EQ(progress.getObjectField("databases"), + BSON("databasesToClone" << 0 << "databasesCloned" << 0)); // Play rest of the failed round of responses. { @@ -4544,14 +4553,19 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) { progress = initialSyncer->getInitialSyncProgress(); log() << "Progress after failure: " << progress; - ASSERT_EQUALS(progress.nFields(), 8) << progress; + ASSERT_EQUALS(progress.nFields(), 11) << progress; + ASSERT_FALSE(progress.hasField("remainingInitialSyncEstimatedMillis")); + ASSERT_FALSE(progress.hasField("InitialSyncEnd")); ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 1) << progress; ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), 2) << progress; + ASSERT_EQUALS(progress.getIntField("approxTotalDataSize"), 0) << progress; + ASSERT_EQUALS(progress.getIntField("approxTotalBytesCopied"), 0) << progress; ASSERT_EQUALS(progress["initialSyncStart"].type(), Date) << progress; ASSERT_EQUALS(progress["initialSyncOplogStart"].timestamp(), Timestamp(1, 1)) << progress; ASSERT_EQUALS(progress.getIntField("fetchedMissingDocs"), 0) << progress; ASSERT_EQUALS(progress.getIntField("appliedOps"), 0) << progress; - ASSERT_BSONOBJ_EQ(progress.getObjectField("databases"), BSON("databasesCloned" << 0)); + ASSERT_BSONOBJ_EQ(progress.getObjectField("databases"), + BSON("databasesToClone" << 0 << "databasesCloned" << 0)); BSONObj attempts = progress["initialSyncAttempts"].Obj(); ASSERT_EQUALS(attempts.nFields(), 1) << attempts; @@ -4565,15 +4579,23 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) { ASSERT_EQUALS(attempt0.getStringField("syncSource"), std::string("localhost:27017")) << attempt0; + const auto bytesToCopy = 10; + const auto avgObjSize = 2; // Play all but last of the successful round of responses. { executor::NetworkInterfaceMock::InNetworkGuard guard(net); + // Advance the clock by 10 seconds + net->advanceTime(net->now() + Seconds(10)); + // listDatabases NamespaceString nss("a.a"); auto request = net->scheduleSuccessfulResponse(makeListDatabasesResponse({nss.db().toString()})); assertRemoteCommandNameEquals("listDatabases", request); + + // The databases cloner calls 'dbStats' to record initial sync progress metrics. + _mockServer->setCommandReply("dbStats", BSON("dataSize" << 10)); net->runReadyNetworkOperations(); // Ignore oplog tailing query. @@ -4606,6 +4628,10 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) { assertRemoteCommandNameEquals("getMore", request); net->blackHole(noi); + // The collection cloner calls 'collStats' to record initial sync progress metrics. + _mockServer->setCommandReply("collStats", + BSON("size" << bytesToCopy << "avgObjSize" << avgObjSize)); + // count:a request = net->scheduleSuccessfulResponse(BSON("n" << 5 << "ok" << 1)); assertRemoteCommandNameEquals("count", request); @@ -4632,17 +4658,22 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) { progress = initialSyncer->getInitialSyncProgress(); log() << "Progress after all but last successful response: " << progress; - ASSERT_EQUALS(progress.nFields(), 9) << progress; + ASSERT_EQUALS(progress.nFields(), 13) << progress; ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 1) << progress; ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), 2) << progress; + ASSERT_GREATER_THAN(progress["totalInitialSyncElapsedMillis"].safeNumberLong(), 0) << progress; + ASSERT_EQUALS(progress.getIntField("approxTotalDataSize"), 10) << progress; + ASSERT_EQUALS(progress.getIntField("approxTotalBytesCopied"), 10) << progress; ASSERT_EQUALS(progress["initialSyncOplogStart"].timestamp(), Timestamp(1, 1)) << progress; ASSERT_EQUALS(progress["initialSyncOplogEnd"].timestamp(), Timestamp(7, 1)) << progress; ASSERT_EQUALS(progress["initialSyncStart"].type(), Date) << progress; + ASSERT_EQUALS(progress.getIntField("remainingInitialSyncEstimatedMillis"), 0) << progress; ASSERT_EQUALS(progress.getIntField("fetchedMissingDocs"), 0) << progress; // Expected applied ops to be a superset of this range: Timestamp(2,1) ... Timestamp(7,1). ASSERT_GREATER_THAN_OR_EQUALS(progress.getIntField("appliedOps"), 6) << progress; auto databasesProgress = progress.getObjectField("databases"); ASSERT_EQUALS(1, databasesProgress.getIntField("databasesCloned")) << databasesProgress; + ASSERT_EQUALS(0, databasesProgress.getIntField("databasesToClone")) << databasesProgress; auto dbProgress = databasesProgress.getObjectField("a"); ASSERT_EQUALS(1, dbProgress.getIntField("collections")) << dbProgress; ASSERT_EQUALS(1, dbProgress.getIntField("clonedCollections")) << dbProgress; @@ -4655,6 +4686,8 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) { << collectionProgress; ASSERT_EQUALS(1, collectionProgress.getIntField("indexes")) << collectionProgress; ASSERT_EQUALS(5, collectionProgress.getIntField("receivedBatches")) << collectionProgress; + ASSERT_EQUALS(bytesToCopy, collectionProgress.getIntField("bytesToCopy")) << collectionProgress; + ASSERT_EQUALS(10, collectionProgress.getIntField("approxBytesCopied")) << collectionProgress; attempts = progress["initialSyncAttempts"].Obj(); ASSERT_EQUALS(attempts.nFields(), 1) << progress; @@ -4694,14 +4727,23 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) { progress = initialSyncer->getInitialSyncProgress(); log() << "Progress at end: " << progress; - ASSERT_EQUALS(progress.nFields(), 11) << progress; + ASSERT_EQUALS(progress.nFields(), 14) << progress; ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 1) << progress; ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), 2) << progress; ASSERT_EQUALS(progress["initialSyncStart"].type(), Date) << progress; ASSERT_EQUALS(progress["initialSyncEnd"].type(), Date) << progress; ASSERT_EQUALS(progress["initialSyncOplogStart"].timestamp(), Timestamp(1, 1)) << progress; ASSERT_EQUALS(progress["initialSyncOplogEnd"].timestamp(), Timestamp(7, 1)) << progress; - ASSERT_EQUALS(progress["initialSyncElapsedMillis"].type(), NumberInt) << progress; + const auto initialSyncEnd = progress["initialSyncEnd"].Date(); + // We should have elapsed 10 secs (from advancing the clock) + 1ms (initialSyncRetry wait time). + ASSERT_EQUALS(progress.getIntField("totalInitialSyncElapsedMillis"), 10001) << progress; + const auto prevElapsedMillis = progress["totalInitialSyncElapsedMillis"].safeNumberLong(); + ASSERT_EQUALS(progress["initialSyncEnd"].Date() - progress["initialSyncStart"].Date(), + Milliseconds{10001}) + << progress; + ASSERT_EQUALS(progress.getIntField("remainingInitialSyncEstimatedMillis"), 0) << progress; + ASSERT_EQUALS(progress.getIntField("approxTotalDataSize"), 10) << progress; + ASSERT_EQUALS(progress.getIntField("approxTotalBytesCopied"), 10) << progress; ASSERT_EQUALS(progress.getIntField("fetchedMissingDocs"), 0) << progress; // Expected applied ops to be a superset of this range: Timestamp(2,1) ... Timestamp(7,1). ASSERT_GREATER_THAN_OR_EQUALS(progress.getIntField("appliedOps"), 6) << progress; @@ -4725,6 +4767,18 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) { ASSERT_EQUALS(attempt1["durationMillis"].type(), NumberInt) << attempt1; ASSERT_EQUALS(attempt1.getStringField("syncSource"), std::string("localhost:27017")) << attempt1; + + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Advance the clock by 100 seconds + net->advanceTime(net->now() + Seconds(100)); + } + + // Check the initial sync progress again to make sure the duration timer has stopped on finish. + progress = initialSyncer->getInitialSyncProgress(); + ASSERT_EQUALS(progress["initialSyncEnd"].Date(), initialSyncEnd); + ASSERT_EQUALS(progress["totalInitialSyncElapsedMillis"].safeNumberLong(), prevElapsedMillis); } TEST_F(InitialSyncerTest, GetInitialSyncProgressOmitsClonerStatsIfClonerStatsExceedBsonLimit) { diff --git a/src/mongo/shell/db.js b/src/mongo/shell/db.js index 763f5fedcb7..0b92bb8934c 100644 --- a/src/mongo/shell/db.js +++ b/src/mongo/shell/db.js @@ -1139,6 +1139,14 @@ DB.prototype.printSecondaryReplicationInfo = function() { } } + function printNodeInitialSyncInfo(syncSourceString, remainingMillis) { + print("\tInitialSyncSyncSource: " + syncSourceString); + let minutes = Math.floor((remainingMillis / (1000 * 60)) % 60); + let hours = Math.floor(remainingMillis / (1000 * 60 * 60)); + print("\tInitialSyncRemainingEstimatedDuration: " + hours + " hour(s) " + minutes + + " minute(s)"); + } + var L = this.getSiblingDB("local"); if (L.system.replset.count() != 0) { @@ -1158,7 +1166,19 @@ DB.prototype.printSecondaryReplicationInfo = function() { } for (i in status.members) { - r(status.members[i]); + if (status.members[i].self && status.members[i].state === 5) { + print("source: " + status.members[i].name); + if (!status.initialSyncStatus) { + print("InitialSyncStatus information not found"); + continue; + } + // Print initial sync info if node is in the STARTUP2 state. + printNodeInitialSyncInfo( + status.members[i].syncSourceHost, + status.initialSyncStatus.remainingInitialSyncEstimatedMillis); + } else { + r(status.members[i]); + } } } }; |