diff options
author | Scott Hernandez <scotthernandez@gmail.com> | 2015-12-17 11:10:14 -0500 |
---|---|---|
committer | Scott Hernandez <scotthernandez@gmail.com> | 2015-12-17 16:12:21 -0500 |
commit | 6331c474bae740e4b54c9741925013b5cbdbc5f9 (patch) | |
tree | 7178382f3bc9130bc42cc32e2fbf50d05f62d78f /src/mongo/db/repl/bgsync.cpp | |
parent | db3259d651227d00a658dff26e2f05167f1a5aea (diff) | |
download | mongo-6331c474bae740e4b54c9741925013b5cbdbc5f9.tar.gz |
SERVER-21930 - Restart oplog query if oplog entries are out of order
(cherry picked from commit 06ff25a41c6ac560c5b9d2fc6a32c13b1346c48d)
(cherry picked from commit ac919e016150819dde59f21c2e6327e5f24754bd)
Diffstat (limited to 'src/mongo/db/repl/bgsync.cpp')
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 63 |
1 files changed, 46 insertions, 17 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index d55a0c58528..6759bf85230 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -319,10 +319,7 @@ void BackgroundSync::_produce(OperationContext* txn) { syncSourceReader.resetConnection(); // no more references to oplog reader from here on. - // If this status is not OK after the fetcher returns from wait(), - // proceed to execute rollback - Status remoteOplogStartStatus = Status::OK(); - + Status fetcherReturnStatus = Status::OK(); auto fetcherCallback = stdx::bind(&BackgroundSync::_fetcherCallback, this, stdx::placeholders::_1, @@ -331,7 +328,7 @@ void BackgroundSync::_produce(OperationContext* txn) { lastOpTimeFetched, lastHashFetched, fetcherMaxTimeMS, - &remoteOplogStartStatus); + &fetcherReturnStatus); BSONObjBuilder cmdBob; @@ -376,10 +373,18 @@ void BackgroundSync::_produce(OperationContext* txn) { return; } - // Execute rollback if necessary. - // Rollback is a synchronous operation that uses the task executor and may not be - // executed inside the fetcher callback. - if (!remoteOplogStartStatus.isOK()) { + if (fetcherReturnStatus.code() == ErrorCodes::OplogOutOfOrder) { + // This is bad because it means that our source + // has not returned oplog entries in ascending ts order, and they need to be. + + warning() << fetcherReturnStatus.toString(); + // Do not blacklist the server here, it will be blacklisted when we try to reuse it, + // if it can't return a matching oplog start from the last fetch oplog ts field. + return; + } else if (fetcherReturnStatus.code() == ErrorCodes::OplogStartMissing || + fetcherReturnStatus.code() == ErrorCodes::RemoteOplogStale) { + // Rollback is a synchronous operation that uses the task executor and may not be + // executed inside the fetcher callback. const int messagingPortTags = 0; ConnectionPool connectionPool(messagingPortTags); std::unique_ptr<ConnectionPool::ConnectionPtr> connection; @@ -392,9 +397,11 @@ void BackgroundSync::_produce(OperationContext* txn) { return connection->get(); }; - log() << "starting rollback: " << remoteOplogStartStatus; + log() << "starting rollback: " << fetcherReturnStatus; _rollback(txn, source, getConnection); stop(); + } else if (!fetcherReturnStatus.isOK()) { + warning() << "Fetcher error querying oplog: " << fetcherReturnStatus.toString(); } } @@ -404,7 +411,7 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& OpTime lastOpTimeFetched, long long lastFetchedHash, Milliseconds fetcherMaxTimeMS, - Status* remoteOplogStartStatus) { + Status* returnStatus) { // if target cut connections between connecting and querying (for // example, because it stepped down) we might not have a cursor if (!result.isOK()) { @@ -465,9 +472,8 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& return *(firstDocToApply++); }; - *remoteOplogStartStatus = - checkRemoteOplogStart(getNextOperation, lastOpTimeFetched, lastFetchedHash); - if (!remoteOplogStartStatus->isOK()) { + *returnStatus = checkRemoteOplogStart(getNextOperation, lastOpTimeFetched, lastFetchedHash); + if (!returnStatus->isOK()) { // Stop fetcher and execute rollback. return; } @@ -485,9 +491,32 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& // The count of the bytes of the documents read off the network. int networkDocumentBytes = 0; - std::for_each(documents.cbegin(), - documents.cend(), - [&networkDocumentBytes](BSONObj doc) { networkDocumentBytes += doc.objsize(); }); + Timestamp lastTS = _lastOpTimeFetched.getTimestamp(); + int count = 0; + for (auto&& doc : documents) { + networkDocumentBytes += doc.objsize(); + ++count; + + // If this is the first response (to the $gte query) then we already applied the first doc. + if (queryResponse.first && count == 1) { + continue; + } + + // Check to see if the oplog entry goes back in time for this document. + const auto docOpTime = OpTime::parseFromOplogEntry(doc); + fassertStatusOK(34362, docOpTime.getStatus()); // entries must have a "ts" field. + const auto docTS = docOpTime.getValue().getTimestamp(); + + if (lastTS >= docTS) { + *returnStatus = Status( + ErrorCodes::OplogOutOfOrder, + str::stream() << "Reading the oplog from" << source.toString() + << " returned out of order entries. lastTS: " << lastTS.toString() + << " outOfOrderTS:" << docTS.toString() << " at count:" << count); + return; + } + lastTS = docTS; + } // These numbers are for the documents we will apply. auto toApplyDocumentCount = documents.size(); |