diff options
author | Scott Hernandez <scotthernandez@gmail.com> | 2015-12-17 15:41:11 -0500 |
---|---|---|
committer | Scott Hernandez <scotthernandez@gmail.com> | 2015-12-17 15:41:30 -0500 |
commit | db3259d651227d00a658dff26e2f05167f1a5aea (patch) | |
tree | 87ba2af009d335b0b10d57e31445e43793e7d34d | |
parent | 466dae32f1ad27bc867e13b5c763a9f48d88981b (diff) | |
download | mongo-db3259d651227d00a658dff26e2f05167f1a5aea.tar.gz |
Revert "SERVER-21930 - Restart oplog query if oplog entries are out of order"
This reverts commit 06ff25a41c6ac560c5b9d2fc6a32c13b1346c48d.
(cherry picked from commit 9ef32d72f37319fabf49296671b6fd1c23ecb46c)
-rw-r--r-- | src/mongo/base/error_codes.err | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 57 | ||||
-rw-r--r-- | src/mongo/db/repl/bgsync.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 82 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.h | 2 |
5 files changed, 56 insertions, 88 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index faed0dbb14a..ece75d7797e 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -151,7 +151,6 @@ error_code("ReadConcernMajorityNotEnabled", 148) error_code("NoConfigMaster", 149) error_code("StaleEpoch", 150) error_code("OperationCannotBeBatched", 151) -error_code("OplogOutOfOrder", 152) # Non-sequential error codes (for compatibility only) error_code("RecvStaleConfig", 9996) diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 72b6719821b..d55a0c58528 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -319,7 +319,10 @@ void BackgroundSync::_produce(OperationContext* txn) { syncSourceReader.resetConnection(); // no more references to oplog reader from here on. - Status fetcherReturnStatus = Status::OK(); + // If this status is not OK after the fetcher returns from wait(), + // proceed to execute rollback + Status remoteOplogStartStatus = Status::OK(); + auto fetcherCallback = stdx::bind(&BackgroundSync::_fetcherCallback, this, stdx::placeholders::_1, @@ -328,7 +331,7 @@ void BackgroundSync::_produce(OperationContext* txn) { lastOpTimeFetched, lastHashFetched, fetcherMaxTimeMS, - &fetcherReturnStatus); + &remoteOplogStartStatus); BSONObjBuilder cmdBob; @@ -373,18 +376,10 @@ void BackgroundSync::_produce(OperationContext* txn) { return; } - if (fetcherReturnStatus.code() == ErrorCodes::OplogOutOfOrder) { - // This is bad because it means that our source - // has not returned oplog entries in ascending ts order, and they need to be. - - warning() << fetcherReturnStatus.toString(); - // Do not blacklist the server here, it will be blacklisted when we try to reuse it, - // if it can't return a matching oplog start from the last fetch oplog ts field. - return; - } else if (fetcherReturnStatus.code() == ErrorCodes::OplogStartMissing || - fetcherReturnStatus.code() == ErrorCodes::RemoteOplogStale) { - // Rollback is a synchronous operation that uses the task executor and may not be - // executed inside the fetcher callback. + // Execute rollback if necessary. + // Rollback is a synchronous operation that uses the task executor and may not be + // executed inside the fetcher callback. + if (!remoteOplogStartStatus.isOK()) { const int messagingPortTags = 0; ConnectionPool connectionPool(messagingPortTags); std::unique_ptr<ConnectionPool::ConnectionPtr> connection; @@ -397,11 +392,9 @@ void BackgroundSync::_produce(OperationContext* txn) { return connection->get(); }; - log() << "starting rollback: " << fetcherReturnStatus; + log() << "starting rollback: " << remoteOplogStartStatus; _rollback(txn, source, getConnection); stop(); - } else if (!fetcherReturnStatus.isOK()) { - warning() << "Fetcher error querying oplog: " << fetcherReturnStatus.toString(); } } @@ -411,7 +404,7 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& OpTime lastOpTimeFetched, long long lastFetchedHash, Milliseconds fetcherMaxTimeMS, - Status* returnStatus) { + Status* remoteOplogStartStatus) { // if target cut connections between connecting and querying (for // example, because it stepped down) we might not have a cursor if (!result.isOK()) { @@ -472,8 +465,9 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& return *(firstDocToApply++); }; - *returnStatus = checkRemoteOplogStart(getNextOperation, lastOpTimeFetched, lastFetchedHash); - if (!returnStatus->isOK()) { + *remoteOplogStartStatus = + checkRemoteOplogStart(getNextOperation, lastOpTimeFetched, lastFetchedHash); + if (!remoteOplogStartStatus->isOK()) { // Stop fetcher and execute rollback. return; } @@ -491,26 +485,9 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& // The count of the bytes of the documents read off the network. int networkDocumentBytes = 0; - Timestamp lastTS = _lastOpTimeFetched.getTimestamp(); - int count = 0; - for (auto&& doc : documents) { - networkDocumentBytes += doc.objsize(); - // Check to see if the oplog entry goes back in time for this document. - const auto docOpTime = OpTime::parseFromOplogEntry(doc); - fassertStatusOK(34362, docOpTime.getStatus()); // entries must have a "ts" field. - const auto docTS = docOpTime.getValue().getTimestamp(); - - if (lastTS >= docTS) { - *returnStatus = Status( - ErrorCodes::OplogOutOfOrder, - str::stream() << "Reading the oplog from" << source.toString() - << " returned out of order entries. lastTS: " << lastTS.toString() - << " outOfOrderTS:" << docTS.toString() << " at count:" << count); - return; - } - lastTS = docTS; - ++count; - } + std::for_each(documents.cbegin(), + documents.cend(), + [&networkDocumentBytes](BSONObj doc) { networkDocumentBytes += doc.objsize(); }); // These numbers are for the documents we will apply. auto toApplyDocumentCount = documents.size(); diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h index 0afe7728af9..f723dfe1cb5 100644 --- a/src/mongo/db/repl/bgsync.h +++ b/src/mongo/db/repl/bgsync.h @@ -174,7 +174,7 @@ private: OpTime lastOpTimeFetched, long long lastFetchedHash, Milliseconds fetcherMaxTimeMS, - Status* returnStatus); + Status* remoteOplogStartStatus); /** * Executes a rollback. diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 1f8ed3a875a..2ae54b06eaa 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -131,39 +131,6 @@ bool isCrudOpType(const char* field) { } return false; } - -void handleSlaveDelay(const Timestamp& ts) { - ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); - int slaveDelaySecs = durationCount<Seconds>(replCoord->getSlaveDelaySecs()); - - // ignore slaveDelay if the box is still initializing. once - // it becomes secondary we can worry about it. - if (slaveDelaySecs > 0 && replCoord->getMemberState().secondary()) { - long long a = ts.getSecs(); - long long b = time(0); - long long lag = b - a; - long long sleeptime = slaveDelaySecs - lag; - if (sleeptime > 0) { - uassert(12000, - "rs slaveDelay differential too big check clocks and systems", - sleeptime < 0x40000000); - if (sleeptime < 60) { - sleepsecs((int)sleeptime); - } else { - warning() << "slavedelay causing a long sleep of " << sleeptime << " seconds"; - // sleep(hours) would prevent reconfigs from taking effect & such! - long long waitUntil = b + sleeptime; - while (time(0) < waitUntil) { - sleepsecs(6); - - // Handle reconfigs that changed the slave delay - if (durationCount<Seconds>(replCoord->getSlaveDelaySecs()) != slaveDelaySecs) - break; - } - } - } - } // endif slaveDelay -} } namespace { @@ -750,18 +717,7 @@ void SyncTail::oplogApplication() { continue; // This wasn't a real op. Don't try to apply it. } - const auto lastOpTime = fassertStatusOK(28773, OpTime::parseFromOplogEntry(lastOp)); - if (lastWriteOpTime >= lastOpTime) { - // Error for the oplog to go back in time. - fassert(34361, - Status(ErrorCodes::OplogOutOfOrder, - str::stream() << "Attempted to apply an earlier oplog entry (ts: " - << lastOpTime.getTimestamp().toStringPretty() - << ") when our lastWrittenOptime was " - << lastWriteOpTime.toString())); - } - - handleSlaveDelay(lastOpTime.getTimestamp()); + handleSlaveDelay(lastOp); // Set minValid to the last OpTime that needs to be applied, in this batch or from the // (last) failed batch, whichever is larger. @@ -769,8 +725,8 @@ void SyncTail::oplogApplication() { // if we should crash and restart before updating finishing. const OpTime start(getLastSetTimestamp(), OpTime::kUninitializedTerm); - // Take the max of the first endOptime (if we recovered) and the end of our batch. + const auto lastOpTime = fassertStatusOK(28773, OpTime::parseFromOplogEntry(lastOp)); // Setting end to the max of originalEndOpTime and lastOpTime (the end of the batch) // ensures that we keep pushing out the point where we can become consistent @@ -887,6 +843,40 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, SyncTail::OpQueue* op return false; } +void SyncTail::handleSlaveDelay(const BSONObj& lastOp) { + ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); + int slaveDelaySecs = durationCount<Seconds>(replCoord->getSlaveDelaySecs()); + + // ignore slaveDelay if the box is still initializing. once + // it becomes secondary we can worry about it. + if (slaveDelaySecs > 0 && replCoord->getMemberState().secondary()) { + const Timestamp ts = lastOp["ts"].timestamp(); + long long a = ts.getSecs(); + long long b = time(0); + long long lag = b - a; + long long sleeptime = slaveDelaySecs - lag; + if (sleeptime > 0) { + uassert(12000, + "rs slaveDelay differential too big check clocks and systems", + sleeptime < 0x40000000); + if (sleeptime < 60) { + sleepsecs((int)sleeptime); + } else { + warning() << "slavedelay causing a long sleep of " << sleeptime << " seconds"; + // sleep(hours) would prevent reconfigs from taking effect & such! + long long waitUntil = b + sleeptime; + while (time(0) < waitUntil) { + sleepsecs(6); + + // Handle reconfigs that changed the slave delay + if (durationCount<Seconds>(replCoord->getSlaveDelaySecs()) != slaveDelaySecs) + break; + } + } + } + } // endif slaveDelay +} + void SyncTail::setHostname(const std::string& hostname) { _hostname = hostname; } diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index 8f331cc8cf6..e3edacaed2a 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -187,6 +187,8 @@ private: // Function to use during applyOps MultiSyncApplyFunc _applyFunc; + void handleSlaveDelay(const BSONObj& op); + // persistent pool of worker threads for writing ops to the databases OldThreadPool _writerPool; // persistent pool of worker threads for prefetching |