summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/bgsync.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/bgsync.cpp')
-rw-r--r--src/mongo/db/repl/bgsync.cpp63
1 files changed, 46 insertions, 17 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index d55a0c58528..6759bf85230 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -319,10 +319,7 @@ void BackgroundSync::_produce(OperationContext* txn) {
syncSourceReader.resetConnection();
// no more references to oplog reader from here on.
- // If this status is not OK after the fetcher returns from wait(),
- // proceed to execute rollback
- Status remoteOplogStartStatus = Status::OK();
-
+ Status fetcherReturnStatus = Status::OK();
auto fetcherCallback = stdx::bind(&BackgroundSync::_fetcherCallback,
this,
stdx::placeholders::_1,
@@ -331,7 +328,7 @@ void BackgroundSync::_produce(OperationContext* txn) {
lastOpTimeFetched,
lastHashFetched,
fetcherMaxTimeMS,
- &remoteOplogStartStatus);
+ &fetcherReturnStatus);
BSONObjBuilder cmdBob;
@@ -376,10 +373,18 @@ void BackgroundSync::_produce(OperationContext* txn) {
return;
}
- // Execute rollback if necessary.
- // Rollback is a synchronous operation that uses the task executor and may not be
- // executed inside the fetcher callback.
- if (!remoteOplogStartStatus.isOK()) {
+ if (fetcherReturnStatus.code() == ErrorCodes::OplogOutOfOrder) {
+ // This is bad because it means that our source
+ // has not returned oplog entries in ascending ts order, and they need to be.
+
+ warning() << fetcherReturnStatus.toString();
+ // Do not blacklist the server here, it will be blacklisted when we try to reuse it,
+ // if it can't return a matching oplog start from the last fetch oplog ts field.
+ return;
+ } else if (fetcherReturnStatus.code() == ErrorCodes::OplogStartMissing ||
+ fetcherReturnStatus.code() == ErrorCodes::RemoteOplogStale) {
+ // Rollback is a synchronous operation that uses the task executor and may not be
+ // executed inside the fetcher callback.
const int messagingPortTags = 0;
ConnectionPool connectionPool(messagingPortTags);
std::unique_ptr<ConnectionPool::ConnectionPtr> connection;
@@ -392,9 +397,11 @@ void BackgroundSync::_produce(OperationContext* txn) {
return connection->get();
};
- log() << "starting rollback: " << remoteOplogStartStatus;
+ log() << "starting rollback: " << fetcherReturnStatus;
_rollback(txn, source, getConnection);
stop();
+ } else if (!fetcherReturnStatus.isOK()) {
+ warning() << "Fetcher error querying oplog: " << fetcherReturnStatus.toString();
}
}
@@ -404,7 +411,7 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
OpTime lastOpTimeFetched,
long long lastFetchedHash,
Milliseconds fetcherMaxTimeMS,
- Status* remoteOplogStartStatus) {
+ Status* returnStatus) {
// if target cut connections between connecting and querying (for
// example, because it stepped down) we might not have a cursor
if (!result.isOK()) {
@@ -465,9 +472,8 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
return *(firstDocToApply++);
};
- *remoteOplogStartStatus =
- checkRemoteOplogStart(getNextOperation, lastOpTimeFetched, lastFetchedHash);
- if (!remoteOplogStartStatus->isOK()) {
+ *returnStatus = checkRemoteOplogStart(getNextOperation, lastOpTimeFetched, lastFetchedHash);
+ if (!returnStatus->isOK()) {
// Stop fetcher and execute rollback.
return;
}
@@ -485,9 +491,32 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
// The count of the bytes of the documents read off the network.
int networkDocumentBytes = 0;
- std::for_each(documents.cbegin(),
- documents.cend(),
- [&networkDocumentBytes](BSONObj doc) { networkDocumentBytes += doc.objsize(); });
+ Timestamp lastTS = _lastOpTimeFetched.getTimestamp();
+ int count = 0;
+ for (auto&& doc : documents) {
+ networkDocumentBytes += doc.objsize();
+ ++count;
+
+ // If this is the first response (to the $gte query) then we already applied the first doc.
+ if (queryResponse.first && count == 1) {
+ continue;
+ }
+
+ // Check to see if the oplog entry goes back in time for this document.
+ const auto docOpTime = OpTime::parseFromOplogEntry(doc);
+ fassertStatusOK(34362, docOpTime.getStatus()); // entries must have a "ts" field.
+ const auto docTS = docOpTime.getValue().getTimestamp();
+
+ if (lastTS >= docTS) {
+ *returnStatus = Status(
+ ErrorCodes::OplogOutOfOrder,
+ str::stream() << "Reading the oplog from" << source.toString()
+ << " returned out of order entries. lastTS: " << lastTS.toString()
+ << " outOfOrderTS:" << docTS.toString() << " at count:" << count);
+ return;
+ }
+ lastTS = docTS;
+ }
// These numbers are for the documents we will apply.
auto toApplyDocumentCount = documents.size();