summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/master_slave.cpp
diff options
context:
space:
mode:
authorScott Hernandez <scotthernandez@gmail.com>2016-08-25 14:02:29 -0400
committerScott Hernandez <scotthernandez@gmail.com>2016-08-26 13:19:14 -0400
commit65925c449db6150307b1b0caf183b2988a692d77 (patch)
treeaa6df8cea51a9bdef6ee72940745e01c8216a29c /src/mongo/db/repl/master_slave.cpp
parentcc0af2d39b5cbec0fcb5cf224b985d52514a9b04 (diff)
downloadmongo-65925c449db6150307b1b0caf183b2988a692d77.tar.gz
SERVER-25618: do not report progress during m/s initial sync
Diffstat (limited to 'src/mongo/db/repl/master_slave.cpp')
-rw-r--r--src/mongo/db/repl/master_slave.cpp82
1 files changed, 59 insertions, 23 deletions
diff --git a/src/mongo/db/repl/master_slave.cpp b/src/mongo/db/repl/master_slave.cpp
index 544fc607f37..005427d0ba4 100644
--- a/src/mongo/db/repl/master_slave.cpp
+++ b/src/mongo/db/repl/master_slave.cpp
@@ -85,6 +85,14 @@ using std::vector;
namespace mongo {
namespace repl {
+namespace {
+time_t lastForcedResync = 0;
+
+const int forceReconnect = -1;
+const int restartSync = 0;
+const int restartSyncAfterSleep = 1;
+} // namespace
+
void pretouchOperation(OperationContext* txn, const BSONObj& op);
void pretouchN(vector<BSONObj>&, unsigned a, unsigned b);
@@ -92,8 +100,6 @@ void pretouchN(vector<BSONObj>&, unsigned a, unsigned b);
volatile int syncing = 0;
volatile int relinquishSyncingSome = 0;
-static time_t lastForcedResync = 0;
-
/* output by the web console */
const char* replInfo = "";
struct ReplInfo {
@@ -413,7 +419,7 @@ bool ReplSource::_connect(OplogReader* reader, const HostAndPort& host, const OI
return false;
}
- if (!replHandshake(reader->conn(), myRID)) {
+ if (_doHandshake && !replHandshake(reader->conn(), myRID)) {
return false;
}
@@ -458,6 +464,23 @@ void ReplSource::forceResync(OperationContext* txn, const char* requester) {
save(txn);
}
+Status ReplSource::_updateIfDoneWithInitialSync() {
+ const auto usedToDoHandshake = _doHandshake;
+ if (!usedToDoHandshake && addDbNextPass.empty() && incompleteCloneDbs.empty()) {
+ _doHandshake = true;
+ oplogReader.resetConnection();
+ const auto myRID = getGlobalReplicationCoordinator()->getMyRID();
+ if (!_connect(&oplogReader, HostAndPort{hostName}, myRID)) {
+ return {ErrorCodes::MasterSlaveConnectionFailure,
+ str::stream() << "could not connect to " << hostName << " with rid: "
+ << myRID.toString()};
+ } else {
+ return {ErrorCodes::Interrupted, "Initial Sync is done."};
+ }
+ }
+ return Status::OK();
+}
+
void ReplSource::resyncDrop(OperationContext* txn, const string& dbName) {
log() << "resync: dropping database " << dbName;
invariant(txn->lockState()->isW());
@@ -672,9 +695,9 @@ void ReplSource::_sync_pullOpLog_applyOperation(OperationContext* txn,
if (op.getStringField("op")[0] == 'n')
return;
- char clientName[MaxDatabaseNameLen];
+ char dbName[MaxDatabaseNameLen];
const char* ns = op.getStringField("ns");
- nsToDatabase(ns, clientName);
+ nsToDatabase(ns, dbName);
if (*ns == '.') {
log() << "skipping bad op in oplog: " << op.toString() << endl;
@@ -689,7 +712,7 @@ void ReplSource::_sync_pullOpLog_applyOperation(OperationContext* txn,
// nsToDatabase(ns, clientName);
}
- if (!only.empty() && only != clientName)
+ if (!only.empty() && only != dbName)
return;
// Push the CurOp stack for "txn" so each individual oplog entry application is separately
@@ -744,7 +767,7 @@ void ReplSource::_sync_pullOpLog_applyOperation(OperationContext* txn,
throw SyncException();
}
- if (!handleDuplicateDbName(txn, op, ns, clientName)) {
+ if (!handleDuplicateDbName(txn, op, ns, dbName)) {
return;
}
@@ -760,37 +783,37 @@ void ReplSource::_sync_pullOpLog_applyOperation(OperationContext* txn,
OldClientContext ctx(txn, ns, false);
bool empty = !ctx.db()->getDatabaseCatalogEntry()->hasUserData();
- bool incompleteClone = incompleteCloneDbs.count(clientName) != 0;
+ bool incompleteClone = incompleteCloneDbs.count(dbName) != 0;
LOG(6) << "ns: " << ns << ", justCreated: " << ctx.justCreated() << ", empty: " << empty
<< ", incompleteClone: " << incompleteClone << endl;
if (ctx.justCreated() || empty || incompleteClone) {
// we must add to incomplete list now that setClient has been called
- incompleteCloneDbs.insert(clientName);
+ incompleteCloneDbs.insert(dbName);
if (nClonedThisPass) {
/* we only clone one database per pass, even if a lot need done. This helps us
avoid overflowing the master's transaction log by doing too much work before going
back to read more transactions. (Imagine a scenario of slave startup where we try to
clone 100 databases in one pass.)
*/
- addDbNextPass.insert(clientName);
+ addDbNextPass.insert(dbName);
} else {
if (incompleteClone) {
- log() << "An earlier initial clone of '" << clientName
+ log() << "An earlier initial clone of '" << dbName
<< "' did not complete, now resyncing." << endl;
}
save(txn);
OldClientContext ctx(txn, ns, false);
nClonedThisPass++;
resync(txn, ctx.db()->name());
- addDbNextPass.erase(clientName);
- incompleteCloneDbs.erase(clientName);
+ addDbNextPass.erase(dbName);
+ incompleteCloneDbs.erase(dbName);
}
save(txn);
} else {
applyOperation(txn, ctx.db(), op);
- addDbNextPass.erase(clientName);
+ addDbNextPass.erase(dbName);
}
}
@@ -845,7 +868,7 @@ public:
1 ok, sleep
*/
int ReplSource::_sync_pullOpLog(OperationContext* txn, int& nApplied) {
- int okResultCode = 1;
+ int okResultCode = restartSyncAfterSleep;
string ns = string("local.oplog.$") + sourceName();
LOG(2) << "sync_pullOpLog " << ns << " syncedTo:" << syncedTo.toStringLong() << '\n';
@@ -904,7 +927,7 @@ int ReplSource::_sync_pullOpLog(OperationContext* txn, int& nApplied) {
if (!oplogReader.haveCursor()) {
log() << "dbclient::query returns null (conn closed?)" << endl;
oplogReader.resetConnection();
- return -1;
+ return forceReconnect;
}
// show any deferred database creates from a previous pass
@@ -919,10 +942,23 @@ int ReplSource::_sync_pullOpLog(OperationContext* txn, int& nApplied) {
}
}
+ auto status = _updateIfDoneWithInitialSync();
+ if (!status.isOK()) {
+ switch (status.code()) {
+ case ErrorCodes::Interrupted: {
+ return restartSync; // don't sleep;
+ }
+ default: {
+ error() << status;
+ return forceReconnect; // causes reconnect.
+ }
+ }
+ }
+
if (!oplogReader.more()) {
if (tailing) {
LOG(2) << "tailing & no new activity\n";
- okResultCode = 0; // don't sleep
+ okResultCode = restartSync; // don't sleep
} else {
log() << ns << " oplog is empty" << endl;
@@ -1000,7 +1036,7 @@ int ReplSource::_sync_pullOpLog(OperationContext* txn, int& nApplied) {
Lock::GlobalWrite lk(txn->lockState());
if (tailing) {
- okResultCode = 0; // don't sleep
+ okResultCode = restartSync; // don't sleep
}
syncedTo = nextOpTime;
@@ -1160,7 +1196,7 @@ int _replMain(OperationContext* txn, ReplSource::SourceVector& sources, int& nAp
int sleepAdvice = 1;
for (ReplSource::SourceVector::iterator i = sources.begin(); i != sources.end(); i++) {
ReplSource* s = i->get();
- int res = -1;
+ int res = forceReconnect;
try {
res = s->sync(txn, nApplied);
bool moreToSync = s->haveMoreDbsToSync();
@@ -1202,7 +1238,7 @@ int _replMain(OperationContext* txn, ReplSource::SourceVector& sources, int& nAp
static void replMain(OperationContext* txn) {
ReplSource::SourceVector sources;
while (1) {
- int s = 0;
+ auto s = restartSync;
{
ScopedTransaction transaction(txn, MODE_X);
Lock::GlobalWrite lk(txn->lockState());
@@ -1224,13 +1260,13 @@ static void replMain(OperationContext* txn) {
try {
int nApplied = 0;
s = _replMain(txn, sources, nApplied);
- if (s == 1) {
+ if (s == restartSyncAfterSleep) {
if (nApplied == 0)
s = 2;
else if (nApplied > 100) {
// sleep very little - just enough that we aren't truly hammering master
sleepmillis(75);
- s = 0;
+ s = restartSync;
}
}
} catch (...) {
@@ -1247,7 +1283,7 @@ static void replMain(OperationContext* txn) {
if (relinquishSyncingSome) {
relinquishSyncingSome = 0;
- s = 1; // sleep before going back in to syncing=1
+ s = restartSyncAfterSleep; // sleep before going back in to syncing=1
}
if (s) {