summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSamy Lanka <samy.lanka@mongodb.com>2019-01-24 21:57:25 -0500
committerSamy Lanka <samy.lanka@mongodb.com>2019-02-11 12:27:33 -0500
commit4f858c52b05ecc49d2ae19bbaf59fc0aad445b7e (patch)
tree2af6afbcb4d27b4b1ce22eb25750583666396a8a /src
parentd568e329a67eee8ba241d52067750a3d8c42dc0f (diff)
downloadmongo-4f858c52b05ecc49d2ae19bbaf59fc0aad445b7e.tar.gz
SERVER-36489 Start initial sync oplog fetching from the 'oldest active transaction timestamp'
SERVER-36490 Initial sync should not actually prepare transactions on applying prepareTransaction oplog entries SERVER-36491 During initial sync, make commitTransaction oplog entries apply the transaction from the prepare oplog entry
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/apply_ops.cpp19
-rw-r--r--src/mongo/db/repl/initial_sync_state.h12
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp183
-rw-r--r--src/mongo/db/repl/initial_syncer.h30
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp424
-rw-r--r--src/mongo/db/repl/oplog.cpp13
-rw-r--r--src/mongo/db/repl/oplog_applier.h4
-rw-r--r--src/mongo/db/repl/oplog_applier_impl.cpp3
-rw-r--r--src/mongo/db/repl/oplog_applier_impl.h4
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp13
-rw-r--r--src/mongo/db/repl/oplog_fetcher.h13
-rw-r--r--src/mongo/db/repl/sync_tail.cpp211
-rw-r--r--src/mongo/db/repl/sync_tail.h16
-rw-r--r--src/mongo/db/repl/transaction_oplog_application.cpp51
-rw-r--r--src/mongo/db/server_transactions_metrics.cpp13
-rw-r--r--src/mongo/db/transactions_stats.idl5
16 files changed, 765 insertions, 249 deletions
diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp
index a32a0d212c1..6f90eedf4c1 100644
--- a/src/mongo/db/repl/apply_ops.cpp
+++ b/src/mongo/db/repl/apply_ops.cpp
@@ -322,14 +322,16 @@ Status _applyPrepareTransaction(OperationContext* opCtx,
}
/**
- * Make sure that if we are in replication recovery, we don't apply the prepare transaction oplog
- * entry as part of recovery until the very end of recovery. Otherwise, only apply the prepare
- * transaction oplog entry if we are a secondary.
+ * Make sure that if we are in replication recovery or initial sync, we don't apply the prepare
+ * transaction oplog entry until we either see a commit transaction oplog entry or are at the very
+ * end of recovery/initial sync. Otherwise, only apply the prepare transaction oplog entry if we are
+ * a secondary.
*/
Status _applyPrepareTransactionOplogEntry(OperationContext* opCtx,
const repl::OplogEntry& entry,
repl::OplogApplication::Mode oplogApplicationMode) {
- // Wait until the end of recovery to apply the operations from the prepared transaction.
+ // Don't apply the operations from the prepared transaction until either we see a commit
+ // transaction oplog entry during recovery or are at the end of recovery.
if (oplogApplicationMode == OplogApplication::Mode::kRecovering) {
if (!serverGlobalParams.enableMajorityReadConcern) {
error() << "Cannot replay a prepared transaction when 'enableMajorityReadConcern' is "
@@ -339,12 +341,19 @@ Status _applyPrepareTransactionOplogEntry(OperationContext* opCtx,
fassert(50964, serverGlobalParams.enableMajorityReadConcern);
return Status::OK();
}
+
+ // Don't apply the operations from the prepared transaction until either we see a commit
+ // transaction oplog entry during the oplog application phase of initial sync or are at the end
+ // of initial sync.
+ if (oplogApplicationMode == OplogApplication::Mode::kInitialSync) {
+ return Status::OK();
+ }
+
// Return error if run via applyOps command.
uassert(50945,
"applyOps with prepared flag is only used internally by secondaries.",
oplogApplicationMode != repl::OplogApplication::Mode::kApplyOpsCmd);
- // TODO: SERVER-36492 Only run on secondary until we support initial sync.
invariant(oplogApplicationMode == repl::OplogApplication::Mode::kSecondary);
return _applyPrepareTransaction(opCtx, entry, oplogApplicationMode);
diff --git a/src/mongo/db/repl/initial_sync_state.h b/src/mongo/db/repl/initial_sync_state.h
index 88208a3b97e..5d4a1bc12bd 100644
--- a/src/mongo/db/repl/initial_sync_state.h
+++ b/src/mongo/db/repl/initial_sync_state.h
@@ -52,10 +52,14 @@ struct InitialSyncState {
InitialSyncState(std::unique_ptr<DatabasesCloner> cloner) : dbsCloner(std::move(cloner)){};
std::unique_ptr<DatabasesCloner>
- dbsCloner; // Cloner for all databases included in initial sync.
- Timestamp beginTimestamp; // Timestamp from the latest entry in oplog when started.
- Timestamp stopTimestamp; // Referred to as minvalid, or the place we can transition states.
- Timer timer; // Timer for timing how long each initial sync attempt takes.
+ dbsCloner; // Cloner for all databases included in initial sync.
+ Timestamp beginApplyingTimestamp; // Timestamp from the latest entry in oplog when started. It
+ // is also the timestamp after which we will start applying
+ // operations during initial sync.
+ Timestamp beginFetchingTimestamp; // Timestamp from the earliest active transaction that had an
+ // oplog entry.
+ Timestamp stopTimestamp; // Referred to as minvalid, or the place we can transition states.
+ Timer timer; // Timer for timing how long each initial sync attempt takes.
size_t fetchedMissingDocs = 0;
size_t appliedOps = 0;
};
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index 80cd33bda8b..82d27390b7d 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -353,8 +353,13 @@ void InitialSyncer::_appendInitialSyncProgressMinimal_inlock(BSONObjBuilder* bob
}
bob->appendNumber("fetchedMissingDocs", _initialSyncState->fetchedMissingDocs);
bob->appendNumber("appliedOps", _initialSyncState->appliedOps);
- if (!_initialSyncState->beginTimestamp.isNull()) {
- bob->append("initialSyncOplogStart", _initialSyncState->beginTimestamp);
+ if (!_initialSyncState->beginApplyingTimestamp.isNull()) {
+ bob->append("initialSyncOplogStart", _initialSyncState->beginApplyingTimestamp);
+ }
+ // Only include the beginFetchingTimestamp if it's different from the beginApplyingTimestamp.
+ if (!_initialSyncState->beginFetchingTimestamp.isNull() &&
+ _initialSyncState->beginFetchingTimestamp != _initialSyncState->beginApplyingTimestamp) {
+ bob->append("initialSyncOplogFetchingStart", _initialSyncState->beginFetchingTimestamp);
}
if (!_initialSyncState->stopTimestamp.isNull()) {
bob->append("initialSyncOplogEnd", _initialSyncState->stopTimestamp);
@@ -590,14 +595,6 @@ void InitialSyncer::_chooseSyncSourceCallback(
_syncSource = syncSource.getValue();
- // Create oplog applier.
- auto consistencyMarkers = _replicationProcess->getConsistencyMarkers();
- OplogApplier::Options options;
- options.allowNamespaceNotFoundErrorsOnCrudOps = true;
- options.missingDocumentSourceForInitialSync = _syncSource;
- _oplogApplier = _dataReplicatorExternalState->makeOplogApplier(
- _oplogBuffer.get(), _observer.get(), consistencyMarkers, _storage, options, _writerPool);
-
// Schedule rollback ID checker.
_rollbackChecker = stdx::make_unique<RollbackChecker>(_exec, _syncSource);
auto scheduleResult = _rollbackChecker->reset([=](const RollbackChecker::Result& result) {
@@ -640,6 +637,25 @@ Status InitialSyncer::_truncateOplogAndDropReplicatedDatabases() {
return _storage->dropReplicatedDatabases(opCtx.get());
}
+Status InitialSyncer::_scheduleGetBeginFetchingOpTime_inlock(
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
+ // We will ultimately be getting oldestActiveOplogEntryOpTime from the transactions field in
+ // serverStatus to use as the beginFetchingTimestamp. Also, we project out the metrics and
+ // wiredTiger fields because we don't need them and they're very large.
+ executor::RemoteCommandRequest request(
+ _syncSource,
+ "admin",
+ BSON("serverStatus" << 1 << "transactions" << 1 << "metrics" << 0 << "wiredTiger" << 0),
+ nullptr);
+
+ auto scheduleResultSW = _exec->scheduleRemoteCommand(
+ request, [=](const executor::TaskExecutor::RemoteCommandCallbackArgs& callbackData) {
+ _getBeginFetchingOpTimeCallback(callbackData.response, onCompletionGuard);
+ });
+
+ return scheduleResultSW.getStatus();
+}
+
void InitialSyncer::_rollbackCheckerResetCallback(
const RollbackChecker::Result& result, std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
@@ -650,11 +666,40 @@ void InitialSyncer::_rollbackCheckerResetCallback(
return;
}
+ status = _scheduleGetBeginFetchingOpTime_inlock(onCompletionGuard);
+ if (!status.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
+ return;
+ }
+}
+
+void InitialSyncer::_getBeginFetchingOpTimeCallback(
+ const executor::TaskExecutor::ResponseStatus& response,
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ auto status = _checkForShutdownAndConvertStatus_inlock(
+ response.status,
+ "error while getting oldest active transaction timestamp for begin fetching timestamp");
+ if (!status.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
+ return;
+ }
+
+ invariant(response.data.hasField("transactions"));
+
+ // Only set beginFetchingOpTime if the oldestActiveOplogEntryTimestamp actually exists.
+ OpTime beginFetchingOpTime = OpTime();
+ if (response.data["transactions"].embeddedObject().hasField("oldestActiveOplogEntryOpTime")) {
+ beginFetchingOpTime = repl::OpTime::parse(
+ response.data["transactions"]["oldestActiveOplogEntryOpTime"].Obj());
+ }
+
status = _scheduleLastOplogEntryFetcher_inlock(
[=](const StatusWith<mongo::Fetcher::QueryResponse>& response,
mongo::Fetcher::NextAction*,
- mongo::BSONObjBuilder*) {
- _lastOplogEntryFetcherCallbackForBeginTimestamp(response, onCompletionGuard);
+ mongo::BSONObjBuilder*) mutable {
+ _lastOplogEntryFetcherCallbackForBeginApplyingTimestamp(
+ response, onCompletionGuard, beginFetchingOpTime);
});
if (!status.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
@@ -662,9 +707,10 @@ void InitialSyncer::_rollbackCheckerResetCallback(
}
}
-void InitialSyncer::_lastOplogEntryFetcherCallbackForBeginTimestamp(
+void InitialSyncer::_lastOplogEntryFetcherCallbackForBeginApplyingTimestamp(
const StatusWith<Fetcher::QueryResponse>& result,
- std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard,
+ OpTime& beginFetchingOpTime) {
stdx::unique_lock<stdx::mutex> lock(_mutex);
auto status = _checkForShutdownAndConvertStatus_inlock(
result.getStatus(), "error while getting last oplog entry for begin timestamp");
@@ -695,23 +741,23 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForBeginTimestamp(
readConcernBob.append("afterClusterTime", lastOpTime.getTimestamp());
readConcernBob.done();
- _fCVFetcher =
- stdx::make_unique<Fetcher>(_exec,
- _syncSource,
- NamespaceString::kServerConfigurationNamespace.db().toString(),
- queryBob.obj(),
- [=](const StatusWith<mongo::Fetcher::QueryResponse>& response,
- mongo::Fetcher::NextAction*,
- mongo::BSONObjBuilder*) {
- _fcvFetcherCallback(response, onCompletionGuard, lastOpTime);
- },
- ReadPreferenceSetting::secondaryPreferredMetadata(),
- RemoteCommandRequest::kNoTimeout /* find network timeout */,
- RemoteCommandRequest::kNoTimeout /* getMore network timeout */,
- RemoteCommandRetryScheduler::makeRetryPolicy(
- numInitialSyncOplogFindAttempts.load(),
- executor::RemoteCommandRequest::kNoTimeout,
- RemoteCommandRetryScheduler::kAllRetriableErrors));
+ _fCVFetcher = stdx::make_unique<Fetcher>(
+ _exec,
+ _syncSource,
+ NamespaceString::kServerConfigurationNamespace.db().toString(),
+ queryBob.obj(),
+ [=](const StatusWith<mongo::Fetcher::QueryResponse>& response,
+ mongo::Fetcher::NextAction*,
+ mongo::BSONObjBuilder*) mutable {
+ _fcvFetcherCallback(response, onCompletionGuard, lastOpTime, beginFetchingOpTime);
+ },
+ ReadPreferenceSetting::secondaryPreferredMetadata(),
+ RemoteCommandRequest::kNoTimeout /* find network timeout */,
+ RemoteCommandRequest::kNoTimeout /* getMore network timeout */,
+ RemoteCommandRetryScheduler::makeRetryPolicy(
+ numInitialSyncOplogFindAttempts.load(),
+ executor::RemoteCommandRequest::kNoTimeout,
+ RemoteCommandRetryScheduler::kAllRetriableErrors));
Status scheduleStatus = _fCVFetcher->schedule();
if (!scheduleStatus.isOK()) {
_fCVFetcher.reset();
@@ -722,7 +768,8 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForBeginTimestamp(
void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse>& result,
std::shared_ptr<OnCompletionGuard> onCompletionGuard,
- const OpTime& lastOpTime) {
+ const OpTime& lastOpTime,
+ OpTime& beginFetchingOpTime) {
stdx::unique_lock<stdx::mutex> lock(_mutex);
auto status = _checkForShutdownAndConvertStatus_inlock(
result.getStatus(), "error while getting the remote feature compatibility version");
@@ -790,12 +837,41 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse>
_databasesClonerCallback(status, onCompletionGuard);
}));
- _initialSyncState->beginTimestamp = lastOpTime.getTimestamp();
+ // Create oplog applier.
+ auto consistencyMarkers = _replicationProcess->getConsistencyMarkers();
+ OplogApplier::Options options;
+ options.allowNamespaceNotFoundErrorsOnCrudOps = true;
+ options.missingDocumentSourceForInitialSync = _syncSource;
+ options.beginApplyingOpTime = lastOpTime;
+ _oplogApplier = _dataReplicatorExternalState->makeOplogApplier(
+ _oplogBuffer.get(), _observer.get(), consistencyMarkers, _storage, options, _writerPool);
+
+ const auto beginApplyingTimestamp = lastOpTime.getTimestamp();
+ _initialSyncState->beginApplyingTimestamp = beginApplyingTimestamp;
+
+ // If there is no beginFetchingOpTime, then it means that there were no open active transactions
+ // with an oplog entry so we can safely start fetching at the same point that we are applying
+ // from.
+ if (beginFetchingOpTime.isNull()) {
+ _initialSyncState->beginFetchingTimestamp = beginApplyingTimestamp;
+ beginFetchingOpTime = lastOpTime;
+ } else {
+ _initialSyncState->beginFetchingTimestamp = beginFetchingOpTime.getTimestamp();
+ }
+
+ invariant(_initialSyncState->beginApplyingTimestamp >=
+ _initialSyncState->beginFetchingTimestamp,
+ str::stream() << "beginApplyingTimestamp was less than beginFetchingTimestamp. "
+ "beginApplyingTimestamp: "
+ << _initialSyncState->beginApplyingTimestamp.toBSON()
+ << " beginFetchingTimestamp: "
+ << _initialSyncState->beginFetchingTimestamp.toBSON());
invariant(!result.getValue().documents.empty());
- LOG(2) << "Setting begin timestamp to " << _initialSyncState->beginTimestamp
+ LOG(2) << "Setting begin applying timestamp to " << _initialSyncState->beginApplyingTimestamp
<< " using last oplog entry: " << redact(result.getValue().documents.front())
- << ", ns: " << _opts.localOplogNS;
+ << ", ns: " << _opts.localOplogNS << " and the begin fetching timestamp to "
+ << _initialSyncState->beginFetchingTimestamp;
const auto configResult = _dataReplicatorExternalState->getCurrentConfig();
@@ -809,7 +885,7 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse>
const auto& config = configResult.getValue();
_oplogFetcher = stdx::make_unique<OplogFetcher>(
_exec,
- lastOpTime,
+ beginFetchingOpTime,
_syncSource,
_opts.remoteOplogNS,
config,
@@ -823,7 +899,8 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse>
return _enqueueDocuments(first, last, info);
},
[=](const Status& s) { _oplogFetcherCallback(s, onCompletionGuard); },
- initialSyncOplogFetcherBatchSize);
+ initialSyncOplogFetcherBatchSize,
+ OplogFetcher::StartingPoint::kEnqueueFirstDoc);
LOG(2) << "Starting OplogFetcher: " << _oplogFetcher->toString();
@@ -961,7 +1038,10 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp(
optime = optimeStatus.getValue();
_initialSyncState->stopTimestamp = optime.getTimestamp();
- if (_initialSyncState->beginTimestamp != _initialSyncState->stopTimestamp) {
+ // If the beginFetchingTimestamp is different from the stopTimestamp, it indicates that
+ // there are oplog entries fetched by the oplog fetcher that need to be written to the oplog
+ // and/or there are operations that need to be applied.
+ if (_initialSyncState->beginFetchingTimestamp != _initialSyncState->stopTimestamp) {
invariant(_lastApplied.isNull());
_checkApplierProgressAndScheduleGetNextApplierBatch_inlock(lock, onCompletionGuard);
return;
@@ -1107,6 +1187,17 @@ void InitialSyncer::_multiApplierCallback(const Status& multiApplierStatus,
_lastApplied = lastApplied;
_opts.setMyLastOptime(_lastApplied, ReplicationCoordinator::DataConsistency::Inconsistent);
+ // Update oplog visibility after applying a batch so that while applying transaction oplog
+ // entries, the TransactionHistoryIterator can get earlier oplog entries associated with the
+ // transaction. Note that setting the oplog visibility timestamp here will be safe even if
+ // initial sync was restarted because until initial sync ends, no one else will try to read our
+ // oplog. It is also safe even if we tried to read from our own oplog because we never try to
+ // read from the oplog before applying at least one batch and therefore setting a value for the
+ // oplog visibility timestamp.
+ auto opCtx = makeOpCtx();
+ const bool orderedCommit = true;
+ _storage->oplogDiskLocRegister(opCtx.get(), lastApplied.getTimestamp(), orderedCommit);
+
auto fetchCount = _fetchCount.load();
if (fetchCount > 0) {
_initialSyncState->fetchedMissingDocs += fetchCount;
@@ -1369,11 +1460,11 @@ void InitialSyncer::_checkApplierProgressAndScheduleGetNextApplierBatch_inlock(
}
// Basic sanity check on begin/stop timestamps.
- if (_initialSyncState->beginTimestamp > _initialSyncState->stopTimestamp) {
+ if (_initialSyncState->beginApplyingTimestamp > _initialSyncState->stopTimestamp) {
std::string msg = str::stream()
<< "Possible rollback on sync source " << _syncSource.toString() << ". Currently at "
<< _initialSyncState->stopTimestamp.toBSON() << ". Started at "
- << _initialSyncState->beginTimestamp.toBSON();
+ << _initialSyncState->beginApplyingTimestamp.toBSON();
error() << msg;
onCompletionGuard->setResultAndCancelRemainingWork_inlock(
lock, Status(ErrorCodes::OplogOutOfOrder, msg));
@@ -1381,11 +1472,13 @@ void InitialSyncer::_checkApplierProgressAndScheduleGetNextApplierBatch_inlock(
}
if (_lastApplied.isNull()) {
- // Check if any ops occurred while cloning.
- invariant(_initialSyncState->beginTimestamp < _initialSyncState->stopTimestamp);
- log() << "Applying operations until " << _initialSyncState->stopTimestamp.toBSON()
- << " before initial sync can complete. (starting at "
- << _initialSyncState->beginTimestamp.toBSON() << ")";
+ // Check if any ops occurred while cloning or any ops need to be fetched.
+ invariant(_initialSyncState->beginFetchingTimestamp < _initialSyncState->stopTimestamp);
+ log() << "Writing to the oplog and applying operations until "
+ << _initialSyncState->stopTimestamp.toBSON()
+ << " before initial sync can complete. (started fetching at "
+ << _initialSyncState->beginFetchingTimestamp.toBSON() << " and applying at "
+ << _initialSyncState->beginApplyingTimestamp.toBSON() << ")";
// Fall through to scheduling _getNextApplierBatchCallback().
} else if (_lastApplied.getTimestamp() >= _initialSyncState->stopTimestamp) {
// Check for rollback if we have applied far enough to be consistent.
diff --git a/src/mongo/db/repl/initial_syncer.h b/src/mongo/db/repl/initial_syncer.h
index c84658092fb..6fbf2d14ef3 100644
--- a/src/mongo/db/repl/initial_syncer.h
+++ b/src/mongo/db/repl/initial_syncer.h
@@ -296,7 +296,11 @@ private:
* |
* |
* V
- * _lastOplogEntryFetcherCallbackForBeginTimestamp()
+ * _getBeginFetchingTimestampCallback()
+ * |
+ * |
+ * V
+ * _lastOplogEntryFetcherCallbackForBeginApplyingTimestamp()
* |
* |
* V
@@ -399,10 +403,27 @@ private:
* setting a reference point for the state of the sync source's oplog when data cloning
* completes.
*/
- void _lastOplogEntryFetcherCallbackForBeginTimestamp(
+ void _lastOplogEntryFetcherCallbackForBeginApplyingTimestamp(
const StatusWith<Fetcher::QueryResponse>& result,
- std::shared_ptr<OnCompletionGuard> onCompletionGuard);
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard,
+ OpTime& beginFetchingOpTime);
+
+ /**
+ * Callback that gets the oldestActiveOplogEntryOptime from the transactions field in the
+ * serverStatus response, which refers to the optime of the oldest active transaction with an
+ * oplog entry. It will be used as the beginFetchingTimestamp.
+ */
+ void _getBeginFetchingOpTimeCallback(const executor::TaskExecutor::ResponseStatus& response,
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard);
+ /**
+ * Schedules a remote command to call serverStatus on the sync source. From the response, we
+ * will be able to get the oldestActiveOplogEntryOptime from the transactions field, which
+ * refers to the optime of the oldest active transaction with an oplog entry. It will be used as
+ * the beginFetchingTimestamp.
+ */
+ Status _scheduleGetBeginFetchingOpTime_inlock(
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard);
/**
* Callback for the '_fCVFetcher'. A successful response lets us check if the remote node
@@ -410,7 +431,8 @@ private:
*/
void _fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse>& result,
std::shared_ptr<OnCompletionGuard> onCompletionGuard,
- const OpTime& lastOpTime);
+ const OpTime& lastOpTime,
+ OpTime& beginFetchingOpTime);
/**
* Callback for oplog fetcher.
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index 408557c823d..dc7b084f07c 100644
--- a/src/mongo/db/repl/initial_syncer_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -507,6 +507,14 @@ BSONObj makeRollbackCheckerResponse(int rollbackId) {
}
/**
+ * Generates a serverStatus response with an 'oldestActiveOplogEntryOpTime' field.
+ */
+BSONObj makeServerStatusResponse(OpTime oldestActiveTxnOpTime) {
+ return BSON("ok" << 1 << "transactions"
+ << BSON("oldestActiveOplogEntryOpTime" << oldestActiveTxnOpTime));
+}
+
+/**
* Generates a cursor response for a Fetcher to consume.
*/
RemoteCommandResponse makeCursorResponse(CursorId cursorId,
@@ -1156,6 +1164,67 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughRollbackCheckerCallbackError
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
}
+TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetBeginFetchingTimestampScheduleError) {
+ auto initialSyncer = &getInitialSyncer();
+ auto opCtx = makeOpCtx();
+
+ // Getting the begin fetching timestamp is the only time a serverStatus command is sent, so we
+ // reject the serverStatus command and save the request for inspection at the end of this test
+ // case.
+ executor::RemoteCommandRequest request;
+ _executorProxy->shouldFailScheduleRemoteCommandRequest =
+ [&request](const executor::RemoteCommandRequest& requestToSend) {
+ request = requestToSend;
+ return "serverStatus" == requestToSend.cmdObj.firstElement().fieldNameStringData();
+ };
+
+ HostAndPort syncSource("localhost", 12345);
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource);
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
+
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+ net->runReadyNetworkOperations();
+ }
+
+ initialSyncer->join();
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
+
+ ASSERT_EQUALS(syncSource, request.target);
+ ASSERT_EQUALS(NamespaceString::kAdminDb, request.dbname);
+ assertRemoteCommandNameEquals("serverStatus", request);
+}
+
+TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetBeginFetchingTimestampCallbackError) {
+ auto initialSyncer = &getInitialSyncer();
+ auto opCtx = makeOpCtx();
+
+ _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
+ ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
+
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ // Base rollback ID.
+ net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+ net->runReadyNetworkOperations();
+
+ assertRemoteCommandNameEquals(
+ "serverStatus",
+ net->scheduleErrorResponse(
+ Status(ErrorCodes::OperationFailed, "serverStatus command failed at sync source")));
+ net->runReadyNetworkOperations();
+ }
+
+ initialSyncer->join();
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
+}
+
TEST_F(InitialSyncerTest, InitialSyncerPassesThroughLastOplogEntryFetcherScheduleError) {
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
@@ -1179,6 +1248,11 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughLastOplogEntryFetcherSchedul
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
}
@@ -1205,6 +1279,11 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughLastOplogEntryFetcherCallbac
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
assertRemoteCommandNameEquals(
@@ -1231,6 +1310,11 @@ TEST_F(InitialSyncerTest, InitialSyncerCancelsLastOplogEntryFetcherOnShutdown) {
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
ASSERT_TRUE(net->hasReadyRequests());
@@ -1257,6 +1341,11 @@ TEST_F(InitialSyncerTest,
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -1280,6 +1369,11 @@ TEST_F(InitialSyncerTest,
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry first attempt - retriable error.
@@ -1308,6 +1402,11 @@ TEST_F(InitialSyncerTest,
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -1334,6 +1433,11 @@ TEST_F(InitialSyncerTest,
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -1371,6 +1475,11 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughFCVFetcherScheduleError) {
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -1397,12 +1506,17 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughFCVFetcherCallbackError) {
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
- auto request = assertRemoteCommandNameEquals(
+ request = assertRemoteCommandNameEquals(
"find",
net->scheduleErrorResponse(
Status(ErrorCodes::OperationFailed, "find command failed at sync source")));
@@ -1427,6 +1541,11 @@ TEST_F(InitialSyncerTest, InitialSyncerCancelsFCVFetcherOnShutdown) {
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -1454,6 +1573,11 @@ TEST_F(InitialSyncerTest, InitialSyncerResendsFindCommandIfFCVFetcherReturnsRetr
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -1485,6 +1609,11 @@ void InitialSyncerTest::runInitialSyncWithBadFCVResponse(std::vector<BSONObj> do
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -1548,6 +1677,11 @@ TEST_F(InitialSyncerTest, InitialSyncerSucceedsWhenFCVFetcherReturnsOldVersion)
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -1594,6 +1728,11 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughOplogFetcherScheduleError) {
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -1631,6 +1770,11 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughOplogFetcherCallbackError) {
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -1646,7 +1790,7 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughOplogFetcherCallbackError) {
net->runReadyNetworkOperations();
// Oplog tailing query.
- auto request = assertRemoteCommandNameEquals(
+ request = assertRemoteCommandNameEquals(
"find", net->scheduleErrorResponse(Status(ErrorCodes::OperationFailed, "dead cursor")));
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->runReadyNetworkOperations();
@@ -1676,10 +1820,15 @@ TEST_F(InitialSyncerTest,
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
- auto request =
+ request =
assertRemoteCommandNameEquals("find",
net->scheduleSuccessfulResponse(makeCursorResponse(
0LL, _options.localOplogNS, {makeOplogEntryObj(1)})));
@@ -1730,6 +1879,11 @@ TEST_F(
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -1744,14 +1898,13 @@ TEST_F(
// Oplog tailing query.
// Simulate cursor closing on sync source.
- auto request =
- assertRemoteCommandNameEquals("find",
- net->scheduleSuccessfulResponse(makeCursorResponse(
- 0LL,
- _options.localOplogNS,
- {makeOplogEntryObj(1),
- makeOplogEntryObj(2, OpTypeEnum::kCommand),
- makeOplogEntryObj(3, OpTypeEnum::kCommand)})));
+ request = assertRemoteCommandNameEquals("find",
+ net->scheduleSuccessfulResponse(makeCursorResponse(
+ 0LL,
+ _options.localOplogNS,
+ {makeOplogEntryObj(1),
+ makeOplogEntryObj(2, OpTypeEnum::kCommand),
+ makeOplogEntryObj(3, OpTypeEnum::kCommand)})));
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->runReadyNetworkOperations();
@@ -1783,6 +1936,11 @@ TEST_F(
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -1797,14 +1955,13 @@ TEST_F(
// Oplog tailing query.
// Simulate cursor closing on sync source.
- auto request =
- assertRemoteCommandNameEquals("find",
- net->scheduleSuccessfulResponse(makeCursorResponse(
- 0LL,
- _options.localOplogNS,
- {makeOplogEntryObj(1),
- makeOplogEntryObj(2, OpTypeEnum::kCommand),
- makeOplogEntryObj(3, OpTypeEnum::kCommand)})));
+ request = assertRemoteCommandNameEquals("find",
+ net->scheduleSuccessfulResponse(makeCursorResponse(
+ 0LL,
+ _options.localOplogNS,
+ {makeOplogEntryObj(1),
+ makeOplogEntryObj(2, OpTypeEnum::kCommand),
+ makeOplogEntryObj(3, OpTypeEnum::kCommand)})));
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->runReadyNetworkOperations();
@@ -1844,6 +2001,11 @@ TEST_F(InitialSyncerTest,
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -1884,6 +2046,11 @@ TEST_F(InitialSyncerTest,
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -1921,6 +2088,11 @@ TEST_F(InitialSyncerTest, InitialSyncerIgnoresLocalDatabasesWhenCloningDatabases
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -1937,7 +2109,7 @@ TEST_F(InitialSyncerTest, InitialSyncerIgnoresLocalDatabasesWhenCloningDatabases
// Oplog tailing query.
auto noi = net->getNextReadyRequest();
- auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
+ request = assertRemoteCommandNameEquals("find", noi->getRequest());
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->blackHole(noi);
@@ -1984,6 +2156,11 @@ TEST_F(InitialSyncerTest,
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -2007,7 +2184,7 @@ TEST_F(InitialSyncerTest,
// Oplog tailing query.
auto noi = net->getNextReadyRequest();
- auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
+ request = assertRemoteCommandNameEquals("find", noi->getRequest());
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->blackHole(noi);
@@ -2053,6 +2230,11 @@ TEST_F(InitialSyncerTest, InitialSyncerCancelsBothOplogFetcherAndDatabasesCloner
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -2102,6 +2284,11 @@ TEST_F(InitialSyncerTest,
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -2141,6 +2328,11 @@ TEST_F(InitialSyncerTest,
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -2158,7 +2350,7 @@ TEST_F(InitialSyncerTest,
// We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
// on to the DatabasesCloner's request.
auto noi = net->getNextReadyRequest();
- auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
+ request = assertRemoteCommandNameEquals("find", noi->getRequest());
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->blackHole(noi);
@@ -2196,6 +2388,11 @@ TEST_F(InitialSyncerTest,
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -2206,7 +2403,7 @@ TEST_F(InitialSyncerTest,
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
- auto request = assertRemoteCommandNameEquals(
+ request = assertRemoteCommandNameEquals(
"listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
net->runReadyNetworkOperations();
@@ -2246,6 +2443,11 @@ TEST_F(InitialSyncerTest,
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -2262,7 +2464,7 @@ TEST_F(InitialSyncerTest,
// Save request for OplogFetcher's oplog tailing query. This request will be canceled.
auto noi = net->getNextReadyRequest();
- auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
+ request = assertRemoteCommandNameEquals("find", noi->getRequest());
ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
auto oplogFetcherNetworkOperationIterator = noi;
@@ -2307,6 +2509,11 @@ TEST_F(
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -2324,7 +2531,7 @@ TEST_F(
// We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
// on to the DatabasesCloner's request.
auto noi = net->getNextReadyRequest();
- auto request = noi->getRequest();
+ request = noi->getRequest();
assertRemoteCommandNameEquals("find", request);
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->blackHole(noi);
@@ -2360,6 +2567,11 @@ TEST_F(InitialSyncerTest,
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -2377,7 +2589,7 @@ TEST_F(InitialSyncerTest,
// We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
// on to the DatabasesCloner's request.
auto noi = net->getNextReadyRequest();
- auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
+ request = assertRemoteCommandNameEquals("find", noi->getRequest());
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->blackHole(noi);
@@ -2426,6 +2638,11 @@ TEST_F(
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -2443,7 +2660,7 @@ TEST_F(
// We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
// on to the DatabasesCloner's request.
auto noi = net->getNextReadyRequest();
- auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
+ request = assertRemoteCommandNameEquals("find", noi->getRequest());
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->blackHole(noi);
@@ -2495,6 +2712,11 @@ TEST_F(
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -2512,7 +2734,7 @@ TEST_F(
// We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
// on to the DatabasesCloner's request.
auto noi = net->getNextReadyRequest();
- auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
+ request = assertRemoteCommandNameEquals("find", noi->getRequest());
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->blackHole(noi);
@@ -2564,6 +2786,11 @@ TEST_F(
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -2581,7 +2808,7 @@ TEST_F(
// We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
// on to the DatabasesCloner's request.
auto noi = net->getNextReadyRequest();
- auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
+ request = assertRemoteCommandNameEquals("find", noi->getRequest());
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->blackHole(noi);
@@ -2615,6 +2842,11 @@ TEST_F(
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -2632,7 +2864,7 @@ TEST_F(
// We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
// on to the DatabasesCloner's request.
auto noi = net->getNextReadyRequest();
- auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
+ request = assertRemoteCommandNameEquals("find", noi->getRequest());
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->blackHole(noi);
@@ -2671,6 +2903,11 @@ TEST_F(InitialSyncerTest, InitialSyncerCancelsLastRollbackCheckerOnShutdown) {
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -2688,7 +2925,7 @@ TEST_F(InitialSyncerTest, InitialSyncerCancelsLastRollbackCheckerOnShutdown) {
// We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
// on to the DatabasesCloner's request.
auto noi = net->getNextReadyRequest();
- auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
+ request = assertRemoteCommandNameEquals("find", noi->getRequest());
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->blackHole(noi);
@@ -2728,6 +2965,11 @@ TEST_F(InitialSyncerTest, InitialSyncerCancelsLastRollbackCheckerOnOplogFetcherC
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -2744,7 +2986,7 @@ TEST_F(InitialSyncerTest, InitialSyncerCancelsLastRollbackCheckerOnOplogFetcherC
// Save request for OplogFetcher's oplog tailing query. This request will be canceled.
auto noi = net->getNextReadyRequest();
- auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
+ request = assertRemoteCommandNameEquals("find", noi->getRequest());
ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
auto oplogFetcherNetworkOperationIterator = noi;
@@ -2790,6 +3032,11 @@ TEST_F(InitialSyncerTest,
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -2807,7 +3054,7 @@ TEST_F(InitialSyncerTest,
// We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
// on to the DatabasesCloner's request.
auto noi = net->getNextReadyRequest();
- auto request = noi->getRequest();
+ request = noi->getRequest();
assertRemoteCommandNameEquals("find", request);
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->blackHole(noi);
@@ -2843,6 +3090,11 @@ TEST_F(InitialSyncerTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfter
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -2855,8 +3107,7 @@ TEST_F(InitialSyncerTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfter
// database names, we'll simulate copying a single database with a single collection on the
// sync source.
NamespaceString nss("a.a");
- auto request =
- net->scheduleSuccessfulResponse(makeListDatabasesResponse({nss.db().toString()}));
+ request = net->scheduleSuccessfulResponse(makeListDatabasesResponse({nss.db().toString()}));
assertRemoteCommandNameEquals("listDatabases", request);
net->runReadyNetworkOperations();
@@ -2930,6 +3181,11 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetNextApplierBatchScheduleE
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -2947,7 +3203,7 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetNextApplierBatchScheduleE
// We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
// on to the DatabasesCloner's request.
auto noi = net->getNextReadyRequest();
- auto request = noi->getRequest();
+ request = noi->getRequest();
assertRemoteCommandNameEquals("find", request);
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->blackHole(noi);
@@ -2987,6 +3243,11 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughSecondGetNextApplierBatchSch
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -3004,7 +3265,7 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughSecondGetNextApplierBatchSch
// We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
// on to the DatabasesCloner's request.
auto noi = net->getNextReadyRequest();
- auto request = noi->getRequest();
+ request = noi->getRequest();
assertRemoteCommandNameEquals("find", request);
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->blackHole(noi);
@@ -3044,6 +3305,11 @@ TEST_F(InitialSyncerTest, InitialSyncerCancelsGetNextApplierBatchOnShutdown) {
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -3061,7 +3327,7 @@ TEST_F(InitialSyncerTest, InitialSyncerCancelsGetNextApplierBatchOnShutdown) {
// We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
// on to the DatabasesCloner's request.
auto noi = net->getNextReadyRequest();
- auto request = noi->getRequest();
+ request = noi->getRequest();
assertRemoteCommandNameEquals("find", request);
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->blackHole(noi);
@@ -3103,6 +3369,11 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetNextApplierBatchInLockErr
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -3119,7 +3390,7 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetNextApplierBatchInLockErr
// OplogFetcher's oplog tailing query. Return bad oplog entry that will be added to the
// oplog buffer and processed by _getNextApplierBatch_inlock().
- auto request = assertRemoteCommandNameEquals(
+ request = assertRemoteCommandNameEquals(
"find",
net->scheduleSuccessfulResponse(makeCursorResponse(
1LL, _options.localOplogNS, {oplogEntry, oplogEntryWithInconsistentVersion})));
@@ -3170,6 +3441,11 @@ TEST_F(
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -3186,7 +3462,7 @@ TEST_F(
// OplogFetcher's oplog tailing query. Return bad oplog entry that will be added to the
// oplog buffer and processed by _getNextApplierBatch_inlock().
- auto request = net->scheduleSuccessfulResponse(makeCursorResponse(
+ request = net->scheduleSuccessfulResponse(makeCursorResponse(
1LL, _options.localOplogNS, {oplogEntry, oplogEntryWithInconsistentVersion}));
assertRemoteCommandNameEquals("find", request);
ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
@@ -3228,6 +3504,11 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughMultiApplierScheduleError) {
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -3244,7 +3525,7 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughMultiApplierScheduleError) {
// OplogFetcher's oplog tailing query. Save for later.
auto noi = net->getNextReadyRequest();
- auto request = noi->getRequest();
+ request = noi->getRequest();
assertRemoteCommandNameEquals("find", request);
ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
auto oplogFetcherNoi = noi;
@@ -3302,6 +3583,11 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughMultiApplierCallbackError) {
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -3317,7 +3603,7 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughMultiApplierCallbackError) {
net->runReadyNetworkOperations();
// OplogFetcher's oplog tailing query. Provide enough operations to trigger MultiApplier.
- auto request = net->scheduleSuccessfulResponse(makeCursorResponse(
+ request = net->scheduleSuccessfulResponse(makeCursorResponse(
1LL, _options.localOplogNS, {makeOplogEntryObj(1), makeOplogEntryObj(2)}));
assertRemoteCommandNameEquals("find", request);
ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
@@ -3351,6 +3637,11 @@ TEST_F(InitialSyncerTest, InitialSyncerCancelsGetNextApplierBatchCallbackOnOplog
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -3367,7 +3658,7 @@ TEST_F(InitialSyncerTest, InitialSyncerCancelsGetNextApplierBatchCallbackOnOplog
// OplogFetcher's oplog tailing query. Save for later.
auto noi = net->getNextReadyRequest();
- auto request = noi->getRequest();
+ request = noi->getRequest();
assertRemoteCommandNameEquals("find", request);
ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
auto oplogFetcherNoi = noi;
@@ -3406,6 +3697,11 @@ OplogEntry InitialSyncerTest::doInitialSyncWithOneBatch(bool shouldSetFCV) {
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -3422,7 +3718,7 @@ OplogEntry InitialSyncerTest::doInitialSyncWithOneBatch(bool shouldSetFCV) {
// OplogFetcher's oplog tailing query. Response has enough operations to reach
// end timestamp.
- auto request = net->scheduleSuccessfulResponse(makeCursorResponse(
+ request = net->scheduleSuccessfulResponse(makeCursorResponse(
1LL, _options.localOplogNS, {makeOplogEntryObj(1), lastOp.toBSON()}));
assertRemoteCommandNameEquals("find", request);
ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
@@ -3498,6 +3794,11 @@ TEST_F(InitialSyncerTest,
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -3510,8 +3811,7 @@ TEST_F(InitialSyncerTest,
// database names, we'll simulate copying a single database with a single collection on the
// sync source.
NamespaceString nss("a.a");
- auto request =
- net->scheduleSuccessfulResponse(makeListDatabasesResponse({nss.db().toString()}));
+ request = net->scheduleSuccessfulResponse(makeListDatabasesResponse({nss.db().toString()}));
assertRemoteCommandNameEquals("listDatabases", request);
net->runReadyNetworkOperations();
@@ -3612,6 +3912,11 @@ TEST_F(
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -3628,7 +3933,7 @@ TEST_F(
// OplogFetcher's oplog tailing query. Response has enough operations to reach
// end timestamp.
- auto request = net->scheduleSuccessfulResponse(
+ request = net->scheduleSuccessfulResponse(
makeCursorResponse(1LL,
_options.localOplogNS,
{makeOplogEntryObj(1), makeOplogEntryObj(2), lastOp.toBSON()}));
@@ -3703,6 +4008,11 @@ TEST_F(InitialSyncerTest, OplogOutOfOrderOnOplogFetchFinish) {
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -3713,7 +4023,7 @@ TEST_F(InitialSyncerTest, OplogOutOfOrderOnOplogFetchFinish) {
// Ignore listDatabases request.
auto noi = net->getNextReadyRequest();
- auto request = noi->getRequest();
+ request = noi->getRequest();
assertRemoteCommandNameEquals("listDatabases", request);
net->blackHole(noi);
@@ -3761,6 +4071,11 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) {
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -3810,6 +4125,11 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) {
// Base rollback ID.
auto request = net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
assertRemoteCommandNameEquals("replSetGetRBID", request);
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -4020,6 +4340,11 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressOmitsClonerStatsIfClonerStatsExc
// Base rollback ID.
net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
+
+ // Send an empty optime as the response to the serverStatus request, which will cause the
+ // beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
+ auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime()));
+ assertRemoteCommandNameEquals("serverStatus", request);
net->runReadyNetworkOperations();
// Last oplog entry.
@@ -4030,8 +4355,7 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressOmitsClonerStatsIfClonerStatsExc
// listDatabases
NamespaceString nss("a.a");
- auto request =
- net->scheduleSuccessfulResponse(makeListDatabasesResponse({nss.db().toString()}));
+ request = net->scheduleSuccessfulResponse(makeListDatabasesResponse({nss.db().toString()}));
assertRemoteCommandNameEquals("listDatabases", request);
net->runReadyNetworkOperations();
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index be5095a3a19..12df8c58003 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -1846,12 +1846,13 @@ Status applyCommand_inlock(OperationContext* opCtx,
}
// The feature compatibility version in the server configuration collection cannot change during
- // initial sync.
- // We do not attempt to parse the whitelisted ops because they do not have a collection
- // namespace. If we drop the 'admin' database we will also log a 'drop' oplog entry for each
- // collection dropped. 'applyOps' will try to apply each individual operation, and those
- // will be caught then if they are a problem.
- auto whitelistedOps = std::vector<std::string>{"dropDatabase", "applyOps", "dbCheck"};
+ // initial sync. We do not attempt to parse the whitelisted ops because they do not have a
+ // collection namespace. If we drop the 'admin' database we will also log a 'drop' oplog entry
+ // for each collection dropped. 'applyOps' and 'commitTransaction' will try to apply each
+ // individual operation, and those will be caught then if they are a problem. 'abortTransaction'
+ // won't ever change the server configuration collection.
+ auto whitelistedOps = std::vector<std::string>{
+ "dropDatabase", "applyOps", "dbCheck", "commitTransaction", "abortTransaction"};
if ((mode == OplogApplication::Mode::kInitialSync) &&
(std::find(whitelistedOps.begin(), whitelistedOps.end(), o.firstElementFieldName()) ==
whitelistedOps.end()) &&
diff --git a/src/mongo/db/repl/oplog_applier.h b/src/mongo/db/repl/oplog_applier.h
index cf6f2405b2a..ecc45bf76df 100644
--- a/src/mongo/db/repl/oplog_applier.h
+++ b/src/mongo/db/repl/oplog_applier.h
@@ -68,6 +68,10 @@ public:
// For initial sync only. If an update fails, the missing document is fetched from
// this sync source to insert into the local collection.
boost::optional<HostAndPort> missingDocumentSourceForInitialSync;
+
+ // Used to determine which operations should be applied. Only initial sync will set this to
+ // be something other than the null optime.
+ OpTime beginApplyingOpTime = OpTime();
};
/**
diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp
index b000e006620..5574f77deb5 100644
--- a/src/mongo/db/repl/oplog_applier_impl.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl.cpp
@@ -46,7 +46,8 @@ OplogApplierImpl::OplogApplierImpl(executor::TaskExecutor* executor,
: OplogApplier(executor, oplogBuffer, observer),
_replCoord(replCoord),
_syncTail(std::make_unique<SyncTail>(
- observer, consistencyMarkers, storageInterface, multiSyncApply, writerPool, options)) {
+ observer, consistencyMarkers, storageInterface, multiSyncApply, writerPool, options)),
+ _beginApplyingOpTime(options.beginApplyingOpTime) {
invariant(!options.relaxUniqueIndexConstraints);
}
diff --git a/src/mongo/db/repl/oplog_applier_impl.h b/src/mongo/db/repl/oplog_applier_impl.h
index 197d7796879..6e51136f892 100644
--- a/src/mongo/db/repl/oplog_applier_impl.h
+++ b/src/mongo/db/repl/oplog_applier_impl.h
@@ -73,6 +73,10 @@ private:
// Used to run oplog application loop.
std::unique_ptr<SyncTail> _syncTail;
+
+ // Used to determine which operations should be applied during initial sync. If this is null,
+ // we will apply all operations that were fetched.
+ OpTime _beginApplyingOpTime = OpTime();
};
} // namespace repl
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index 7850334f478..0b71deac9d2 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -323,7 +323,8 @@ OplogFetcher::OplogFetcher(executor::TaskExecutor* executor,
DataReplicatorExternalState* dataReplicatorExternalState,
EnqueueDocumentsFn enqueueDocumentsFn,
OnShutdownCallbackFn onShutdownCallbackFn,
- const int batchSize)
+ const int batchSize,
+ StartingPoint startingPoint)
: AbstractOplogFetcher(executor,
lastFetched,
source,
@@ -337,7 +338,8 @@ OplogFetcher::OplogFetcher(executor::TaskExecutor* executor,
_dataReplicatorExternalState(dataReplicatorExternalState),
_enqueueDocumentsFn(enqueueDocumentsFn),
_awaitDataTimeout(calculateAwaitDataTimeout(config)),
- _batchSize(batchSize) {
+ _batchSize(batchSize),
+ _startingPoint(startingPoint) {
invariant(config.isInitialized());
invariant(enqueueDocumentsFn);
@@ -444,8 +446,11 @@ StatusWith<BSONObj> OplogFetcher::_onSuccessfulBatch(const Fetcher::QueryRespons
LOG(1) << "oplog fetcher successfully fetched from " << _getSource();
- // If this is the first batch and no rollback is needed, skip the first document.
- firstDocToApply++;
+ // If this is the first batch, no rollback is needed and we don't want to enqueue the first
+ // document, skip it.
+ if (_startingPoint == StartingPoint::kSkipFirstDoc) {
+ firstDocToApply++;
+ }
}
auto validateResult =
diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h
index 42eace2f077..20ff4668a8a 100644
--- a/src/mongo/db/repl/oplog_fetcher.h
+++ b/src/mongo/db/repl/oplog_fetcher.h
@@ -90,6 +90,13 @@ public:
};
/**
+ * An enum that indicates if we want to skip the first document during oplog fetching or not.
+ * Currently, the only time we don't want to skip the first document is during initial sync
+ * if there was an oldest active transaction timestamp.
+ */
+ enum class StartingPoint { kSkipFirstDoc, kEnqueueFirstDoc };
+
+ /**
* Type of function that accepts a pair of iterators into a range of operations
* within the current batch of results and copies the operations into
* a buffer to be consumed by the next stage of the replication process.
@@ -125,7 +132,8 @@ public:
DataReplicatorExternalState* dataReplicatorExternalState,
EnqueueDocumentsFn enqueueDocumentsFn,
OnShutdownCallbackFn onShutdownCallbackFn,
- const int batchSize);
+ const int batchSize,
+ StartingPoint startingPoint = StartingPoint::kSkipFirstDoc);
virtual ~OplogFetcher();
@@ -176,6 +184,9 @@ private:
const EnqueueDocumentsFn _enqueueDocumentsFn;
const Milliseconds _awaitDataTimeout;
const int _batchSize;
+
+ // Indicates if we want to skip the first document during oplog fetching or not.
+ StartingPoint _startingPoint;
};
} // namespace repl
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 1b879b6ba4b..81208eeb3da 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -69,8 +69,6 @@
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/repl_set_config.h"
#include "mongo/db/repl/replication_coordinator.h"
-#include "mongo/db/repl/session_update_tracker.h"
-#include "mongo/db/service_context.h"
#include "mongo/db/session.h"
#include "mongo/db/session_txn_record_gen.h"
#include "mongo/db/stats/timer_stats.h"
@@ -548,106 +546,6 @@ private:
StringMap<CollectionProperties> _cache;
};
-/**
- * ops - This only modifies the isForCappedCollection field on each op. It does not alter the ops
- * vector in any other way.
- * writerVectors - Set of operations for each worker thread to apply.
- * derivedOps - If provided, this function inserts a decomposition of applyOps operations
- * and instructions for updating the transactions table.
- * sessionUpdateTracker - if provided, keeps track of session info from ops.
- */
-void fillWriterVectors(OperationContext* opCtx,
- MultiApplier::Operations* ops,
- std::vector<MultiApplier::OperationPtrs>* writerVectors,
- std::vector<MultiApplier::Operations>* derivedOps,
- SessionUpdateTracker* sessionUpdateTracker) {
- const auto serviceContext = opCtx->getServiceContext();
- const auto storageEngine = serviceContext->getStorageEngine();
-
- const bool supportsDocLocking = storageEngine->supportsDocLocking();
- const uint32_t numWriters = writerVectors->size();
-
- CachedCollectionProperties collPropertiesCache;
-
- for (auto&& op : *ops) {
- auto hashedNs = StringMapHasher().hashed_key(op.getNss().ns());
- // Reduce the hash from 64bit down to 32bit, just to allow combinations with murmur3 later
- // on. Bit depth not important, we end up just doing integer modulo with this in the end.
- // The hash function should provide entropy in the lower bits as it's used in hash tables.
- uint32_t hash = static_cast<uint32_t>(hashedNs.hash());
-
- // We need to track all types of ops, including type 'n' (these are generated from chunk
- // migrations).
- if (sessionUpdateTracker) {
- if (auto newOplogWrites = sessionUpdateTracker->updateOrFlush(op)) {
- derivedOps->emplace_back(std::move(*newOplogWrites));
- fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
- }
- }
-
- if (op.isCrudOpType()) {
- auto collProperties = collPropertiesCache.getCollectionProperties(opCtx, hashedNs);
-
- // For doc locking engines, include the _id of the document in the hash so we get
- // parallelism even if all writes are to a single collection.
- //
- // For capped collections, this is illegal, since capped collections must preserve
- // insertion order.
- if (supportsDocLocking && !collProperties.isCapped) {
- BSONElement id = op.getIdElement();
- BSONElementComparator elementHasher(BSONElementComparator::FieldNamesMode::kIgnore,
- collProperties.collator);
- const size_t idHash = elementHasher.hash(id);
- MurmurHash3_x86_32(&idHash, sizeof(idHash), hash, &hash);
- }
-
- if (op.getOpType() == OpTypeEnum::kInsert && collProperties.isCapped) {
- // Mark capped collection ops before storing them to ensure we do not attempt to
- // bulk insert them.
- op.isForCappedCollection = true;
- }
- }
-
- // Extract applyOps operations and fill writers with extracted operations using this
- // function.
- if (op.getCommandType() == OplogEntry::CommandType::kApplyOps && !op.shouldPrepare()) {
- try {
- derivedOps->emplace_back(ApplyOps::extractOperations(op));
-
- // Nested entries cannot have different session updates.
- fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
- } catch (...) {
- fassertFailedWithStatusNoTrace(
- 50711,
- exceptionToStatus().withContext(str::stream()
- << "Unable to extract operations from applyOps "
- << redact(op.toBSON())));
- }
- continue;
- }
-
- auto& writer = (*writerVectors)[hash % numWriters];
- if (writer.empty()) {
- writer.reserve(8); // Skip a few growth rounds
- }
- writer.push_back(&op);
- }
-}
-
-void fillWriterVectors(OperationContext* opCtx,
- MultiApplier::Operations* ops,
- std::vector<MultiApplier::OperationPtrs>* writerVectors,
- std::vector<MultiApplier::Operations>* derivedOps) {
- SessionUpdateTracker sessionUpdateTracker;
- fillWriterVectors(opCtx, ops, writerVectors, derivedOps, &sessionUpdateTracker);
-
- auto newOplogWrites = sessionUpdateTracker.flushAll();
- if (!newOplogWrites.empty()) {
- derivedOps->emplace_back(std::move(newOplogWrites));
- fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
- }
-}
-
void tryToGoLiveAsASecondary(OperationContext* opCtx,
ReplicationCoordinator* replCoord,
OpTime minValid) {
@@ -1295,6 +1193,113 @@ Status multiSyncApply(OperationContext* opCtx,
return Status::OK();
}
+
+/**
+ * ops - This only modifies the isForCappedCollection field on each op. It does not alter the ops
+ * vector in any other way.
+ * writerVectors - Set of operations for each worker thread to apply.
+ * derivedOps - If provided, this function inserts a decomposition of applyOps operations
+ * and instructions for updating the transactions table.
+ * sessionUpdateTracker - if provided, keeps track of session info from ops.
+ */
+void SyncTail::_fillWriterVectors(OperationContext* opCtx,
+ MultiApplier::Operations* ops,
+ std::vector<MultiApplier::OperationPtrs>* writerVectors,
+ std::vector<MultiApplier::Operations>* derivedOps,
+ SessionUpdateTracker* sessionUpdateTracker) {
+ const auto serviceContext = opCtx->getServiceContext();
+ const auto storageEngine = serviceContext->getStorageEngine();
+
+ const bool supportsDocLocking = storageEngine->supportsDocLocking();
+ const uint32_t numWriters = writerVectors->size();
+
+ CachedCollectionProperties collPropertiesCache;
+
+ for (auto&& op : *ops) {
+ // If the operation's optime is before or the same as the beginApplyingOpTime we don't want
+ // to apply it, so don't include it in writerVectors.
+ if (op.getOpTime() <= _options.beginApplyingOpTime) {
+ continue;
+ }
+
+ auto hashedNs = StringMapHasher().hashed_key(op.getNss().ns());
+ // Reduce the hash from 64bit down to 32bit, just to allow combinations with murmur3 later
+ // on. Bit depth not important, we end up just doing integer modulo with this in the end.
+ // The hash function should provide entropy in the lower bits as it's used in hash tables.
+ uint32_t hash = static_cast<uint32_t>(hashedNs.hash());
+
+ // We need to track all types of ops, including type 'n' (these are generated from chunk
+ // migrations).
+ if (sessionUpdateTracker) {
+ if (auto newOplogWrites = sessionUpdateTracker->updateOrFlush(op)) {
+ derivedOps->emplace_back(std::move(*newOplogWrites));
+ _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
+ }
+ }
+
+ if (op.isCrudOpType()) {
+ auto collProperties = collPropertiesCache.getCollectionProperties(opCtx, hashedNs);
+
+ // For doc locking engines, include the _id of the document in the hash so we get
+ // parallelism even if all writes are to a single collection.
+ //
+ // For capped collections, this is illegal, since capped collections must preserve
+ // insertion order.
+ if (supportsDocLocking && !collProperties.isCapped) {
+ BSONElement id = op.getIdElement();
+ BSONElementComparator elementHasher(BSONElementComparator::FieldNamesMode::kIgnore,
+ collProperties.collator);
+ const size_t idHash = elementHasher.hash(id);
+ MurmurHash3_x86_32(&idHash, sizeof(idHash), hash, &hash);
+ }
+
+ if (op.getOpType() == OpTypeEnum::kInsert && collProperties.isCapped) {
+ // Mark capped collection ops before storing them to ensure we do not attempt to
+ // bulk insert them.
+ op.isForCappedCollection = true;
+ }
+ }
+
+ // Extract applyOps operations and fill writers with extracted operations using this
+ // function.
+ if (op.getCommandType() == OplogEntry::CommandType::kApplyOps && !op.shouldPrepare()) {
+ try {
+ derivedOps->emplace_back(ApplyOps::extractOperations(op));
+
+ // Nested entries cannot have different session updates.
+ _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
+ } catch (...) {
+ fassertFailedWithStatusNoTrace(
+ 50711,
+ exceptionToStatus().withContext(str::stream()
+ << "Unable to extract operations from applyOps "
+ << redact(op.toBSON())));
+ }
+ continue;
+ }
+
+ auto& writer = (*writerVectors)[hash % numWriters];
+ if (writer.empty()) {
+ writer.reserve(8); // Skip a few growth rounds
+ }
+ writer.push_back(&op);
+ }
+}
+
+void SyncTail::_fillWriterVectors(OperationContext* opCtx,
+ MultiApplier::Operations* ops,
+ std::vector<MultiApplier::OperationPtrs>* writerVectors,
+ std::vector<MultiApplier::Operations>* derivedOps) {
+ SessionUpdateTracker sessionUpdateTracker;
+ _fillWriterVectors(opCtx, ops, writerVectors, derivedOps, &sessionUpdateTracker);
+
+ auto newOplogWrites = sessionUpdateTracker.flushAll();
+ if (!newOplogWrites.empty()) {
+ derivedOps->emplace_back(std::move(newOplogWrites));
+ _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
+ }
+}
+
StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::Operations ops) {
invariant(!ops.empty());
@@ -1338,7 +1343,7 @@ StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::O
std::vector<MultiApplier::Operations> derivedOps;
std::vector<MultiApplier::OperationPtrs> writerVectors(_writerPool->getStats().numThreads);
- fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps);
+ _fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps);
// Wait for writes to finish before applying ops.
_writerPool->waitForIdle();
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index c651b17b743..aebce6af205 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -41,6 +41,7 @@
#include "mongo/db/repl/oplog_buffer.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/replication_consistency_markers.h"
+#include "mongo/db/repl/session_update_tracker.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/stdx/functional.h"
#include "mongo/stdx/mutex.h"
@@ -225,7 +226,9 @@ public:
/**
* Applies a batch of oplog entries by writing the oplog entries to the local oplog and then
- * using a set of threads to apply the operations.
+ * using a set of threads to apply the operations. It will only apply (but will
+ * still write to the oplog) oplog entries with a timestamp greater than or equal to the
+ * beginApplyingTimestamp.
*
* If the batch application is successful, returns the optime of the last op applied, which
* should be the last op in the batch.
@@ -250,6 +253,17 @@ private:
ReplicationCoordinator* replCoord,
OpQueueBatcher* batcher) noexcept;
+ void _fillWriterVectors(OperationContext* opCtx,
+ MultiApplier::Operations* ops,
+ std::vector<MultiApplier::OperationPtrs>* writerVectors,
+ std::vector<MultiApplier::Operations>* derivedOps,
+ SessionUpdateTracker* sessionUpdateTracker);
+
+ void _fillWriterVectors(OperationContext* opCtx,
+ MultiApplier::Operations* ops,
+ std::vector<MultiApplier::OperationPtrs>* writerVectors,
+ std::vector<MultiApplier::Operations>* derivedOps);
+
OplogApplier::Observer* const _observer;
ReplicationConsistencyMarkers* const _consistencyMarkers;
StorageInterface* const _storageInterface;
diff --git a/src/mongo/db/repl/transaction_oplog_application.cpp b/src/mongo/db/repl/transaction_oplog_application.cpp
index 27c57aa5e87..1a6e53d7a24 100644
--- a/src/mongo/db/repl/transaction_oplog_application.cpp
+++ b/src/mongo/db/repl/transaction_oplog_application.cpp
@@ -40,6 +40,36 @@
namespace mongo {
+/**
+ * Helper that will find the previous oplog entry for that transaction, transform it to be a normal
+ * applyOps command and applies the oplog entry. Currently used for oplog application of a
+ * commitTransaction oplog entry during recovery, rollback and initial sync.
+ */
+Status _applyTransactionFromOplogChain(OperationContext* opCtx,
+ const repl::OplogEntry& entry,
+ repl::OplogApplication::Mode mode) {
+ invariant(mode == repl::OplogApplication::Mode::kRecovering ||
+ mode == repl::OplogApplication::Mode::kInitialSync);
+
+ // Since the TransactionHistoryIterator uses DBDirectClient, it cannot come with snapshot
+ // isolation.
+ invariant(!opCtx->recoveryUnit()->getPointInTimeReadTimestamp());
+
+ // Get the corresponding prepareTransaction oplog entry.
+ const auto prepareOpTime = entry.getPrevWriteOpTimeInTransaction();
+ invariant(prepareOpTime);
+ TransactionHistoryIterator iter(prepareOpTime.get());
+ invariant(iter.hasNext());
+ const auto prepareOplogEntry = iter.next(opCtx);
+
+ // Transform prepare command into a normal applyOps command.
+ const auto prepareCmd = prepareOplogEntry.getOperationToApply().removeField("prepare");
+
+ BSONObjBuilder resultWeDontCareAbout;
+ return applyOps(
+ opCtx, entry.getNss().db().toString(), prepareCmd, mode, &resultWeDontCareAbout);
+}
+
Status applyCommitTransaction(OperationContext* opCtx,
const repl::OplogEntry& entry,
repl::OplogApplication::Mode mode) {
@@ -63,26 +93,13 @@ Status applyCommitTransaction(OperationContext* opCtx,
return Status::OK();
}
- // Since the TransactionHistoryIterator uses DBDirectClient, it cannot come with snapshot
- // isolation.
- invariant(!opCtx->recoveryUnit()->getPointInTimeReadTimestamp());
-
- // Get the corresponding prepareTransaction oplog entry.
- const auto prepareOpTime = entry.getPrevWriteOpTimeInTransaction();
- invariant(prepareOpTime);
- TransactionHistoryIterator iter(prepareOpTime.get());
- invariant(iter.hasNext());
- const auto prepareOplogEntry = iter.next(opCtx);
-
- // Transform prepare command into a normal applyOps command.
- const auto prepareCmd = prepareOplogEntry.getOperationToApply().removeField("prepare");
+ return _applyTransactionFromOplogChain(opCtx, entry, mode);
+ }
- BSONObjBuilder resultWeDontCareAbout;
- return applyOps(
- opCtx, entry.getNss().db().toString(), prepareCmd, mode, &resultWeDontCareAbout);
+ if (mode == repl::OplogApplication::Mode::kInitialSync) {
+ return _applyTransactionFromOplogChain(opCtx, entry, mode);
}
- // TODO: SERVER-36492 Only run on secondary until we support initial sync.
invariant(mode == repl::OplogApplication::Mode::kSecondary);
// Transaction operations are in its own batch, so we can modify their opCtx.
diff --git a/src/mongo/db/server_transactions_metrics.cpp b/src/mongo/db/server_transactions_metrics.cpp
index 11f6480d887..a46a663e0b1 100644
--- a/src/mongo/db/server_transactions_metrics.cpp
+++ b/src/mongo/db/server_transactions_metrics.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/commands/server_status.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/optime.h"
#include "mongo/db/retryable_writes_stats.h"
#include "mongo/db/service_context.h"
#include "mongo/db/transactions_stats_gen.h"
@@ -295,12 +296,12 @@ void ServerTransactionsMetrics::updateStats(TransactionsStats* stats, OperationC
ServerTransactionsMetrics::_getOldestOpenUnpreparedReadTimestamp(opCtx));
// Acquire _mutex before reading _oldestActiveOplogEntryOpTime.
stdx::lock_guard<stdx::mutex> lm(_mutex);
- // To avoid compression loss, we have Timestamp(0, 0) be the default value if no oldest active
- // transaction optime is stored.
- Timestamp oldestActiveOplogEntryTimestamp = (_oldestActiveOplogEntryOpTime != boost::none)
- ? _oldestActiveOplogEntryOpTime->getTimestamp()
- : Timestamp();
- stats->setOldestActiveOplogEntryTimestamp(oldestActiveOplogEntryTimestamp);
+ // To avoid compression loss, we use the null OpTime if no oldest active transaction optime is
+ // stored.
+ repl::OpTime oldestActiveOplogEntryOpTime = (_oldestActiveOplogEntryOpTime != boost::none)
+ ? _oldestActiveOplogEntryOpTime.get()
+ : repl::OpTime();
+ stats->setOldestActiveOplogEntryOpTime(oldestActiveOplogEntryOpTime);
}
void ServerTransactionsMetrics::clearOpTimes() {
diff --git a/src/mongo/db/transactions_stats.idl b/src/mongo/db/transactions_stats.idl
index 834399a331c..070ca433080 100644
--- a/src/mongo/db/transactions_stats.idl
+++ b/src/mongo/db/transactions_stats.idl
@@ -35,6 +35,7 @@ global:
imports:
- "mongo/idl/basic_types.idl"
+ - "mongo/db/repl/replication_types.idl"
structs:
@@ -82,7 +83,7 @@ structs:
currentPrepared:
type: long
default: 0
- oldestActiveOplogEntryTimestamp:
- type: timestamp
+ oldestActiveOplogEntryOpTime:
+ type: optime
oldestOpenUnpreparedReadTimestamp:
type: timestamp