diff options
-rw-r--r-- | src/mongo/db/repl/data_replicator_external_state.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator_external_state_impl.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_syncer.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_syncer_test.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.h | 11 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test.cpp | 41 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test_fixture.cpp | 2 | ||||
-rw-r--r-- | src/mongo/dbtests/storage_timestamp_tests.cpp | 39 |
9 files changed, 54 insertions, 76 deletions
diff --git a/src/mongo/db/repl/data_replicator_external_state.h b/src/mongo/db/repl/data_replicator_external_state.h index a53ecbcf96b..c2e9f4174e0 100644 --- a/src/mongo/db/repl/data_replicator_external_state.h +++ b/src/mongo/db/repl/data_replicator_external_state.h @@ -132,8 +132,7 @@ private: const HostAndPort& source, ThreadPool* writerPool) = 0; - // Provides InitialSyncer with access to _multiApply, _multiSyncApply and - // _multiInitialSyncApply. + // Provides InitialSyncer with access to multiApply and multiSyncApply. friend class InitialSyncer; }; diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp index 05d29bc1aae..050a99e15c6 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp @@ -172,13 +172,15 @@ StatusWith<OpTime> DataReplicatorExternalStateImpl::_multiApply(OperationContext OplogApplier::Options options; options.allowNamespaceNotFoundErrorsOnCrudOps = true; options.missingDocumentSourceForInitialSync = source; - SyncTail syncTail(observer, - consistencyMarkers, - storageInterface, - repl::multiInitialSyncApply, - writerPool, - options); - return syncTail.multiApply(opCtx, std::move(ops)); + OplogApplier oplogApplier(getTaskExecutor(), + nullptr, // oplog buffer + observer, + _replicationCoordinator, + consistencyMarkers, + storageInterface, + options, + writerPool); + return oplogApplier.multiApply(opCtx, std::move(ops)); } ReplicationCoordinator* DataReplicatorExternalStateImpl::getReplicationCoordinator() const { diff --git a/src/mongo/db/repl/initial_syncer.h b/src/mongo/db/repl/initial_syncer.h index 7f0e5112857..9ac496196bb 100644 --- a/src/mongo/db/repl/initial_syncer.h +++ b/src/mongo/db/repl/initial_syncer.h @@ -436,7 +436,7 @@ private: /** * Callback for third '_lastOplogEntryFetcher' callback. This is scheduled after MultiApplier * completed successfully and missing documents were fetched from the sync source while - * DataReplicatorExternalState::_multiInitialSyncApply() was processing operations. + * DataReplicatorExternalState::_multiApply() was processing operations. * This callback will update InitialSyncState::stopTimestamp on success. */ void _lastOplogEntryFetcherCallbackAfterFetchingMissingDocuments( diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp index 14ffb0c5455..98a18d6c75f 100644 --- a/src/mongo/db/repl/initial_syncer_test.cpp +++ b/src/mongo/db/repl/initial_syncer_test.cpp @@ -3598,8 +3598,8 @@ TEST_F( auto initialSyncer = &getInitialSyncer(); auto opCtx = makeOpCtx(); - // Override DataReplicatorExternalState::_multiInitialSyncApply() so that it will also fetch a - // missing document. + // Override DataReplicatorExternalState::_multiApply() so that it will also fetch a missing + // document. // This forces InitialSyncer to evaluate its end timestamp for applying operations after each // batch. bool fetchCountIncremented = false; diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index fd6bca1ce7f..73f82665d88 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -1315,18 +1315,6 @@ Status multiSyncApply(OperationContext* opCtx, return Status::OK(); } -Status multiInitialSyncApply(OperationContext* opCtx, - MultiApplier::OperationPtrs* ops, - SyncTail* st, - WorkerMultikeyPathInfo* workerMultikeyPathInfo) { - invariant(st); - invariant(!st->getOptions().skipWritesToOplog); - invariant(st->getOptions().allowNamespaceNotFoundErrorsOnCrudOps); - invariant(st->getOptions().missingDocumentSourceForInitialSync); - - return multiSyncApply(opCtx, ops, st, workerMultikeyPathInfo); -} - StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::Operations ops) { invariant(!ops.empty()); diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index 90fec1bc686..dff6fc32e32 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -288,19 +288,14 @@ private: bool _inShutdown = false; }; -// These free functions are used by the thread pool workers to write ops to the db. -// They consume the passed in OperationPtrs and callers should not make any assumptions about the -// state of the container after calling. However, these functions cannot modify the pointed-to +// This free function is used by the thread pool workers to write ops to the db. +// This consumes the passed in OperationPtrs and callers should not make any assumptions about the +// state of the container after calling. However, this function cannot modify the pointed-to // operations because the OperationPtrs container contains const pointers. Status multiSyncApply(OperationContext* opCtx, MultiApplier::OperationPtrs* ops, SyncTail* st, WorkerMultikeyPathInfo* workerMultikeyPathInfo); -Status multiInitialSyncApply(OperationContext* opCtx, - MultiApplier::OperationPtrs* ops, - SyncTail* st, - WorkerMultikeyPathInfo* workerMultikeyPathInfo); - } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index d037267cdc9..7edcc147eef 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -637,18 +637,6 @@ TEST_F(SyncTailTest, MultiSyncApplyFailsWhenCollectionCreationTriesToMakeUUID) { multiSyncApply(_opCtx.get(), &ops, &syncTail, nullptr)); } -TEST_F(SyncTailTest, MultiInitialSyncApplyFailsWhenCollectionCreationTriesToMakeUUID) { - ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_STARTUP2)); - NamespaceString nss("foo." + _agent.getSuiteName() + "_" + _agent.getTestName()); - - auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss); - - SyncTail syncTail(nullptr, nullptr, nullptr, {}, nullptr, makeInitialSyncOptions()); - MultiApplier::OperationPtrs ops = {&op}; - ASSERT_EQUALS(ErrorCodes::InvalidOptions, - multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, nullptr)); -} - TEST_F(SyncTailTest, MultiSyncApplyDisablesDocumentValidationWhileApplyingOperations) { NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); bool onInsertsCalled = false; @@ -1004,19 +992,7 @@ TEST_F(SyncTailTest, MultiSyncApplyFallsBackOnApplyingInsertsIndividuallyWhenGro ASSERT_EQUALS(1U, numFailedGroupedInserts); } -TEST_F(SyncTailTest, MultiInitialSyncApplyDisablesDocumentValidationWhileApplyingOperations) { - SyncTailWithOperationContextChecker syncTail; - NamespaceString nss("test.t"); - createCollection(_opCtx.get(), nss, {}); - auto op = makeUpdateDocumentOplogEntry( - {Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), BSON("_id" << 0 << "x" << 2)); - MultiApplier::OperationPtrs ops = {&op}; - WorkerMultikeyPathInfo pathInfo; - ASSERT_OK(multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo)); - ASSERT(syncTail.called); -} - -TEST_F(SyncTailTest, MultiInitialSyncApplyIgnoresUpdateOperationIfDocumentIsMissingFromSyncSource) { +TEST_F(SyncTailTest, MultiSyncApplyIgnoresUpdateOperationIfDocumentIsMissingFromSyncSource) { BSONObj emptyDoc; SyncTailWithLocalDocumentFetcher syncTail(emptyDoc); NamespaceString nss("test.t"); @@ -1032,7 +1008,7 @@ TEST_F(SyncTailTest, MultiInitialSyncApplyIgnoresUpdateOperationIfDocumentIsMiss {Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), BSON("_id" << 0 << "x" << 2)); MultiApplier::OperationPtrs ops = {&op}; WorkerMultikeyPathInfo pathInfo; - ASSERT_OK(multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo)); + ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo)); // Since the missing document is not found on the sync source, the collection referenced by // the failed operation should not be automatically created. @@ -1042,7 +1018,7 @@ TEST_F(SyncTailTest, MultiInitialSyncApplyIgnoresUpdateOperationIfDocumentIsMiss ASSERT_EQUALS(syncTail.numFetched, 0U); } -TEST_F(SyncTailTest, MultiInitialSyncApplySkipsDocumentOnNamespaceNotFound) { +TEST_F(SyncTailTest, MultiSyncApplySkipsDocumentOnNamespaceNotFoundDuringInitialSync) { BSONObj emptyDoc; SyncTailWithLocalDocumentFetcher syncTail(emptyDoc); NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); @@ -1056,7 +1032,7 @@ TEST_F(SyncTailTest, MultiInitialSyncApplySkipsDocumentOnNamespaceNotFound) { auto op3 = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, doc3); MultiApplier::OperationPtrs ops = {&op0, &op1, &op2, &op3}; WorkerMultikeyPathInfo pathInfo; - ASSERT_OK(multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo)); + ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo)); ASSERT_EQUALS(syncTail.numFetched, 0U); OplogInterfaceLocal collectionReader(_opCtx.get(), nss.ns()); @@ -1066,7 +1042,7 @@ TEST_F(SyncTailTest, MultiInitialSyncApplySkipsDocumentOnNamespaceNotFound) { ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus()); } -TEST_F(SyncTailTest, MultiInitialSyncApplySkipsIndexCreationOnNamespaceNotFound) { +TEST_F(SyncTailTest, MultiSyncApplySkipsIndexCreationOnNamespaceNotFoundDuringInitialSync) { BSONObj emptyDoc; SyncTailWithLocalDocumentFetcher syncTail(emptyDoc); NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); @@ -1082,7 +1058,7 @@ TEST_F(SyncTailTest, MultiInitialSyncApplySkipsIndexCreationOnNamespaceNotFound) MultiApplier::OperationPtrs ops = {&op0, &op1, &op2, &op3}; AtomicUInt32 fetchCount(0); WorkerMultikeyPathInfo pathInfo; - ASSERT_OK(multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo)); + ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo)); ASSERT_EQUALS(syncTail.numFetched, 0U); OplogInterfaceLocal collectionReader(_opCtx.get(), nss.ns()); @@ -1095,8 +1071,7 @@ TEST_F(SyncTailTest, MultiInitialSyncApplySkipsIndexCreationOnNamespaceNotFound) ASSERT_FALSE(AutoGetCollectionForReadCommand(_opCtx.get(), badNss).getCollection()); } -TEST_F(SyncTailTest, - MultiInitialSyncApplyFetchesMissingDocumentIfDocumentIsAvailableFromSyncSource) { +TEST_F(SyncTailTest, MultiSyncApplyFetchesMissingDocumentIfDocumentIsAvailableFromSyncSource) { SyncTailWithLocalDocumentFetcher syncTail(BSON("_id" << 0 << "x" << 1)); NamespaceString nss("test.t"); createCollection(_opCtx.get(), nss, {}); @@ -1105,7 +1080,7 @@ TEST_F(SyncTailTest, {Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), updatedDocument); MultiApplier::OperationPtrs ops = {&op}; WorkerMultikeyPathInfo pathInfo; - ASSERT_OK(multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo)); + ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo)); ASSERT_EQUALS(syncTail.numFetched, 1U); // The collection referenced by "ns" in the failed operation is automatically created to hold diff --git a/src/mongo/db/repl/sync_tail_test_fixture.cpp b/src/mongo/db/repl/sync_tail_test_fixture.cpp index 3f6029b15a6..b3b7980a470 100644 --- a/src/mongo/db/repl/sync_tail_test_fixture.cpp +++ b/src/mongo/db/repl/sync_tail_test_fixture.cpp @@ -229,7 +229,7 @@ Status SyncTailTest::runOpsInitialSync(std::vector<OplogEntry> ops) { MultiApplier::OperationPtrs opsPtrs; opsPtrs.push_back(&op); WorkerMultikeyPathInfo pathInfo; - auto status = multiInitialSyncApply(_opCtx.get(), &opsPtrs, &syncTail, &pathInfo); + auto status = multiSyncApply(_opCtx.get(), &opsPtrs, &syncTail, &pathInfo); if (!status.isOK()) { return status; } diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp index d40972a7956..8a842c8ff2c 100644 --- a/src/mongo/dbtests/storage_timestamp_tests.cpp +++ b/src/mongo/dbtests/storage_timestamp_tests.cpp @@ -53,6 +53,7 @@ #include "mongo/db/repl/drop_pending_collection_reaper.h" #include "mongo/db/repl/multiapplier.h" #include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/oplog_applier.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/repl_client_info.h" @@ -108,6 +109,14 @@ BSONCollectionCatalogEntry::IndexMetaData getIndexMetaData( return collMetaData.indexes[idxOffset]; } +class DoNothingOplogApplierObserver : public repl::OplogApplier::Observer { +public: + void onBatchBegin(const repl::OplogApplier::Operations&) final {} + void onBatchEnd(const StatusWith<repl::OpTime>&, const repl::OplogApplier::Operations&) final {} + void onMissingDocumentsFetchedAndInserted(const std::vector<FetchInfo>&) final {} + void onOperationConsumed(const BSONObj&) final {} +}; + class StorageTimestampTest { public: ServiceContext::UniqueOperationContext _opCtxRaii = cc().makeOperationContext(); @@ -1329,11 +1338,18 @@ public: << doc2)); std::vector<repl::OplogEntry> ops = {op0, op1, op2}; + DoNothingOplogApplierObserver observer; auto storageInterface = repl::StorageInterface::get(_opCtx); auto writerPool = repl::SyncTail::makeWriterPool(); - repl::SyncTail syncTail( - nullptr, _consistencyMarkers, storageInterface, repl::multiSyncApply, writerPool.get()); - ASSERT_EQUALS(op2.getOpTime(), unittest::assertGet(syncTail.multiApply(_opCtx, ops))); + repl::OplogApplier oplogApplier(nullptr, + nullptr, + &observer, + nullptr, + _consistencyMarkers, + storageInterface, + {}, + writerPool.get()); + ASSERT_EQUALS(op2.getOpTime(), unittest::assertGet(oplogApplier.multiApply(_opCtx, ops))); AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_X, LockMode::MODE_IX); assertMultikeyPaths( @@ -1434,18 +1450,21 @@ public: // after bulk index builds. std::vector<repl::OplogEntry> ops = {op0, createIndexOp, op1, op2}; + DoNothingOplogApplierObserver observer; auto storageInterface = repl::StorageInterface::get(_opCtx); auto writerPool = repl::SyncTail::makeWriterPool(); repl::OplogApplier::Options options; options.allowNamespaceNotFoundErrorsOnCrudOps = true; options.missingDocumentSourceForInitialSync = HostAndPort("localhost", 123); - repl::SyncTail syncTail(nullptr, - _consistencyMarkers, - storageInterface, - repl::multiInitialSyncApply, - writerPool.get(), - options); - auto lastTime = unittest::assertGet(syncTail.multiApply(_opCtx, ops)); + repl::OplogApplier oplogApplier(nullptr, + nullptr, + &observer, + nullptr, + _consistencyMarkers, + storageInterface, + options, + writerPool.get()); + auto lastTime = unittest::assertGet(oplogApplier.multiApply(_opCtx, ops)); ASSERT_EQ(lastTime.getTimestamp(), insertTime2.asTimestamp()); AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_X, LockMode::MODE_IX); |