diff options
author | Scott Hernandez <scotthernandez@gmail.com> | 2016-08-25 14:02:29 -0400 |
---|---|---|
committer | Scott Hernandez <scotthernandez@gmail.com> | 2016-08-26 13:19:14 -0400 |
commit | 65925c449db6150307b1b0caf183b2988a692d77 (patch) | |
tree | aa6df8cea51a9bdef6ee72940745e01c8216a29c /src/mongo/db/repl/master_slave.cpp | |
parent | cc0af2d39b5cbec0fcb5cf224b985d52514a9b04 (diff) | |
download | mongo-65925c449db6150307b1b0caf183b2988a692d77.tar.gz |
SERVER-25618: do not report progress during m/s initial sync
Diffstat (limited to 'src/mongo/db/repl/master_slave.cpp')
-rw-r--r-- | src/mongo/db/repl/master_slave.cpp | 82 |
1 files changed, 59 insertions, 23 deletions
diff --git a/src/mongo/db/repl/master_slave.cpp b/src/mongo/db/repl/master_slave.cpp index 544fc607f37..005427d0ba4 100644 --- a/src/mongo/db/repl/master_slave.cpp +++ b/src/mongo/db/repl/master_slave.cpp @@ -85,6 +85,14 @@ using std::vector; namespace mongo { namespace repl { +namespace { +time_t lastForcedResync = 0; + +const int forceReconnect = -1; +const int restartSync = 0; +const int restartSyncAfterSleep = 1; +} // namespace + void pretouchOperation(OperationContext* txn, const BSONObj& op); void pretouchN(vector<BSONObj>&, unsigned a, unsigned b); @@ -92,8 +100,6 @@ void pretouchN(vector<BSONObj>&, unsigned a, unsigned b); volatile int syncing = 0; volatile int relinquishSyncingSome = 0; -static time_t lastForcedResync = 0; - /* output by the web console */ const char* replInfo = ""; struct ReplInfo { @@ -413,7 +419,7 @@ bool ReplSource::_connect(OplogReader* reader, const HostAndPort& host, const OI return false; } - if (!replHandshake(reader->conn(), myRID)) { + if (_doHandshake && !replHandshake(reader->conn(), myRID)) { return false; } @@ -458,6 +464,23 @@ void ReplSource::forceResync(OperationContext* txn, const char* requester) { save(txn); } +Status ReplSource::_updateIfDoneWithInitialSync() { + const auto usedToDoHandshake = _doHandshake; + if (!usedToDoHandshake && addDbNextPass.empty() && incompleteCloneDbs.empty()) { + _doHandshake = true; + oplogReader.resetConnection(); + const auto myRID = getGlobalReplicationCoordinator()->getMyRID(); + if (!_connect(&oplogReader, HostAndPort{hostName}, myRID)) { + return {ErrorCodes::MasterSlaveConnectionFailure, + str::stream() << "could not connect to " << hostName << " with rid: " + << myRID.toString()}; + } else { + return {ErrorCodes::Interrupted, "Initial Sync is done."}; + } + } + return Status::OK(); +} + void ReplSource::resyncDrop(OperationContext* txn, const string& dbName) { log() << "resync: dropping database " << dbName; invariant(txn->lockState()->isW()); @@ -672,9 +695,9 @@ void ReplSource::_sync_pullOpLog_applyOperation(OperationContext* txn, if (op.getStringField("op")[0] == 'n') return; - char clientName[MaxDatabaseNameLen]; + char dbName[MaxDatabaseNameLen]; const char* ns = op.getStringField("ns"); - nsToDatabase(ns, clientName); + nsToDatabase(ns, dbName); if (*ns == '.') { log() << "skipping bad op in oplog: " << op.toString() << endl; @@ -689,7 +712,7 @@ void ReplSource::_sync_pullOpLog_applyOperation(OperationContext* txn, // nsToDatabase(ns, clientName); } - if (!only.empty() && only != clientName) + if (!only.empty() && only != dbName) return; // Push the CurOp stack for "txn" so each individual oplog entry application is separately @@ -744,7 +767,7 @@ void ReplSource::_sync_pullOpLog_applyOperation(OperationContext* txn, throw SyncException(); } - if (!handleDuplicateDbName(txn, op, ns, clientName)) { + if (!handleDuplicateDbName(txn, op, ns, dbName)) { return; } @@ -760,37 +783,37 @@ void ReplSource::_sync_pullOpLog_applyOperation(OperationContext* txn, OldClientContext ctx(txn, ns, false); bool empty = !ctx.db()->getDatabaseCatalogEntry()->hasUserData(); - bool incompleteClone = incompleteCloneDbs.count(clientName) != 0; + bool incompleteClone = incompleteCloneDbs.count(dbName) != 0; LOG(6) << "ns: " << ns << ", justCreated: " << ctx.justCreated() << ", empty: " << empty << ", incompleteClone: " << incompleteClone << endl; if (ctx.justCreated() || empty || incompleteClone) { // we must add to incomplete list now that setClient has been called - incompleteCloneDbs.insert(clientName); + incompleteCloneDbs.insert(dbName); if (nClonedThisPass) { /* we only clone one database per pass, even if a lot need done. This helps us avoid overflowing the master's transaction log by doing too much work before going back to read more transactions. (Imagine a scenario of slave startup where we try to clone 100 databases in one pass.) */ - addDbNextPass.insert(clientName); + addDbNextPass.insert(dbName); } else { if (incompleteClone) { - log() << "An earlier initial clone of '" << clientName + log() << "An earlier initial clone of '" << dbName << "' did not complete, now resyncing." << endl; } save(txn); OldClientContext ctx(txn, ns, false); nClonedThisPass++; resync(txn, ctx.db()->name()); - addDbNextPass.erase(clientName); - incompleteCloneDbs.erase(clientName); + addDbNextPass.erase(dbName); + incompleteCloneDbs.erase(dbName); } save(txn); } else { applyOperation(txn, ctx.db(), op); - addDbNextPass.erase(clientName); + addDbNextPass.erase(dbName); } } @@ -845,7 +868,7 @@ public: 1 ok, sleep */ int ReplSource::_sync_pullOpLog(OperationContext* txn, int& nApplied) { - int okResultCode = 1; + int okResultCode = restartSyncAfterSleep; string ns = string("local.oplog.$") + sourceName(); LOG(2) << "sync_pullOpLog " << ns << " syncedTo:" << syncedTo.toStringLong() << '\n'; @@ -904,7 +927,7 @@ int ReplSource::_sync_pullOpLog(OperationContext* txn, int& nApplied) { if (!oplogReader.haveCursor()) { log() << "dbclient::query returns null (conn closed?)" << endl; oplogReader.resetConnection(); - return -1; + return forceReconnect; } // show any deferred database creates from a previous pass @@ -919,10 +942,23 @@ int ReplSource::_sync_pullOpLog(OperationContext* txn, int& nApplied) { } } + auto status = _updateIfDoneWithInitialSync(); + if (!status.isOK()) { + switch (status.code()) { + case ErrorCodes::Interrupted: { + return restartSync; // don't sleep; + } + default: { + error() << status; + return forceReconnect; // causes reconnect. + } + } + } + if (!oplogReader.more()) { if (tailing) { LOG(2) << "tailing & no new activity\n"; - okResultCode = 0; // don't sleep + okResultCode = restartSync; // don't sleep } else { log() << ns << " oplog is empty" << endl; @@ -1000,7 +1036,7 @@ int ReplSource::_sync_pullOpLog(OperationContext* txn, int& nApplied) { Lock::GlobalWrite lk(txn->lockState()); if (tailing) { - okResultCode = 0; // don't sleep + okResultCode = restartSync; // don't sleep } syncedTo = nextOpTime; @@ -1160,7 +1196,7 @@ int _replMain(OperationContext* txn, ReplSource::SourceVector& sources, int& nAp int sleepAdvice = 1; for (ReplSource::SourceVector::iterator i = sources.begin(); i != sources.end(); i++) { ReplSource* s = i->get(); - int res = -1; + int res = forceReconnect; try { res = s->sync(txn, nApplied); bool moreToSync = s->haveMoreDbsToSync(); @@ -1202,7 +1238,7 @@ int _replMain(OperationContext* txn, ReplSource::SourceVector& sources, int& nAp static void replMain(OperationContext* txn) { ReplSource::SourceVector sources; while (1) { - int s = 0; + auto s = restartSync; { ScopedTransaction transaction(txn, MODE_X); Lock::GlobalWrite lk(txn->lockState()); @@ -1224,13 +1260,13 @@ static void replMain(OperationContext* txn) { try { int nApplied = 0; s = _replMain(txn, sources, nApplied); - if (s == 1) { + if (s == restartSyncAfterSleep) { if (nApplied == 0) s = 2; else if (nApplied > 100) { // sleep very little - just enough that we aren't truly hammering master sleepmillis(75); - s = 0; + s = restartSync; } } } catch (...) { @@ -1247,7 +1283,7 @@ static void replMain(OperationContext* txn) { if (relinquishSyncingSome) { relinquishSyncingSome = 0; - s = 1; // sleep before going back in to syncing=1 + s = restartSyncAfterSleep; // sleep before going back in to syncing=1 } if (s) { |