summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/oplog_fetcher.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/oplog_fetcher.cpp')
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp84
1 files changed, 75 insertions, 9 deletions
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index d8aacca5a04..052de5cba7f 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -144,21 +144,75 @@ StatusWith<BSONObj> makeMetadataObject(bool isV1ElectionProtocol) {
* Checks the first batch of results from query.
* 'documents' are the first batch of results returned from tailing the remote oplog.
* 'lastFetched' optime and hash should be consistent with the predicate in the query.
- * Returns RemoteOplogStale if the oplog query has no results.
+ * 'remoteLastOpApplied' is the last OpTime applied on the sync source. This is optional for
+ * compatibility with 3.4 servers that do not send OplogQueryMetadata.
+ * 'requiredRBID' is a RollbackID received when we chose the sync source that we use here to
+ * guarantee we have not rolled back since we confirmed the sync source had our minValid.
+ * 'remoteRBID' is a RollbackId for the sync source returned in this oplog query. This is optional
+ * for compatibility with 3.4 servers that do not send OplogQueryMetadata.
+ * 'requireFresherSyncSource' is a boolean indicating whether we should require the sync source's
+ * oplog to be ahead of ours. If false, the sync source's oplog is allowed to be at the same point
+ * as ours, but still cannot be behind ours.
+ *
+ * TODO (SERVER-27668): Make remoteLastOpApplied and remoteRBID non-optional in mongodb 3.8.
+ *
* Returns OplogStartMissing if we cannot find the optime of the last fetched operation in
* the remote oplog.
*/
-Status checkRemoteOplogStart(const Fetcher::Documents& documents, OpTimeWithHash lastFetched) {
+Status checkRemoteOplogStart(const Fetcher::Documents& documents,
+ OpTimeWithHash lastFetched,
+ boost::optional<OpTime> remoteLastOpApplied,
+ int requiredRBID,
+ boost::optional<int> remoteRBID,
+ bool requireFresherSyncSource) {
+ // Once we establish our cursor, we need to ensure that our upstream node hasn't rolled back
+ // since that could cause it to not have our required minValid point. The cursor will be
+ // killed if the upstream node rolls back so we don't need to keep checking once the cursor
+ // is established.
+ if (remoteRBID && (*remoteRBID != requiredRBID)) {
+ return Status(ErrorCodes::InvalidSyncSource,
+ "Upstream node rolled back after choosing it as a sync source. Choosing "
+ "new sync source.");
+ }
+
+ // The SyncSourceResolver never checks that the sync source candidate is actually ahead of
+ // us. Rather than have it check there with an extra network roundtrip, we check here.
+ if (requireFresherSyncSource && remoteLastOpApplied &&
+ (*remoteLastOpApplied <= lastFetched.opTime)) {
+ return Status(ErrorCodes::InvalidSyncSource,
+ str::stream() << "Sync source's last applied OpTime "
+ << remoteLastOpApplied->toString()
+ << " is not greater than our last fetched OpTime "
+ << lastFetched.opTime.toString()
+ << ". Choosing new sync source.");
+ } else if (remoteLastOpApplied && (*remoteLastOpApplied < lastFetched.opTime)) {
+ // In initial sync, the lastFetched OpTime will almost always equal the remoteLastOpApplied
+ // since we fetch the sync source's last applied OpTime to determine where to start our
+ // OplogFetcher. This is fine since no other node can sync off of an initial syncing node
+ // and thus cannot form a sync source cycle. To account for this, we must relax the
+ // constraint on our sync source being fresher.
+ return Status(ErrorCodes::InvalidSyncSource,
+ str::stream() << "Sync source's last applied OpTime "
+ << remoteLastOpApplied->toString()
+ << " is older than our last fetched OpTime "
+ << lastFetched.opTime.toString()
+ << ". Choosing new sync source.");
+ }
+
+ // At this point we know that our sync source has our minValid and is ahead of us, so if our
+ // history diverges from our sync source's we should prefer its history and roll back ours.
+
+ // Since we checked for rollback and our sync source is ahead of us, an empty batch means that
+ // we have a higher timestamp on our last fetched OpTime than our sync source's last applied
+ // OpTime, but a lower term. When this occurs, we must roll back our inconsistent oplog entry.
if (documents.empty()) {
- // The GTE query from upstream returns nothing, so we're ahead of the upstream.
- return Status(ErrorCodes::RemoteOplogStale,
- str::stream() << "We are ahead of the sync source. Our last op time fetched: "
- << lastFetched.opTime.toString());
+ return Status(ErrorCodes::OplogStartMissing, "Received an empty batch from sync source.");
}
+
const auto& o = documents.front();
auto opTimeResult = OpTime::parseFromOplogEntry(o);
if (!opTimeResult.isOK()) {
- return Status(ErrorCodes::OplogStartMissing,
+ return Status(ErrorCodes::InvalidBSON,
str::stream() << "our last op time fetched: " << lastFetched.opTime.toString()
<< " (hash: "
<< lastFetched.value
@@ -277,6 +331,8 @@ OplogFetcher::OplogFetcher(executor::TaskExecutor* executor,
NamespaceString nss,
ReplSetConfig config,
std::size_t maxFetcherRestarts,
+ int requiredRBID,
+ bool requireFresherSyncSource,
DataReplicatorExternalState* dataReplicatorExternalState,
EnqueueDocumentsFn enqueueDocumentsFn,
OnShutdownCallbackFn onShutdownCallbackFn)
@@ -285,6 +341,8 @@ OplogFetcher::OplogFetcher(executor::TaskExecutor* executor,
_nss(nss),
_metadataObject(uassertStatusOK(makeMetadataObject(config.getProtocolVersion() == 1LL))),
_maxFetcherRestarts(maxFetcherRestarts),
+ _requiredRBID(requiredRBID),
+ _requireFresherSyncSource(requireFresherSyncSource),
_dataReplicatorExternalState(dataReplicatorExternalState),
_enqueueDocumentsFn(enqueueDocumentsFn),
_awaitDataTimeout(calculateAwaitDataTimeout(config)),
@@ -480,9 +538,17 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
// Check start of remote oplog and, if necessary, stop fetcher to execute rollback.
if (queryResponse.first) {
- auto status = checkRemoteOplogStart(documents, opTimeWithHash);
+ auto remoteRBID = oqMetadata ? boost::make_optional(oqMetadata->getRBID()) : boost::none;
+ auto remoteLastApplied =
+ oqMetadata ? boost::make_optional(oqMetadata->getLastOpApplied()) : boost::none;
+ auto status = checkRemoteOplogStart(documents,
+ opTimeWithHash,
+ remoteLastApplied,
+ _requiredRBID,
+ remoteRBID,
+ _requireFresherSyncSource);
if (!status.isOK()) {
- // Stop oplog fetcher and execute rollback.
+ // Stop oplog fetcher and execute rollback if necessary.
_finishCallback(status, opTimeWithHash);
return;
}