summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorScott Hernandez <scotthernandez@gmail.com>2015-12-17 11:10:14 -0500
committerScott Hernandez <scotthernandez@gmail.com>2015-12-17 14:32:38 -0500
commit466dae32f1ad27bc867e13b5c763a9f48d88981b (patch)
treeb635563ea81223012cf85d95f724b32b7b9c0653
parentf785174734fcf309c6be9cbc5f8a3ae591ce4dfd (diff)
downloadmongo-466dae32f1ad27bc867e13b5c763a9f48d88981b.tar.gz
SERVER-21930 - Restart oplog query if oplog entries are out of order
(cherry picked from commit 06ff25a41c6ac560c5b9d2fc6a32c13b1346c48d)
-rw-r--r--src/mongo/base/error_codes.err1
-rw-r--r--src/mongo/db/repl/bgsync.cpp57
-rw-r--r--src/mongo/db/repl/bgsync.h2
-rw-r--r--src/mongo/db/repl/sync_tail.cpp82
-rw-r--r--src/mongo/db/repl/sync_tail.h2
5 files changed, 88 insertions, 56 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index ece75d7797e..faed0dbb14a 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -151,6 +151,7 @@ error_code("ReadConcernMajorityNotEnabled", 148)
error_code("NoConfigMaster", 149)
error_code("StaleEpoch", 150)
error_code("OperationCannotBeBatched", 151)
+error_code("OplogOutOfOrder", 152)
# Non-sequential error codes (for compatibility only)
error_code("RecvStaleConfig", 9996)
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index d55a0c58528..72b6719821b 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -319,10 +319,7 @@ void BackgroundSync::_produce(OperationContext* txn) {
syncSourceReader.resetConnection();
// no more references to oplog reader from here on.
- // If this status is not OK after the fetcher returns from wait(),
- // proceed to execute rollback
- Status remoteOplogStartStatus = Status::OK();
-
+ Status fetcherReturnStatus = Status::OK();
auto fetcherCallback = stdx::bind(&BackgroundSync::_fetcherCallback,
this,
stdx::placeholders::_1,
@@ -331,7 +328,7 @@ void BackgroundSync::_produce(OperationContext* txn) {
lastOpTimeFetched,
lastHashFetched,
fetcherMaxTimeMS,
- &remoteOplogStartStatus);
+ &fetcherReturnStatus);
BSONObjBuilder cmdBob;
@@ -376,10 +373,18 @@ void BackgroundSync::_produce(OperationContext* txn) {
return;
}
- // Execute rollback if necessary.
- // Rollback is a synchronous operation that uses the task executor and may not be
- // executed inside the fetcher callback.
- if (!remoteOplogStartStatus.isOK()) {
+ if (fetcherReturnStatus.code() == ErrorCodes::OplogOutOfOrder) {
+ // This is bad because it means that our source
+ // has not returned oplog entries in ascending ts order, and they need to be.
+
+ warning() << fetcherReturnStatus.toString();
+ // Do not blacklist the server here, it will be blacklisted when we try to reuse it,
+ // if it can't return a matching oplog start from the last fetch oplog ts field.
+ return;
+ } else if (fetcherReturnStatus.code() == ErrorCodes::OplogStartMissing ||
+ fetcherReturnStatus.code() == ErrorCodes::RemoteOplogStale) {
+ // Rollback is a synchronous operation that uses the task executor and may not be
+ // executed inside the fetcher callback.
const int messagingPortTags = 0;
ConnectionPool connectionPool(messagingPortTags);
std::unique_ptr<ConnectionPool::ConnectionPtr> connection;
@@ -392,9 +397,11 @@ void BackgroundSync::_produce(OperationContext* txn) {
return connection->get();
};
- log() << "starting rollback: " << remoteOplogStartStatus;
+ log() << "starting rollback: " << fetcherReturnStatus;
_rollback(txn, source, getConnection);
stop();
+ } else if (!fetcherReturnStatus.isOK()) {
+ warning() << "Fetcher error querying oplog: " << fetcherReturnStatus.toString();
}
}
@@ -404,7 +411,7 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
OpTime lastOpTimeFetched,
long long lastFetchedHash,
Milliseconds fetcherMaxTimeMS,
- Status* remoteOplogStartStatus) {
+ Status* returnStatus) {
// if target cut connections between connecting and querying (for
// example, because it stepped down) we might not have a cursor
if (!result.isOK()) {
@@ -465,9 +472,8 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
return *(firstDocToApply++);
};
- *remoteOplogStartStatus =
- checkRemoteOplogStart(getNextOperation, lastOpTimeFetched, lastFetchedHash);
- if (!remoteOplogStartStatus->isOK()) {
+ *returnStatus = checkRemoteOplogStart(getNextOperation, lastOpTimeFetched, lastFetchedHash);
+ if (!returnStatus->isOK()) {
// Stop fetcher and execute rollback.
return;
}
@@ -485,9 +491,26 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
// The count of the bytes of the documents read off the network.
int networkDocumentBytes = 0;
- std::for_each(documents.cbegin(),
- documents.cend(),
- [&networkDocumentBytes](BSONObj doc) { networkDocumentBytes += doc.objsize(); });
+ Timestamp lastTS = _lastOpTimeFetched.getTimestamp();
+ int count = 0;
+ for (auto&& doc : documents) {
+ networkDocumentBytes += doc.objsize();
+ // Check to see if the oplog entry goes back in time for this document.
+ const auto docOpTime = OpTime::parseFromOplogEntry(doc);
+ fassertStatusOK(34362, docOpTime.getStatus()); // entries must have a "ts" field.
+ const auto docTS = docOpTime.getValue().getTimestamp();
+
+ if (lastTS >= docTS) {
+ *returnStatus = Status(
+ ErrorCodes::OplogOutOfOrder,
+ str::stream() << "Reading the oplog from" << source.toString()
+ << " returned out of order entries. lastTS: " << lastTS.toString()
+ << " outOfOrderTS:" << docTS.toString() << " at count:" << count);
+ return;
+ }
+ lastTS = docTS;
+ ++count;
+ }
// These numbers are for the documents we will apply.
auto toApplyDocumentCount = documents.size();
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h
index f723dfe1cb5..0afe7728af9 100644
--- a/src/mongo/db/repl/bgsync.h
+++ b/src/mongo/db/repl/bgsync.h
@@ -174,7 +174,7 @@ private:
OpTime lastOpTimeFetched,
long long lastFetchedHash,
Milliseconds fetcherMaxTimeMS,
- Status* remoteOplogStartStatus);
+ Status* returnStatus);
/**
* Executes a rollback.
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 2ae54b06eaa..1f8ed3a875a 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -131,6 +131,39 @@ bool isCrudOpType(const char* field) {
}
return false;
}
+
+void handleSlaveDelay(const Timestamp& ts) {
+ ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
+ int slaveDelaySecs = durationCount<Seconds>(replCoord->getSlaveDelaySecs());
+
+ // ignore slaveDelay if the box is still initializing. once
+ // it becomes secondary we can worry about it.
+ if (slaveDelaySecs > 0 && replCoord->getMemberState().secondary()) {
+ long long a = ts.getSecs();
+ long long b = time(0);
+ long long lag = b - a;
+ long long sleeptime = slaveDelaySecs - lag;
+ if (sleeptime > 0) {
+ uassert(12000,
+ "rs slaveDelay differential too big check clocks and systems",
+ sleeptime < 0x40000000);
+ if (sleeptime < 60) {
+ sleepsecs((int)sleeptime);
+ } else {
+ warning() << "slavedelay causing a long sleep of " << sleeptime << " seconds";
+ // sleep(hours) would prevent reconfigs from taking effect & such!
+ long long waitUntil = b + sleeptime;
+ while (time(0) < waitUntil) {
+ sleepsecs(6);
+
+ // Handle reconfigs that changed the slave delay
+ if (durationCount<Seconds>(replCoord->getSlaveDelaySecs()) != slaveDelaySecs)
+ break;
+ }
+ }
+ }
+ } // endif slaveDelay
+}
}
namespace {
@@ -717,7 +750,18 @@ void SyncTail::oplogApplication() {
continue; // This wasn't a real op. Don't try to apply it.
}
- handleSlaveDelay(lastOp);
+ const auto lastOpTime = fassertStatusOK(28773, OpTime::parseFromOplogEntry(lastOp));
+ if (lastWriteOpTime >= lastOpTime) {
+ // Error for the oplog to go back in time.
+ fassert(34361,
+ Status(ErrorCodes::OplogOutOfOrder,
+ str::stream() << "Attempted to apply an earlier oplog entry (ts: "
+ << lastOpTime.getTimestamp().toStringPretty()
+ << ") when our lastWrittenOptime was "
+ << lastWriteOpTime.toString()));
+ }
+
+ handleSlaveDelay(lastOpTime.getTimestamp());
// Set minValid to the last OpTime that needs to be applied, in this batch or from the
// (last) failed batch, whichever is larger.
@@ -725,8 +769,8 @@ void SyncTail::oplogApplication() {
// if we should crash and restart before updating finishing.
const OpTime start(getLastSetTimestamp(), OpTime::kUninitializedTerm);
+
// Take the max of the first endOptime (if we recovered) and the end of our batch.
- const auto lastOpTime = fassertStatusOK(28773, OpTime::parseFromOplogEntry(lastOp));
// Setting end to the max of originalEndOpTime and lastOpTime (the end of the batch)
// ensures that we keep pushing out the point where we can become consistent
@@ -843,40 +887,6 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, SyncTail::OpQueue* op
return false;
}
-void SyncTail::handleSlaveDelay(const BSONObj& lastOp) {
- ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
- int slaveDelaySecs = durationCount<Seconds>(replCoord->getSlaveDelaySecs());
-
- // ignore slaveDelay if the box is still initializing. once
- // it becomes secondary we can worry about it.
- if (slaveDelaySecs > 0 && replCoord->getMemberState().secondary()) {
- const Timestamp ts = lastOp["ts"].timestamp();
- long long a = ts.getSecs();
- long long b = time(0);
- long long lag = b - a;
- long long sleeptime = slaveDelaySecs - lag;
- if (sleeptime > 0) {
- uassert(12000,
- "rs slaveDelay differential too big check clocks and systems",
- sleeptime < 0x40000000);
- if (sleeptime < 60) {
- sleepsecs((int)sleeptime);
- } else {
- warning() << "slavedelay causing a long sleep of " << sleeptime << " seconds";
- // sleep(hours) would prevent reconfigs from taking effect & such!
- long long waitUntil = b + sleeptime;
- while (time(0) < waitUntil) {
- sleepsecs(6);
-
- // Handle reconfigs that changed the slave delay
- if (durationCount<Seconds>(replCoord->getSlaveDelaySecs()) != slaveDelaySecs)
- break;
- }
- }
- }
- } // endif slaveDelay
-}
-
void SyncTail::setHostname(const std::string& hostname) {
_hostname = hostname;
}
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index e3edacaed2a..8f331cc8cf6 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -187,8 +187,6 @@ private:
// Function to use during applyOps
MultiSyncApplyFunc _applyFunc;
- void handleSlaveDelay(const BSONObj& op);
-
// persistent pool of worker threads for writing ops to the databases
OldThreadPool _writerPool;
// persistent pool of worker threads for prefetching