summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/repl/data_replicator_external_state.h3
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_impl.cpp16
-rw-r--r--src/mongo/db/repl/initial_syncer.h2
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp4
-rw-r--r--src/mongo/db/repl/sync_tail.cpp12
-rw-r--r--src/mongo/db/repl/sync_tail.h11
-rw-r--r--src/mongo/db/repl/sync_tail_test.cpp41
-rw-r--r--src/mongo/db/repl/sync_tail_test_fixture.cpp2
-rw-r--r--src/mongo/dbtests/storage_timestamp_tests.cpp39
9 files changed, 54 insertions, 76 deletions
diff --git a/src/mongo/db/repl/data_replicator_external_state.h b/src/mongo/db/repl/data_replicator_external_state.h
index a53ecbcf96b..c2e9f4174e0 100644
--- a/src/mongo/db/repl/data_replicator_external_state.h
+++ b/src/mongo/db/repl/data_replicator_external_state.h
@@ -132,8 +132,7 @@ private:
const HostAndPort& source,
ThreadPool* writerPool) = 0;
- // Provides InitialSyncer with access to _multiApply, _multiSyncApply and
- // _multiInitialSyncApply.
+ // Provides InitialSyncer with access to multiApply and multiSyncApply.
friend class InitialSyncer;
};
diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp
index 05d29bc1aae..050a99e15c6 100644
--- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp
@@ -172,13 +172,15 @@ StatusWith<OpTime> DataReplicatorExternalStateImpl::_multiApply(OperationContext
OplogApplier::Options options;
options.allowNamespaceNotFoundErrorsOnCrudOps = true;
options.missingDocumentSourceForInitialSync = source;
- SyncTail syncTail(observer,
- consistencyMarkers,
- storageInterface,
- repl::multiInitialSyncApply,
- writerPool,
- options);
- return syncTail.multiApply(opCtx, std::move(ops));
+ OplogApplier oplogApplier(getTaskExecutor(),
+ nullptr, // oplog buffer
+ observer,
+ _replicationCoordinator,
+ consistencyMarkers,
+ storageInterface,
+ options,
+ writerPool);
+ return oplogApplier.multiApply(opCtx, std::move(ops));
}
ReplicationCoordinator* DataReplicatorExternalStateImpl::getReplicationCoordinator() const {
diff --git a/src/mongo/db/repl/initial_syncer.h b/src/mongo/db/repl/initial_syncer.h
index 7f0e5112857..9ac496196bb 100644
--- a/src/mongo/db/repl/initial_syncer.h
+++ b/src/mongo/db/repl/initial_syncer.h
@@ -436,7 +436,7 @@ private:
/**
* Callback for third '_lastOplogEntryFetcher' callback. This is scheduled after MultiApplier
* completed successfully and missing documents were fetched from the sync source while
- * DataReplicatorExternalState::_multiInitialSyncApply() was processing operations.
+ * DataReplicatorExternalState::_multiApply() was processing operations.
* This callback will update InitialSyncState::stopTimestamp on success.
*/
void _lastOplogEntryFetcherCallbackAfterFetchingMissingDocuments(
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index 14ffb0c5455..98a18d6c75f 100644
--- a/src/mongo/db/repl/initial_syncer_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -3598,8 +3598,8 @@ TEST_F(
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
- // Override DataReplicatorExternalState::_multiInitialSyncApply() so that it will also fetch a
- // missing document.
+ // Override DataReplicatorExternalState::_multiApply() so that it will also fetch a missing
+ // document.
// This forces InitialSyncer to evaluate its end timestamp for applying operations after each
// batch.
bool fetchCountIncremented = false;
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index fd6bca1ce7f..73f82665d88 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -1315,18 +1315,6 @@ Status multiSyncApply(OperationContext* opCtx,
return Status::OK();
}
-Status multiInitialSyncApply(OperationContext* opCtx,
- MultiApplier::OperationPtrs* ops,
- SyncTail* st,
- WorkerMultikeyPathInfo* workerMultikeyPathInfo) {
- invariant(st);
- invariant(!st->getOptions().skipWritesToOplog);
- invariant(st->getOptions().allowNamespaceNotFoundErrorsOnCrudOps);
- invariant(st->getOptions().missingDocumentSourceForInitialSync);
-
- return multiSyncApply(opCtx, ops, st, workerMultikeyPathInfo);
-}
-
StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::Operations ops) {
invariant(!ops.empty());
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index 90fec1bc686..dff6fc32e32 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -288,19 +288,14 @@ private:
bool _inShutdown = false;
};
-// These free functions are used by the thread pool workers to write ops to the db.
-// They consume the passed in OperationPtrs and callers should not make any assumptions about the
-// state of the container after calling. However, these functions cannot modify the pointed-to
+// This free function is used by the thread pool workers to write ops to the db.
+// This consumes the passed in OperationPtrs and callers should not make any assumptions about the
+// state of the container after calling. However, this function cannot modify the pointed-to
// operations because the OperationPtrs container contains const pointers.
Status multiSyncApply(OperationContext* opCtx,
MultiApplier::OperationPtrs* ops,
SyncTail* st,
WorkerMultikeyPathInfo* workerMultikeyPathInfo);
-Status multiInitialSyncApply(OperationContext* opCtx,
- MultiApplier::OperationPtrs* ops,
- SyncTail* st,
- WorkerMultikeyPathInfo* workerMultikeyPathInfo);
-
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp
index d037267cdc9..7edcc147eef 100644
--- a/src/mongo/db/repl/sync_tail_test.cpp
+++ b/src/mongo/db/repl/sync_tail_test.cpp
@@ -637,18 +637,6 @@ TEST_F(SyncTailTest, MultiSyncApplyFailsWhenCollectionCreationTriesToMakeUUID) {
multiSyncApply(_opCtx.get(), &ops, &syncTail, nullptr));
}
-TEST_F(SyncTailTest, MultiInitialSyncApplyFailsWhenCollectionCreationTriesToMakeUUID) {
- ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_STARTUP2));
- NamespaceString nss("foo." + _agent.getSuiteName() + "_" + _agent.getTestName());
-
- auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss);
-
- SyncTail syncTail(nullptr, nullptr, nullptr, {}, nullptr, makeInitialSyncOptions());
- MultiApplier::OperationPtrs ops = {&op};
- ASSERT_EQUALS(ErrorCodes::InvalidOptions,
- multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, nullptr));
-}
-
TEST_F(SyncTailTest, MultiSyncApplyDisablesDocumentValidationWhileApplyingOperations) {
NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName());
bool onInsertsCalled = false;
@@ -1004,19 +992,7 @@ TEST_F(SyncTailTest, MultiSyncApplyFallsBackOnApplyingInsertsIndividuallyWhenGro
ASSERT_EQUALS(1U, numFailedGroupedInserts);
}
-TEST_F(SyncTailTest, MultiInitialSyncApplyDisablesDocumentValidationWhileApplyingOperations) {
- SyncTailWithOperationContextChecker syncTail;
- NamespaceString nss("test.t");
- createCollection(_opCtx.get(), nss, {});
- auto op = makeUpdateDocumentOplogEntry(
- {Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), BSON("_id" << 0 << "x" << 2));
- MultiApplier::OperationPtrs ops = {&op};
- WorkerMultikeyPathInfo pathInfo;
- ASSERT_OK(multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo));
- ASSERT(syncTail.called);
-}
-
-TEST_F(SyncTailTest, MultiInitialSyncApplyIgnoresUpdateOperationIfDocumentIsMissingFromSyncSource) {
+TEST_F(SyncTailTest, MultiSyncApplyIgnoresUpdateOperationIfDocumentIsMissingFromSyncSource) {
BSONObj emptyDoc;
SyncTailWithLocalDocumentFetcher syncTail(emptyDoc);
NamespaceString nss("test.t");
@@ -1032,7 +1008,7 @@ TEST_F(SyncTailTest, MultiInitialSyncApplyIgnoresUpdateOperationIfDocumentIsMiss
{Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), BSON("_id" << 0 << "x" << 2));
MultiApplier::OperationPtrs ops = {&op};
WorkerMultikeyPathInfo pathInfo;
- ASSERT_OK(multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo));
+ ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo));
// Since the missing document is not found on the sync source, the collection referenced by
// the failed operation should not be automatically created.
@@ -1042,7 +1018,7 @@ TEST_F(SyncTailTest, MultiInitialSyncApplyIgnoresUpdateOperationIfDocumentIsMiss
ASSERT_EQUALS(syncTail.numFetched, 0U);
}
-TEST_F(SyncTailTest, MultiInitialSyncApplySkipsDocumentOnNamespaceNotFound) {
+TEST_F(SyncTailTest, MultiSyncApplySkipsDocumentOnNamespaceNotFoundDuringInitialSync) {
BSONObj emptyDoc;
SyncTailWithLocalDocumentFetcher syncTail(emptyDoc);
NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName());
@@ -1056,7 +1032,7 @@ TEST_F(SyncTailTest, MultiInitialSyncApplySkipsDocumentOnNamespaceNotFound) {
auto op3 = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, doc3);
MultiApplier::OperationPtrs ops = {&op0, &op1, &op2, &op3};
WorkerMultikeyPathInfo pathInfo;
- ASSERT_OK(multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo));
+ ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo));
ASSERT_EQUALS(syncTail.numFetched, 0U);
OplogInterfaceLocal collectionReader(_opCtx.get(), nss.ns());
@@ -1066,7 +1042,7 @@ TEST_F(SyncTailTest, MultiInitialSyncApplySkipsDocumentOnNamespaceNotFound) {
ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus());
}
-TEST_F(SyncTailTest, MultiInitialSyncApplySkipsIndexCreationOnNamespaceNotFound) {
+TEST_F(SyncTailTest, MultiSyncApplySkipsIndexCreationOnNamespaceNotFoundDuringInitialSync) {
BSONObj emptyDoc;
SyncTailWithLocalDocumentFetcher syncTail(emptyDoc);
NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName());
@@ -1082,7 +1058,7 @@ TEST_F(SyncTailTest, MultiInitialSyncApplySkipsIndexCreationOnNamespaceNotFound)
MultiApplier::OperationPtrs ops = {&op0, &op1, &op2, &op3};
AtomicUInt32 fetchCount(0);
WorkerMultikeyPathInfo pathInfo;
- ASSERT_OK(multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo));
+ ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo));
ASSERT_EQUALS(syncTail.numFetched, 0U);
OplogInterfaceLocal collectionReader(_opCtx.get(), nss.ns());
@@ -1095,8 +1071,7 @@ TEST_F(SyncTailTest, MultiInitialSyncApplySkipsIndexCreationOnNamespaceNotFound)
ASSERT_FALSE(AutoGetCollectionForReadCommand(_opCtx.get(), badNss).getCollection());
}
-TEST_F(SyncTailTest,
- MultiInitialSyncApplyFetchesMissingDocumentIfDocumentIsAvailableFromSyncSource) {
+TEST_F(SyncTailTest, MultiSyncApplyFetchesMissingDocumentIfDocumentIsAvailableFromSyncSource) {
SyncTailWithLocalDocumentFetcher syncTail(BSON("_id" << 0 << "x" << 1));
NamespaceString nss("test.t");
createCollection(_opCtx.get(), nss, {});
@@ -1105,7 +1080,7 @@ TEST_F(SyncTailTest,
{Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), updatedDocument);
MultiApplier::OperationPtrs ops = {&op};
WorkerMultikeyPathInfo pathInfo;
- ASSERT_OK(multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo));
+ ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo));
ASSERT_EQUALS(syncTail.numFetched, 1U);
// The collection referenced by "ns" in the failed operation is automatically created to hold
diff --git a/src/mongo/db/repl/sync_tail_test_fixture.cpp b/src/mongo/db/repl/sync_tail_test_fixture.cpp
index 3f6029b15a6..b3b7980a470 100644
--- a/src/mongo/db/repl/sync_tail_test_fixture.cpp
+++ b/src/mongo/db/repl/sync_tail_test_fixture.cpp
@@ -229,7 +229,7 @@ Status SyncTailTest::runOpsInitialSync(std::vector<OplogEntry> ops) {
MultiApplier::OperationPtrs opsPtrs;
opsPtrs.push_back(&op);
WorkerMultikeyPathInfo pathInfo;
- auto status = multiInitialSyncApply(_opCtx.get(), &opsPtrs, &syncTail, &pathInfo);
+ auto status = multiSyncApply(_opCtx.get(), &opsPtrs, &syncTail, &pathInfo);
if (!status.isOK()) {
return status;
}
diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp
index d40972a7956..8a842c8ff2c 100644
--- a/src/mongo/dbtests/storage_timestamp_tests.cpp
+++ b/src/mongo/dbtests/storage_timestamp_tests.cpp
@@ -53,6 +53,7 @@
#include "mongo/db/repl/drop_pending_collection_reaper.h"
#include "mongo/db/repl/multiapplier.h"
#include "mongo/db/repl/oplog.h"
+#include "mongo/db/repl/oplog_applier.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/repl_client_info.h"
@@ -108,6 +109,14 @@ BSONCollectionCatalogEntry::IndexMetaData getIndexMetaData(
return collMetaData.indexes[idxOffset];
}
+class DoNothingOplogApplierObserver : public repl::OplogApplier::Observer {
+public:
+ void onBatchBegin(const repl::OplogApplier::Operations&) final {}
+ void onBatchEnd(const StatusWith<repl::OpTime>&, const repl::OplogApplier::Operations&) final {}
+ void onMissingDocumentsFetchedAndInserted(const std::vector<FetchInfo>&) final {}
+ void onOperationConsumed(const BSONObj&) final {}
+};
+
class StorageTimestampTest {
public:
ServiceContext::UniqueOperationContext _opCtxRaii = cc().makeOperationContext();
@@ -1329,11 +1338,18 @@ public:
<< doc2));
std::vector<repl::OplogEntry> ops = {op0, op1, op2};
+ DoNothingOplogApplierObserver observer;
auto storageInterface = repl::StorageInterface::get(_opCtx);
auto writerPool = repl::SyncTail::makeWriterPool();
- repl::SyncTail syncTail(
- nullptr, _consistencyMarkers, storageInterface, repl::multiSyncApply, writerPool.get());
- ASSERT_EQUALS(op2.getOpTime(), unittest::assertGet(syncTail.multiApply(_opCtx, ops)));
+ repl::OplogApplier oplogApplier(nullptr,
+ nullptr,
+ &observer,
+ nullptr,
+ _consistencyMarkers,
+ storageInterface,
+ {},
+ writerPool.get());
+ ASSERT_EQUALS(op2.getOpTime(), unittest::assertGet(oplogApplier.multiApply(_opCtx, ops)));
AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_X, LockMode::MODE_IX);
assertMultikeyPaths(
@@ -1434,18 +1450,21 @@ public:
// after bulk index builds.
std::vector<repl::OplogEntry> ops = {op0, createIndexOp, op1, op2};
+ DoNothingOplogApplierObserver observer;
auto storageInterface = repl::StorageInterface::get(_opCtx);
auto writerPool = repl::SyncTail::makeWriterPool();
repl::OplogApplier::Options options;
options.allowNamespaceNotFoundErrorsOnCrudOps = true;
options.missingDocumentSourceForInitialSync = HostAndPort("localhost", 123);
- repl::SyncTail syncTail(nullptr,
- _consistencyMarkers,
- storageInterface,
- repl::multiInitialSyncApply,
- writerPool.get(),
- options);
- auto lastTime = unittest::assertGet(syncTail.multiApply(_opCtx, ops));
+ repl::OplogApplier oplogApplier(nullptr,
+ nullptr,
+ &observer,
+ nullptr,
+ _consistencyMarkers,
+ storageInterface,
+ options,
+ writerPool.get());
+ auto lastTime = unittest::assertGet(oplogApplier.multiApply(_opCtx, ops));
ASSERT_EQ(lastTime.getTimestamp(), insertTime2.asTimestamp());
AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_X, LockMode::MODE_IX);