diff options
author | Scott Hernandez <scotthernandez@gmail.com> | 2015-12-17 11:10:14 -0500 |
---|---|---|
committer | Scott Hernandez <scotthernandez@gmail.com> | 2015-12-17 16:12:21 -0500 |
commit | 6331c474bae740e4b54c9741925013b5cbdbc5f9 (patch) | |
tree | 7178382f3bc9130bc42cc32e2fbf50d05f62d78f | |
parent | db3259d651227d00a658dff26e2f05167f1a5aea (diff) | |
download | mongo-6331c474bae740e4b54c9741925013b5cbdbc5f9.tar.gz |
SERVER-21930 - Restart oplog query if oplog entries are out of order
(cherry picked from commit 06ff25a41c6ac560c5b9d2fc6a32c13b1346c48d)
(cherry picked from commit ac919e016150819dde59f21c2e6327e5f24754bd)
-rw-r--r-- | src/mongo/base/error_codes.err | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 63 | ||||
-rw-r--r-- | src/mongo/db/repl/bgsync.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 82 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.h | 2 |
5 files changed, 94 insertions, 56 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index ece75d7797e..faed0dbb14a 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -151,6 +151,7 @@ error_code("ReadConcernMajorityNotEnabled", 148) error_code("NoConfigMaster", 149) error_code("StaleEpoch", 150) error_code("OperationCannotBeBatched", 151) +error_code("OplogOutOfOrder", 152) # Non-sequential error codes (for compatibility only) error_code("RecvStaleConfig", 9996) diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index d55a0c58528..6759bf85230 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -319,10 +319,7 @@ void BackgroundSync::_produce(OperationContext* txn) { syncSourceReader.resetConnection(); // no more references to oplog reader from here on. - // If this status is not OK after the fetcher returns from wait(), - // proceed to execute rollback - Status remoteOplogStartStatus = Status::OK(); - + Status fetcherReturnStatus = Status::OK(); auto fetcherCallback = stdx::bind(&BackgroundSync::_fetcherCallback, this, stdx::placeholders::_1, @@ -331,7 +328,7 @@ void BackgroundSync::_produce(OperationContext* txn) { lastOpTimeFetched, lastHashFetched, fetcherMaxTimeMS, - &remoteOplogStartStatus); + &fetcherReturnStatus); BSONObjBuilder cmdBob; @@ -376,10 +373,18 @@ void BackgroundSync::_produce(OperationContext* txn) { return; } - // Execute rollback if necessary. - // Rollback is a synchronous operation that uses the task executor and may not be - // executed inside the fetcher callback. - if (!remoteOplogStartStatus.isOK()) { + if (fetcherReturnStatus.code() == ErrorCodes::OplogOutOfOrder) { + // This is bad because it means that our source + // has not returned oplog entries in ascending ts order, and they need to be. + + warning() << fetcherReturnStatus.toString(); + // Do not blacklist the server here, it will be blacklisted when we try to reuse it, + // if it can't return a matching oplog start from the last fetch oplog ts field. + return; + } else if (fetcherReturnStatus.code() == ErrorCodes::OplogStartMissing || + fetcherReturnStatus.code() == ErrorCodes::RemoteOplogStale) { + // Rollback is a synchronous operation that uses the task executor and may not be + // executed inside the fetcher callback. const int messagingPortTags = 0; ConnectionPool connectionPool(messagingPortTags); std::unique_ptr<ConnectionPool::ConnectionPtr> connection; @@ -392,9 +397,11 @@ void BackgroundSync::_produce(OperationContext* txn) { return connection->get(); }; - log() << "starting rollback: " << remoteOplogStartStatus; + log() << "starting rollback: " << fetcherReturnStatus; _rollback(txn, source, getConnection); stop(); + } else if (!fetcherReturnStatus.isOK()) { + warning() << "Fetcher error querying oplog: " << fetcherReturnStatus.toString(); } } @@ -404,7 +411,7 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& OpTime lastOpTimeFetched, long long lastFetchedHash, Milliseconds fetcherMaxTimeMS, - Status* remoteOplogStartStatus) { + Status* returnStatus) { // if target cut connections between connecting and querying (for // example, because it stepped down) we might not have a cursor if (!result.isOK()) { @@ -465,9 +472,8 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& return *(firstDocToApply++); }; - *remoteOplogStartStatus = - checkRemoteOplogStart(getNextOperation, lastOpTimeFetched, lastFetchedHash); - if (!remoteOplogStartStatus->isOK()) { + *returnStatus = checkRemoteOplogStart(getNextOperation, lastOpTimeFetched, lastFetchedHash); + if (!returnStatus->isOK()) { // Stop fetcher and execute rollback. return; } @@ -485,9 +491,32 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& // The count of the bytes of the documents read off the network. int networkDocumentBytes = 0; - std::for_each(documents.cbegin(), - documents.cend(), - [&networkDocumentBytes](BSONObj doc) { networkDocumentBytes += doc.objsize(); }); + Timestamp lastTS = _lastOpTimeFetched.getTimestamp(); + int count = 0; + for (auto&& doc : documents) { + networkDocumentBytes += doc.objsize(); + ++count; + + // If this is the first response (to the $gte query) then we already applied the first doc. + if (queryResponse.first && count == 1) { + continue; + } + + // Check to see if the oplog entry goes back in time for this document. + const auto docOpTime = OpTime::parseFromOplogEntry(doc); + fassertStatusOK(34362, docOpTime.getStatus()); // entries must have a "ts" field. + const auto docTS = docOpTime.getValue().getTimestamp(); + + if (lastTS >= docTS) { + *returnStatus = Status( + ErrorCodes::OplogOutOfOrder, + str::stream() << "Reading the oplog from" << source.toString() + << " returned out of order entries. lastTS: " << lastTS.toString() + << " outOfOrderTS:" << docTS.toString() << " at count:" << count); + return; + } + lastTS = docTS; + } // These numbers are for the documents we will apply. auto toApplyDocumentCount = documents.size(); diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h index f723dfe1cb5..0afe7728af9 100644 --- a/src/mongo/db/repl/bgsync.h +++ b/src/mongo/db/repl/bgsync.h @@ -174,7 +174,7 @@ private: OpTime lastOpTimeFetched, long long lastFetchedHash, Milliseconds fetcherMaxTimeMS, - Status* remoteOplogStartStatus); + Status* returnStatus); /** * Executes a rollback. diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 2ae54b06eaa..1f8ed3a875a 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -131,6 +131,39 @@ bool isCrudOpType(const char* field) { } return false; } + +void handleSlaveDelay(const Timestamp& ts) { + ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); + int slaveDelaySecs = durationCount<Seconds>(replCoord->getSlaveDelaySecs()); + + // ignore slaveDelay if the box is still initializing. once + // it becomes secondary we can worry about it. + if (slaveDelaySecs > 0 && replCoord->getMemberState().secondary()) { + long long a = ts.getSecs(); + long long b = time(0); + long long lag = b - a; + long long sleeptime = slaveDelaySecs - lag; + if (sleeptime > 0) { + uassert(12000, + "rs slaveDelay differential too big check clocks and systems", + sleeptime < 0x40000000); + if (sleeptime < 60) { + sleepsecs((int)sleeptime); + } else { + warning() << "slavedelay causing a long sleep of " << sleeptime << " seconds"; + // sleep(hours) would prevent reconfigs from taking effect & such! + long long waitUntil = b + sleeptime; + while (time(0) < waitUntil) { + sleepsecs(6); + + // Handle reconfigs that changed the slave delay + if (durationCount<Seconds>(replCoord->getSlaveDelaySecs()) != slaveDelaySecs) + break; + } + } + } + } // endif slaveDelay +} } namespace { @@ -717,7 +750,18 @@ void SyncTail::oplogApplication() { continue; // This wasn't a real op. Don't try to apply it. } - handleSlaveDelay(lastOp); + const auto lastOpTime = fassertStatusOK(28773, OpTime::parseFromOplogEntry(lastOp)); + if (lastWriteOpTime >= lastOpTime) { + // Error for the oplog to go back in time. + fassert(34361, + Status(ErrorCodes::OplogOutOfOrder, + str::stream() << "Attempted to apply an earlier oplog entry (ts: " + << lastOpTime.getTimestamp().toStringPretty() + << ") when our lastWrittenOptime was " + << lastWriteOpTime.toString())); + } + + handleSlaveDelay(lastOpTime.getTimestamp()); // Set minValid to the last OpTime that needs to be applied, in this batch or from the // (last) failed batch, whichever is larger. @@ -725,8 +769,8 @@ void SyncTail::oplogApplication() { // if we should crash and restart before updating finishing. const OpTime start(getLastSetTimestamp(), OpTime::kUninitializedTerm); + // Take the max of the first endOptime (if we recovered) and the end of our batch. - const auto lastOpTime = fassertStatusOK(28773, OpTime::parseFromOplogEntry(lastOp)); // Setting end to the max of originalEndOpTime and lastOpTime (the end of the batch) // ensures that we keep pushing out the point where we can become consistent @@ -843,40 +887,6 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, SyncTail::OpQueue* op return false; } -void SyncTail::handleSlaveDelay(const BSONObj& lastOp) { - ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); - int slaveDelaySecs = durationCount<Seconds>(replCoord->getSlaveDelaySecs()); - - // ignore slaveDelay if the box is still initializing. once - // it becomes secondary we can worry about it. - if (slaveDelaySecs > 0 && replCoord->getMemberState().secondary()) { - const Timestamp ts = lastOp["ts"].timestamp(); - long long a = ts.getSecs(); - long long b = time(0); - long long lag = b - a; - long long sleeptime = slaveDelaySecs - lag; - if (sleeptime > 0) { - uassert(12000, - "rs slaveDelay differential too big check clocks and systems", - sleeptime < 0x40000000); - if (sleeptime < 60) { - sleepsecs((int)sleeptime); - } else { - warning() << "slavedelay causing a long sleep of " << sleeptime << " seconds"; - // sleep(hours) would prevent reconfigs from taking effect & such! - long long waitUntil = b + sleeptime; - while (time(0) < waitUntil) { - sleepsecs(6); - - // Handle reconfigs that changed the slave delay - if (durationCount<Seconds>(replCoord->getSlaveDelaySecs()) != slaveDelaySecs) - break; - } - } - } - } // endif slaveDelay -} - void SyncTail::setHostname(const std::string& hostname) { _hostname = hostname; } diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index e3edacaed2a..8f331cc8cf6 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -187,8 +187,6 @@ private: // Function to use during applyOps MultiSyncApplyFunc _applyFunc; - void handleSlaveDelay(const BSONObj& op); - // persistent pool of worker threads for writing ops to the databases OldThreadPool _writerPool; // persistent pool of worker threads for prefetching |