summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorScott Hernandez <scotthernandez@gmail.com>2015-12-17 15:41:11 -0500
committerScott Hernandez <scotthernandez@gmail.com>2015-12-17 15:41:30 -0500
commitdb3259d651227d00a658dff26e2f05167f1a5aea (patch)
tree87ba2af009d335b0b10d57e31445e43793e7d34d
parent466dae32f1ad27bc867e13b5c763a9f48d88981b (diff)
downloadmongo-db3259d651227d00a658dff26e2f05167f1a5aea.tar.gz
Revert "SERVER-21930 - Restart oplog query if oplog entries are out of order"
This reverts commit 06ff25a41c6ac560c5b9d2fc6a32c13b1346c48d. (cherry picked from commit 9ef32d72f37319fabf49296671b6fd1c23ecb46c)
-rw-r--r--src/mongo/base/error_codes.err1
-rw-r--r--src/mongo/db/repl/bgsync.cpp57
-rw-r--r--src/mongo/db/repl/bgsync.h2
-rw-r--r--src/mongo/db/repl/sync_tail.cpp82
-rw-r--r--src/mongo/db/repl/sync_tail.h2
5 files changed, 56 insertions, 88 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index faed0dbb14a..ece75d7797e 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -151,7 +151,6 @@ error_code("ReadConcernMajorityNotEnabled", 148)
error_code("NoConfigMaster", 149)
error_code("StaleEpoch", 150)
error_code("OperationCannotBeBatched", 151)
-error_code("OplogOutOfOrder", 152)
# Non-sequential error codes (for compatibility only)
error_code("RecvStaleConfig", 9996)
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 72b6719821b..d55a0c58528 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -319,7 +319,10 @@ void BackgroundSync::_produce(OperationContext* txn) {
syncSourceReader.resetConnection();
// no more references to oplog reader from here on.
- Status fetcherReturnStatus = Status::OK();
+ // If this status is not OK after the fetcher returns from wait(),
+ // proceed to execute rollback
+ Status remoteOplogStartStatus = Status::OK();
+
auto fetcherCallback = stdx::bind(&BackgroundSync::_fetcherCallback,
this,
stdx::placeholders::_1,
@@ -328,7 +331,7 @@ void BackgroundSync::_produce(OperationContext* txn) {
lastOpTimeFetched,
lastHashFetched,
fetcherMaxTimeMS,
- &fetcherReturnStatus);
+ &remoteOplogStartStatus);
BSONObjBuilder cmdBob;
@@ -373,18 +376,10 @@ void BackgroundSync::_produce(OperationContext* txn) {
return;
}
- if (fetcherReturnStatus.code() == ErrorCodes::OplogOutOfOrder) {
- // This is bad because it means that our source
- // has not returned oplog entries in ascending ts order, and they need to be.
-
- warning() << fetcherReturnStatus.toString();
- // Do not blacklist the server here, it will be blacklisted when we try to reuse it,
- // if it can't return a matching oplog start from the last fetch oplog ts field.
- return;
- } else if (fetcherReturnStatus.code() == ErrorCodes::OplogStartMissing ||
- fetcherReturnStatus.code() == ErrorCodes::RemoteOplogStale) {
- // Rollback is a synchronous operation that uses the task executor and may not be
- // executed inside the fetcher callback.
+ // Execute rollback if necessary.
+ // Rollback is a synchronous operation that uses the task executor and may not be
+ // executed inside the fetcher callback.
+ if (!remoteOplogStartStatus.isOK()) {
const int messagingPortTags = 0;
ConnectionPool connectionPool(messagingPortTags);
std::unique_ptr<ConnectionPool::ConnectionPtr> connection;
@@ -397,11 +392,9 @@ void BackgroundSync::_produce(OperationContext* txn) {
return connection->get();
};
- log() << "starting rollback: " << fetcherReturnStatus;
+ log() << "starting rollback: " << remoteOplogStartStatus;
_rollback(txn, source, getConnection);
stop();
- } else if (!fetcherReturnStatus.isOK()) {
- warning() << "Fetcher error querying oplog: " << fetcherReturnStatus.toString();
}
}
@@ -411,7 +404,7 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
OpTime lastOpTimeFetched,
long long lastFetchedHash,
Milliseconds fetcherMaxTimeMS,
- Status* returnStatus) {
+ Status* remoteOplogStartStatus) {
// if target cut connections between connecting and querying (for
// example, because it stepped down) we might not have a cursor
if (!result.isOK()) {
@@ -472,8 +465,9 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
return *(firstDocToApply++);
};
- *returnStatus = checkRemoteOplogStart(getNextOperation, lastOpTimeFetched, lastFetchedHash);
- if (!returnStatus->isOK()) {
+ *remoteOplogStartStatus =
+ checkRemoteOplogStart(getNextOperation, lastOpTimeFetched, lastFetchedHash);
+ if (!remoteOplogStartStatus->isOK()) {
// Stop fetcher and execute rollback.
return;
}
@@ -491,26 +485,9 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
// The count of the bytes of the documents read off the network.
int networkDocumentBytes = 0;
- Timestamp lastTS = _lastOpTimeFetched.getTimestamp();
- int count = 0;
- for (auto&& doc : documents) {
- networkDocumentBytes += doc.objsize();
- // Check to see if the oplog entry goes back in time for this document.
- const auto docOpTime = OpTime::parseFromOplogEntry(doc);
- fassertStatusOK(34362, docOpTime.getStatus()); // entries must have a "ts" field.
- const auto docTS = docOpTime.getValue().getTimestamp();
-
- if (lastTS >= docTS) {
- *returnStatus = Status(
- ErrorCodes::OplogOutOfOrder,
- str::stream() << "Reading the oplog from" << source.toString()
- << " returned out of order entries. lastTS: " << lastTS.toString()
- << " outOfOrderTS:" << docTS.toString() << " at count:" << count);
- return;
- }
- lastTS = docTS;
- ++count;
- }
+ std::for_each(documents.cbegin(),
+ documents.cend(),
+ [&networkDocumentBytes](BSONObj doc) { networkDocumentBytes += doc.objsize(); });
// These numbers are for the documents we will apply.
auto toApplyDocumentCount = documents.size();
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h
index 0afe7728af9..f723dfe1cb5 100644
--- a/src/mongo/db/repl/bgsync.h
+++ b/src/mongo/db/repl/bgsync.h
@@ -174,7 +174,7 @@ private:
OpTime lastOpTimeFetched,
long long lastFetchedHash,
Milliseconds fetcherMaxTimeMS,
- Status* returnStatus);
+ Status* remoteOplogStartStatus);
/**
* Executes a rollback.
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 1f8ed3a875a..2ae54b06eaa 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -131,39 +131,6 @@ bool isCrudOpType(const char* field) {
}
return false;
}
-
-void handleSlaveDelay(const Timestamp& ts) {
- ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
- int slaveDelaySecs = durationCount<Seconds>(replCoord->getSlaveDelaySecs());
-
- // ignore slaveDelay if the box is still initializing. once
- // it becomes secondary we can worry about it.
- if (slaveDelaySecs > 0 && replCoord->getMemberState().secondary()) {
- long long a = ts.getSecs();
- long long b = time(0);
- long long lag = b - a;
- long long sleeptime = slaveDelaySecs - lag;
- if (sleeptime > 0) {
- uassert(12000,
- "rs slaveDelay differential too big check clocks and systems",
- sleeptime < 0x40000000);
- if (sleeptime < 60) {
- sleepsecs((int)sleeptime);
- } else {
- warning() << "slavedelay causing a long sleep of " << sleeptime << " seconds";
- // sleep(hours) would prevent reconfigs from taking effect & such!
- long long waitUntil = b + sleeptime;
- while (time(0) < waitUntil) {
- sleepsecs(6);
-
- // Handle reconfigs that changed the slave delay
- if (durationCount<Seconds>(replCoord->getSlaveDelaySecs()) != slaveDelaySecs)
- break;
- }
- }
- }
- } // endif slaveDelay
-}
}
namespace {
@@ -750,18 +717,7 @@ void SyncTail::oplogApplication() {
continue; // This wasn't a real op. Don't try to apply it.
}
- const auto lastOpTime = fassertStatusOK(28773, OpTime::parseFromOplogEntry(lastOp));
- if (lastWriteOpTime >= lastOpTime) {
- // Error for the oplog to go back in time.
- fassert(34361,
- Status(ErrorCodes::OplogOutOfOrder,
- str::stream() << "Attempted to apply an earlier oplog entry (ts: "
- << lastOpTime.getTimestamp().toStringPretty()
- << ") when our lastWrittenOptime was "
- << lastWriteOpTime.toString()));
- }
-
- handleSlaveDelay(lastOpTime.getTimestamp());
+ handleSlaveDelay(lastOp);
// Set minValid to the last OpTime that needs to be applied, in this batch or from the
// (last) failed batch, whichever is larger.
@@ -769,8 +725,8 @@ void SyncTail::oplogApplication() {
// if we should crash and restart before updating finishing.
const OpTime start(getLastSetTimestamp(), OpTime::kUninitializedTerm);
-
// Take the max of the first endOptime (if we recovered) and the end of our batch.
+ const auto lastOpTime = fassertStatusOK(28773, OpTime::parseFromOplogEntry(lastOp));
// Setting end to the max of originalEndOpTime and lastOpTime (the end of the batch)
// ensures that we keep pushing out the point where we can become consistent
@@ -887,6 +843,40 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, SyncTail::OpQueue* op
return false;
}
+void SyncTail::handleSlaveDelay(const BSONObj& lastOp) {
+ ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
+ int slaveDelaySecs = durationCount<Seconds>(replCoord->getSlaveDelaySecs());
+
+ // ignore slaveDelay if the box is still initializing. once
+ // it becomes secondary we can worry about it.
+ if (slaveDelaySecs > 0 && replCoord->getMemberState().secondary()) {
+ const Timestamp ts = lastOp["ts"].timestamp();
+ long long a = ts.getSecs();
+ long long b = time(0);
+ long long lag = b - a;
+ long long sleeptime = slaveDelaySecs - lag;
+ if (sleeptime > 0) {
+ uassert(12000,
+ "rs slaveDelay differential too big check clocks and systems",
+ sleeptime < 0x40000000);
+ if (sleeptime < 60) {
+ sleepsecs((int)sleeptime);
+ } else {
+ warning() << "slavedelay causing a long sleep of " << sleeptime << " seconds";
+ // sleep(hours) would prevent reconfigs from taking effect & such!
+ long long waitUntil = b + sleeptime;
+ while (time(0) < waitUntil) {
+ sleepsecs(6);
+
+ // Handle reconfigs that changed the slave delay
+ if (durationCount<Seconds>(replCoord->getSlaveDelaySecs()) != slaveDelaySecs)
+ break;
+ }
+ }
+ }
+ } // endif slaveDelay
+}
+
void SyncTail::setHostname(const std::string& hostname) {
_hostname = hostname;
}
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index 8f331cc8cf6..e3edacaed2a 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -187,6 +187,8 @@ private:
// Function to use during applyOps
MultiSyncApplyFunc _applyFunc;
+ void handleSlaveDelay(const BSONObj& op);
+
// persistent pool of worker threads for writing ops to the databases
OldThreadPool _writerPool;
// persistent pool of worker threads for prefetching