diff options
author | Samy Lanka <samy.lanka@mongodb.com> | 2019-01-24 21:57:25 -0500 |
---|---|---|
committer | Samy Lanka <samy.lanka@mongodb.com> | 2019-02-11 12:27:33 -0500 |
commit | 4f858c52b05ecc49d2ae19bbaf59fc0aad445b7e (patch) | |
tree | 2af6afbcb4d27b4b1ce22eb25750583666396a8a | |
parent | d568e329a67eee8ba241d52067750a3d8c42dc0f (diff) | |
download | mongo-4f858c52b05ecc49d2ae19bbaf59fc0aad445b7e.tar.gz |
SERVER-36489 Start initial sync oplog fetching from the 'oldest active transaction timestamp'
SERVER-36490 Initial sync should not actually prepare transactions on applying prepareTransaction oplog entries
SERVER-36491 During initial sync, make commitTransaction oplog entries apply the transaction from the prepare oplog entry
20 files changed, 1118 insertions, 251 deletions
diff --git a/jstests/noPassthrough/server_transaction_metrics_for_prepared_transactions.js b/jstests/noPassthrough/server_transaction_metrics_for_prepared_transactions.js index 4924143dfc9..7d1406ac936 100644 --- a/jstests/noPassthrough/server_transaction_metrics_for_prepared_transactions.js +++ b/jstests/noPassthrough/server_transaction_metrics_for_prepared_transactions.js @@ -88,7 +88,7 @@ verifyServerStatusChange( initialStatus.transactions, newStatus.transactions, "currentPrepared", 1); - assert.eq(newStatus.transactions.oldestActiveOplogEntryTimestamp, prepareTimestampForCommit); + assert.eq(newStatus.transactions.oldestActiveOplogEntryOpTime.ts, prepareTimestampForCommit); // Verify that the oldestOpenUnpreparedReadTimestamp is a null timestamp since the transaction // has been prepared. assert.eq(newStatus.transactions.oldestOpenUnpreparedReadTimestamp, Timestamp(0, 0)); @@ -137,7 +137,7 @@ verifyServerStatusChange( initialStatus.transactions, newStatus.transactions, "currentPrepared", 1); - assert.eq(newStatus.transactions.oldestActiveOplogEntryTimestamp, prepareTimestampForAbort); + assert.eq(newStatus.transactions.oldestActiveOplogEntryOpTime.ts, prepareTimestampForAbort); // Verify that the oldestOpenUnpreparedReadTimestamp is a null timestamp since the transaction // has been prepared. assert.eq(newStatus.transactions.oldestOpenUnpreparedReadTimestamp, Timestamp(0, 0)); diff --git a/jstests/replsets/initial_sync_commit_prepared_transaction.js b/jstests/replsets/initial_sync_commit_prepared_transaction.js new file mode 100644 index 00000000000..1135237a25c --- /dev/null +++ b/jstests/replsets/initial_sync_commit_prepared_transaction.js @@ -0,0 +1,107 @@ +/** + * Tests that initial sync successfully applies the commitTransaction oplog entry. To be able to + * test this, we have to pause collection cloning and run commitTransaction so that the oplog entry + * is applied during the oplog application phase of initial sync. + * + * @tags: [uses_transactions, uses_prepare_transaction] + */ + +(function() { + "use strict"; + + load("jstests/libs/check_log.js"); + load("jstests/core/txns/libs/prepare_helpers.js"); + + const replTest = new ReplSetTest({nodes: 2}); + replTest.startSet(); + + const config = replTest.getReplSetConfig(); + // Increase the election timeout so that we do not accidentally trigger an election while the + // secondary is restarting. + config.settings = {"electionTimeoutMillis": 12 * 60 * 60 * 1000}; + replTest.initiate(config); + + const primary = replTest.getPrimary(); + let secondary = replTest.getSecondary(); + + const dbName = "test"; + const collName = "initial_sync_commit_prepared_transaction"; + const testDB = primary.getDB(dbName); + const testColl = testDB.getCollection(collName); + + assert.commandWorked(testColl.insert({_id: 1})); + + jsTestLog("Preparing a transaction that will be the oldest active transaction"); + + // Prepare a transaction so that there is an active transaction with an oplog entry. The prepare + // timestamp will become the beginFetchingTimestamp during initial sync. + const session1 = primary.startSession({causalConsistency: false}); + const sessionDB1 = session1.getDatabase(dbName); + const sessionColl1 = sessionDB1.getCollection(collName); + session1.startTransaction(); + assert.commandWorked(sessionColl1.insert({_id: 2})); + let prepareTimestamp1 = PrepareHelpers.prepareTransaction(session1); + + // Do another operation so that the beginFetchingTimestamp will be different from the + // beginApplyingTimestamp. + assert.commandWorked(testColl.insert({_id: 3})); + + jsTestLog("Restarting the secondary"); + + // Restart the secondary with startClean set to true so that it goes through initial sync. Also + // restart the node with a failpoint turned on that will pause initial sync after the secondary + // has copied {_id: 1} and {_id: 3}. This way we can try to commit the prepared transaction + // while initial sync is paused and know that its operations won't be copied during collection + // cloning. Instead, the commitTransaction oplog entry must be applied during oplog application. + secondary = replTest.restart(secondary, { + startClean: true, + setParameter: { + 'failpoint.initialSyncHangDuringCollectionClone': tojson( + {mode: 'alwaysOn', data: {namespace: testColl.getFullName(), numDocsToClone: 2}}), + 'numInitialSyncAttempts': 1 + } + }); + + // Wait for failpoint to be reached so we know that collection cloning is paused. + checkLog.contains(secondary, "initialSyncHangDuringCollectionClone fail point enabled"); + + jsTestLog("Running operations while collection cloning is paused"); + + // Commit a transaction on the sync source while collection cloning is paused so that we know + // they must be applied during the oplog application stage of initial sync. + assert.commandWorked( + PrepareHelpers.commitTransactionAfterPrepareTS(session1, prepareTimestamp1)); + + jsTestLog("Resuming initial sync"); + + // Resume initial sync. + assert.commandWorked(secondary.adminCommand( + {configureFailPoint: "initialSyncHangDuringCollectionClone", mode: "off"})); + + // Wait for the secondary to complete initial sync. + replTest.waitForState(secondary, ReplSetTest.State.SECONDARY); + + jsTestLog("Initial sync completed"); + + // Make sure the transaction committed properly and is reflected after the initial sync. + let res = secondary.getDB(dbName).getCollection(collName).findOne({_id: 2}); + assert.docEq(res, {_id: 2}, res); + + // Step up the secondary after initial sync is done and make sure we can successfully run + // another transaction. + replTest.stepUp(secondary); + replTest.waitForState(secondary, ReplSetTest.State.PRIMARY); + let newPrimary = replTest.getPrimary(); + const session2 = newPrimary.startSession({causalConsistency: false}); + const sessionDB2 = session2.getDatabase(dbName); + const sessionColl2 = sessionDB2.getCollection(collName); + session2.startTransaction(); + assert.commandWorked(sessionColl2.insert({_id: 4})); + let prepareTimestamp2 = PrepareHelpers.prepareTransaction(session2); + assert.commandWorked( + PrepareHelpers.commitTransactionAfterPrepareTS(session2, prepareTimestamp2)); + res = newPrimary.getDB(dbName).getCollection(collName).findOne({_id: 4}); + assert.docEq(res, {_id: 4}, res); + + replTest.stopSet(); +})();
\ No newline at end of file diff --git a/jstests/replsets/initial_sync_fetch_from_oldest_active_transaction_timestamp.js b/jstests/replsets/initial_sync_fetch_from_oldest_active_transaction_timestamp.js new file mode 100644 index 00000000000..eeff7092001 --- /dev/null +++ b/jstests/replsets/initial_sync_fetch_from_oldest_active_transaction_timestamp.js @@ -0,0 +1,134 @@ +/** + * Tests that initial sync properly fetches from the oldest active transaction timestamp, but that + * it only applies from the beginApplyingTimestamp. The beginApplyingTimestamp is the timestamp of + * the oplog entry that was last applied on the sync source before initial sync begins. It is also + * the timestamp after which we will start applying operations during initial sync. + * + * To make sure that it is applying from the correct point, the test prepares a transaction before + * the beginFetchingTimestamp and commits it before the beginApplyingTimestamp. Since the + * transaction is not active by the time initial sync begins, its prepareTransaction oplog entry + * won't be fetched during oplog application and trying to apply the commitTransaction oplog entry + * will cause initial sync to fail. + * + * @tags: [uses_transactions, uses_prepare_transaction] + */ + +(function() { + "use strict"; + + load("jstests/libs/check_log.js"); + load("jstests/core/txns/libs/prepare_helpers.js"); + + const replTest = new ReplSetTest({nodes: [{}, {rsConfig: {priority: 0, votes: 0}}]}); + replTest.startSet(); + replTest.initiate(); + + const primary = replTest.getPrimary(); + let secondary = replTest.getSecondary(); + + const dbName = "test"; + const collName = "initial_sync_fetch_from_oldest_active_transaction_timestamp"; + const testDB = primary.getDB(dbName); + const testColl = testDB.getCollection(collName); + + assert.commandWorked(testColl.insert({_id: 1})); + + jsTestLog("Preparing a transaction that will later be committed"); + + const session1 = primary.startSession({causalConsistency: false}); + const sessionDB1 = session1.getDatabase(dbName); + const sessionColl1 = sessionDB1.getCollection(collName); + session1.startTransaction(); + assert.commandWorked(sessionColl1.insert({_id: 2})); + const prepareTimestamp1 = PrepareHelpers.prepareTransaction(session1); + + jsTestLog("Preparing a transaction that will later be the oldest active transaction"); + + // Prepare a transaction so that there is an active transaction with an oplog entry. The prepare + // timestamp will become the beginFetchingTimestamp during initial sync. + const session2 = primary.startSession({causalConsistency: false}); + const sessionDB2 = session2.getDatabase(dbName); + const sessionColl2 = sessionDB2.getCollection(collName); + session2.startTransaction(); + assert.commandWorked(sessionColl2.insert({_id: 3})); + const prepareTimestamp2 = PrepareHelpers.prepareTransaction(session2); + + jsTestLog("beginFetchingTimestamp: " + prepareTimestamp2); + + // Commit the first transaction so that we have an operation that is fetched during initial sync + // but should not be applied. If this is applied, initial sync will fail because while trying to + // apply the commitTransaction oplog entry, it will fail to get the prepareTransaction oplog + // entry since its optime is before the beginFetchingTimestamp. Doing another operation will + // also cause the beginApplyingTimestamp to be different from the beginFetchingTimestamp. Note + // that since the beginApplyingTimestamp is the timestamp after which operations are applied + // during initial sync, this commitTransaction will not be applied. + const beginApplyingTimestamp = + assert + .commandWorked( + PrepareHelpers.commitTransactionAfterPrepareTS(session1, prepareTimestamp1)) + .operationTime; + + jsTestLog("beginApplyingTimestamp: " + beginApplyingTimestamp); + + // Restart the secondary with startClean set to true so that it goes through initial sync. Also + // restart the node with a failpoint turned on that will pause initial sync after the secondary + // has copied {_id: 1} and {_id: 2}. This way we can insert more documents when initial sync is + // paused and know that they won't be copied during collection cloning but instead must be + // applied during oplog application. + secondary = replTest.restart(secondary, { + startClean: true, + setParameter: { + 'failpoint.initialSyncHangDuringCollectionClone': tojson( + {mode: 'alwaysOn', data: {namespace: testColl.getFullName(), numDocsToClone: 2}}), + 'numInitialSyncAttempts': 1 + } + }); + + jsTestLog("Secondary was restarted"); + + // Wait for failpoint to be reached so we know that collection cloning is paused. + checkLog.contains(secondary, "initialSyncHangDuringCollectionClone fail point enabled"); + + jsTestLog("Running operations while collection cloning is paused"); + + // Run some operations on the sync source while collection cloning is paused so that we know + // they must be applied during the oplog application stage of initial sync. This will also make + // sure that the beginApplyingTimestamp and the stopTimestamp in initial sync are different. The + // stopTimestamp is the timestamp of the oplog entry that was last applied on the sync source + // when the oplog application phase of initial sync begins. + const stopTimestamp = + assert.commandWorked(testColl.runCommand("insert", {documents: [{_id: 4}]})).operationTime; + + jsTestLog("stopTimestamp: " + stopTimestamp); + + // Resume initial sync. + assert.commandWorked(secondary.adminCommand( + {configureFailPoint: "initialSyncHangDuringCollectionClone", mode: "off"})); + + jsTestLog("Initial sync resumed"); + + // Wait for the secondary to complete initial sync. + replTest.waitForState(secondary, ReplSetTest.State.SECONDARY); + + jsTestLog("Initial sync completed"); + + // Make sure the secondary has the prepare transaction oplog entry of the active transaction. + secondary.setSlaveOk(); + const localDB = secondary.getDB("local"); + const oplog = localDB.getCollection("oplog.rs"); + let res = oplog.find({"prepare": true}); + assert.eq(res.count(), 1, res); + + // Make sure the first transaction committed properly and is reflected after the initial sync. + res = secondary.getDB(dbName).getCollection(collName).findOne({_id: 2}); + assert.docEq(res, {_id: 2}, res); + + // TODO SERVER-36492: Step up the secondary, make sure that we get the prepare conflicts and + // lock conflicts we expect, make sure we can commit the second transaction after initial sync + // is done and that we can successfully run another transaction. + + jsTestLog("Aborting the second transaction"); + session2.abortTransaction_forTesting(); + + replTest.stopSet(); +})();
\ No newline at end of file diff --git a/jstests/replsets/initial_sync_fetch_from_oldest_active_transaction_timestamp_no_oplog_application.js b/jstests/replsets/initial_sync_fetch_from_oldest_active_transaction_timestamp_no_oplog_application.js new file mode 100644 index 00000000000..c05f22b0219 --- /dev/null +++ b/jstests/replsets/initial_sync_fetch_from_oldest_active_transaction_timestamp_no_oplog_application.js @@ -0,0 +1,110 @@ +/** + * Tests that initial sync properly fetches from the oldest active transaction timestamp, even if + * the beginApplyingTimestamp and the stopTimestamp are the same. The beginApplyingTimestamp is the + * timestamp of the oplog entry that was last applied on the sync source before initial sync begins. + * It is also the timestamp after which we will start applying operations during initial sync. The + * stopTimestamp is the timestamp of the oplog entry that was last applied on the sync source when + * before the oplog application phase of initial sync begins. If they are the same, it means that + * no operations were run on the sync source during the collection cloning phase of initial sync and + * so no oplog entries need to be applied. + * + * To make sure that it is applying from the correct point, the test prepares a transaction before + * the beginFetchingTimestamp and commits it before the beginApplyingTimestamp. Since the + * transaction is not active by the time initial sync begins, its prepareTransaction oplog entry + * won't be fetched during oplog application and trying to apply the commitTransaction oplog entry + * will cause initial sync to fail. + * + * @tags: [uses_transactions, uses_prepare_transaction] + */ + +(function() { + "use strict"; + + load("jstests/libs/check_log.js"); + load("jstests/core/txns/libs/prepare_helpers.js"); + + const replTest = new ReplSetTest({nodes: [{}, {rsConfig: {priority: 0, votes: 0}}]}); + replTest.startSet(); + replTest.initiate(); + + const primary = replTest.getPrimary(); + let secondary = replTest.getSecondary(); + + const dbName = "test"; + const collName = + "initial_sync_fetch_from_oldest_active_transaction_timestamp_no_oplog_application"; + const testDB = primary.getDB(dbName); + const testColl = testDB.getCollection(collName); + + assert.commandWorked(testColl.insert({_id: 1})); + + jsTestLog("Preparing a transaction that will later be committed"); + + const session1 = primary.startSession({causalConsistency: false}); + const sessionDB1 = session1.getDatabase(dbName); + const sessionColl1 = sessionDB1.getCollection(collName); + session1.startTransaction(); + assert.commandWorked(sessionColl1.insert({_id: 2})); + const prepareTimestamp1 = PrepareHelpers.prepareTransaction(session1); + + jsTestLog("Preparing a transaction that will be the oldest active transaction"); + + // Prepare a transaction so that there is an active transaction with an oplog entry. The prepare + // timestamp will become the beginFetchingTimestamp during initial sync. + const session2 = primary.startSession({causalConsistency: false}); + const sessionDB2 = session2.getDatabase(dbName); + const sessionColl2 = sessionDB2.getCollection(collName); + session2.startTransaction(); + assert.commandWorked(sessionColl2.insert({_id: 3})); + const prepareTimestamp2 = PrepareHelpers.prepareTransaction(session2); + + jsTestLog("beginFetchingTimestamp: " + prepareTimestamp2); + + // Commit the first transaction so that we have an operation that is fetched during initial sync + // but should not be applied. If this is applied, initial sync will fail because while trying to + // apply the commitTransaction oplog entry, it will fail to get the prepareTransaction oplog + // entry since its optime is before the beginFetchingTimestamp. Doing another operation will + // also cause the beginApplyingTimestamp to be different from the beginFetchingTimestamp. Note + // that since the beginApplyingTimestamp is the timestamp after which operations are applied + // during initial sync, this commitTransaction will not be applied. + const beginApplyingTimestamp = + assert + .commandWorked( + PrepareHelpers.commitTransactionAfterPrepareTS(session1, prepareTimestamp1)) + .operationTime; + + jsTestLog("beginApplyingTimestamp/stopTimestamp: " + beginApplyingTimestamp); + + // Restart the secondary with startClean set to true so that it goes through initial sync. Since + // we won't be running any operations during collection cloning, the beginApplyingTimestamp and + // stopTimestamp should be the same. + secondary = replTest.restart(secondary, + {startClean: true, setParameter: {'numInitialSyncAttempts': 1}}); + replTest.awaitSecondaryNodes(); + replTest.awaitReplication(); + + jsTestLog("Secondary was restarted"); + + // Wait for the secondary to complete initial sync. + replTest.waitForState(secondary, ReplSetTest.State.SECONDARY); + + jsTestLog("Initial sync completed"); + + // Make sure the secondary has the prepare transaction oplog entry of the active transaction + // even though the beginApplyingTimestamp and stopTimestamp were the same. + secondary.setSlaveOk(); + const localDB = secondary.getDB("local"); + const oplog = localDB.getCollection("oplog.rs"); + let res = oplog.find({"prepare": true}); + assert.eq(res.count(), 1, res); + + // Make sure the first transaction committed properly and is reflected after the initial sync. + res = secondary.getDB(dbName).getCollection(collName).findOne({_id: 2}); + assert.docEq(res, {_id: 2}, res); + + jsTestLog("Aborting the second transaction"); + + session2.abortTransaction_forTesting(); + + replTest.stopSet(); +})();
\ No newline at end of file diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp index a32a0d212c1..6f90eedf4c1 100644 --- a/src/mongo/db/repl/apply_ops.cpp +++ b/src/mongo/db/repl/apply_ops.cpp @@ -322,14 +322,16 @@ Status _applyPrepareTransaction(OperationContext* opCtx, } /** - * Make sure that if we are in replication recovery, we don't apply the prepare transaction oplog - * entry as part of recovery until the very end of recovery. Otherwise, only apply the prepare - * transaction oplog entry if we are a secondary. + * Make sure that if we are in replication recovery or initial sync, we don't apply the prepare + * transaction oplog entry until we either see a commit transaction oplog entry or are at the very + * end of recovery/initial sync. Otherwise, only apply the prepare transaction oplog entry if we are + * a secondary. */ Status _applyPrepareTransactionOplogEntry(OperationContext* opCtx, const repl::OplogEntry& entry, repl::OplogApplication::Mode oplogApplicationMode) { - // Wait until the end of recovery to apply the operations from the prepared transaction. + // Don't apply the operations from the prepared transaction until either we see a commit + // transaction oplog entry during recovery or are at the end of recovery. if (oplogApplicationMode == OplogApplication::Mode::kRecovering) { if (!serverGlobalParams.enableMajorityReadConcern) { error() << "Cannot replay a prepared transaction when 'enableMajorityReadConcern' is " @@ -339,12 +341,19 @@ Status _applyPrepareTransactionOplogEntry(OperationContext* opCtx, fassert(50964, serverGlobalParams.enableMajorityReadConcern); return Status::OK(); } + + // Don't apply the operations from the prepared transaction until either we see a commit + // transaction oplog entry during the oplog application phase of initial sync or are at the end + // of initial sync. + if (oplogApplicationMode == OplogApplication::Mode::kInitialSync) { + return Status::OK(); + } + // Return error if run via applyOps command. uassert(50945, "applyOps with prepared flag is only used internally by secondaries.", oplogApplicationMode != repl::OplogApplication::Mode::kApplyOpsCmd); - // TODO: SERVER-36492 Only run on secondary until we support initial sync. invariant(oplogApplicationMode == repl::OplogApplication::Mode::kSecondary); return _applyPrepareTransaction(opCtx, entry, oplogApplicationMode); diff --git a/src/mongo/db/repl/initial_sync_state.h b/src/mongo/db/repl/initial_sync_state.h index 88208a3b97e..5d4a1bc12bd 100644 --- a/src/mongo/db/repl/initial_sync_state.h +++ b/src/mongo/db/repl/initial_sync_state.h @@ -52,10 +52,14 @@ struct InitialSyncState { InitialSyncState(std::unique_ptr<DatabasesCloner> cloner) : dbsCloner(std::move(cloner)){}; std::unique_ptr<DatabasesCloner> - dbsCloner; // Cloner for all databases included in initial sync. - Timestamp beginTimestamp; // Timestamp from the latest entry in oplog when started. - Timestamp stopTimestamp; // Referred to as minvalid, or the place we can transition states. - Timer timer; // Timer for timing how long each initial sync attempt takes. + dbsCloner; // Cloner for all databases included in initial sync. + Timestamp beginApplyingTimestamp; // Timestamp from the latest entry in oplog when started. It + // is also the timestamp after which we will start applying + // operations during initial sync. + Timestamp beginFetchingTimestamp; // Timestamp from the earliest active transaction that had an + // oplog entry. + Timestamp stopTimestamp; // Referred to as minvalid, or the place we can transition states. + Timer timer; // Timer for timing how long each initial sync attempt takes. size_t fetchedMissingDocs = 0; size_t appliedOps = 0; }; diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index 80cd33bda8b..82d27390b7d 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -353,8 +353,13 @@ void InitialSyncer::_appendInitialSyncProgressMinimal_inlock(BSONObjBuilder* bob } bob->appendNumber("fetchedMissingDocs", _initialSyncState->fetchedMissingDocs); bob->appendNumber("appliedOps", _initialSyncState->appliedOps); - if (!_initialSyncState->beginTimestamp.isNull()) { - bob->append("initialSyncOplogStart", _initialSyncState->beginTimestamp); + if (!_initialSyncState->beginApplyingTimestamp.isNull()) { + bob->append("initialSyncOplogStart", _initialSyncState->beginApplyingTimestamp); + } + // Only include the beginFetchingTimestamp if it's different from the beginApplyingTimestamp. + if (!_initialSyncState->beginFetchingTimestamp.isNull() && + _initialSyncState->beginFetchingTimestamp != _initialSyncState->beginApplyingTimestamp) { + bob->append("initialSyncOplogFetchingStart", _initialSyncState->beginFetchingTimestamp); } if (!_initialSyncState->stopTimestamp.isNull()) { bob->append("initialSyncOplogEnd", _initialSyncState->stopTimestamp); @@ -590,14 +595,6 @@ void InitialSyncer::_chooseSyncSourceCallback( _syncSource = syncSource.getValue(); - // Create oplog applier. - auto consistencyMarkers = _replicationProcess->getConsistencyMarkers(); - OplogApplier::Options options; - options.allowNamespaceNotFoundErrorsOnCrudOps = true; - options.missingDocumentSourceForInitialSync = _syncSource; - _oplogApplier = _dataReplicatorExternalState->makeOplogApplier( - _oplogBuffer.get(), _observer.get(), consistencyMarkers, _storage, options, _writerPool); - // Schedule rollback ID checker. _rollbackChecker = stdx::make_unique<RollbackChecker>(_exec, _syncSource); auto scheduleResult = _rollbackChecker->reset([=](const RollbackChecker::Result& result) { @@ -640,6 +637,25 @@ Status InitialSyncer::_truncateOplogAndDropReplicatedDatabases() { return _storage->dropReplicatedDatabases(opCtx.get()); } +Status InitialSyncer::_scheduleGetBeginFetchingOpTime_inlock( + std::shared_ptr<OnCompletionGuard> onCompletionGuard) { + // We will ultimately be getting oldestActiveOplogEntryOpTime from the transactions field in + // serverStatus to use as the beginFetchingTimestamp. Also, we project out the metrics and + // wiredTiger fields because we don't need them and they're very large. + executor::RemoteCommandRequest request( + _syncSource, + "admin", + BSON("serverStatus" << 1 << "transactions" << 1 << "metrics" << 0 << "wiredTiger" << 0), + nullptr); + + auto scheduleResultSW = _exec->scheduleRemoteCommand( + request, [=](const executor::TaskExecutor::RemoteCommandCallbackArgs& callbackData) { + _getBeginFetchingOpTimeCallback(callbackData.response, onCompletionGuard); + }); + + return scheduleResultSW.getStatus(); +} + void InitialSyncer::_rollbackCheckerResetCallback( const RollbackChecker::Result& result, std::shared_ptr<OnCompletionGuard> onCompletionGuard) { stdx::lock_guard<stdx::mutex> lock(_mutex); @@ -650,11 +666,40 @@ void InitialSyncer::_rollbackCheckerResetCallback( return; } + status = _scheduleGetBeginFetchingOpTime_inlock(onCompletionGuard); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } +} + +void InitialSyncer::_getBeginFetchingOpTimeCallback( + const executor::TaskExecutor::ResponseStatus& response, + std::shared_ptr<OnCompletionGuard> onCompletionGuard) { + stdx::unique_lock<stdx::mutex> lock(_mutex); + auto status = _checkForShutdownAndConvertStatus_inlock( + response.status, + "error while getting oldest active transaction timestamp for begin fetching timestamp"); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + + invariant(response.data.hasField("transactions")); + + // Only set beginFetchingOpTime if the oldestActiveOplogEntryTimestamp actually exists. + OpTime beginFetchingOpTime = OpTime(); + if (response.data["transactions"].embeddedObject().hasField("oldestActiveOplogEntryOpTime")) { + beginFetchingOpTime = repl::OpTime::parse( + response.data["transactions"]["oldestActiveOplogEntryOpTime"].Obj()); + } + status = _scheduleLastOplogEntryFetcher_inlock( [=](const StatusWith<mongo::Fetcher::QueryResponse>& response, mongo::Fetcher::NextAction*, - mongo::BSONObjBuilder*) { - _lastOplogEntryFetcherCallbackForBeginTimestamp(response, onCompletionGuard); + mongo::BSONObjBuilder*) mutable { + _lastOplogEntryFetcherCallbackForBeginApplyingTimestamp( + response, onCompletionGuard, beginFetchingOpTime); }); if (!status.isOK()) { onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); @@ -662,9 +707,10 @@ void InitialSyncer::_rollbackCheckerResetCallback( } } -void InitialSyncer::_lastOplogEntryFetcherCallbackForBeginTimestamp( +void InitialSyncer::_lastOplogEntryFetcherCallbackForBeginApplyingTimestamp( const StatusWith<Fetcher::QueryResponse>& result, - std::shared_ptr<OnCompletionGuard> onCompletionGuard) { + std::shared_ptr<OnCompletionGuard> onCompletionGuard, + OpTime& beginFetchingOpTime) { stdx::unique_lock<stdx::mutex> lock(_mutex); auto status = _checkForShutdownAndConvertStatus_inlock( result.getStatus(), "error while getting last oplog entry for begin timestamp"); @@ -695,23 +741,23 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForBeginTimestamp( readConcernBob.append("afterClusterTime", lastOpTime.getTimestamp()); readConcernBob.done(); - _fCVFetcher = - stdx::make_unique<Fetcher>(_exec, - _syncSource, - NamespaceString::kServerConfigurationNamespace.db().toString(), - queryBob.obj(), - [=](const StatusWith<mongo::Fetcher::QueryResponse>& response, - mongo::Fetcher::NextAction*, - mongo::BSONObjBuilder*) { - _fcvFetcherCallback(response, onCompletionGuard, lastOpTime); - }, - ReadPreferenceSetting::secondaryPreferredMetadata(), - RemoteCommandRequest::kNoTimeout /* find network timeout */, - RemoteCommandRequest::kNoTimeout /* getMore network timeout */, - RemoteCommandRetryScheduler::makeRetryPolicy( - numInitialSyncOplogFindAttempts.load(), - executor::RemoteCommandRequest::kNoTimeout, - RemoteCommandRetryScheduler::kAllRetriableErrors)); + _fCVFetcher = stdx::make_unique<Fetcher>( + _exec, + _syncSource, + NamespaceString::kServerConfigurationNamespace.db().toString(), + queryBob.obj(), + [=](const StatusWith<mongo::Fetcher::QueryResponse>& response, + mongo::Fetcher::NextAction*, + mongo::BSONObjBuilder*) mutable { + _fcvFetcherCallback(response, onCompletionGuard, lastOpTime, beginFetchingOpTime); + }, + ReadPreferenceSetting::secondaryPreferredMetadata(), + RemoteCommandRequest::kNoTimeout /* find network timeout */, + RemoteCommandRequest::kNoTimeout /* getMore network timeout */, + RemoteCommandRetryScheduler::makeRetryPolicy( + numInitialSyncOplogFindAttempts.load(), + executor::RemoteCommandRequest::kNoTimeout, + RemoteCommandRetryScheduler::kAllRetriableErrors)); Status scheduleStatus = _fCVFetcher->schedule(); if (!scheduleStatus.isOK()) { _fCVFetcher.reset(); @@ -722,7 +768,8 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForBeginTimestamp( void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse>& result, std::shared_ptr<OnCompletionGuard> onCompletionGuard, - const OpTime& lastOpTime) { + const OpTime& lastOpTime, + OpTime& beginFetchingOpTime) { stdx::unique_lock<stdx::mutex> lock(_mutex); auto status = _checkForShutdownAndConvertStatus_inlock( result.getStatus(), "error while getting the remote feature compatibility version"); @@ -790,12 +837,41 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse> _databasesClonerCallback(status, onCompletionGuard); })); - _initialSyncState->beginTimestamp = lastOpTime.getTimestamp(); + // Create oplog applier. + auto consistencyMarkers = _replicationProcess->getConsistencyMarkers(); + OplogApplier::Options options; + options.allowNamespaceNotFoundErrorsOnCrudOps = true; + options.missingDocumentSourceForInitialSync = _syncSource; + options.beginApplyingOpTime = lastOpTime; + _oplogApplier = _dataReplicatorExternalState->makeOplogApplier( + _oplogBuffer.get(), _observer.get(), consistencyMarkers, _storage, options, _writerPool); + + const auto beginApplyingTimestamp = lastOpTime.getTimestamp(); + _initialSyncState->beginApplyingTimestamp = beginApplyingTimestamp; + + // If there is no beginFetchingOpTime, then it means that there were no open active transactions + // with an oplog entry so we can safely start fetching at the same point that we are applying + // from. + if (beginFetchingOpTime.isNull()) { + _initialSyncState->beginFetchingTimestamp = beginApplyingTimestamp; + beginFetchingOpTime = lastOpTime; + } else { + _initialSyncState->beginFetchingTimestamp = beginFetchingOpTime.getTimestamp(); + } + + invariant(_initialSyncState->beginApplyingTimestamp >= + _initialSyncState->beginFetchingTimestamp, + str::stream() << "beginApplyingTimestamp was less than beginFetchingTimestamp. " + "beginApplyingTimestamp: " + << _initialSyncState->beginApplyingTimestamp.toBSON() + << " beginFetchingTimestamp: " + << _initialSyncState->beginFetchingTimestamp.toBSON()); invariant(!result.getValue().documents.empty()); - LOG(2) << "Setting begin timestamp to " << _initialSyncState->beginTimestamp + LOG(2) << "Setting begin applying timestamp to " << _initialSyncState->beginApplyingTimestamp << " using last oplog entry: " << redact(result.getValue().documents.front()) - << ", ns: " << _opts.localOplogNS; + << ", ns: " << _opts.localOplogNS << " and the begin fetching timestamp to " + << _initialSyncState->beginFetchingTimestamp; const auto configResult = _dataReplicatorExternalState->getCurrentConfig(); @@ -809,7 +885,7 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse> const auto& config = configResult.getValue(); _oplogFetcher = stdx::make_unique<OplogFetcher>( _exec, - lastOpTime, + beginFetchingOpTime, _syncSource, _opts.remoteOplogNS, config, @@ -823,7 +899,8 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse> return _enqueueDocuments(first, last, info); }, [=](const Status& s) { _oplogFetcherCallback(s, onCompletionGuard); }, - initialSyncOplogFetcherBatchSize); + initialSyncOplogFetcherBatchSize, + OplogFetcher::StartingPoint::kEnqueueFirstDoc); LOG(2) << "Starting OplogFetcher: " << _oplogFetcher->toString(); @@ -961,7 +1038,10 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp( optime = optimeStatus.getValue(); _initialSyncState->stopTimestamp = optime.getTimestamp(); - if (_initialSyncState->beginTimestamp != _initialSyncState->stopTimestamp) { + // If the beginFetchingTimestamp is different from the stopTimestamp, it indicates that + // there are oplog entries fetched by the oplog fetcher that need to be written to the oplog + // and/or there are operations that need to be applied. + if (_initialSyncState->beginFetchingTimestamp != _initialSyncState->stopTimestamp) { invariant(_lastApplied.isNull()); _checkApplierProgressAndScheduleGetNextApplierBatch_inlock(lock, onCompletionGuard); return; @@ -1107,6 +1187,17 @@ void InitialSyncer::_multiApplierCallback(const Status& multiApplierStatus, _lastApplied = lastApplied; _opts.setMyLastOptime(_lastApplied, ReplicationCoordinator::DataConsistency::Inconsistent); + // Update oplog visibility after applying a batch so that while applying transaction oplog + // entries, the TransactionHistoryIterator can get earlier oplog entries associated with the + // transaction. Note that setting the oplog visibility timestamp here will be safe even if + // initial sync was restarted because until initial sync ends, no one else will try to read our + // oplog. It is also safe even if we tried to read from our own oplog because we never try to + // read from the oplog before applying at least one batch and therefore setting a value for the + // oplog visibility timestamp. + auto opCtx = makeOpCtx(); + const bool orderedCommit = true; + _storage->oplogDiskLocRegister(opCtx.get(), lastApplied.getTimestamp(), orderedCommit); + auto fetchCount = _fetchCount.load(); if (fetchCount > 0) { _initialSyncState->fetchedMissingDocs += fetchCount; @@ -1369,11 +1460,11 @@ void InitialSyncer::_checkApplierProgressAndScheduleGetNextApplierBatch_inlock( } // Basic sanity check on begin/stop timestamps. - if (_initialSyncState->beginTimestamp > _initialSyncState->stopTimestamp) { + if (_initialSyncState->beginApplyingTimestamp > _initialSyncState->stopTimestamp) { std::string msg = str::stream() << "Possible rollback on sync source " << _syncSource.toString() << ". Currently at " << _initialSyncState->stopTimestamp.toBSON() << ". Started at " - << _initialSyncState->beginTimestamp.toBSON(); + << _initialSyncState->beginApplyingTimestamp.toBSON(); error() << msg; onCompletionGuard->setResultAndCancelRemainingWork_inlock( lock, Status(ErrorCodes::OplogOutOfOrder, msg)); @@ -1381,11 +1472,13 @@ void InitialSyncer::_checkApplierProgressAndScheduleGetNextApplierBatch_inlock( } if (_lastApplied.isNull()) { - // Check if any ops occurred while cloning. - invariant(_initialSyncState->beginTimestamp < _initialSyncState->stopTimestamp); - log() << "Applying operations until " << _initialSyncState->stopTimestamp.toBSON() - << " before initial sync can complete. (starting at " - << _initialSyncState->beginTimestamp.toBSON() << ")"; + // Check if any ops occurred while cloning or any ops need to be fetched. + invariant(_initialSyncState->beginFetchingTimestamp < _initialSyncState->stopTimestamp); + log() << "Writing to the oplog and applying operations until " + << _initialSyncState->stopTimestamp.toBSON() + << " before initial sync can complete. (started fetching at " + << _initialSyncState->beginFetchingTimestamp.toBSON() << " and applying at " + << _initialSyncState->beginApplyingTimestamp.toBSON() << ")"; // Fall through to scheduling _getNextApplierBatchCallback(). } else if (_lastApplied.getTimestamp() >= _initialSyncState->stopTimestamp) { // Check for rollback if we have applied far enough to be consistent. diff --git a/src/mongo/db/repl/initial_syncer.h b/src/mongo/db/repl/initial_syncer.h index c84658092fb..6fbf2d14ef3 100644 --- a/src/mongo/db/repl/initial_syncer.h +++ b/src/mongo/db/repl/initial_syncer.h @@ -296,7 +296,11 @@ private: * | * | * V - * _lastOplogEntryFetcherCallbackForBeginTimestamp() + * _getBeginFetchingTimestampCallback() + * | + * | + * V + * _lastOplogEntryFetcherCallbackForBeginApplyingTimestamp() * | * | * V @@ -399,10 +403,27 @@ private: * setting a reference point for the state of the sync source's oplog when data cloning * completes. */ - void _lastOplogEntryFetcherCallbackForBeginTimestamp( + void _lastOplogEntryFetcherCallbackForBeginApplyingTimestamp( const StatusWith<Fetcher::QueryResponse>& result, - std::shared_ptr<OnCompletionGuard> onCompletionGuard); + std::shared_ptr<OnCompletionGuard> onCompletionGuard, + OpTime& beginFetchingOpTime); + + /** + * Callback that gets the oldestActiveOplogEntryOptime from the transactions field in the + * serverStatus response, which refers to the optime of the oldest active transaction with an + * oplog entry. It will be used as the beginFetchingTimestamp. + */ + void _getBeginFetchingOpTimeCallback(const executor::TaskExecutor::ResponseStatus& response, + std::shared_ptr<OnCompletionGuard> onCompletionGuard); + /** + * Schedules a remote command to call serverStatus on the sync source. From the response, we + * will be able to get the oldestActiveOplogEntryOptime from the transactions field, which + * refers to the optime of the oldest active transaction with an oplog entry. It will be used as + * the beginFetchingTimestamp. + */ + Status _scheduleGetBeginFetchingOpTime_inlock( + std::shared_ptr<OnCompletionGuard> onCompletionGuard); /** * Callback for the '_fCVFetcher'. A successful response lets us check if the remote node @@ -410,7 +431,8 @@ private: */ void _fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse>& result, std::shared_ptr<OnCompletionGuard> onCompletionGuard, - const OpTime& lastOpTime); + const OpTime& lastOpTime, + OpTime& beginFetchingOpTime); /** * Callback for oplog fetcher. diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp index 408557c823d..dc7b084f07c 100644 --- a/src/mongo/db/repl/initial_syncer_test.cpp +++ b/src/mongo/db/repl/initial_syncer_test.cpp @@ -507,6 +507,14 @@ BSONObj makeRollbackCheckerResponse(int rollbackId) { } /** + * Generates a serverStatus response with an 'oldestActiveOplogEntryOpTime' field. + */ +BSONObj makeServerStatusResponse(OpTime oldestActiveTxnOpTime) { + return BSON("ok" << 1 << "transactions" + << BSON("oldestActiveOplogEntryOpTime" << oldestActiveTxnOpTime)); +} + +/** * Generates a cursor response for a Fetcher to consume. */ RemoteCommandResponse makeCursorResponse(CursorId cursorId, @@ -1156,6 +1164,67 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughRollbackCheckerCallbackError ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied); } +TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetBeginFetchingTimestampScheduleError) { + auto initialSyncer = &getInitialSyncer(); + auto opCtx = makeOpCtx(); + + // Getting the begin fetching timestamp is the only time a serverStatus command is sent, so we + // reject the serverStatus command and save the request for inspection at the end of this test + // case. + executor::RemoteCommandRequest request; + _executorProxy->shouldFailScheduleRemoteCommandRequest = + [&request](const executor::RemoteCommandRequest& requestToSend) { + request = requestToSend; + return "serverStatus" == requestToSend.cmdObj.firstElement().fieldNameStringData(); + }; + + HostAndPort syncSource("localhost", 12345); + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource); + ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts)); + + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + net->runReadyNetworkOperations(); + } + + initialSyncer->join(); + ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied); + + ASSERT_EQUALS(syncSource, request.target); + ASSERT_EQUALS(NamespaceString::kAdminDb, request.dbname); + assertRemoteCommandNameEquals("serverStatus", request); +} + +TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetBeginFetchingTimestampCallbackError) { + auto initialSyncer = &getInitialSyncer(); + auto opCtx = makeOpCtx(); + + _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); + ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts)); + + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + // Base rollback ID. + net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + net->runReadyNetworkOperations(); + + assertRemoteCommandNameEquals( + "serverStatus", + net->scheduleErrorResponse( + Status(ErrorCodes::OperationFailed, "serverStatus command failed at sync source"))); + net->runReadyNetworkOperations(); + } + + initialSyncer->join(); + ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied); +} + TEST_F(InitialSyncerTest, InitialSyncerPassesThroughLastOplogEntryFetcherScheduleError) { auto initialSyncer = &getInitialSyncer(); auto opCtx = makeOpCtx(); @@ -1179,6 +1248,11 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughLastOplogEntryFetcherSchedul // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); } @@ -1205,6 +1279,11 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughLastOplogEntryFetcherCallbac // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); assertRemoteCommandNameEquals( @@ -1231,6 +1310,11 @@ TEST_F(InitialSyncerTest, InitialSyncerCancelsLastOplogEntryFetcherOnShutdown) { // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); ASSERT_TRUE(net->hasReadyRequests()); @@ -1257,6 +1341,11 @@ TEST_F(InitialSyncerTest, // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -1280,6 +1369,11 @@ TEST_F(InitialSyncerTest, // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry first attempt - retriable error. @@ -1308,6 +1402,11 @@ TEST_F(InitialSyncerTest, // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -1334,6 +1433,11 @@ TEST_F(InitialSyncerTest, // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -1371,6 +1475,11 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughFCVFetcherScheduleError) { // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -1397,12 +1506,17 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughFCVFetcherCallbackError) { // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)}); - auto request = assertRemoteCommandNameEquals( + request = assertRemoteCommandNameEquals( "find", net->scheduleErrorResponse( Status(ErrorCodes::OperationFailed, "find command failed at sync source"))); @@ -1427,6 +1541,11 @@ TEST_F(InitialSyncerTest, InitialSyncerCancelsFCVFetcherOnShutdown) { // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -1454,6 +1573,11 @@ TEST_F(InitialSyncerTest, InitialSyncerResendsFindCommandIfFCVFetcherReturnsRetr // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -1485,6 +1609,11 @@ void InitialSyncerTest::runInitialSyncWithBadFCVResponse(std::vector<BSONObj> do // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -1548,6 +1677,11 @@ TEST_F(InitialSyncerTest, InitialSyncerSucceedsWhenFCVFetcherReturnsOldVersion) // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -1594,6 +1728,11 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughOplogFetcherScheduleError) { // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -1631,6 +1770,11 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughOplogFetcherCallbackError) { // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -1646,7 +1790,7 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughOplogFetcherCallbackError) { net->runReadyNetworkOperations(); // Oplog tailing query. - auto request = assertRemoteCommandNameEquals( + request = assertRemoteCommandNameEquals( "find", net->scheduleErrorResponse(Status(ErrorCodes::OperationFailed, "dead cursor"))); ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); net->runReadyNetworkOperations(); @@ -1676,10 +1820,15 @@ TEST_F(InitialSyncerTest, // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. - auto request = + request = assertRemoteCommandNameEquals("find", net->scheduleSuccessfulResponse(makeCursorResponse( 0LL, _options.localOplogNS, {makeOplogEntryObj(1)}))); @@ -1730,6 +1879,11 @@ TEST_F( // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -1744,14 +1898,13 @@ TEST_F( // Oplog tailing query. // Simulate cursor closing on sync source. - auto request = - assertRemoteCommandNameEquals("find", - net->scheduleSuccessfulResponse(makeCursorResponse( - 0LL, - _options.localOplogNS, - {makeOplogEntryObj(1), - makeOplogEntryObj(2, OpTypeEnum::kCommand), - makeOplogEntryObj(3, OpTypeEnum::kCommand)}))); + request = assertRemoteCommandNameEquals("find", + net->scheduleSuccessfulResponse(makeCursorResponse( + 0LL, + _options.localOplogNS, + {makeOplogEntryObj(1), + makeOplogEntryObj(2, OpTypeEnum::kCommand), + makeOplogEntryObj(3, OpTypeEnum::kCommand)}))); ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); net->runReadyNetworkOperations(); @@ -1783,6 +1936,11 @@ TEST_F( // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -1797,14 +1955,13 @@ TEST_F( // Oplog tailing query. // Simulate cursor closing on sync source. - auto request = - assertRemoteCommandNameEquals("find", - net->scheduleSuccessfulResponse(makeCursorResponse( - 0LL, - _options.localOplogNS, - {makeOplogEntryObj(1), - makeOplogEntryObj(2, OpTypeEnum::kCommand), - makeOplogEntryObj(3, OpTypeEnum::kCommand)}))); + request = assertRemoteCommandNameEquals("find", + net->scheduleSuccessfulResponse(makeCursorResponse( + 0LL, + _options.localOplogNS, + {makeOplogEntryObj(1), + makeOplogEntryObj(2, OpTypeEnum::kCommand), + makeOplogEntryObj(3, OpTypeEnum::kCommand)}))); ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); net->runReadyNetworkOperations(); @@ -1844,6 +2001,11 @@ TEST_F(InitialSyncerTest, // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -1884,6 +2046,11 @@ TEST_F(InitialSyncerTest, // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -1921,6 +2088,11 @@ TEST_F(InitialSyncerTest, InitialSyncerIgnoresLocalDatabasesWhenCloningDatabases // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -1937,7 +2109,7 @@ TEST_F(InitialSyncerTest, InitialSyncerIgnoresLocalDatabasesWhenCloningDatabases // Oplog tailing query. auto noi = net->getNextReadyRequest(); - auto request = assertRemoteCommandNameEquals("find", noi->getRequest()); + request = assertRemoteCommandNameEquals("find", noi->getRequest()); ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); net->blackHole(noi); @@ -1984,6 +2156,11 @@ TEST_F(InitialSyncerTest, // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -2007,7 +2184,7 @@ TEST_F(InitialSyncerTest, // Oplog tailing query. auto noi = net->getNextReadyRequest(); - auto request = assertRemoteCommandNameEquals("find", noi->getRequest()); + request = assertRemoteCommandNameEquals("find", noi->getRequest()); ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); net->blackHole(noi); @@ -2053,6 +2230,11 @@ TEST_F(InitialSyncerTest, InitialSyncerCancelsBothOplogFetcherAndDatabasesCloner // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -2102,6 +2284,11 @@ TEST_F(InitialSyncerTest, // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -2141,6 +2328,11 @@ TEST_F(InitialSyncerTest, // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -2158,7 +2350,7 @@ TEST_F(InitialSyncerTest, // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move // on to the DatabasesCloner's request. auto noi = net->getNextReadyRequest(); - auto request = assertRemoteCommandNameEquals("find", noi->getRequest()); + request = assertRemoteCommandNameEquals("find", noi->getRequest()); ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); net->blackHole(noi); @@ -2196,6 +2388,11 @@ TEST_F(InitialSyncerTest, // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -2206,7 +2403,7 @@ TEST_F(InitialSyncerTest, // Quickest path to a successful DatabasesCloner completion is to respond to the // listDatabases with an empty list of database names. - auto request = assertRemoteCommandNameEquals( + request = assertRemoteCommandNameEquals( "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({}))); net->runReadyNetworkOperations(); @@ -2246,6 +2443,11 @@ TEST_F(InitialSyncerTest, // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -2262,7 +2464,7 @@ TEST_F(InitialSyncerTest, // Save request for OplogFetcher's oplog tailing query. This request will be canceled. auto noi = net->getNextReadyRequest(); - auto request = assertRemoteCommandNameEquals("find", noi->getRequest()); + request = assertRemoteCommandNameEquals("find", noi->getRequest()); ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay")); ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); auto oplogFetcherNetworkOperationIterator = noi; @@ -2307,6 +2509,11 @@ TEST_F( // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -2324,7 +2531,7 @@ TEST_F( // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move // on to the DatabasesCloner's request. auto noi = net->getNextReadyRequest(); - auto request = noi->getRequest(); + request = noi->getRequest(); assertRemoteCommandNameEquals("find", request); ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); net->blackHole(noi); @@ -2360,6 +2567,11 @@ TEST_F(InitialSyncerTest, // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -2377,7 +2589,7 @@ TEST_F(InitialSyncerTest, // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move // on to the DatabasesCloner's request. auto noi = net->getNextReadyRequest(); - auto request = assertRemoteCommandNameEquals("find", noi->getRequest()); + request = assertRemoteCommandNameEquals("find", noi->getRequest()); ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); net->blackHole(noi); @@ -2426,6 +2638,11 @@ TEST_F( // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -2443,7 +2660,7 @@ TEST_F( // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move // on to the DatabasesCloner's request. auto noi = net->getNextReadyRequest(); - auto request = assertRemoteCommandNameEquals("find", noi->getRequest()); + request = assertRemoteCommandNameEquals("find", noi->getRequest()); ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); net->blackHole(noi); @@ -2495,6 +2712,11 @@ TEST_F( // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -2512,7 +2734,7 @@ TEST_F( // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move // on to the DatabasesCloner's request. auto noi = net->getNextReadyRequest(); - auto request = assertRemoteCommandNameEquals("find", noi->getRequest()); + request = assertRemoteCommandNameEquals("find", noi->getRequest()); ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); net->blackHole(noi); @@ -2564,6 +2786,11 @@ TEST_F( // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -2581,7 +2808,7 @@ TEST_F( // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move // on to the DatabasesCloner's request. auto noi = net->getNextReadyRequest(); - auto request = assertRemoteCommandNameEquals("find", noi->getRequest()); + request = assertRemoteCommandNameEquals("find", noi->getRequest()); ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); net->blackHole(noi); @@ -2615,6 +2842,11 @@ TEST_F( // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -2632,7 +2864,7 @@ TEST_F( // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move // on to the DatabasesCloner's request. auto noi = net->getNextReadyRequest(); - auto request = assertRemoteCommandNameEquals("find", noi->getRequest()); + request = assertRemoteCommandNameEquals("find", noi->getRequest()); ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); net->blackHole(noi); @@ -2671,6 +2903,11 @@ TEST_F(InitialSyncerTest, InitialSyncerCancelsLastRollbackCheckerOnShutdown) { // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -2688,7 +2925,7 @@ TEST_F(InitialSyncerTest, InitialSyncerCancelsLastRollbackCheckerOnShutdown) { // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move // on to the DatabasesCloner's request. auto noi = net->getNextReadyRequest(); - auto request = assertRemoteCommandNameEquals("find", noi->getRequest()); + request = assertRemoteCommandNameEquals("find", noi->getRequest()); ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); net->blackHole(noi); @@ -2728,6 +2965,11 @@ TEST_F(InitialSyncerTest, InitialSyncerCancelsLastRollbackCheckerOnOplogFetcherC // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -2744,7 +2986,7 @@ TEST_F(InitialSyncerTest, InitialSyncerCancelsLastRollbackCheckerOnOplogFetcherC // Save request for OplogFetcher's oplog tailing query. This request will be canceled. auto noi = net->getNextReadyRequest(); - auto request = assertRemoteCommandNameEquals("find", noi->getRequest()); + request = assertRemoteCommandNameEquals("find", noi->getRequest()); ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay")); ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); auto oplogFetcherNetworkOperationIterator = noi; @@ -2790,6 +3032,11 @@ TEST_F(InitialSyncerTest, // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -2807,7 +3054,7 @@ TEST_F(InitialSyncerTest, // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move // on to the DatabasesCloner's request. auto noi = net->getNextReadyRequest(); - auto request = noi->getRequest(); + request = noi->getRequest(); assertRemoteCommandNameEquals("find", request); ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); net->blackHole(noi); @@ -2843,6 +3090,11 @@ TEST_F(InitialSyncerTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfter // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -2855,8 +3107,7 @@ TEST_F(InitialSyncerTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfter // database names, we'll simulate copying a single database with a single collection on the // sync source. NamespaceString nss("a.a"); - auto request = - net->scheduleSuccessfulResponse(makeListDatabasesResponse({nss.db().toString()})); + request = net->scheduleSuccessfulResponse(makeListDatabasesResponse({nss.db().toString()})); assertRemoteCommandNameEquals("listDatabases", request); net->runReadyNetworkOperations(); @@ -2930,6 +3181,11 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetNextApplierBatchScheduleE // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -2947,7 +3203,7 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetNextApplierBatchScheduleE // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move // on to the DatabasesCloner's request. auto noi = net->getNextReadyRequest(); - auto request = noi->getRequest(); + request = noi->getRequest(); assertRemoteCommandNameEquals("find", request); ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); net->blackHole(noi); @@ -2987,6 +3243,11 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughSecondGetNextApplierBatchSch // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -3004,7 +3265,7 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughSecondGetNextApplierBatchSch // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move // on to the DatabasesCloner's request. auto noi = net->getNextReadyRequest(); - auto request = noi->getRequest(); + request = noi->getRequest(); assertRemoteCommandNameEquals("find", request); ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); net->blackHole(noi); @@ -3044,6 +3305,11 @@ TEST_F(InitialSyncerTest, InitialSyncerCancelsGetNextApplierBatchOnShutdown) { // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -3061,7 +3327,7 @@ TEST_F(InitialSyncerTest, InitialSyncerCancelsGetNextApplierBatchOnShutdown) { // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move // on to the DatabasesCloner's request. auto noi = net->getNextReadyRequest(); - auto request = noi->getRequest(); + request = noi->getRequest(); assertRemoteCommandNameEquals("find", request); ASSERT_TRUE(request.cmdObj.getBoolField("tailable")); net->blackHole(noi); @@ -3103,6 +3369,11 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetNextApplierBatchInLockErr // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -3119,7 +3390,7 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetNextApplierBatchInLockErr // OplogFetcher's oplog tailing query. Return bad oplog entry that will be added to the // oplog buffer and processed by _getNextApplierBatch_inlock(). - auto request = assertRemoteCommandNameEquals( + request = assertRemoteCommandNameEquals( "find", net->scheduleSuccessfulResponse(makeCursorResponse( 1LL, _options.localOplogNS, {oplogEntry, oplogEntryWithInconsistentVersion}))); @@ -3170,6 +3441,11 @@ TEST_F( // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -3186,7 +3462,7 @@ TEST_F( // OplogFetcher's oplog tailing query. Return bad oplog entry that will be added to the // oplog buffer and processed by _getNextApplierBatch_inlock(). - auto request = net->scheduleSuccessfulResponse(makeCursorResponse( + request = net->scheduleSuccessfulResponse(makeCursorResponse( 1LL, _options.localOplogNS, {oplogEntry, oplogEntryWithInconsistentVersion})); assertRemoteCommandNameEquals("find", request); ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay")); @@ -3228,6 +3504,11 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughMultiApplierScheduleError) { // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -3244,7 +3525,7 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughMultiApplierScheduleError) { // OplogFetcher's oplog tailing query. Save for later. auto noi = net->getNextReadyRequest(); - auto request = noi->getRequest(); + request = noi->getRequest(); assertRemoteCommandNameEquals("find", request); ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay")); auto oplogFetcherNoi = noi; @@ -3302,6 +3583,11 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughMultiApplierCallbackError) { // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -3317,7 +3603,7 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughMultiApplierCallbackError) { net->runReadyNetworkOperations(); // OplogFetcher's oplog tailing query. Provide enough operations to trigger MultiApplier. - auto request = net->scheduleSuccessfulResponse(makeCursorResponse( + request = net->scheduleSuccessfulResponse(makeCursorResponse( 1LL, _options.localOplogNS, {makeOplogEntryObj(1), makeOplogEntryObj(2)})); assertRemoteCommandNameEquals("find", request); ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay")); @@ -3351,6 +3637,11 @@ TEST_F(InitialSyncerTest, InitialSyncerCancelsGetNextApplierBatchCallbackOnOplog // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -3367,7 +3658,7 @@ TEST_F(InitialSyncerTest, InitialSyncerCancelsGetNextApplierBatchCallbackOnOplog // OplogFetcher's oplog tailing query. Save for later. auto noi = net->getNextReadyRequest(); - auto request = noi->getRequest(); + request = noi->getRequest(); assertRemoteCommandNameEquals("find", request); ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay")); auto oplogFetcherNoi = noi; @@ -3406,6 +3697,11 @@ OplogEntry InitialSyncerTest::doInitialSyncWithOneBatch(bool shouldSetFCV) { // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -3422,7 +3718,7 @@ OplogEntry InitialSyncerTest::doInitialSyncWithOneBatch(bool shouldSetFCV) { // OplogFetcher's oplog tailing query. Response has enough operations to reach // end timestamp. - auto request = net->scheduleSuccessfulResponse(makeCursorResponse( + request = net->scheduleSuccessfulResponse(makeCursorResponse( 1LL, _options.localOplogNS, {makeOplogEntryObj(1), lastOp.toBSON()})); assertRemoteCommandNameEquals("find", request); ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay")); @@ -3498,6 +3794,11 @@ TEST_F(InitialSyncerTest, // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -3510,8 +3811,7 @@ TEST_F(InitialSyncerTest, // database names, we'll simulate copying a single database with a single collection on the // sync source. NamespaceString nss("a.a"); - auto request = - net->scheduleSuccessfulResponse(makeListDatabasesResponse({nss.db().toString()})); + request = net->scheduleSuccessfulResponse(makeListDatabasesResponse({nss.db().toString()})); assertRemoteCommandNameEquals("listDatabases", request); net->runReadyNetworkOperations(); @@ -3612,6 +3912,11 @@ TEST_F( // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -3628,7 +3933,7 @@ TEST_F( // OplogFetcher's oplog tailing query. Response has enough operations to reach // end timestamp. - auto request = net->scheduleSuccessfulResponse( + request = net->scheduleSuccessfulResponse( makeCursorResponse(1LL, _options.localOplogNS, {makeOplogEntryObj(1), makeOplogEntryObj(2), lastOp.toBSON()})); @@ -3703,6 +4008,11 @@ TEST_F(InitialSyncerTest, OplogOutOfOrderOnOplogFetchFinish) { // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -3713,7 +4023,7 @@ TEST_F(InitialSyncerTest, OplogOutOfOrderOnOplogFetchFinish) { // Ignore listDatabases request. auto noi = net->getNextReadyRequest(); - auto request = noi->getRequest(); + request = noi->getRequest(); assertRemoteCommandNameEquals("listDatabases", request); net->blackHole(noi); @@ -3761,6 +4071,11 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) { // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -3810,6 +4125,11 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) { // Base rollback ID. auto request = net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); assertRemoteCommandNameEquals("replSetGetRBID", request); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -4020,6 +4340,11 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressOmitsClonerStatsIfClonerStatsExc // Base rollback ID. net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); + + // Send an empty optime as the response to the serverStatus request, which will cause the + // beginFetchingTimestamp to be the same as the beginApplyingTimestamp. + auto request = net->scheduleSuccessfulResponse(makeServerStatusResponse(OpTime())); + assertRemoteCommandNameEquals("serverStatus", request); net->runReadyNetworkOperations(); // Last oplog entry. @@ -4030,8 +4355,7 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressOmitsClonerStatsIfClonerStatsExc // listDatabases NamespaceString nss("a.a"); - auto request = - net->scheduleSuccessfulResponse(makeListDatabasesResponse({nss.db().toString()})); + request = net->scheduleSuccessfulResponse(makeListDatabasesResponse({nss.db().toString()})); assertRemoteCommandNameEquals("listDatabases", request); net->runReadyNetworkOperations(); diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index be5095a3a19..12df8c58003 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -1846,12 +1846,13 @@ Status applyCommand_inlock(OperationContext* opCtx, } // The feature compatibility version in the server configuration collection cannot change during - // initial sync. - // We do not attempt to parse the whitelisted ops because they do not have a collection - // namespace. If we drop the 'admin' database we will also log a 'drop' oplog entry for each - // collection dropped. 'applyOps' will try to apply each individual operation, and those - // will be caught then if they are a problem. - auto whitelistedOps = std::vector<std::string>{"dropDatabase", "applyOps", "dbCheck"}; + // initial sync. We do not attempt to parse the whitelisted ops because they do not have a + // collection namespace. If we drop the 'admin' database we will also log a 'drop' oplog entry + // for each collection dropped. 'applyOps' and 'commitTransaction' will try to apply each + // individual operation, and those will be caught then if they are a problem. 'abortTransaction' + // won't ever change the server configuration collection. + auto whitelistedOps = std::vector<std::string>{ + "dropDatabase", "applyOps", "dbCheck", "commitTransaction", "abortTransaction"}; if ((mode == OplogApplication::Mode::kInitialSync) && (std::find(whitelistedOps.begin(), whitelistedOps.end(), o.firstElementFieldName()) == whitelistedOps.end()) && diff --git a/src/mongo/db/repl/oplog_applier.h b/src/mongo/db/repl/oplog_applier.h index cf6f2405b2a..ecc45bf76df 100644 --- a/src/mongo/db/repl/oplog_applier.h +++ b/src/mongo/db/repl/oplog_applier.h @@ -68,6 +68,10 @@ public: // For initial sync only. If an update fails, the missing document is fetched from // this sync source to insert into the local collection. boost::optional<HostAndPort> missingDocumentSourceForInitialSync; + + // Used to determine which operations should be applied. Only initial sync will set this to + // be something other than the null optime. + OpTime beginApplyingOpTime = OpTime(); }; /** diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp index b000e006620..5574f77deb5 100644 --- a/src/mongo/db/repl/oplog_applier_impl.cpp +++ b/src/mongo/db/repl/oplog_applier_impl.cpp @@ -46,7 +46,8 @@ OplogApplierImpl::OplogApplierImpl(executor::TaskExecutor* executor, : OplogApplier(executor, oplogBuffer, observer), _replCoord(replCoord), _syncTail(std::make_unique<SyncTail>( - observer, consistencyMarkers, storageInterface, multiSyncApply, writerPool, options)) { + observer, consistencyMarkers, storageInterface, multiSyncApply, writerPool, options)), + _beginApplyingOpTime(options.beginApplyingOpTime) { invariant(!options.relaxUniqueIndexConstraints); } diff --git a/src/mongo/db/repl/oplog_applier_impl.h b/src/mongo/db/repl/oplog_applier_impl.h index 197d7796879..6e51136f892 100644 --- a/src/mongo/db/repl/oplog_applier_impl.h +++ b/src/mongo/db/repl/oplog_applier_impl.h @@ -73,6 +73,10 @@ private: // Used to run oplog application loop. std::unique_ptr<SyncTail> _syncTail; + + // Used to determine which operations should be applied during initial sync. If this is null, + // we will apply all operations that were fetched. + OpTime _beginApplyingOpTime = OpTime(); }; } // namespace repl diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index 7850334f478..0b71deac9d2 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -323,7 +323,8 @@ OplogFetcher::OplogFetcher(executor::TaskExecutor* executor, DataReplicatorExternalState* dataReplicatorExternalState, EnqueueDocumentsFn enqueueDocumentsFn, OnShutdownCallbackFn onShutdownCallbackFn, - const int batchSize) + const int batchSize, + StartingPoint startingPoint) : AbstractOplogFetcher(executor, lastFetched, source, @@ -337,7 +338,8 @@ OplogFetcher::OplogFetcher(executor::TaskExecutor* executor, _dataReplicatorExternalState(dataReplicatorExternalState), _enqueueDocumentsFn(enqueueDocumentsFn), _awaitDataTimeout(calculateAwaitDataTimeout(config)), - _batchSize(batchSize) { + _batchSize(batchSize), + _startingPoint(startingPoint) { invariant(config.isInitialized()); invariant(enqueueDocumentsFn); @@ -444,8 +446,11 @@ StatusWith<BSONObj> OplogFetcher::_onSuccessfulBatch(const Fetcher::QueryRespons LOG(1) << "oplog fetcher successfully fetched from " << _getSource(); - // If this is the first batch and no rollback is needed, skip the first document. - firstDocToApply++; + // If this is the first batch, no rollback is needed and we don't want to enqueue the first + // document, skip it. + if (_startingPoint == StartingPoint::kSkipFirstDoc) { + firstDocToApply++; + } } auto validateResult = diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h index 42eace2f077..20ff4668a8a 100644 --- a/src/mongo/db/repl/oplog_fetcher.h +++ b/src/mongo/db/repl/oplog_fetcher.h @@ -90,6 +90,13 @@ public: }; /** + * An enum that indicates if we want to skip the first document during oplog fetching or not. + * Currently, the only time we don't want to skip the first document is during initial sync + * if there was an oldest active transaction timestamp. + */ + enum class StartingPoint { kSkipFirstDoc, kEnqueueFirstDoc }; + + /** * Type of function that accepts a pair of iterators into a range of operations * within the current batch of results and copies the operations into * a buffer to be consumed by the next stage of the replication process. @@ -125,7 +132,8 @@ public: DataReplicatorExternalState* dataReplicatorExternalState, EnqueueDocumentsFn enqueueDocumentsFn, OnShutdownCallbackFn onShutdownCallbackFn, - const int batchSize); + const int batchSize, + StartingPoint startingPoint = StartingPoint::kSkipFirstDoc); virtual ~OplogFetcher(); @@ -176,6 +184,9 @@ private: const EnqueueDocumentsFn _enqueueDocumentsFn; const Milliseconds _awaitDataTimeout; const int _batchSize; + + // Indicates if we want to skip the first document during oplog fetching or not. + StartingPoint _startingPoint; }; } // namespace repl diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 1b879b6ba4b..81208eeb3da 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -69,8 +69,6 @@ #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/repl_set_config.h" #include "mongo/db/repl/replication_coordinator.h" -#include "mongo/db/repl/session_update_tracker.h" -#include "mongo/db/service_context.h" #include "mongo/db/session.h" #include "mongo/db/session_txn_record_gen.h" #include "mongo/db/stats/timer_stats.h" @@ -548,106 +546,6 @@ private: StringMap<CollectionProperties> _cache; }; -/** - * ops - This only modifies the isForCappedCollection field on each op. It does not alter the ops - * vector in any other way. - * writerVectors - Set of operations for each worker thread to apply. - * derivedOps - If provided, this function inserts a decomposition of applyOps operations - * and instructions for updating the transactions table. - * sessionUpdateTracker - if provided, keeps track of session info from ops. - */ -void fillWriterVectors(OperationContext* opCtx, - MultiApplier::Operations* ops, - std::vector<MultiApplier::OperationPtrs>* writerVectors, - std::vector<MultiApplier::Operations>* derivedOps, - SessionUpdateTracker* sessionUpdateTracker) { - const auto serviceContext = opCtx->getServiceContext(); - const auto storageEngine = serviceContext->getStorageEngine(); - - const bool supportsDocLocking = storageEngine->supportsDocLocking(); - const uint32_t numWriters = writerVectors->size(); - - CachedCollectionProperties collPropertiesCache; - - for (auto&& op : *ops) { - auto hashedNs = StringMapHasher().hashed_key(op.getNss().ns()); - // Reduce the hash from 64bit down to 32bit, just to allow combinations with murmur3 later - // on. Bit depth not important, we end up just doing integer modulo with this in the end. - // The hash function should provide entropy in the lower bits as it's used in hash tables. - uint32_t hash = static_cast<uint32_t>(hashedNs.hash()); - - // We need to track all types of ops, including type 'n' (these are generated from chunk - // migrations). - if (sessionUpdateTracker) { - if (auto newOplogWrites = sessionUpdateTracker->updateOrFlush(op)) { - derivedOps->emplace_back(std::move(*newOplogWrites)); - fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr); - } - } - - if (op.isCrudOpType()) { - auto collProperties = collPropertiesCache.getCollectionProperties(opCtx, hashedNs); - - // For doc locking engines, include the _id of the document in the hash so we get - // parallelism even if all writes are to a single collection. - // - // For capped collections, this is illegal, since capped collections must preserve - // insertion order. - if (supportsDocLocking && !collProperties.isCapped) { - BSONElement id = op.getIdElement(); - BSONElementComparator elementHasher(BSONElementComparator::FieldNamesMode::kIgnore, - collProperties.collator); - const size_t idHash = elementHasher.hash(id); - MurmurHash3_x86_32(&idHash, sizeof(idHash), hash, &hash); - } - - if (op.getOpType() == OpTypeEnum::kInsert && collProperties.isCapped) { - // Mark capped collection ops before storing them to ensure we do not attempt to - // bulk insert them. - op.isForCappedCollection = true; - } - } - - // Extract applyOps operations and fill writers with extracted operations using this - // function. - if (op.getCommandType() == OplogEntry::CommandType::kApplyOps && !op.shouldPrepare()) { - try { - derivedOps->emplace_back(ApplyOps::extractOperations(op)); - - // Nested entries cannot have different session updates. - fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr); - } catch (...) { - fassertFailedWithStatusNoTrace( - 50711, - exceptionToStatus().withContext(str::stream() - << "Unable to extract operations from applyOps " - << redact(op.toBSON()))); - } - continue; - } - - auto& writer = (*writerVectors)[hash % numWriters]; - if (writer.empty()) { - writer.reserve(8); // Skip a few growth rounds - } - writer.push_back(&op); - } -} - -void fillWriterVectors(OperationContext* opCtx, - MultiApplier::Operations* ops, - std::vector<MultiApplier::OperationPtrs>* writerVectors, - std::vector<MultiApplier::Operations>* derivedOps) { - SessionUpdateTracker sessionUpdateTracker; - fillWriterVectors(opCtx, ops, writerVectors, derivedOps, &sessionUpdateTracker); - - auto newOplogWrites = sessionUpdateTracker.flushAll(); - if (!newOplogWrites.empty()) { - derivedOps->emplace_back(std::move(newOplogWrites)); - fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr); - } -} - void tryToGoLiveAsASecondary(OperationContext* opCtx, ReplicationCoordinator* replCoord, OpTime minValid) { @@ -1295,6 +1193,113 @@ Status multiSyncApply(OperationContext* opCtx, return Status::OK(); } + +/** + * ops - This only modifies the isForCappedCollection field on each op. It does not alter the ops + * vector in any other way. + * writerVectors - Set of operations for each worker thread to apply. + * derivedOps - If provided, this function inserts a decomposition of applyOps operations + * and instructions for updating the transactions table. + * sessionUpdateTracker - if provided, keeps track of session info from ops. + */ +void SyncTail::_fillWriterVectors(OperationContext* opCtx, + MultiApplier::Operations* ops, + std::vector<MultiApplier::OperationPtrs>* writerVectors, + std::vector<MultiApplier::Operations>* derivedOps, + SessionUpdateTracker* sessionUpdateTracker) { + const auto serviceContext = opCtx->getServiceContext(); + const auto storageEngine = serviceContext->getStorageEngine(); + + const bool supportsDocLocking = storageEngine->supportsDocLocking(); + const uint32_t numWriters = writerVectors->size(); + + CachedCollectionProperties collPropertiesCache; + + for (auto&& op : *ops) { + // If the operation's optime is before or the same as the beginApplyingOpTime we don't want + // to apply it, so don't include it in writerVectors. + if (op.getOpTime() <= _options.beginApplyingOpTime) { + continue; + } + + auto hashedNs = StringMapHasher().hashed_key(op.getNss().ns()); + // Reduce the hash from 64bit down to 32bit, just to allow combinations with murmur3 later + // on. Bit depth not important, we end up just doing integer modulo with this in the end. + // The hash function should provide entropy in the lower bits as it's used in hash tables. + uint32_t hash = static_cast<uint32_t>(hashedNs.hash()); + + // We need to track all types of ops, including type 'n' (these are generated from chunk + // migrations). + if (sessionUpdateTracker) { + if (auto newOplogWrites = sessionUpdateTracker->updateOrFlush(op)) { + derivedOps->emplace_back(std::move(*newOplogWrites)); + _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr); + } + } + + if (op.isCrudOpType()) { + auto collProperties = collPropertiesCache.getCollectionProperties(opCtx, hashedNs); + + // For doc locking engines, include the _id of the document in the hash so we get + // parallelism even if all writes are to a single collection. + // + // For capped collections, this is illegal, since capped collections must preserve + // insertion order. + if (supportsDocLocking && !collProperties.isCapped) { + BSONElement id = op.getIdElement(); + BSONElementComparator elementHasher(BSONElementComparator::FieldNamesMode::kIgnore, + collProperties.collator); + const size_t idHash = elementHasher.hash(id); + MurmurHash3_x86_32(&idHash, sizeof(idHash), hash, &hash); + } + + if (op.getOpType() == OpTypeEnum::kInsert && collProperties.isCapped) { + // Mark capped collection ops before storing them to ensure we do not attempt to + // bulk insert them. + op.isForCappedCollection = true; + } + } + + // Extract applyOps operations and fill writers with extracted operations using this + // function. + if (op.getCommandType() == OplogEntry::CommandType::kApplyOps && !op.shouldPrepare()) { + try { + derivedOps->emplace_back(ApplyOps::extractOperations(op)); + + // Nested entries cannot have different session updates. + _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr); + } catch (...) { + fassertFailedWithStatusNoTrace( + 50711, + exceptionToStatus().withContext(str::stream() + << "Unable to extract operations from applyOps " + << redact(op.toBSON()))); + } + continue; + } + + auto& writer = (*writerVectors)[hash % numWriters]; + if (writer.empty()) { + writer.reserve(8); // Skip a few growth rounds + } + writer.push_back(&op); + } +} + +void SyncTail::_fillWriterVectors(OperationContext* opCtx, + MultiApplier::Operations* ops, + std::vector<MultiApplier::OperationPtrs>* writerVectors, + std::vector<MultiApplier::Operations>* derivedOps) { + SessionUpdateTracker sessionUpdateTracker; + _fillWriterVectors(opCtx, ops, writerVectors, derivedOps, &sessionUpdateTracker); + + auto newOplogWrites = sessionUpdateTracker.flushAll(); + if (!newOplogWrites.empty()) { + derivedOps->emplace_back(std::move(newOplogWrites)); + _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr); + } +} + StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::Operations ops) { invariant(!ops.empty()); @@ -1338,7 +1343,7 @@ StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::O std::vector<MultiApplier::Operations> derivedOps; std::vector<MultiApplier::OperationPtrs> writerVectors(_writerPool->getStats().numThreads); - fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps); + _fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps); // Wait for writes to finish before applying ops. _writerPool->waitForIdle(); diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index c651b17b743..aebce6af205 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -41,6 +41,7 @@ #include "mongo/db/repl/oplog_buffer.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/replication_consistency_markers.h" +#include "mongo/db/repl/session_update_tracker.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/mutex.h" @@ -225,7 +226,9 @@ public: /** * Applies a batch of oplog entries by writing the oplog entries to the local oplog and then - * using a set of threads to apply the operations. + * using a set of threads to apply the operations. It will only apply (but will + * still write to the oplog) oplog entries with a timestamp greater than or equal to the + * beginApplyingTimestamp. * * If the batch application is successful, returns the optime of the last op applied, which * should be the last op in the batch. @@ -250,6 +253,17 @@ private: ReplicationCoordinator* replCoord, OpQueueBatcher* batcher) noexcept; + void _fillWriterVectors(OperationContext* opCtx, + MultiApplier::Operations* ops, + std::vector<MultiApplier::OperationPtrs>* writerVectors, + std::vector<MultiApplier::Operations>* derivedOps, + SessionUpdateTracker* sessionUpdateTracker); + + void _fillWriterVectors(OperationContext* opCtx, + MultiApplier::Operations* ops, + std::vector<MultiApplier::OperationPtrs>* writerVectors, + std::vector<MultiApplier::Operations>* derivedOps); + OplogApplier::Observer* const _observer; ReplicationConsistencyMarkers* const _consistencyMarkers; StorageInterface* const _storageInterface; diff --git a/src/mongo/db/repl/transaction_oplog_application.cpp b/src/mongo/db/repl/transaction_oplog_application.cpp index 27c57aa5e87..1a6e53d7a24 100644 --- a/src/mongo/db/repl/transaction_oplog_application.cpp +++ b/src/mongo/db/repl/transaction_oplog_application.cpp @@ -40,6 +40,36 @@ namespace mongo { +/** + * Helper that will find the previous oplog entry for that transaction, transform it to be a normal + * applyOps command and applies the oplog entry. Currently used for oplog application of a + * commitTransaction oplog entry during recovery, rollback and initial sync. + */ +Status _applyTransactionFromOplogChain(OperationContext* opCtx, + const repl::OplogEntry& entry, + repl::OplogApplication::Mode mode) { + invariant(mode == repl::OplogApplication::Mode::kRecovering || + mode == repl::OplogApplication::Mode::kInitialSync); + + // Since the TransactionHistoryIterator uses DBDirectClient, it cannot come with snapshot + // isolation. + invariant(!opCtx->recoveryUnit()->getPointInTimeReadTimestamp()); + + // Get the corresponding prepareTransaction oplog entry. + const auto prepareOpTime = entry.getPrevWriteOpTimeInTransaction(); + invariant(prepareOpTime); + TransactionHistoryIterator iter(prepareOpTime.get()); + invariant(iter.hasNext()); + const auto prepareOplogEntry = iter.next(opCtx); + + // Transform prepare command into a normal applyOps command. + const auto prepareCmd = prepareOplogEntry.getOperationToApply().removeField("prepare"); + + BSONObjBuilder resultWeDontCareAbout; + return applyOps( + opCtx, entry.getNss().db().toString(), prepareCmd, mode, &resultWeDontCareAbout); +} + Status applyCommitTransaction(OperationContext* opCtx, const repl::OplogEntry& entry, repl::OplogApplication::Mode mode) { @@ -63,26 +93,13 @@ Status applyCommitTransaction(OperationContext* opCtx, return Status::OK(); } - // Since the TransactionHistoryIterator uses DBDirectClient, it cannot come with snapshot - // isolation. - invariant(!opCtx->recoveryUnit()->getPointInTimeReadTimestamp()); - - // Get the corresponding prepareTransaction oplog entry. - const auto prepareOpTime = entry.getPrevWriteOpTimeInTransaction(); - invariant(prepareOpTime); - TransactionHistoryIterator iter(prepareOpTime.get()); - invariant(iter.hasNext()); - const auto prepareOplogEntry = iter.next(opCtx); - - // Transform prepare command into a normal applyOps command. - const auto prepareCmd = prepareOplogEntry.getOperationToApply().removeField("prepare"); + return _applyTransactionFromOplogChain(opCtx, entry, mode); + } - BSONObjBuilder resultWeDontCareAbout; - return applyOps( - opCtx, entry.getNss().db().toString(), prepareCmd, mode, &resultWeDontCareAbout); + if (mode == repl::OplogApplication::Mode::kInitialSync) { + return _applyTransactionFromOplogChain(opCtx, entry, mode); } - // TODO: SERVER-36492 Only run on secondary until we support initial sync. invariant(mode == repl::OplogApplication::Mode::kSecondary); // Transaction operations are in its own batch, so we can modify their opCtx. diff --git a/src/mongo/db/server_transactions_metrics.cpp b/src/mongo/db/server_transactions_metrics.cpp index 11f6480d887..a46a663e0b1 100644 --- a/src/mongo/db/server_transactions_metrics.cpp +++ b/src/mongo/db/server_transactions_metrics.cpp @@ -34,6 +34,7 @@ #include "mongo/db/commands/server_status.h" #include "mongo/db/jsobj.h" #include "mongo/db/operation_context.h" +#include "mongo/db/repl/optime.h" #include "mongo/db/retryable_writes_stats.h" #include "mongo/db/service_context.h" #include "mongo/db/transactions_stats_gen.h" @@ -295,12 +296,12 @@ void ServerTransactionsMetrics::updateStats(TransactionsStats* stats, OperationC ServerTransactionsMetrics::_getOldestOpenUnpreparedReadTimestamp(opCtx)); // Acquire _mutex before reading _oldestActiveOplogEntryOpTime. stdx::lock_guard<stdx::mutex> lm(_mutex); - // To avoid compression loss, we have Timestamp(0, 0) be the default value if no oldest active - // transaction optime is stored. - Timestamp oldestActiveOplogEntryTimestamp = (_oldestActiveOplogEntryOpTime != boost::none) - ? _oldestActiveOplogEntryOpTime->getTimestamp() - : Timestamp(); - stats->setOldestActiveOplogEntryTimestamp(oldestActiveOplogEntryTimestamp); + // To avoid compression loss, we use the null OpTime if no oldest active transaction optime is + // stored. + repl::OpTime oldestActiveOplogEntryOpTime = (_oldestActiveOplogEntryOpTime != boost::none) + ? _oldestActiveOplogEntryOpTime.get() + : repl::OpTime(); + stats->setOldestActiveOplogEntryOpTime(oldestActiveOplogEntryOpTime); } void ServerTransactionsMetrics::clearOpTimes() { diff --git a/src/mongo/db/transactions_stats.idl b/src/mongo/db/transactions_stats.idl index 834399a331c..070ca433080 100644 --- a/src/mongo/db/transactions_stats.idl +++ b/src/mongo/db/transactions_stats.idl @@ -35,6 +35,7 @@ global: imports: - "mongo/idl/basic_types.idl" + - "mongo/db/repl/replication_types.idl" structs: @@ -82,7 +83,7 @@ structs: currentPrepared: type: long default: 0 - oldestActiveOplogEntryTimestamp: - type: timestamp + oldestActiveOplogEntryOpTime: + type: optime oldestOpenUnpreparedReadTimestamp: type: timestamp |