summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Chan <jason.chan@10gen.com>2020-12-03 16:30:42 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-12-07 18:06:54 +0000
commit07430d1cb41b102670783afeb21361d61b564783 (patch)
tree1ada85335bde8e729492287c31ef38d9a3de80a2
parent9b518b1c13000e6c0568ee43d520b6bd712e820a (diff)
downloadmongo-07430d1cb41b102670783afeb21361d61b564783.tar.gz
SERVER-47863 Add fields to initial sync progress metrics
-rw-r--r--jstests/replsets/get_replication_info_helper.js38
-rw-r--r--jstests/replsets/initial_sync_replSetGetStatus.js108
-rw-r--r--src/mongo/db/repl/collection_cloner.cpp29
-rw-r--r--src/mongo/db/repl/collection_cloner.h3
-rw-r--r--src/mongo/db/repl/databases_cloner.cpp48
-rw-r--r--src/mongo/db/repl/databases_cloner.h19
-rw-r--r--src/mongo/db/repl/databases_cloner_test.cpp26
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp40
-rw-r--r--src/mongo/db/repl/initial_syncer.h18
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp68
-rw-r--r--src/mongo/shell/db.js22
11 files changed, 385 insertions, 34 deletions
diff --git a/jstests/replsets/get_replication_info_helper.js b/jstests/replsets/get_replication_info_helper.js
index 39355e2ceda..0558d8ee5ee 100644
--- a/jstests/replsets/get_replication_info_helper.js
+++ b/jstests/replsets/get_replication_info_helper.js
@@ -4,12 +4,46 @@
(function() {
"use strict";
var name = "getReplicationInfo";
-var replSet = new ReplSetTest({name: name, nodes: 3, oplogSize: 50});
-var nodes = replSet.nodeList();
+const replSet = new ReplSetTest({name: name, nodes: 2, oplogSize: 50});
+
replSet.startSet();
replSet.initiate();
var primary = replSet.getPrimary();
+
+// Test that db.printSlaveReplicationInfo() and db.printSecondaryReplicationInfo() both print
+// out initial sync info when called during an initial sync.
+const syncTarget = replSet.add({
+ rsConfig: {votes: 0, priority: 0},
+ setParameter: {
+ 'failpoint.forceSyncSourceCandidate':
+ tojson({mode: 'alwaysOn', data: {hostAndPort: primary.name}})
+ }
+});
+syncTarget.setSecondaryOk();
+assert.commandWorked(syncTarget.getDB('admin').runCommand(
+ {configureFailPoint: 'initialSyncHangBeforeFinish', mode: 'alwaysOn'}));
+replSet.reInitiate();
+
+// Wait for initial sync to pause before it copies the databases.
+checkLog.contains(syncTarget, 'initial sync - initialSyncHangBeforeFinish fail point enabled');
+const callPrintSecondaryReplInfo = startParallelShell(
+ "db.getSiblingDB('admin').printSecondaryReplicationInfo();", syncTarget.port);
+callPrintSecondaryReplInfo();
+assert(rawMongoProgramOutput().match("InitialSyncSyncSource: " + primary.name));
+assert(rawMongoProgramOutput().match("InitialSyncRemainingEstimatedDuration: "));
+clearRawMongoProgramOutput();
+
+const callPrintSlaveReplInfo =
+ startParallelShell("db.getSiblingDB('admin').printSlaveReplicationInfo();", syncTarget.port);
+callPrintSlaveReplInfo();
+assert(rawMongoProgramOutput().match("InitialSyncSyncSource: " + primary.name));
+assert(rawMongoProgramOutput().match("InitialSyncRemainingEstimatedDuration: "));
+clearRawMongoProgramOutput();
+assert.commandWorked(syncTarget.getDB('admin').runCommand(
+ {configureFailPoint: 'initialSyncHangBeforeFinish', mode: 'off'}));
+replSet.awaitSecondaryNodes();
+
for (var i = 0; i < 100; i++) {
primary.getDB('test').foo.insert({a: i});
}
diff --git a/jstests/replsets/initial_sync_replSetGetStatus.js b/jstests/replsets/initial_sync_replSetGetStatus.js
index 57289816d51..8b1d7364e9d 100644
--- a/jstests/replsets/initial_sync_replSetGetStatus.js
+++ b/jstests/replsets/initial_sync_replSetGetStatus.js
@@ -16,12 +16,24 @@ replSet.startSet();
replSet.initiate();
var primary = replSet.getPrimary();
+const barColl = primary.getDB('pretest').bar;
+assert.commandWorked(barColl.insert({a: 1}));
+assert.commandWorked(barColl.insert({a: 2}));
+assert.commandWorked(barColl.insert({a: 3}));
+
var coll = primary.getDB('test').foo;
assert.writeOK(coll.insert({a: 1}));
assert.writeOK(coll.insert({a: 2}));
// Add a secondary node but make it hang before copying databases.
-var secondary = replSet.add();
+let secondary = replSet.add({
+ rsConfig: {votes: 0, priority: 0},
+ setParameter: {
+ 'collectionClonerBatchSize': 2,
+ 'failpoint.initialSyncHangDuringCollectionClone':
+ tojson({mode: 'alwaysOn', data: {namespace: barColl.getFullName(), numDocsToClone: 2}})
+ }
+});
secondary.setSlaveOk();
assert.commandWorked(secondary.getDB('admin').runCommand(
@@ -53,23 +65,91 @@ assert.writeOK(coll.insert({a: 4}));
assert.commandWorked(secondary.getDB('admin').runCommand(
{configureFailPoint: 'initialSyncHangBeforeCopyingDatabases', mode: 'off'}));
+// Wait for initial sync to pause halfway through cloning the 'pretest.bar' collection.
+checkLog.contains(secondary,
+ "initial sync - initialSyncHangDuringCollectionClone fail point enabled.");
+
+const pretestDbRes = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1}));
+
+assert.gt(pretestDbRes.initialSyncStatus.totalInitialSyncElapsedMillis, 0);
+assert.gt(pretestDbRes.initialSyncStatus.remainingInitialSyncEstimatedMillis, 0);
+assert.gt(pretestDbRes.initialSyncStatus.approxTotalDataSize, 0);
+
+assert.eq(pretestDbRes.initialSyncStatus.databases.pretest.collections, 1);
+assert.eq(pretestDbRes.initialSyncStatus.databases.pretest.clonedCollections, 0);
+
+let barCollRes = pretestDbRes.initialSyncStatus.databases.pretest["pretest.bar"];
+assert.eq(barCollRes.documentsToCopy, 3);
+// Even though we set the collectionClonerBatchSize to 2, it is possible for a batch to actually
+// return only 1 document. This can lead to us hitting the failpoint in the next batch instead,
+// causing us to copy up to 3 documents.
+assert.lte(barCollRes.documentsCopied, 3);
+assert.gt(barCollRes.bytesToCopy, 0);
+assert.gt(barCollRes.approxBytesCopied, 0);
+assert.lte(barCollRes.approxBytesCopied, barCollRes.bytesToCopy);
+assert.lt(barCollRes.approxBytesCopied, pretestDbRes.initialSyncStatus.approxTotalDataSize);
+
+const bytesCopiedAdminDb =
+ pretestDbRes.initialSyncStatus.databases.admin["admin.system.version"].approxBytesCopied +
+ pretestDbRes.initialSyncStatus.databases.admin["admin.system.keys"].approxBytesCopied;
+assert.eq(pretestDbRes.initialSyncStatus.approxTotalBytesCopied,
+ bytesCopiedAdminDb + barCollRes.approxBytesCopied);
+assert.gt(pretestDbRes.initialSyncStatus.approxTotalBytesCopied, 0);
+
+// The server still has the 'pretest' and 'test' dbs to finish cloning.
+assert.eq(pretestDbRes.initialSyncStatus.databases.databasesCloned, 2);
+assert.eq(pretestDbRes.initialSyncStatus.databases.databasesToClone, 2);
+
+assert.commandWorked(secondary.adminCommand(
+ {configureFailPoint: 'initialSyncHangDuringCollectionClone', mode: "off"}));
+
// Wait for initial sync to pause right before it finishes.
checkLog.contains(secondary, 'initial sync - initialSyncHangBeforeFinish fail point enabled');
-res = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1}));
-assert(res.initialSyncStatus,
+const endOfCloningRes = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1}));
+assert(endOfCloningRes.initialSyncStatus,
() => "Response should have an 'initialSyncStatus' field: " + tojson(res));
-assert.eq(res.initialSyncStatus.fetchedMissingDocs, 0);
-assert.eq(res.initialSyncStatus.appliedOps, 3);
-assert.eq(res.initialSyncStatus.failedInitialSyncAttempts, 0);
-assert.eq(res.initialSyncStatus.maxFailedInitialSyncAttempts, 10);
-assert.eq(res.initialSyncStatus.databases.databasesCloned, 3);
-assert.eq(res.initialSyncStatus.databases.test.collections, 1);
-assert.eq(res.initialSyncStatus.databases.test.clonedCollections, 1);
-assert.eq(res.initialSyncStatus.databases.test["test.foo"].documentsToCopy, 4);
-assert.eq(res.initialSyncStatus.databases.test["test.foo"].documentsCopied, 4);
-assert.eq(res.initialSyncStatus.databases.test["test.foo"].indexes, 1);
-assert.eq(res.initialSyncStatus.databases.test["test.foo"].fetchedBatches, 1);
+
+// Assert metrics have progressed in the right direction since the last time we checked the metrics.
+assert.gt(endOfCloningRes.initialSyncStatus.totalInitialSyncElapsedMillis,
+ pretestDbRes.initialSyncStatus.totalInitialSyncElapsedMillis);
+assert.lt(endOfCloningRes.initialSyncStatus.remainingInitialSyncEstimatedMillis,
+ pretestDbRes.initialSyncStatus.remainingInitialSyncEstimatedMillis);
+assert.gt(endOfCloningRes.initialSyncStatus.approxTotalBytesCopied,
+ pretestDbRes.initialSyncStatus.approxTotalBytesCopied);
+assert.eq(endOfCloningRes.initialSyncStatus.approxTotalDataSize,
+ pretestDbRes.initialSyncStatus.approxTotalDataSize);
+
+assert.eq(endOfCloningRes.initialSyncStatus.failedInitialSyncAttempts, 0);
+assert.eq(endOfCloningRes.initialSyncStatus.maxFailedInitialSyncAttempts, 10);
+
+assert.eq(endOfCloningRes.initialSyncStatus.databases.databasesCloned, 4);
+assert.eq(endOfCloningRes.initialSyncStatus.databases.databasesToClone, 0);
+
+assert.eq(endOfCloningRes.initialSyncStatus.databases.pretest.collections, 1);
+assert.eq(endOfCloningRes.initialSyncStatus.databases.pretest.clonedCollections, 1);
+barCollRes = endOfCloningRes.initialSyncStatus.databases.pretest["pretest.bar"];
+assert.eq(barCollRes.documentsToCopy, 3);
+assert.eq(barCollRes.documentsCopied, 3);
+assert.eq(barCollRes.indexes, 1);
+assert.lte(barCollRes.fetchedBatches, 2);
+assert.gt(barCollRes.bytesToCopy, 0);
+assert.eq(barCollRes.approxBytesCopied, barCollRes.bytesToCopy);
+
+let fooCollRes = endOfCloningRes.initialSyncStatus.databases.test["test.foo"];
+assert.eq(endOfCloningRes.initialSyncStatus.databases.test.collections, 1);
+assert.eq(endOfCloningRes.initialSyncStatus.databases.test.clonedCollections, 1);
+assert.eq(fooCollRes.documentsToCopy, 4);
+assert.eq(fooCollRes.documentsCopied, 4);
+assert.eq(fooCollRes.indexes, 1);
+assert.lte(fooCollRes.fetchedBatches, 2);
+assert.gt(fooCollRes.bytesToCopy, 0);
+assert.eq(fooCollRes.approxBytesCopied, fooCollRes.bytesToCopy);
+
+assert.eq(endOfCloningRes.initialSyncStatus.approxTotalDataSize,
+ endOfCloningRes.initialSyncStatus.approxTotalBytesCopied);
+assert.eq(endOfCloningRes.initialSyncStatus.approxTotalBytesCopied,
+ fooCollRes.approxBytesCopied + barCollRes.approxBytesCopied + bytesCopiedAdminDb);
// Let initial sync finish and get into secondary state.
assert.commandWorked(secondary.getDB('admin').runCommand(
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp
index 80efca4571e..09d405c72d9 100644
--- a/src/mongo/db/repl/collection_cloner.cpp
+++ b/src/mongo/db/repl/collection_cloner.cpp
@@ -544,6 +544,30 @@ void CollectionCloner::_runQuery(const executor::TaskExecutor::CallbackArgs& cal
}
try {
+ BSONObj res;
+ _clientConnection->runCommand(
+ _sourceNss.db().toString(), BSON("collStats" << _sourceNss.coll().toString()), res);
+ if (auto status = getStatusFromCommandResult(res); status.isOK()) {
+ stdx::lock_guard<Latch> lock(_mutex);
+ _stats.bytesToCopy = res.getField("size").safeNumberLong();
+ if (_stats.bytesToCopy > 0) {
+ // The 'avgObjSize' parameter is only available if 'collStats' returns a 'size'
+ // field greater than zero.
+ _stats.avgObjSize = res.getField("avgObjSize").safeNumberLong();
+ }
+ } else {
+ LOG(1) << "Skipping the recording of some initial sync metrics due to a bad status "
+ "response from calling 'collStats' on source '"
+ << _source.toString() << "': " << status;
+ }
+ } catch (const DBException& e) {
+ LOG(1)
+ << "Skipping the recording of some initial sync metrics due to an error when calling "
+ "'collStats' on source '"
+ << _source.toString() << "': " << e.toStatus();
+ }
+
+ try {
_clientConnection->query(
[this, onCompletionGuard](DBClientCursorBatchIterator& iter) {
_handleNextBatch(onCompletionGuard, iter);
@@ -700,6 +724,7 @@ void CollectionCloner::_insertDocumentsCallback(
}
_documentsToInsert.swap(docs);
_stats.documentsCopied += docs.size();
+ _stats.approxBytesCopied = ((long)_stats.documentsCopied) * _stats.avgObjSize;
++_stats.fetchedBatches;
_progressMeter.hit(int(docs.size()));
invariant(_collLoader);
@@ -789,6 +814,10 @@ void CollectionCloner::Stats::append(BSONObjBuilder* builder) const {
builder->appendNumber(kDocumentsCopiedFieldName, documentsCopied);
builder->appendNumber("indexes", indexes);
builder->appendNumber("fetchedBatches", fetchedBatches);
+ builder->appendNumber("bytesToCopy", bytesToCopy);
+ if (bytesToCopy) {
+ builder->appendNumber("approxBytesCopied", approxBytesCopied);
+ }
if (start != Date_t()) {
builder->appendDate("start", start);
if (end != Date_t()) {
diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h
index 1eb92679c9e..8c3769797e2 100644
--- a/src/mongo/db/repl/collection_cloner.h
+++ b/src/mongo/db/repl/collection_cloner.h
@@ -82,6 +82,9 @@ public:
size_t indexes{0};
size_t fetchedBatches{0}; // This is actually inserted batches.
size_t receivedBatches{0};
+ long long bytesToCopy{0};
+ long long avgObjSize{0};
+ long long approxBytesCopied{0};
std::string toString() const;
BSONObj toBSON() const;
diff --git a/src/mongo/db/repl/databases_cloner.cpp b/src/mongo/db/repl/databases_cloner.cpp
index 1a0857ce6b7..5737ef7e2a6 100644
--- a/src/mongo/db/repl/databases_cloner.cpp
+++ b/src/mongo/db/repl/databases_cloner.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/client.h"
#include "mongo/db/repl/databases_cloner_gen.h"
+#include "mongo/db/repl/oplogreader.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/stdx/functional.h"
@@ -74,7 +75,8 @@ DatabasesCloner::DatabasesCloner(StorageInterface* si,
_source(source),
_includeDbFn(includeDbPred),
_finishFn(finishFn),
- _storage(si) {
+ _storage(si),
+ _createClientFn([] { return stdx::make_unique<DBClientConnection>(); }) {
uassert(ErrorCodes::InvalidOptions, "storage interface must be provided.", si);
uassert(ErrorCodes::InvalidOptions, "executor must be provided.", exec);
uassert(
@@ -93,6 +95,7 @@ std::string DatabasesCloner::toString() const {
return str::stream() << "initial sync --"
<< " active:" << _isActive_inlock() << " status:" << _status.toString()
<< " source:" << _source.toString()
+ << " db cloners remaining:" << _stats.databasesToClone
<< " db cloners completed:" << _stats.databasesCloned
<< " db count:" << _databaseCloners.size();
}
@@ -170,6 +173,7 @@ BSONObj DatabasesCloner::Stats::toBSON() const {
}
void DatabasesCloner::Stats::append(BSONObjBuilder* builder) const {
+ builder->appendNumber("databasesToClone", databasesToClone);
builder->appendNumber("databasesCloned", databasesCloned);
for (auto&& db : databaseStats) {
BSONObjBuilder dbBuilder(builder->subobjStart(db.dbname));
@@ -227,6 +231,11 @@ void DatabasesCloner::setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& work)
_scheduleDbWorkFn = work;
}
+void DatabasesCloner::setCreateClientFn_forTest(const CreateClientFn& createClientFn) {
+ stdx::lock_guard<Latch> lk(_mutex);
+ _createClientFn = createClientFn;
+}
+
void DatabasesCloner::setStartCollectionClonerFn(
const StartCollectionClonerFn& startCollectionCloner) {
LockGuard lk(_mutex);
@@ -303,6 +312,10 @@ void DatabasesCloner::_onListDatabaseFinish(
// initial sync is occurring.
_setAdminAsFirst(dbsArray);
+ // Create a client connection to call 'dbStats' to gather initial sync metrics.
+ auto clientConnection = _createClientFn();
+ const auto clientConnectionStatus = clientConnection->connect(_source, StringData());
+
for (BSONElement arrayElement : dbsArray) {
const BSONObj dbBSON = arrayElement.Obj();
@@ -384,6 +397,37 @@ void DatabasesCloner::_onListDatabaseFinish(
// add cloner to list.
_databaseCloners.push_back(dbCloner);
+
+ try {
+ if (!replAuthenticate(clientConnection.get())) {
+ LOG(1) << "Skipping the recording of initial sync data size metrics for the '"
+ << dbName << "' database as we failed to authenticate to " << _source;
+ continue;
+ }
+ auto status = clientConnectionStatus;
+ if (status.isOK()) {
+ BSONObj res;
+ clientConnection->runCommand(dbName, BSON("dbStats" << 1), res);
+ auto respStatus = getStatusFromCommandResult(res);
+ if (respStatus.isOK()) {
+ _stats.dataSize += res.getField("dataSize").safeNumberLong();
+ continue;
+ }
+ // 'dbStats' returned a bad status.
+ status = respStatus;
+ }
+ // It is possible for the call to 'dbStats' to fail if the sync source contains
+ // invalid views. We should not fail initial sync in this case due to the
+ // situation where the replica set may have lost majority availability and
+ // therefore have no access to a primary to fix the view definitions. Instead,
+ // we simply skip recording the data size metrics.
+ LOG(1) << "Skipping the recording of initial sync data size metrics for the '" << dbName
+ << "' database due to bad status response to 'dbStats' command: " << status;
+ } catch (const DBException& e) {
+ LOG(1) << "Skipping the recording of initial sync data size metrics for the '" << dbName
+ << "' database due to an error when calling the 'dbStats' command: "
+ << e.toStatus();
+ }
}
if (_databaseCloners.size() == 0) {
if (_status.isOK()) {
@@ -392,6 +436,7 @@ void DatabasesCloner::_onListDatabaseFinish(
_fail_inlock(&lk, _status);
}
}
+ _stats.databasesToClone = _databaseCloners.size();
}
std::vector<std::shared_ptr<DatabaseCloner>> DatabasesCloner::_getDatabaseCloners() const {
@@ -435,6 +480,7 @@ void DatabasesCloner::_onEachDBCloneFinish(const Status& status, const std::stri
}
_stats.databasesCloned++;
+ _stats.databasesToClone--;
if (_stats.databasesCloned == _databaseCloners.size()) {
_succeed_inlock(&lk);
diff --git a/src/mongo/db/repl/databases_cloner.h b/src/mongo/db/repl/databases_cloner.h
index db6f1129edf..f0d85907386 100644
--- a/src/mongo/db/repl/databases_cloner.h
+++ b/src/mongo/db/repl/databases_cloner.h
@@ -57,6 +57,8 @@ class DatabasesCloner {
public:
struct Stats {
size_t databasesCloned{0};
+ size_t databasesToClone{0};
+ long long dataSize{0};
std::vector<DatabaseCloner::Stats> databaseStats;
std::string toString() const;
@@ -70,6 +72,13 @@ public:
using ScheduleDbWorkFn = stdx::function<StatusWith<executor::TaskExecutor::CallbackHandle>(
executor::TaskExecutor::CallbackFn)>;
+ /**
+ * Type of function to create a database client
+ *
+ * Used for testing only.
+ */
+ using CreateClientFn = stdx::function<std::unique_ptr<DBClientConnection>()>;
+
DatabasesCloner(StorageInterface* si,
executor::TaskExecutor* exec,
ThreadPool* dbWorkThreadPool,
@@ -119,6 +128,13 @@ public:
*/
StatusWith<std::vector<BSONElement>> parseListDatabasesResponse_forTest(BSONObj dbResponse);
+ /**
+ * Allows a different client class to be injected.
+ *
+ * For testing only.
+ */
+ void setCreateClientFn_forTest(const CreateClientFn& createClientFn);
+
private:
bool _isActive_inlock() const;
@@ -174,6 +190,7 @@ private:
// (R) Read-only in concurrent operation; no synchronization required.
// (M) Reads and writes guarded by _mutex
// (S) Self-synchronizing; access in any way from any context.
+ // (RT) Read-only in concurrent operation; synchronized externally by tests
//
mutable Mutex _mutex = MONGO_MAKE_LATCH("DatabasesCloner::_mutex"); // (S)
Status _status{ErrorCodes::NotYetInitialized, ""}; // (M) If it is not OK, we stop everything.
@@ -191,6 +208,8 @@ private:
std::vector<std::shared_ptr<DatabaseCloner>> _databaseCloners; // (M) database cloners by name
Stats _stats; // (M)
+ CreateClientFn _createClientFn; // (RT) Function for creating a database client.
+
// State transitions:
// PreStart --> Running --> ShuttingDown --> Complete
// It is possible to skip intermediate states. For example,
diff --git a/src/mongo/db/repl/databases_cloner_test.cpp b/src/mongo/db/repl/databases_cloner_test.cpp
index b09146240b3..25d5a2b2037 100644
--- a/src/mongo/db/repl/databases_cloner_test.cpp
+++ b/src/mongo/db/repl/databases_cloner_test.cpp
@@ -285,6 +285,15 @@ protected:
return Status::OK();
};
+ void startWithMockClient(DatabasesCloner& cloner) {
+ // The databases cloner uses DBDirectClient to call 'dbstats' for initial sync metrics.
+ cloner.setCreateClientFn_forTest([this]() {
+ return std::unique_ptr<DBClientConnection>(
+ new MockDBClientConnection(_mockServer.get()));
+ });
+ ASSERT_OK(cloner.startup());
+ }
+
void runCompleteClone(Responses responses) {
Status result{Status::OK()};
bool done = false;
@@ -306,6 +315,11 @@ protected:
return getExecutor().scheduleWork(std::move(work));
});
+ cloner.setCreateClientFn_forTest([this]() {
+ return std::unique_ptr<DBClientConnection>(
+ new MockDBClientConnection(_mockServer.get()));
+ });
+
cloner.setStartCollectionClonerFn([this](CollectionCloner& cloner) {
cloner.setCreateClientFn_forTest([&cloner, this]() {
return std::unique_ptr<DBClientConnection>(
@@ -688,7 +702,7 @@ TEST_F(DBsClonerTest, DatabasesClonerResetsOnFinishCallbackFunctionAfterCompleti
result = status;
}};
- ASSERT_OK(cloner.startup());
+ startWithMockClient(cloner);
ASSERT_TRUE(cloner.isActive());
sharedCallbackData.reset();
@@ -717,7 +731,7 @@ TEST_F(DBsClonerTest, FailsOnListCollectionsOnOnlyDatabase) {
result = status;
}};
- ASSERT_OK(cloner.startup());
+ startWithMockClient(cloner);
ASSERT_TRUE(cloner.isActive());
auto net = getNet();
@@ -747,7 +761,7 @@ TEST_F(DBsClonerTest, FailsOnListCollectionsOnFirstOfTwoDatabases) {
result = status;
}};
- ASSERT_OK(cloner.startup());
+ startWithMockClient(cloner);
ASSERT_TRUE(cloner.isActive());
auto net = getNet();
@@ -805,7 +819,7 @@ TEST_F(DBsClonerTest, FailingToScheduleSecondDatabaseClonerShouldCancelTheCloner
result = status;
}};
- ASSERT_OK(cloner.startup());
+ startWithMockClient(cloner);
ASSERT_TRUE(cloner.isActive());
auto net = getNet();
@@ -850,7 +864,7 @@ TEST_F(DBsClonerTest, DatabaseClonerChecksAdminDbUsingStorageInterfaceAfterCopyi
result = status;
}};
- ASSERT_OK(cloner.startup());
+ startWithMockClient(cloner);
ASSERT_TRUE(cloner.isActive());
auto net = getNet();
@@ -891,7 +905,7 @@ TEST_F(DBsClonerTest, AdminDbValidationErrorShouldAbortTheCloner) {
result = status;
}};
- ASSERT_OK(cloner.startup());
+ startWithMockClient(cloner);
ASSERT_TRUE(cloner.isActive());
auto net = getNet();
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index 589736ec2db..b8b7b2ecab0 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -340,6 +340,11 @@ InitialSyncer::State InitialSyncer::getState_forTest() const {
return _state;
}
+void InitialSyncer::setDbsClonerCreateClientFn_forTest(const CreateClientFn& createClientFn) {
+ stdx::lock_guard<Latch> lk(_mutex);
+ _dbsClonerCreateClientFn = createClientFn;
+}
+
Date_t InitialSyncer::getWallClockTime_forTest() const {
stdx::lock_guard<Latch> lk(_mutex);
return _lastApplied.wallTime;
@@ -388,6 +393,29 @@ void InitialSyncer::_appendInitialSyncProgressMinimal_inlock(BSONObjBuilder* bob
if (!_initialSyncState) {
return;
}
+ if (_initialSyncState && _initialSyncState->dbsCloner) {
+ const auto dbsClonerStats = _initialSyncState->dbsCloner->getStats();
+ const auto approxTotalDataSize = dbsClonerStats.dataSize;
+ bob->appendNumber("approxTotalDataSize", approxTotalDataSize);
+ long long approxTotalBytesCopied = 0;
+ for (auto dbClonerStats : dbsClonerStats.databaseStats) {
+ for (auto collClonerStats : dbClonerStats.collectionStats) {
+ approxTotalBytesCopied += collClonerStats.approxBytesCopied;
+ }
+ }
+ bob->appendNumber("approxTotalBytesCopied", approxTotalBytesCopied);
+ if (approxTotalBytesCopied > 0) {
+ const auto statsObj = bob->asTempObj();
+ auto totalInitialSyncElapsedMillis =
+ statsObj.getField("totalInitialSyncElapsedMillis").safeNumberLong();
+ const auto downloadRate =
+ (double)totalInitialSyncElapsedMillis / (double)approxTotalBytesCopied;
+ const auto remainingInitialSyncEstimatedMillis =
+ downloadRate * (double)(approxTotalDataSize - approxTotalBytesCopied);
+ bob->appendNumber("remainingInitialSyncEstimatedMillis",
+ (long long)remainingInitialSyncEstimatedMillis);
+ }
+ }
bob->appendNumber("fetchedMissingDocs", _initialSyncState->fetchedMissingDocs);
bob->appendNumber("appliedOps", _initialSyncState->appliedOps);
if (!_initialSyncState->beginApplyingTimestamp.isNull()) {
@@ -1061,6 +1089,10 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse>
// facilitate testing.
_initialSyncState->dbsCloner->setScheduleDbWorkFn_forTest(_scheduleDbWorkFn);
}
+ if (_dbsClonerCreateClientFn) {
+ // Used for testing to inject a different client class to be used in the DatabasesCloner.
+ _initialSyncState->dbsCloner->setCreateClientFn_forTest(_dbsClonerCreateClientFn);
+ }
if (_startCollectionClonerFn) {
_initialSyncState->dbsCloner->setStartCollectionClonerFn(_startCollectionClonerFn);
}
@@ -1854,12 +1886,14 @@ void InitialSyncer::Stats::append(BSONObjBuilder* builder) const {
static_cast<long long>(maxFailedInitialSyncAttempts));
if (initialSyncStart != Date_t()) {
builder->appendDate("initialSyncStart", initialSyncStart);
+ auto elapsedDurationEnd = Date_t::now();
if (initialSyncEnd != Date_t()) {
builder->appendDate("initialSyncEnd", initialSyncEnd);
- auto elapsed = initialSyncEnd - initialSyncStart;
- long long elapsedMillis = duration_cast<Milliseconds>(elapsed).count();
- builder->appendNumber("initialSyncElapsedMillis", elapsedMillis);
+ elapsedDurationEnd = initialSyncEnd;
}
+ long long elapsedMillis =
+ duration_cast<Milliseconds>(elapsedDurationEnd - initialSyncStart).count();
+ builder->appendNumber("totalInitialSyncElapsedMillis", elapsedMillis);
}
BSONArrayBuilder arrBuilder(builder->subarrayStart("initialSyncAttempts"));
for (unsigned int i = 0; i < initialSyncAttemptInfos.size(); ++i) {
diff --git a/src/mongo/db/repl/initial_syncer.h b/src/mongo/db/repl/initial_syncer.h
index d8ba0b9474c..4a34795fbc9 100644
--- a/src/mongo/db/repl/initial_syncer.h
+++ b/src/mongo/db/repl/initial_syncer.h
@@ -154,6 +154,13 @@ public:
using StartCollectionClonerFn = DatabaseCloner::StartCollectionClonerFn;
+ /**
+ * Type of function to create a database client
+ *
+ * Used for testing only.
+ */
+ using CreateClientFn = stdx::function<std::unique_ptr<DBClientConnection>()>;
+
struct InitialSyncAttemptInfo {
int durationMillis;
Status status;
@@ -230,6 +237,13 @@ public:
*/
void setStartCollectionClonerFn(const StartCollectionClonerFn& startCollectionCloner);
+ /**
+ * Allows a different client class to be injected for the databases cloner.
+ *
+ * For testing only.
+ */
+ void setDbsClonerCreateClientFn_forTest(const CreateClientFn& createClientFn);
+
// State transitions:
// PreStart --> Running --> ShuttingDown --> Complete
// It is possible to skip intermediate states. For example, calling shutdown() when the data
@@ -674,6 +688,10 @@ private:
// Current initial syncer state. See comments for State enum class for details.
State _state = State::kPreStart; // (M)
+ // Passed to DatabasesCloner.
+ CreateClientFn _dbsClonerCreateClientFn; // (M)
+
+
// Passed to CollectionCloner via DatabasesCloner.
DatabaseCloner::ScheduleDbWorkFn _scheduleDbWorkFn; // (M)
StartCollectionClonerFn _startCollectionClonerFn; // (M)
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index 2c32e736d88..7cf799c5bb0 100644
--- a/src/mongo/db/repl/initial_syncer_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -407,6 +407,10 @@ protected:
[this](executor::TaskExecutor::CallbackFn work) {
return getExecutor().scheduleWork(std::move(work));
});
+ _initialSyncer->setDbsClonerCreateClientFn_forTest([this]() {
+ return std::unique_ptr<DBClientConnection>(
+ new MockDBClientConnection(_mockServer.get()));
+ });
_initialSyncer->setStartCollectionClonerFn([this](CollectionCloner& cloner) {
cloner.setCreateClientFn_forTest([&cloner, this]() {
return std::unique_ptr<DBClientConnection>(
@@ -4485,15 +4489,20 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) {
auto progress = initialSyncer->getInitialSyncProgress();
log() << "Progress after first failed response: " << progress;
- ASSERT_EQUALS(progress.nFields(), 8) << progress;
+ ASSERT_EQUALS(progress.nFields(), 11) << progress;
+ ASSERT_FALSE(progress.hasField("remainingInitialSyncEstimatedMillis"));
+ ASSERT_FALSE(progress.hasField("InitialSyncEnd"));
ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 0) << progress;
ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), 2) << progress;
+ ASSERT_EQUALS(progress.getIntField("approxTotalDataSize"), 0) << progress;
+ ASSERT_EQUALS(progress.getIntField("approxTotalBytesCopied"), 0) << progress;
ASSERT_EQUALS(progress["initialSyncStart"].type(), Date) << progress;
ASSERT_EQUALS(progress["initialSyncOplogStart"].timestamp(), Timestamp(1, 1)) << progress;
ASSERT_BSONOBJ_EQ(progress.getObjectField("initialSyncAttempts"), BSONObj());
ASSERT_EQUALS(progress.getIntField("fetchedMissingDocs"), 0) << progress;
ASSERT_EQUALS(progress.getIntField("appliedOps"), 0) << progress;
- ASSERT_BSONOBJ_EQ(progress.getObjectField("databases"), BSON("databasesCloned" << 0));
+ ASSERT_BSONOBJ_EQ(progress.getObjectField("databases"),
+ BSON("databasesToClone" << 0 << "databasesCloned" << 0));
// Play rest of the failed round of responses.
{
@@ -4544,14 +4553,19 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) {
progress = initialSyncer->getInitialSyncProgress();
log() << "Progress after failure: " << progress;
- ASSERT_EQUALS(progress.nFields(), 8) << progress;
+ ASSERT_EQUALS(progress.nFields(), 11) << progress;
+ ASSERT_FALSE(progress.hasField("remainingInitialSyncEstimatedMillis"));
+ ASSERT_FALSE(progress.hasField("InitialSyncEnd"));
ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 1) << progress;
ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), 2) << progress;
+ ASSERT_EQUALS(progress.getIntField("approxTotalDataSize"), 0) << progress;
+ ASSERT_EQUALS(progress.getIntField("approxTotalBytesCopied"), 0) << progress;
ASSERT_EQUALS(progress["initialSyncStart"].type(), Date) << progress;
ASSERT_EQUALS(progress["initialSyncOplogStart"].timestamp(), Timestamp(1, 1)) << progress;
ASSERT_EQUALS(progress.getIntField("fetchedMissingDocs"), 0) << progress;
ASSERT_EQUALS(progress.getIntField("appliedOps"), 0) << progress;
- ASSERT_BSONOBJ_EQ(progress.getObjectField("databases"), BSON("databasesCloned" << 0));
+ ASSERT_BSONOBJ_EQ(progress.getObjectField("databases"),
+ BSON("databasesToClone" << 0 << "databasesCloned" << 0));
BSONObj attempts = progress["initialSyncAttempts"].Obj();
ASSERT_EQUALS(attempts.nFields(), 1) << attempts;
@@ -4565,15 +4579,23 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) {
ASSERT_EQUALS(attempt0.getStringField("syncSource"), std::string("localhost:27017"))
<< attempt0;
+ const auto bytesToCopy = 10;
+ const auto avgObjSize = 2;
// Play all but last of the successful round of responses.
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+ // Advance the clock by 10 seconds
+ net->advanceTime(net->now() + Seconds(10));
+
// listDatabases
NamespaceString nss("a.a");
auto request =
net->scheduleSuccessfulResponse(makeListDatabasesResponse({nss.db().toString()}));
assertRemoteCommandNameEquals("listDatabases", request);
+
+ // The databases cloner calls 'dbStats' to record initial sync progress metrics.
+ _mockServer->setCommandReply("dbStats", BSON("dataSize" << 10));
net->runReadyNetworkOperations();
// Ignore oplog tailing query.
@@ -4606,6 +4628,10 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) {
assertRemoteCommandNameEquals("getMore", request);
net->blackHole(noi);
+ // The collection cloner calls 'collStats' to record initial sync progress metrics.
+ _mockServer->setCommandReply("collStats",
+ BSON("size" << bytesToCopy << "avgObjSize" << avgObjSize));
+
// count:a
request = net->scheduleSuccessfulResponse(BSON("n" << 5 << "ok" << 1));
assertRemoteCommandNameEquals("count", request);
@@ -4632,17 +4658,22 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) {
progress = initialSyncer->getInitialSyncProgress();
log() << "Progress after all but last successful response: " << progress;
- ASSERT_EQUALS(progress.nFields(), 9) << progress;
+ ASSERT_EQUALS(progress.nFields(), 13) << progress;
ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 1) << progress;
ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), 2) << progress;
+ ASSERT_GREATER_THAN(progress["totalInitialSyncElapsedMillis"].safeNumberLong(), 0) << progress;
+ ASSERT_EQUALS(progress.getIntField("approxTotalDataSize"), 10) << progress;
+ ASSERT_EQUALS(progress.getIntField("approxTotalBytesCopied"), 10) << progress;
ASSERT_EQUALS(progress["initialSyncOplogStart"].timestamp(), Timestamp(1, 1)) << progress;
ASSERT_EQUALS(progress["initialSyncOplogEnd"].timestamp(), Timestamp(7, 1)) << progress;
ASSERT_EQUALS(progress["initialSyncStart"].type(), Date) << progress;
+ ASSERT_EQUALS(progress.getIntField("remainingInitialSyncEstimatedMillis"), 0) << progress;
ASSERT_EQUALS(progress.getIntField("fetchedMissingDocs"), 0) << progress;
// Expected applied ops to be a superset of this range: Timestamp(2,1) ... Timestamp(7,1).
ASSERT_GREATER_THAN_OR_EQUALS(progress.getIntField("appliedOps"), 6) << progress;
auto databasesProgress = progress.getObjectField("databases");
ASSERT_EQUALS(1, databasesProgress.getIntField("databasesCloned")) << databasesProgress;
+ ASSERT_EQUALS(0, databasesProgress.getIntField("databasesToClone")) << databasesProgress;
auto dbProgress = databasesProgress.getObjectField("a");
ASSERT_EQUALS(1, dbProgress.getIntField("collections")) << dbProgress;
ASSERT_EQUALS(1, dbProgress.getIntField("clonedCollections")) << dbProgress;
@@ -4655,6 +4686,8 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) {
<< collectionProgress;
ASSERT_EQUALS(1, collectionProgress.getIntField("indexes")) << collectionProgress;
ASSERT_EQUALS(5, collectionProgress.getIntField("receivedBatches")) << collectionProgress;
+ ASSERT_EQUALS(bytesToCopy, collectionProgress.getIntField("bytesToCopy")) << collectionProgress;
+ ASSERT_EQUALS(10, collectionProgress.getIntField("approxBytesCopied")) << collectionProgress;
attempts = progress["initialSyncAttempts"].Obj();
ASSERT_EQUALS(attempts.nFields(), 1) << progress;
@@ -4694,14 +4727,23 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) {
progress = initialSyncer->getInitialSyncProgress();
log() << "Progress at end: " << progress;
- ASSERT_EQUALS(progress.nFields(), 11) << progress;
+ ASSERT_EQUALS(progress.nFields(), 14) << progress;
ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 1) << progress;
ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), 2) << progress;
ASSERT_EQUALS(progress["initialSyncStart"].type(), Date) << progress;
ASSERT_EQUALS(progress["initialSyncEnd"].type(), Date) << progress;
ASSERT_EQUALS(progress["initialSyncOplogStart"].timestamp(), Timestamp(1, 1)) << progress;
ASSERT_EQUALS(progress["initialSyncOplogEnd"].timestamp(), Timestamp(7, 1)) << progress;
- ASSERT_EQUALS(progress["initialSyncElapsedMillis"].type(), NumberInt) << progress;
+ const auto initialSyncEnd = progress["initialSyncEnd"].Date();
+ // We should have elapsed 10 secs (from advancing the clock) + 1ms (initialSyncRetry wait time).
+ ASSERT_EQUALS(progress.getIntField("totalInitialSyncElapsedMillis"), 10001) << progress;
+ const auto prevElapsedMillis = progress["totalInitialSyncElapsedMillis"].safeNumberLong();
+ ASSERT_EQUALS(progress["initialSyncEnd"].Date() - progress["initialSyncStart"].Date(),
+ Milliseconds{10001})
+ << progress;
+ ASSERT_EQUALS(progress.getIntField("remainingInitialSyncEstimatedMillis"), 0) << progress;
+ ASSERT_EQUALS(progress.getIntField("approxTotalDataSize"), 10) << progress;
+ ASSERT_EQUALS(progress.getIntField("approxTotalBytesCopied"), 10) << progress;
ASSERT_EQUALS(progress.getIntField("fetchedMissingDocs"), 0) << progress;
// Expected applied ops to be a superset of this range: Timestamp(2,1) ... Timestamp(7,1).
ASSERT_GREATER_THAN_OR_EQUALS(progress.getIntField("appliedOps"), 6) << progress;
@@ -4725,6 +4767,18 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) {
ASSERT_EQUALS(attempt1["durationMillis"].type(), NumberInt) << attempt1;
ASSERT_EQUALS(attempt1.getStringField("syncSource"), std::string("localhost:27017"))
<< attempt1;
+
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Advance the clock by 100 seconds
+ net->advanceTime(net->now() + Seconds(100));
+ }
+
+ // Check the initial sync progress again to make sure the duration timer has stopped on finish.
+ progress = initialSyncer->getInitialSyncProgress();
+ ASSERT_EQUALS(progress["initialSyncEnd"].Date(), initialSyncEnd);
+ ASSERT_EQUALS(progress["totalInitialSyncElapsedMillis"].safeNumberLong(), prevElapsedMillis);
}
TEST_F(InitialSyncerTest, GetInitialSyncProgressOmitsClonerStatsIfClonerStatsExceedBsonLimit) {
diff --git a/src/mongo/shell/db.js b/src/mongo/shell/db.js
index 763f5fedcb7..0b92bb8934c 100644
--- a/src/mongo/shell/db.js
+++ b/src/mongo/shell/db.js
@@ -1139,6 +1139,14 @@ DB.prototype.printSecondaryReplicationInfo = function() {
}
}
+ function printNodeInitialSyncInfo(syncSourceString, remainingMillis) {
+ print("\tInitialSyncSyncSource: " + syncSourceString);
+ let minutes = Math.floor((remainingMillis / (1000 * 60)) % 60);
+ let hours = Math.floor(remainingMillis / (1000 * 60 * 60));
+ print("\tInitialSyncRemainingEstimatedDuration: " + hours + " hour(s) " + minutes +
+ " minute(s)");
+ }
+
var L = this.getSiblingDB("local");
if (L.system.replset.count() != 0) {
@@ -1158,7 +1166,19 @@ DB.prototype.printSecondaryReplicationInfo = function() {
}
for (i in status.members) {
- r(status.members[i]);
+ if (status.members[i].self && status.members[i].state === 5) {
+ print("source: " + status.members[i].name);
+ if (!status.initialSyncStatus) {
+ print("InitialSyncStatus information not found");
+ continue;
+ }
+ // Print initial sync info if node is in the STARTUP2 state.
+ printNodeInitialSyncInfo(
+ status.members[i].syncSourceHost,
+ status.initialSyncStatus.remainingInitialSyncEstimatedMillis);
+ } else {
+ r(status.members[i]);
+ }
}
}
};