summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorChristopher Caplinger <christopher.caplinger@mongodb.com>2023-03-21 17:49:58 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-03-21 21:03:45 +0000
commit3c130a69eaddc7cb44895f57af4da6e39556dbb4 (patch)
treebdcf8be7bde8dac12106d578f69dff21331dccfd /src/mongo/db
parent2dbc2a40b841eef00e2ad1b79e3f938bad889c58 (diff)
downloadmongo-3c130a69eaddc7cb44895f57af4da6e39556dbb4.tar.gz
SERVER-72622: Track TenantOplogApplier progress in replicated collection
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/basic_types.idl2
-rw-r--r--src/mongo/db/namespace_string.cpp5
-rw-r--r--src/mongo/db/namespace_string.h8
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/shard_merge_recipient_service.cpp7
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp70
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.h6
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service_test.cpp462
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier.cpp112
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier.h41
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier_progress.idl50
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier_test.cpp1145
12 files changed, 836 insertions, 1073 deletions
diff --git a/src/mongo/db/basic_types.idl b/src/mongo/db/basic_types.idl
index 70f52ceea6b..e1c1e25ec84 100644
--- a/src/mongo/db/basic_types.idl
+++ b/src/mongo/db/basic_types.idl
@@ -196,7 +196,7 @@ types:
array:
bson_serialization_type: array
- description: "An unowned BSONArray without custom deserialization or seialization"
+ description: "An unowned BSONArray without custom deserialization or serialization"
cpp_type: "mongo::BSONArray"
array_owned:
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index 8965f0bcc98..6bd83eaba2c 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -240,6 +240,11 @@ NamespaceString NamespaceString::makeDummyNamespace(const boost::optional<Tenant
return NamespaceString(tenantId, DatabaseName::kConfig.db(), "dummy.namespace");
}
+NamespaceString NamespaceString::makeTenantOplogApplierProgressNSS(const UUID& migrationUUID) {
+ return NamespaceString(DatabaseName::kConfig,
+ NamespaceString::kTenantOplogApplierProgressPrefix +
+ migrationUUID.toString());
+}
std::string NamespaceString::getSisterNS(StringData local) const {
verify(local.size() && local[0] != '.');
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index 0669d777caa..a19f446f62e 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -188,6 +188,8 @@ public:
static constexpr StringData kAnalyzeShardKeySplitPointsCollectionPrefix =
"analyzeShardKey.splitPoints."_sd;
+ // Prefix for tracking TenantOplogApplier progress during a tenant migration.
+ static constexpr StringData kTenantOplogApplierProgressPrefix = "repl.migration.progress_"_sd;
// Maintainers Note: The large set of `NamespaceString`-typed static data
// members of the `NamespaceString` class representing system-reserved
@@ -383,6 +385,12 @@ public:
static NamespaceString makeDummyNamespace(const boost::optional<TenantId>& tenantId);
/**
+ * Constructs a NamespaceString used for tracking TenantOplogApplier progress during a tenant
+ * migration.
+ */
+ static NamespaceString makeTenantOplogApplierProgressNSS(const UUID& migrationUUID);
+
+ /**
* NOTE: DollarInDbNameBehavior::allow is deprecated.
*
* Please use DollarInDbNameBehavior::disallow and check explicitly for any DB names that must
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 4f7c9df477c..637097b7f65 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -2060,6 +2060,7 @@ env.Library(
source=[
'tenant_oplog_batcher.cpp',
'tenant_oplog_applier.cpp',
+ 'tenant_oplog_applier_progress.idl',
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/base',
diff --git a/src/mongo/db/repl/shard_merge_recipient_service.cpp b/src/mongo/db/repl/shard_merge_recipient_service.cpp
index 8e362697ee3..1392b3cd6f6 100644
--- a/src/mongo/db/repl/shard_merge_recipient_service.cpp
+++ b/src/mongo/db/repl/shard_merge_recipient_service.cpp
@@ -2112,15 +2112,18 @@ void ShardMergeRecipientService::Instance::_startOplogApplier() {
invariant(startApplyingDonorOpTime);
const auto& cloneFinishedRecipientOpTime = _stateDoc.getCloneFinishedRecipientOpTime();
invariant(cloneFinishedRecipientOpTime);
+ invariant(!cloneFinishedRecipientOpTime->isNull());
_tenantOplogApplier = std::make_shared<TenantOplogApplier>(_migrationUuid,
MigrationProtocolEnum::kShardMerge,
boost::none,
+ boost::none,
*startApplyingDonorOpTime,
_donorOplogBuffer.get(),
**_scopedExecutor,
- _writerPool.get());
- _tenantOplogApplier->setCloneFinishedRecipientOpTime(*cloneFinishedRecipientOpTime);
+ _writerPool.get(),
+ *cloneFinishedRecipientOpTime,
+ false);
LOGV2_DEBUG(7339750,
1,
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index 1e01d46a9be..d64e9af6433 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -95,6 +95,8 @@ using namespace fmt;
const std::string kTTLIndexName = "TenantMigrationRecipientTTLIndex";
const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max());
constexpr StringData kOplogBufferPrefix = "repl.migration.oplog_"_sd;
+
+constexpr StringData kTenantOplogApplierProgressPrefix = "repl.migration.progress_"_sd;
constexpr int kBackupCursorFileFetcherRetryAttempts = 10;
constexpr int kCheckpointTsBackupCursorErrorCode = 6929900;
constexpr int kCloseCursorBeforeOpenErrorCode = 50886;
@@ -2672,16 +2674,7 @@ void TenantMigrationRecipientService::Instance::_startOplogApplier() {
stdx::unique_lock lk(_mutex);
const auto& cloneFinishedRecipientOpTime = _stateDoc.getCloneFinishedRecipientOpTime();
invariant(cloneFinishedRecipientOpTime);
-
- OpTime resumeOpTime;
- if (_sharedData->getResumePhase() == ResumePhase::kOplogCatchup) {
- lk.unlock();
- // We avoid holding the mutex while scanning the local oplog which
- // acquires the RSTL in IX mode. This is to allow us to be interruptable
- // via a concurrent stepDown which acquires the RSTL in X mode.
- resumeOpTime = _getOplogResumeApplyingDonorOptime(*cloneFinishedRecipientOpTime);
- lk.lock();
- }
+ invariant(!cloneFinishedRecipientOpTime->isNull());
// Throwing error when cloner is canceled externally via interrupt(),
// makes the instance to skip the remaining task (i.e., starting oplog
@@ -2696,18 +2689,37 @@ void TenantMigrationRecipientService::Instance::_startOplogApplier() {
const auto& startApplyingDonorOpTime = _stateDoc.getStartApplyingDonorOpTime();
invariant(startApplyingDonorOpTime);
- _tenantOplogApplier = std::make_shared<TenantOplogApplier>(
- _migrationUuid,
- _protocol,
- (_protocol != MigrationProtocolEnum::kShardMerge) ? boost::make_optional(_tenantId)
- : boost::none,
- (!resumeOpTime.isNull()) ? std::max(resumeOpTime, *startApplyingDonorOpTime)
- : *startApplyingDonorOpTime,
- _donorOplogBuffer.get(),
- **_scopedExecutor,
- _writerPool.get(),
- resumeOpTime.getTimestamp());
- _tenantOplogApplier->setCloneFinishedRecipientOpTime(*cloneFinishedRecipientOpTime);
+ OpTime deprecatedResumeOpTime;
+ boost::optional<NamespaceString> nss = boost::none;
+ // If we are running < 7.0, fall back to oplog scanning for TenantOplogApplier resumption. Only
+ // versions 7.0 and later will store oplog applier progress data in a replicated collection.
+ if (serverGlobalParams.featureCompatibility.isLessThan(
+ multiversion::FeatureCompatibilityVersion::kVersion_7_0)) {
+ // We avoid holding the mutex while scanning the local oplog which
+ // acquires the RSTL in IX mode. This is to allow us to be interruptable
+ // via a concurrent stepDown which acquires the RSTL in X mode.
+ lk.unlock();
+ // This node is not aware of the tenant oplog applier progress collection, fall back to
+ // scanning the oplog to ensure that we don't throw away our previous progress.
+ deprecatedResumeOpTime = _getOplogResumeApplyingDonorOptime(*cloneFinishedRecipientOpTime);
+ lk.lock();
+ } else {
+ nss = NamespaceString::makeTenantOplogApplierProgressNSS(_migrationUuid);
+ }
+
+ _tenantOplogApplier = std::make_shared<TenantOplogApplier>(_migrationUuid,
+ _protocol,
+ _tenantId,
+ nss,
+ *startApplyingDonorOpTime,
+ _donorOplogBuffer.get(),
+ **_scopedExecutor,
+ _writerPool.get(),
+ *cloneFinishedRecipientOpTime,
+ _sharedData->getResumePhase() ==
+ ResumePhase::kOplogCatchup);
+
+ _tenantOplogApplier->setDeprecatedResumeOpTime(deprecatedResumeOpTime);
LOGV2_DEBUG(4881202,
1,
@@ -2715,8 +2727,7 @@ void TenantMigrationRecipientService::Instance::_startOplogApplier() {
"tenantId"_attr = getTenantId(),
"migrationId"_attr = getMigrationUUID(),
"startApplyingAfterDonorOpTime"_attr =
- _tenantOplogApplier->getStartApplyingAfterOpTime(),
- "resumeBatchingTs"_attr = _tenantOplogApplier->getResumeBatchingTs());
+ _tenantOplogApplier->getStartApplyingAfterOpTime());
uassertStatusOK(_tenantOplogApplier->startup());
_oplogApplierReady = true;
@@ -2856,15 +2867,18 @@ void TenantMigrationRecipientService::Instance::_dropTempCollections() {
auto opCtx = cc().makeOperationContext();
auto storageInterface = StorageInterface::get(opCtx.get());
- // The donated files and oplog buffer collections can be safely dropped at this
- // point. In case either collection does not exist, dropping will be a no-op.
- // It isn't necessary that a given drop is majority-committed. A new primary will
- // attempt to drop the collection anyway.
+ // The donated files, oplog buffer, and tenant oplog applier progress collections
+ // can be safely dropped at this point. In case either collection does not exist,
+ // dropping will be a no-op. It isn't necessary that a given drop is
+ // majority-committed. A new primary will attempt to drop the collection anyway.
uassertStatusOK(storageInterface->dropCollection(
opCtx.get(), shard_merge_utils::getDonatedFilesNs(getMigrationUUID())));
uassertStatusOK(
storageInterface->dropCollection(opCtx.get(), getOplogBufferNs(getMigrationUUID())));
+
+ uassertStatusOK(storageInterface->dropCollection(
+ opCtx.get(), NamespaceString::makeTenantOplogApplierProgressNSS(getMigrationUUID())));
}
SemiFuture<void> TenantMigrationRecipientService::Instance::run(
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h
index 69e2020b6a0..254553a626a 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.h
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.h
@@ -39,6 +39,7 @@
#include "mongo/db/repl/tenant_all_database_cloner.h"
#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
#include "mongo/db/repl/tenant_oplog_applier.h"
+#include "mongo/db/repl/tenant_oplog_applier_progress_gen.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/util/time_support.h"
@@ -489,7 +490,10 @@ public:
/*
* Traverse backwards through the oplog to find the optime which tenant oplog application
* should resume from. The oplog applier should resume applying entries that have a greater
- * optime than the returned value.
+ * optime than the returned value. Note, this is currently only used as a fallback for if
+ * _getStoredTenantOplogApplierProgress returns no results in order to preserve backwards
+ * compatibility and avoid the reapplication of noop entries. This can be removed after
+ * all Serverless nodes are running the new progress collection-aware version of the code.
*/
OpTime _getOplogResumeApplyingDonorOptime(const OpTime& cloneFinishedRecipientOpTime) const;
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
index 36c74cdacbe..c883e976300 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
@@ -1853,468 +1853,6 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogFetcherNoDocInBufferToResumeFro
ASSERT_OK(instance->getForgetMigrationDurableFuture().getNoThrow());
}
-TEST_F(TenantMigrationRecipientServiceTest, OplogApplierResumesFromLastNoOpOplogEntry) {
- const UUID migrationUUID = UUID::gen();
- // Recipient opTimes
- const OpTime clonerFinishedOpTime(Timestamp(1, 1), 1);
- // Donor opTimes
- const OpTime earlierThanResumeOpTime(Timestamp(2, 1), 1);
- const OpTime resumeOpTime(Timestamp(3, 1), 1);
- const OpTime dataConsistentOpTime(Timestamp(4, 1), 1);
-
- MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
- getTopologyManager()->setTopologyDescription(replSet.getTopologyDescription(clock()));
- insertTopOfOplog(&replSet, clonerFinishedOpTime);
-
- const auto tenantId = OID::gen().toString();
- TenantMigrationRecipientDocument initialStateDocument(
- migrationUUID,
- replSet.getConnectionString(),
- tenantId,
- kDefaultStartMigrationTimestamp,
- ReadPreferenceSetting(ReadPreference::PrimaryOnly));
- initialStateDocument.setProtocol(MigrationProtocolEnum::kMultitenantMigrations);
- initialStateDocument.setRecipientCertificateForDonor(kRecipientPEMPayload);
-
- // We skip cloning here as a way to simulate that the recipient service has detected an existing
- // migration on startup and will attempt to resume oplog fetching from the appropriate optime.
- updateStateDocToCloningFinished(initialStateDocument,
- clonerFinishedOpTime /* cloneFinishedRecipientOpTime */,
- dataConsistentOpTime /* dataConsistentStopDonorOpTime */,
- clonerFinishedOpTime /* startApplyingDonorOpTime */,
- clonerFinishedOpTime /* startFetchingDonorOpTime */);
-
- auto opCtx = makeOperationContext();
- std::shared_ptr<TenantMigrationRecipientService::Instance> instance;
-
- // Hang before reading oplog.
- const auto hangAfterStartingOplogFetcher =
- globalFailPointRegistry().find("fpAfterStartingOplogFetcherMigrationRecipientInstance");
- hangAfterStartingOplogFetcher->setMode(FailPoint::alwaysOn,
- 0,
- BSON("action"
- << "hang"));
-
- // Hang before starting the oplog applier.
- const auto hangAfterStartingOplogApplier =
- globalFailPointRegistry().find("fpAfterStartingOplogApplierMigrationRecipientInstance");
- auto initialTimesEntered = hangAfterStartingOplogApplier->setMode(FailPoint::alwaysOn,
- 0,
- BSON("action"
- << "hang"));
-
- {
- FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance");
- // Create and start the instance.
- instance = TenantMigrationRecipientService::Instance::getOrCreate(
- opCtx.get(), _service, initialStateDocument.toBSON());
- ASSERT(instance.get());
- instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>());
- }
- // Create and insert two tenant migration no-op entries into the oplog. The oplog applier should
- // resume from the no-op entry with the most recent donor opTime.
- const auto insertNss = NamespaceString::createNamespaceString_forTest(tenantId + "_foo.bar");
- const auto earlierOplogBson = makeOplogEntry(earlierThanResumeOpTime,
- OpTypeEnum::kInsert,
- insertNss,
- UUID::gen(),
- BSON("doc" << 1),
- boost::none /* o2 */)
- .getEntry()
- .toBSON();
- const auto resumeOplogBson = makeOplogEntry(resumeOpTime,
- OpTypeEnum::kInsert,
- insertNss,
- UUID::gen(),
- BSON("doc" << 2),
- boost::none /* o2 */)
- .getEntry()
- .toBSON();
- auto storage = StorageInterface::get(opCtx->getServiceContext());
- const auto oplogNss = NamespaceString::kRsOplogNamespace;
- const OpTime earlierRecipientOpTime(Timestamp(9, 1), 1);
- const OpTime resumeRecipientOpTime(Timestamp(10, 1), 1);
- auto earlierNoOpEntry = makeNoOpOplogEntry(earlierRecipientOpTime,
- insertNss,
- UUID::gen(),
- earlierOplogBson,
- instance->getMigrationUUID());
- auto resumeNoOpEntry = makeNoOpOplogEntry(resumeRecipientOpTime,
- insertNss,
- UUID::gen(),
- resumeOplogBson,
- instance->getMigrationUUID());
- ASSERT_OK(
- storage->insertDocument(opCtx.get(),
- oplogNss,
- {earlierNoOpEntry.toBSON(), earlierRecipientOpTime.getTimestamp()},
- earlierRecipientOpTime.getTerm()));
- ASSERT_OK(
- storage->insertDocument(opCtx.get(),
- oplogNss,
- {resumeNoOpEntry.toBSON(), resumeRecipientOpTime.getTimestamp()},
- resumeRecipientOpTime.getTerm()));
-
- hangAfterStartingOplogFetcher->setMode(FailPoint::off);
- hangAfterStartingOplogApplier->waitForTimesEntered(initialTimesEntered + 1);
-
- auto oplogFetcher = getDonorOplogFetcher(instance.get());
- auto dataConsistentOplogEntry = makeOplogEntry(dataConsistentOpTime,
- OpTypeEnum::kInsert,
- insertNss,
- UUID::gen(),
- BSON("doc" << 3),
- boost::none /* o2 */);
- // Feed the oplog fetcher the last doc required for the recipient to be considered consistent.
- oplogFetcher->receiveBatch(
- 1, {dataConsistentOplogEntry.getEntry().toBSON()}, dataConsistentOpTime.getTimestamp());
-
- // Allow the service to continue.
- hangAfterStartingOplogApplier->setMode(FailPoint::off);
- LOGV2(5272350,
- "Waiting for recipient service to reach consistent state",
- "suite"_attr = _agent.getSuiteName(),
- "test"_attr = _agent.getTestName());
- instance->waitUntilMigrationReachesConsistentState(opCtx.get());
-
- // The oplog applier should have started batching and applying at the donor opTime equal to
- // 'resumeOpTime'.
- const auto oplogApplier = getTenantOplogApplier(instance.get());
- ASSERT_EQUALS(resumeOpTime, oplogApplier->getStartApplyingAfterOpTime());
- ASSERT_EQUALS(resumeOpTime.getTimestamp(), oplogApplier->getResumeBatchingTs());
-
- // Stop the oplog applier.
- instance->stopOplogApplier_forTest();
- // Wait for task completion. Since we're using a test function to cancel the applier,
- // the actual result is not critical.
- ASSERT_NOT_OK(instance->getDataSyncCompletionFuture().getNoThrow());
- ASSERT_OK(instance->getForgetMigrationDurableFuture().getNoThrow());
-}
-
-TEST_F(TenantMigrationRecipientServiceTest,
- OplogApplierResumesBatchingAndApplyingAtDifferentTimestamps) {
- const UUID migrationUUID = UUID::gen();
- // Donor opTimes
- const OpTime startApplyingOpTime(Timestamp(2, 1), 1);
- const OpTime dataConsistentOpTime(Timestamp(4, 1), 1);
-
- MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
- getTopologyManager()->setTopologyDescription(replSet.getTopologyDescription(clock()));
- insertTopOfOplog(&replSet, startApplyingOpTime);
-
- const auto tenantId = OID::gen().toString();
- TenantMigrationRecipientDocument initialStateDocument(
- migrationUUID,
- replSet.getConnectionString(),
- tenantId,
- kDefaultStartMigrationTimestamp,
- ReadPreferenceSetting(ReadPreference::PrimaryOnly));
- initialStateDocument.setProtocol(MigrationProtocolEnum::kMultitenantMigrations);
- initialStateDocument.setRecipientCertificateForDonor(kRecipientPEMPayload);
-
- // We skip cloning here as a way to simulate that the recipient service has detected an existing
- // migration on startup and will attempt to resume oplog fetching from the appropriate optime.
- updateStateDocToCloningFinished(initialStateDocument,
- OpTime(Timestamp(10, 1), 1) /* cloneFinishedRecipientOpTime
- */
- ,
- dataConsistentOpTime /* dataConsistentStopDonorOpTime */,
- startApplyingOpTime /* startApplyingDonorOpTime */,
- startApplyingOpTime /* startFetchingDonorOpTime */);
-
- auto opCtx = makeOperationContext();
- std::shared_ptr<TenantMigrationRecipientService::Instance> instance;
-
- // Hang before creating the oplog applier.
- const auto hangBeforeCreatingOplogApplier =
- globalFailPointRegistry().find("fpAfterStartingOplogFetcherMigrationRecipientInstance");
- hangBeforeCreatingOplogApplier->setMode(FailPoint::alwaysOn,
- 0,
- BSON("action"
- << "hang"));
- // Hang after starting the oplog applier.
- const auto hangAfterStartingOplogApplier =
- globalFailPointRegistry().find("fpAfterStartingOplogApplierMigrationRecipientInstance");
- auto initialTimesEntered = hangAfterStartingOplogApplier->setMode(FailPoint::alwaysOn,
- 0,
- BSON("action"
- << "hang"));
-
- {
- FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance");
- // Create and start the instance.
- instance = TenantMigrationRecipientService::Instance::getOrCreate(
- opCtx.get(), _service, initialStateDocument.toBSON());
- ASSERT(instance.get());
- instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>());
- }
-
- // Create and insert the following into the oplog:
- // - (1) An oplog entry with opTime earlier than 'cloneFinishedRecipientOpTime'.
- // - (2) An oplog entry with opTime greater than 'cloneFinishedRecipientOpTime'.
- // - (3) A no-op oplog entry with an inner donor oplog entry as the 'o2' field. The donor opTime
- // is less than the 'startApplyingDonorOpTime'. We will resume batching from this
- // timestamp.
- // - (4) A no-op oplog entry with an inner oplog entry as the 'o2' field but no
- // 'fromTenantMigrate' field. This oplog entry does not satisfy the conditions
- // for the oplog applier to resume applying from so we default to apply from
- // 'startDonorApplyingOpTime'.
- const auto insertNss = NamespaceString::createNamespaceString_forTest(tenantId + "_foo.bar");
- const auto beforeStartApplyingOpTime = OpTime(Timestamp(1, 1), 1);
- const auto entryBeforeStartApplyingOpTime = makeOplogEntry(
- beforeStartApplyingOpTime,
- OpTypeEnum::kInsert,
- insertNss,
- UUID::gen(),
- BSON("doc"
- << "before startApplyingDonorOpTime"),
- boost::none /* o2 */)
- .getEntry()
- .toBSON();
- const auto afterStartApplyingOpTime = OpTime(Timestamp(3, 1), 1);
- const auto entryAfterStartApplyingOpTime = makeOplogEntry(
- afterStartApplyingOpTime,
- OpTypeEnum::kInsert,
- insertNss,
- UUID::gen(),
- BSON("doc"
- << "after startApplyingDonorOpTime"),
- boost::none /* o2 */)
- .getEntry()
- .toBSON();
- auto storage = StorageInterface::get(opCtx->getServiceContext());
- const auto oplogNss = NamespaceString::kRsOplogNamespace;
- const auto collUuid = UUID::gen();
- std::vector<DurableOplogEntry> oplogEntries;
- std::vector<MutableOplogEntry> noOpEntries;
- // (1)
- oplogEntries.push_back(makeOplogEntry(OpTime(Timestamp(9, 1), 1),
- OpTypeEnum::kInsert,
- insertNss,
- collUuid,
- BSON("doc"
- << "before clonerFinishedOpTime"),
- boost::none /* o2 */)
- .getEntry());
- // (2)
- oplogEntries.push_back(makeOplogEntry(OpTime(Timestamp(11, 1), 1),
- OpTypeEnum::kInsert,
- insertNss,
- collUuid,
- BSON("doc"
- << "after clonerFinishedOpTime"),
- boost::none /* o2 */)
- .getEntry());
- // (3)
- noOpEntries.push_back(makeNoOpOplogEntry(OpTime(Timestamp(12, 1), 1),
- insertNss,
- collUuid,
- entryBeforeStartApplyingOpTime,
- instance->getMigrationUUID()));
- // (4)
- noOpEntries.push_back(makeNoOpOplogEntry(OpTime(Timestamp(13, 1), 1),
- insertNss,
- collUuid,
- entryAfterStartApplyingOpTime,
- boost::none /* migrationUUID */));
- for (const auto& entry : oplogEntries) {
- auto opTime = entry.getOpTime();
- ASSERT_OK(storage->insertDocument(
- opCtx.get(), oplogNss, {entry.toBSON(), opTime.getTimestamp()}, opTime.getTerm()));
- }
- for (const auto& entry : noOpEntries) {
- auto opTime = entry.getOpTime();
- ASSERT_OK(storage->insertDocument(
- opCtx.get(), oplogNss, {entry.toBSON(), opTime.getTimestamp()}, opTime.getTerm()));
- }
- // Move on to the next failpoint to hang after starting the oplog applier.
- hangBeforeCreatingOplogApplier->setMode(FailPoint::off);
- hangAfterStartingOplogApplier->waitForTimesEntered(initialTimesEntered + 1);
-
- auto dataConsistentOplogEntry =
- makeOplogEntry(dataConsistentOpTime,
- OpTypeEnum::kInsert,
- NamespaceString::createNamespaceString_forTest(tenantId + "_foo.bar"),
- UUID::gen(),
- BSON("doc" << 3),
- boost::none /* o2 */);
-
- auto oplogFetcher = getDonorOplogFetcher(instance.get());
- // Feed the oplog fetcher the last doc required for the recipient to be considered consistent.
- oplogFetcher->receiveBatch(
- 1, {dataConsistentOplogEntry.getEntry().toBSON()}, dataConsistentOpTime.getTimestamp());
-
- // Allow the service to continue.
- hangAfterStartingOplogApplier->setMode(FailPoint::off);
- LOGV2(5272340,
- "Waiting for recipient service to reach consistent state",
- "suite"_attr = _agent.getSuiteName(),
- "test"_attr = _agent.getTestName());
- instance->waitUntilMigrationReachesConsistentState(opCtx.get());
-
- const auto oplogApplier = getTenantOplogApplier(instance.get());
- // Resume batching from the first migration no-op oplog entry. In this test, this is before
- // the 'startApplyingDonorOpTime'.
- ASSERT_EQUALS(beforeStartApplyingOpTime.getTimestamp(), oplogApplier->getResumeBatchingTs());
- // The oplog applier starts applying from the donor opTime equal to 'beginApplyingOpTime'.
- ASSERT_EQUALS(startApplyingOpTime, oplogApplier->getStartApplyingAfterOpTime());
-
- // Stop the oplog applier.
- instance->stopOplogApplier_forTest();
- // Wait for task completion. Since we're using a test function to cancel the applier,
- // the actual result is not critical.
- ASSERT_NOT_OK(instance->getDataSyncCompletionFuture().getNoThrow());
- ASSERT_OK(instance->getForgetMigrationDurableFuture().getNoThrow());
-}
-
-TEST_F(TenantMigrationRecipientServiceTest, OplogApplierResumesFromStartDonorApplyingOpTime) {
- const UUID migrationUUID = UUID::gen();
- // Donor opTimes
- const OpTime startApplyingOpTime(Timestamp(2, 1), 1);
- const OpTime dataConsistentOpTime(Timestamp(4, 1), 1);
-
- MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
- getTopologyManager()->setTopologyDescription(replSet.getTopologyDescription(clock()));
- insertTopOfOplog(&replSet, startApplyingOpTime);
-
- const auto tenantId = OID::gen().toString();
- TenantMigrationRecipientDocument initialStateDocument(
- migrationUUID,
- replSet.getConnectionString(),
- tenantId,
- kDefaultStartMigrationTimestamp,
- ReadPreferenceSetting(ReadPreference::PrimaryOnly));
- initialStateDocument.setProtocol(MigrationProtocolEnum::kMultitenantMigrations);
- initialStateDocument.setRecipientCertificateForDonor(kRecipientPEMPayload);
-
- // We skip cloning here as a way to simulate that the recipient service has detected an existing
- // migration on startup and will attempt to resume oplog fetching from the appropriate optime.
- updateStateDocToCloningFinished(initialStateDocument,
- OpTime(Timestamp(10, 1), 1) /* cloneFinishedRecipientOpTime
- */
- ,
- dataConsistentOpTime /* dataConsistentStopDonorOpTime */,
- startApplyingOpTime /* startApplyingDonorOpTime */,
- startApplyingOpTime /* startFetchingDonorOpTime */);
-
- auto opCtx = makeOperationContext();
- std::shared_ptr<TenantMigrationRecipientService::Instance> instance;
-
- // Hang before starting the oplog applier.
- const auto hangAfterStartingOplogApplier =
- globalFailPointRegistry().find("fpAfterStartingOplogApplierMigrationRecipientInstance");
- auto initialTimesEntered = hangAfterStartingOplogApplier->setMode(FailPoint::alwaysOn,
- 0,
- BSON("action"
- << "hang"));
-
- {
- FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance");
- // Create and start the instance.
- instance = TenantMigrationRecipientService::Instance::getOrCreate(
- opCtx.get(), _service, initialStateDocument.toBSON());
- ASSERT(instance.get());
- instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>());
- }
-
- // Create and insert the following into the oplog:
- // - (1) An oplog entry with opTime earlier than 'cloneFinishedRecipientOpTime'.
- // - (2) An oplog entry with opTime greater than 'cloneFinishedRecipientOpTime'.
- // - (3) A no-op oplog entry with an inner oplog entry as the 'o2' field but no
- // 'fromTenantMigrate' field. This oplog entry does not satisfy the conditions
- // for the oplog applier to resume applying from so we default to applying and
- // batching from the start of the buffer collection.
- const auto insertNss = NamespaceString::createNamespaceString_forTest(tenantId + "_foo.bar");
- const auto afterStartApplyingOpTime = OpTime(Timestamp(3, 1), 1);
- const auto entryAfterStartApplyingOpTime = makeOplogEntry(
- afterStartApplyingOpTime,
- OpTypeEnum::kInsert,
- insertNss,
- UUID::gen(),
- BSON("doc"
- << "after startApplyingDonorOpTime"),
- boost::none /* o2 */)
- .getEntry()
- .toBSON();
- auto storage = StorageInterface::get(opCtx->getServiceContext());
- const auto oplogNss = NamespaceString::kRsOplogNamespace;
- const auto collUuid = UUID::gen();
- std::vector<DurableOplogEntry> oplogEntries;
- std::vector<MutableOplogEntry> noOpEntries;
- // (1)
- oplogEntries.push_back(makeOplogEntry(OpTime(Timestamp(9, 1), 1),
- OpTypeEnum::kInsert,
- insertNss,
- collUuid,
- BSON("doc"
- << "before clonerFinishedOpTime"),
- boost::none /* o2 */)
- .getEntry());
- // (2)
- oplogEntries.push_back(makeOplogEntry(OpTime(Timestamp(11, 1), 1),
- OpTypeEnum::kInsert,
- insertNss,
- collUuid,
- BSON("doc"
- << "after clonerFinishedOpTime"),
- boost::none /* o2 */)
- .getEntry());
- // (3)
- const auto laterOpTime = OpTime(Timestamp(13, 1), 1);
- const auto noOpEntry = makeNoOpOplogEntry(laterOpTime,
- insertNss,
- collUuid,
- entryAfterStartApplyingOpTime,
- boost::none /* migrationUUID */);
-
- for (const auto& entry : oplogEntries) {
- auto opTime = entry.getOpTime();
- ASSERT_OK(storage->insertDocument(
- opCtx.get(), oplogNss, {entry.toBSON(), opTime.getTimestamp()}, opTime.getTerm()));
- }
- ASSERT_OK(storage->insertDocument(opCtx.get(),
- oplogNss,
- {noOpEntry.toBSON(), laterOpTime.getTimestamp()},
- laterOpTime.getTerm()));
-
- hangAfterStartingOplogApplier->waitForTimesEntered(initialTimesEntered + 1);
-
- auto dataConsistentOplogEntry =
- makeOplogEntry(dataConsistentOpTime,
- OpTypeEnum::kInsert,
- NamespaceString::createNamespaceString_forTest(tenantId + "_foo.bar"),
- UUID::gen(),
- BSON("doc" << 3),
- boost::none /* o2 */);
-
- auto oplogFetcher = getDonorOplogFetcher(instance.get());
- // Feed the oplog fetcher the last doc required for the recipient to be considered consistent.
- oplogFetcher->receiveBatch(
- 1, {dataConsistentOplogEntry.getEntry().toBSON()}, dataConsistentOpTime.getTimestamp());
-
- // Allow the service to continue.
- hangAfterStartingOplogApplier->setMode(FailPoint::off);
- LOGV2(5394602,
- "Waiting for recipient service to reach consistent state",
- "suite"_attr = _agent.getSuiteName(),
- "test"_attr = _agent.getTestName());
- instance->waitUntilMigrationReachesConsistentState(opCtx.get());
-
- const auto oplogApplier = getTenantOplogApplier(instance.get());
- // There is no oplog entry to resume batching from, so we treat it as if we are resuming
- // oplog application from the start. The 'resumeBatchingTs' will be a null timestamp.
- ASSERT_EQUALS(Timestamp(), oplogApplier->getResumeBatchingTs());
- // The oplog applier starts applying from the donor opTime equal to 'beginApplyingOpTime'.
- ASSERT_EQUALS(startApplyingOpTime, oplogApplier->getStartApplyingAfterOpTime());
-
- // Stop the oplog applier.
- instance->stopOplogApplier_forTest();
- // Wait for task completion. Since we're using a test function to cancel the applier,
- // the actual result is not critical.
- ASSERT_NOT_OK(instance->getDataSyncCompletionFuture().getNoThrow());
- ASSERT_OK(instance->getForgetMigrationDurableFuture().getNoThrow());
-}
-
TEST_F(TenantMigrationRecipientServiceTest,
OplogFetcherResumesFromStartFetchingOpTimeWithDocInBuffer) {
const UUID migrationUUID = UUID::gen();
diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp
index aa92bd63a88..95c2643513b 100644
--- a/src/mongo/db/repl/tenant_oplog_applier.cpp
+++ b/src/mongo/db/repl/tenant_oplog_applier.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/db_raii.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/op_observer/op_observer.h"
+#include "mongo/db/persistent_task_store.h"
#include "mongo/db/repl/apply_ops.h"
#include "mongo/db/repl/cloner_utils.h"
#include "mongo/db/repl/insert_group.h"
@@ -70,21 +71,25 @@ MONGO_FAIL_POINT_DEFINE(fpBeforeTenantOplogApplyingBatch);
TenantOplogApplier::TenantOplogApplier(const UUID& migrationUuid,
const MigrationProtocolEnum& protocol,
boost::optional<std::string> tenantId,
+ boost::optional<NamespaceString> progressNss,
OpTime startApplyingAfterOpTime,
RandomAccessOplogBuffer* oplogBuffer,
std::shared_ptr<executor::TaskExecutor> executor,
ThreadPool* writerPool,
- Timestamp resumeBatchingTs)
+ OpTime cloneFinishedRecipientOpTime,
+ const bool isResuming)
: AbstractAsyncComponent(executor.get(),
std::string("TenantOplogApplier_") + migrationUuid.toString()),
_migrationUuid(migrationUuid),
+ _progressNamespaceString(progressNss),
_protocol(protocol),
_tenantId(tenantId),
_startApplyingAfterOpTime(startApplyingAfterOpTime),
_oplogBuffer(oplogBuffer),
_executor(std::move(executor)),
+ _cloneFinishedRecipientOpTime(cloneFinishedRecipientOpTime),
_writerPool(writerPool),
- _resumeBatchingTs(resumeBatchingTs) {
+ _isResuming(isResuming) {
if (_protocol != MigrationProtocolEnum::kShardMerge) {
invariant(_tenantId);
} else {
@@ -120,21 +125,58 @@ OpTime TenantOplogApplier::getStartApplyingAfterOpTime() const {
return _startApplyingAfterOpTime;
}
-Timestamp TenantOplogApplier::getResumeBatchingTs() const {
- return _resumeBatchingTs;
+boost::optional<TenantOplogApplierProgress> TenantOplogApplier::getStoredProgress(
+ OperationContext* opCtx) {
+ if (!_progressNamespaceString) {
+ return boost::none;
+ }
+
+ DBDirectClient client(opCtx);
+ const auto tenantOplogApplierProgress =
+ client.findOne(_progressNamespaceString.get(),
+ BSON(TenantOplogApplierProgress::kMigrationUuidFieldName << _migrationUuid));
+ if (tenantOplogApplierProgress.isEmpty()) {
+ return boost::none;
+ }
+
+ IDLParserContext ctx("TenantOplogApplierProgress");
+ return TenantOplogApplierProgress::parse(ctx, tenantOplogApplierProgress);
}
-void TenantOplogApplier::setCloneFinishedRecipientOpTime(OpTime cloneFinishedRecipientOpTime) {
- stdx::lock_guard lk(_mutex);
- invariant(!_isActive_inlock());
- invariant(!cloneFinishedRecipientOpTime.isNull());
- invariant(_cloneFinishedRecipientOpTime.isNull());
- _cloneFinishedRecipientOpTime = cloneFinishedRecipientOpTime;
+void TenantOplogApplier::_storeProgress(OperationContext* opCtx, OpTime donorOpTime) {
+ if (!_progressNamespaceString) {
+ return;
+ }
+
+ PersistentTaskStore<TenantOplogApplierProgress> store(_progressNamespaceString.get());
+
+ BSONObjBuilder builder;
+ builder.append("$set", BSON(TenantOplogApplierProgress::kDonorOpTimeFieldName << donorOpTime));
+
+ store.upsert(opCtx,
+ BSON(TenantOplogApplierProgress::kMigrationUuidFieldName << _migrationUuid),
+ builder.obj());
}
void TenantOplogApplier::_doStartup_inlock() {
+ Timestamp resumeBatchingTs = Timestamp();
+ if (_isResuming) {
+ if (_deprecatedResumeOpTime.isNull()) {
+ auto opCtx = cc().makeOperationContext();
+ if (const auto storedProgress = getStoredProgress(opCtx.get())) {
+ auto donorOpTime = storedProgress->getDonorOpTime();
+ _startApplyingAfterOpTime = std::max(donorOpTime, _startApplyingAfterOpTime);
+ resumeBatchingTs = donorOpTime.getTimestamp();
+ }
+ } else {
+ _startApplyingAfterOpTime =
+ std::max(_deprecatedResumeOpTime, _startApplyingAfterOpTime);
+ resumeBatchingTs = _deprecatedResumeOpTime.getTimestamp();
+ }
+ }
+
_oplogBatcher = std::make_shared<TenantOplogBatcher>(
- _migrationUuid, _oplogBuffer, _executor, _resumeBatchingTs, _startApplyingAfterOpTime);
+ _migrationUuid, _oplogBuffer, _executor, resumeBatchingTs, _startApplyingAfterOpTime);
uassertStatusOK(_oplogBatcher->startup());
auto fut = _oplogBatcher->getNextBatch(
TenantOplogBatcher::BatchLimits(std::size_t(tenantApplierBatchSizeBytes.load()),
@@ -322,6 +364,7 @@ void TenantOplogApplier::_applyOplogBatch(TenantOplogBatch* batch) {
"Tenant Oplog Applier starting to write no-ops",
"protocol"_attr = _protocol,
"migrationId"_attr = _migrationUuid);
+
auto lastBatchCompletedOpTimes = _writeNoOpEntries(opCtx.get(), *batch);
stdx::lock_guard lk(_mutex);
_lastAppliedOpTimesUpToLastBatch.donorOpTime = lastBatchCompletedOpTimes.donorOpTime;
@@ -334,6 +377,8 @@ void TenantOplogApplier::_applyOplogBatch(TenantOplogBatch* batch) {
_numOpsApplied += batch->ops.size();
+ _storeProgress(opCtx.get(), lastBatchCompletedOpTimes.donorOpTime);
+
LOGV2_DEBUG(4886002,
1,
"Tenant Oplog Applier finished applying batch",
@@ -630,7 +675,12 @@ void TenantOplogApplier::_writeSessionNoOpsForRange(
// Check out the session.
if (!scopedSession) {
auto mongoDSessionCatalog = MongoDSessionCatalog::get(opCtx.get());
- scopedSession = mongoDSessionCatalog->checkOutSessionWithoutOplogRead(opCtx.get());
+ if (_isResuming) {
+ scopedSession = mongoDSessionCatalog->checkOutSession(opCtx.get());
+ } else {
+ scopedSession =
+ mongoDSessionCatalog->checkOutSessionWithoutOplogRead(opCtx.get());
+ }
}
auto txnParticipant = TransactionParticipant::get(opCtx.get());
@@ -736,6 +786,7 @@ void TenantOplogApplier::_writeSessionNoOpsForRange(
noopEntry.setObject2(o2Entry.toBSON());
}
}
+
stmtIds.insert(stmtIds.end(), entryStmtIds.begin(), entryStmtIds.end());
if (!prePostImageEntry && (entry.getPreImageOpTime() || entry.getPostImageOpTime())) {
@@ -779,9 +830,15 @@ void TenantOplogApplier::_writeSessionNoOpsForRange(
opCtx->setLogicalSessionId(sessionId);
opCtx->setTxnNumber(txnNumber);
}
+
if (!scopedSession) {
auto mongoDSessionCatalog = MongoDSessionCatalog::get(opCtx.get());
- scopedSession = mongoDSessionCatalog->checkOutSessionWithoutOplogRead(opCtx.get());
+ if (_isResuming) {
+ scopedSession = mongoDSessionCatalog->checkOutSession(opCtx.get());
+ } else {
+ scopedSession =
+ mongoDSessionCatalog->checkOutSessionWithoutOplogRead(opCtx.get());
+ }
}
auto txnParticipant = TransactionParticipant::get(opCtx.get());
@@ -790,6 +847,7 @@ void TenantOplogApplier::_writeSessionNoOpsForRange(
"for transaction "
<< txnNumber << " on session " << sessionId,
txnParticipant);
+
// beginOrContinue throws on failure, which will abort the migration. Failure should
// only result from out-of-order processing, which should not happen.
TxnNumberAndRetryCounter txnNumberAndRetryCounter{txnNumber};
@@ -819,7 +877,7 @@ void TenantOplogApplier::_writeSessionNoOpsForRange(
"_cloneFinishedRecipientOpTime"_attr = _cloneFinishedRecipientOpTime,
"sessionId"_attr = sessionId,
"txnNumber"_attr = txnNumber,
- "statementIds"_attr = entryStmtIds,
+ "statementIds"_attr = stmtIds,
"protocol"_attr = _protocol,
"migrationId"_attr = _migrationUuid);
txnParticipant.invalidate(opCtx.get());
@@ -832,19 +890,26 @@ void TenantOplogApplier::_writeSessionNoOpsForRange(
}
// We should never process the same donor statement twice, except in failover
- // cases where we'll also have "forgotten" the statement was executed.
- uassert(5350902,
- str::stream() << "Tenant oplog application processed same retryable write "
- "twice for transaction "
- << txnNumber << " statement " << entryStmtIds.front()
- << " on session " << sessionId,
- !txnParticipant.checkStatementExecutedNoOplogEntryFetch(opCtx.get(),
- entryStmtIds.front()));
+ // cases. In the event of a failover, it is possible that we were able to successfully
+ // log the noop but failed to persist progress checkpoint data. As a result, we can end
+ // up re-applying noop entries. We can safely skip the entry in this case.
+ if (txnParticipant.checkStatementExecutedNoOplogEntryFetch(opCtx.get(),
+ stmtIds.front())) {
+ LOGV2_DEBUG(7262200,
+ 1,
+ "Tenant Oplog Applier skipping previously processed retryable write",
+ "protocol"_attr = _protocol,
+ "migrationId"_attr = _migrationUuid,
+ "txnNumber"_attr = txnNumber,
+ "statement"_attr = entryStmtIds.front(),
+ "sessionId"_attr = sessionId);
+ continue;
+ }
// Set sessionId, txnNumber, and statementId for all ops in a retryable write.
noopEntry.setSessionId(sessionId);
noopEntry.setTxnNumber(txnNumber);
- noopEntry.setStatementIds(entryStmtIds);
+ noopEntry.setStatementIds(stmtIds);
// set fromMigrate on the no-op so the session update tracker recognizes it.
noopEntry.setFromMigrate(true);
@@ -884,7 +949,6 @@ void TenantOplogApplier::_writeSessionNoOpsForRange(
TransactionParticipant::get(opCtx.get())
.onWriteOpCompletedOnPrimary(opCtx.get(), {stmtIds}, *sessionTxnRecord);
}
-
wuow.commit();
});
prePostImageEntry = boost::none;
diff --git a/src/mongo/db/repl/tenant_oplog_applier.h b/src/mongo/db/repl/tenant_oplog_applier.h
index c4c218b7289..c9d79ca0ec4 100644
--- a/src/mongo/db/repl/tenant_oplog_applier.h
+++ b/src/mongo/db/repl/tenant_oplog_applier.h
@@ -36,6 +36,7 @@
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_buffer.h"
#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/db/repl/tenant_oplog_applier_progress_gen.h"
#include "mongo/db/repl/tenant_oplog_batcher.h"
#include "mongo/db/serverless/serverless_types_gen.h"
#include "mongo/util/future.h"
@@ -75,11 +76,13 @@ public:
TenantOplogApplier(const UUID& migrationUuid,
const MigrationProtocolEnum& protocol,
boost::optional<std::string> tenantId,
+ boost::optional<NamespaceString> progressNss,
OpTime StartApplyingAfterOpTime,
RandomAccessOplogBuffer* oplogBuffer,
std::shared_ptr<executor::TaskExecutor> executor,
ThreadPool* writerPool,
- Timestamp resumeBatchingTs = Timestamp());
+ OpTime cloneFinishedRecipientOpTime,
+ bool isResuming);
virtual ~TenantOplogApplier();
@@ -98,19 +101,20 @@ public:
}
/**
- * This should only be called once before the applier starts.
+ * Returns information describing TenantOplogApplier progress for the
+ * given migration UUID.
*/
- void setCloneFinishedRecipientOpTime(OpTime cloneFinishedRecipientOpTime);
+ boost::optional<TenantOplogApplierProgress> getStoredProgress(OperationContext* opCtx);
/**
* Returns the optime the applier will start applying from.
*/
OpTime getStartApplyingAfterOpTime() const;
- /**
- * Returns the timestamp the applier will resume batching from.
- */
- Timestamp getResumeBatchingTs() const;
+ void setDeprecatedResumeOpTime(OpTime deprecatedResumeOpTime) {
+ stdx::lock_guard lk(_mutex);
+ _deprecatedResumeOpTime = deprecatedResumeOpTime;
+ }
private:
void _doStartup_inlock() final;
@@ -143,6 +147,11 @@ private:
TenantOplogBatch* batch);
/**
+ * Stores information describing TenantOplogApplier progress in a replicated collection.
+ */
+ void _storeProgress(OperationContext* opCtx, OpTime donorOpTime);
+
+ /**
* Sets the _finalStatus to the new status if and only if the old status is "OK".
*/
void _setFinalStatusIfOk(WithLock, Status newStatus);
@@ -161,13 +170,14 @@ private:
// (X) Access only allowed from the main flow of control called from run() or constructor.
// Handles consuming oplog entries from the OplogBuffer for oplog application.
- std::shared_ptr<TenantOplogBatcher> _oplogBatcher; // (R)
- const UUID _migrationUuid; // (R)
- const MigrationProtocolEnum _protocol; // (R)
+ std::shared_ptr<TenantOplogBatcher> _oplogBatcher; // (R)
+ const UUID _migrationUuid; // (R)
+ const boost::optional<NamespaceString> _progressNamespaceString; // (R)
+ const MigrationProtocolEnum _protocol; // (R)
// For multi-tenant migration protocol, _tenantId is set.
// But, for shard merge protcol, _tenantId is empty.
const boost::optional<std::string> _tenantId; // (R)
- const OpTime _startApplyingAfterOpTime; // (R)
+ OpTime _startApplyingAfterOpTime; // (R)
RandomAccessOplogBuffer* _oplogBuffer; // (R)
std::shared_ptr<executor::TaskExecutor> _executor; // (R)
// All no-op entries written by this tenant migration should have OpTime greater than this
@@ -179,10 +189,11 @@ private:
// Pool of worker threads for writing ops to the databases.
// Not owned by us.
ThreadPool* const _writerPool; // (S)
- // The timestamp to resume batching from. A null timestamp indicates that the oplog applier
- // is starting fresh (not a retry), and will start batching from the beginning of the oplog
- // buffer.
- const Timestamp _resumeBatchingTs; // (R)
+ // Used when resuming oplog applier state via oplog scanning for nodes running FCV <= 6.3. If
+ // set, we will use this as our _startApplyingAfterOpTime, as well as the resume batching
+ // timestamp passed to the TenantOplogBatcher.
+ OpTime _deprecatedResumeOpTime; // (M)
+ const bool _isResuming; // (R)
std::map<OpTime, SharedPromise<OpTimePair>> _opTimeNotificationList; // (M)
Status _finalStatus = Status::OK(); // (M)
stdx::unordered_set<UUID, UUID::Hash> _knownGoodUuids; // (X)
diff --git a/src/mongo/db/repl/tenant_oplog_applier_progress.idl b/src/mongo/db/repl/tenant_oplog_applier_progress.idl
new file mode 100644
index 00000000000..a8cf8c98b03
--- /dev/null
+++ b/src/mongo/db/repl/tenant_oplog_applier_progress.idl
@@ -0,0 +1,50 @@
+# Copyright (C) 2023-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+#
+
+# This file defines the document used for storing progress by the tenant oplog applier.
+
+global:
+ cpp_namespace: "mongo"
+
+imports:
+ - "mongo/db/basic_types.idl"
+ - "mongo/db/repl/replication_types.idl"
+
+structs:
+ TenantOplogApplierProgress:
+ description: "Used for storing the progress made by the tenant oplog applier."
+ strict: true
+ fields:
+ _id:
+ type: uuid
+ description: "The UUID of the associated tenant migration."
+ cpp_name: migrationUuid
+ donorOpTime:
+ type: optime
+ description: "The last applied donor optime."
+ cpp_name: donorOpTime
diff --git a/src/mongo/db/repl/tenant_oplog_applier_test.cpp b/src/mongo/db/repl/tenant_oplog_applier_test.cpp
index 7e2b2bc2862..83ddd04502b 100644
--- a/src/mongo/db/repl/tenant_oplog_applier_test.cpp
+++ b/src/mongo/db/repl/tenant_oplog_applier_test.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/repl/oplog_applier_impl_test_fixture.h"
#include "mongo/db/repl/oplog_batcher_test_fixture.h"
#include "mongo/db/repl/oplog_entry_test_helpers.h"
+#include "mongo/db/repl/oplog_interface_local.h"
#include "mongo/db/repl/repl_server_parameters_gen.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/storage_interface_impl.h"
@@ -155,8 +156,8 @@ public:
// Set up oplog collection. If the WT storage engine is used, the oplog collection is
// expected to exist when fetching the next opTime (LocalOplogInfo::getNextOpTimes) to use
// for a write.
- _opCtx = cc().makeOperationContext();
- repl::createOplog(_opCtx.get());
+ auto opCtx = cc().makeOperationContext();
+ repl::createOplog(opCtx.get());
MongoDSessionCatalog::set(
service,
@@ -164,7 +165,7 @@ public:
std::make_unique<MongoDSessionCatalogTransactionInterfaceImpl>()));
// Ensure that we are primary.
- auto replCoord = ReplicationCoordinator::get(_opCtx.get());
+ auto replCoord = ReplicationCoordinator::get(opCtx.get());
ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_PRIMARY));
}
@@ -188,8 +189,42 @@ public:
_oplogBuffer.push(nullptr, bsonOps.begin(), bsonOps.end());
}
+ std::shared_ptr<TenantOplogApplier> makeTenantMigrationOplogApplier(
+ ThreadPool* writerPool,
+ OpTime startApplyingAfterOpTime = OpTime(),
+ OpTime cloneFinishedOpTime = OpTime(),
+ boost::optional<NamespaceString> progressNss = boost::none,
+ const bool isResuming = false) {
+ return std::make_shared<TenantOplogApplier>(_migrationUuid,
+ MigrationProtocolEnum::kMultitenantMigrations,
+ _tenantId,
+ progressNss,
+ startApplyingAfterOpTime,
+ &_oplogBuffer,
+ _executor,
+ writerPool,
+ cloneFinishedOpTime,
+ isResuming);
+ };
+
+ std::shared_ptr<TenantOplogApplier> makeShardMergeOplogApplier(
+ ThreadPool* writerPool,
+ OpTime startApplyingAfterOpTime = OpTime(),
+ OpTime cloneFinishedOpTime = OpTime()) {
+ return std::make_shared<TenantOplogApplier>(_migrationUuid,
+ MigrationProtocolEnum::kShardMerge,
+ boost::none,
+ boost::none,
+ startApplyingAfterOpTime,
+ &_oplogBuffer,
+ _executor,
+ writerPool,
+ cloneFinishedOpTime,
+ false);
+ };
+
StorageInterface* getStorageInterface() {
- return StorageInterface::get(_opCtx->getServiceContext());
+ return StorageInterface::get(getServiceContext());
}
protected:
@@ -199,7 +234,6 @@ protected:
std::string _tenantId = OID::gen().toString();
DatabaseName _dbName = DatabaseName(TenantId(OID(_tenantId)), "test");
UUID _migrationUuid = UUID::gen();
- ServiceContext::UniqueOperationContext _opCtx;
TenantOplogApplierTestOpObserver* _opObserver; // Owned by service context opObserverRegistry
private:
@@ -225,14 +259,7 @@ TEST_F(TenantOplogApplierTest, NoOpsForSingleBatch) {
auto writerPool = makeTenantMigrationWriterPool();
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
// Even if we wait for the first op in a batch, it is the last op we should be notified on.
auto lastBatchTimes = applier->getNotificationForOpTime(srcOps.front().getOpTime()).get();
@@ -243,7 +270,8 @@ TEST_F(TenantOplogApplierTest, NoOpsForSingleBatch) {
assertNoOpMatches(srcOps[1], entries[1]);
ASSERT_EQ(srcOps.size(), applier->getNumOpsApplied());
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ auto opCtx = cc().makeOperationContext();
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
@@ -260,14 +288,7 @@ TEST_F(TenantOplogApplierTest, NoOpsForLargeBatch) {
auto writerPool = makeTenantMigrationWriterPool();
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
// Even if we wait for the first op in a batch, it is the last op we should be notified on.
auto lastBatchTimes = applier->getNotificationForOpTime(srcOps.front().getOpTime()).get();
@@ -279,7 +300,8 @@ TEST_F(TenantOplogApplierTest, NoOpsForLargeBatch) {
}
ASSERT_EQ(srcOps.size(), applier->getNumOpsApplied());
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ auto opCtx = cc().makeOperationContext();
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
@@ -304,14 +326,7 @@ TEST_F(TenantOplogApplierTest, NoOpsForMultipleBatches) {
auto writerPool = makeTenantMigrationWriterPool();
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
tenantApplierBatchSizeBytes.store(100 * 1024 /* bytes */);
tenantApplierBatchSizeOps.store(2 /* ops */);
@@ -330,7 +345,8 @@ TEST_F(TenantOplogApplierTest, NoOpsForMultipleBatches) {
assertNoOpMatches(srcOps[2], entries[2]);
assertNoOpMatches(srcOps[3], entries[3]);
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ auto opCtx = cc().makeOperationContext();
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
@@ -370,14 +386,7 @@ TEST_F(TenantOplogApplierTest, NoOpsForLargeTransaction) {
auto writerPool = makeTenantMigrationWriterPool();
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
// The first two ops should come in the first batch.
auto firstBatchFuture = applier->getNotificationForOpTime(srcOps[0].getOpTime());
@@ -391,21 +400,23 @@ TEST_F(TenantOplogApplierTest, NoOpsForLargeTransaction) {
assertNoOpMatches(srcOps[i], entries[i]);
}
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ auto opCtx = cc().makeOperationContext();
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
TEST_F(TenantOplogApplierTest, CommitUnpreparedTransaction_DataPartiallyApplied) {
- createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace);
+ auto opCtx = cc().makeOperationContext();
+ createCollectionWithUuid(opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace);
{
- DBDirectClient client(_opCtx.get());
+ DBDirectClient client(opCtx.get());
client.createIndexes(NamespaceString::kSessionTransactionsTableNamespace,
{MongoDSessionCatalog::getConfigTxnPartialIndexSpec()});
}
NamespaceString nss =
NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar");
- auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
- auto lsid = makeLogicalSessionId(_opCtx.get());
+ auto uuid = createCollectionWithUuid(opCtx.get(), nss);
+ auto lsid = makeLogicalSessionId(opCtx.get());
TxnNumber txnNum(0);
const BSONObj doc1 = BSON("_id" << 1 << "data" << 1);
@@ -430,216 +441,195 @@ TEST_F(TenantOplogApplierTest, CommitUnpreparedTransaction_DataPartiallyApplied)
{StmtId(1)},
partialOp.getOpTime());
- ASSERT_OK(getStorageInterface()->insertDocument(_opCtx.get(),
+ ASSERT_OK(getStorageInterface()->insertDocument(opCtx.get(),
nss,
{doc1, commitOp.getOpTime().getTimestamp()},
commitOp.getOpTime().getTerm()));
- ASSERT_TRUE(docExists(_opCtx.get(), nss, doc1));
- ASSERT_FALSE(docExists(_opCtx.get(), nss, doc2));
+ ASSERT_TRUE(docExists(opCtx.get(), nss, doc1));
+ ASSERT_FALSE(docExists(opCtx.get(), nss, doc2));
pushOps({partialOp, commitOp});
auto writerPool = makeTenantMigrationWriterPool();
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
auto opAppliedFuture = applier->getNotificationForOpTime(commitOp.getOpTime());
ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
- ASSERT_TRUE(docExists(_opCtx.get(), nss, doc1));
- ASSERT_TRUE(docExists(_opCtx.get(), nss, doc2));
+ ASSERT_TRUE(docExists(opCtx.get(), nss, doc1));
+ ASSERT_TRUE(docExists(opCtx.get(), nss, doc2));
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyInsert_DatabaseMissing) {
- auto entry = makeInsertOplogEntry(
- 1,
- NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar"),
- UUID::gen());
- bool onInsertsCalled = false;
- _opObserver->onInsertsFn =
- [&](OperationContext* opCtx, const NamespaceString&, const std::vector<BSONObj>&) {
- onInsertsCalled = true;
- };
+ auto nss =
+ NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar");
+ auto entry = makeInsertOplogEntry(1, nss, UUID::gen());
+ bool onInsertsCalledForNss = false;
+ _opObserver->onInsertsFn = [&](OperationContext* opCtx,
+ const NamespaceString& onInsertsNss,
+ const std::vector<BSONObj>&) {
+ if (onInsertsNss == nss) {
+ onInsertsCalledForNss = true;
+ }
+ };
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
// Since no database was available, the insert shouldn't actually happen.
- ASSERT_FALSE(onInsertsCalled);
+ ASSERT_FALSE(onInsertsCalledForNss);
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ auto opCtx = cc().makeOperationContext();
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyInsert_CollectionMissing) {
- createDatabase(_opCtx.get(), _dbName.toString());
- auto entry = makeInsertOplogEntry(
- 1,
- NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar"),
- UUID::gen());
- bool onInsertsCalled = false;
- _opObserver->onInsertsFn =
- [&](OperationContext* opCtx, const NamespaceString&, const std::vector<BSONObj>&) {
- onInsertsCalled = true;
- };
+ auto opCtx = cc().makeOperationContext();
+ createDatabase(opCtx.get(), _dbName.toString());
+ auto nss =
+ NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar");
+ auto entry = makeInsertOplogEntry(1, nss, UUID::gen());
+ bool onInsertsCalledForNss = false;
+ _opObserver->onInsertsFn = [&](OperationContext* opCtx,
+ const NamespaceString& onInsertsNss,
+ const std::vector<BSONObj>&) {
+ if (onInsertsNss == nss) {
+ onInsertsCalledForNss = true;
+ }
+ };
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
// Since no collection was available, the insert shouldn't actually happen.
- ASSERT_FALSE(onInsertsCalled);
+ ASSERT_FALSE(onInsertsCalledForNss);
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyInsert_InsertExisting) {
+ auto opCtx = cc().makeOperationContext();
NamespaceString nss =
NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar");
- auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
- ASSERT_OK(getStorageInterface()->insertDocument(_opCtx.get(),
+ auto uuid = createCollectionWithUuid(opCtx.get(), nss);
+ ASSERT_OK(getStorageInterface()->insertDocument(opCtx.get(),
nss,
{BSON("_id" << 1 << "data"
<< "1")},
0));
auto entry = makeInsertOplogEntry(1, nss, uuid);
- bool onInsertsCalled = false;
- bool onUpdateCalled = false;
- _opObserver->onInsertsFn =
- [&](OperationContext* opCtx, const NamespaceString&, const std::vector<BSONObj>&) {
- onInsertsCalled = true;
- };
- _opObserver->onUpdateFn = [&](OperationContext* opCtx, const OplogUpdateEntryArgs&) {
- onUpdateCalled = true;
+ bool onInsertsCalledForNss = false;
+ bool onUpdateCalledForNss = false;
+ _opObserver->onInsertsFn = [&](OperationContext* opCtx,
+ const NamespaceString& onInsertsNss,
+ const std::vector<BSONObj>&) {
+ if (onInsertsNss == nss) {
+ onInsertsCalledForNss = true;
+ }
+ };
+ _opObserver->onUpdateFn = [&](OperationContext* opCtx,
+ const OplogUpdateEntryArgs& onUpdateArgs) {
+ if (onUpdateArgs.coll->ns() == nss) {
+ onUpdateCalledForNss = true;
+ }
};
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
// This insert gets converted to an upsert.
- ASSERT_FALSE(onInsertsCalled);
- ASSERT_TRUE(onUpdateCalled);
+ ASSERT_FALSE(onInsertsCalledForNss);
+ ASSERT_TRUE(onUpdateCalledForNss);
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyInsert_UniqueKey_InsertExisting) {
NamespaceString nss =
NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar");
- auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
+ auto opCtx = cc().makeOperationContext();
+ auto uuid = createCollectionWithUuid(opCtx.get(), nss);
// Create unique key index on the collection.
auto indexKey = BSON("data" << 1);
auto spec =
BSON("v" << int(IndexDescriptor::kLatestIndexVersion) << "key" << indexKey << "name"
<< (indexKey.firstElementFieldNameStringData() + "_1") << "unique" << true);
- createIndex(_opCtx.get(), nss, uuid, spec);
+ createIndex(opCtx.get(), nss, uuid, spec);
ASSERT_OK(getStorageInterface()->insertDocument(
- _opCtx.get(), nss, {BSON("_id" << 0 << "data" << 2)}, 0));
+ opCtx.get(), nss, {BSON("_id" << 0 << "data" << 2)}, 0));
// Insert an entry that conflicts with the existing document on the indexed field.
auto entry =
makeOplogEntry(repl::OpTypeEnum::kInsert, nss, uuid, BSON("_id" << 1 << "data" << 2));
- bool onInsertsCalled = false;
- _opObserver->onInsertsFn =
- [&](OperationContext* opCtx, const NamespaceString&, const std::vector<BSONObj>&) {
- onInsertsCalled = true;
- };
+ bool onInsertsCalledForNss = false;
+ _opObserver->onInsertsFn = [&](OperationContext* opCtx,
+ const NamespaceString& onInsertsNss,
+ const std::vector<BSONObj>&) {
+ if (onInsertsNss == nss) {
+ onInsertsCalledForNss = true;
+ }
+ };
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
// The DuplicateKey error should be ignored and insert should succeed.
- ASSERT_TRUE(onInsertsCalled);
+ ASSERT_TRUE(onInsertsCalledForNss);
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyInsert_Success) {
NamespaceString nss =
NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar");
- auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
+ auto opCtx = cc().makeOperationContext();
+ auto uuid = createCollectionWithUuid(opCtx.get(), nss);
auto entry = makeInsertOplogEntry(1, nss, uuid);
- bool onInsertsCalled = false;
- _opObserver->onInsertsFn =
- [&](OperationContext* opCtx, const NamespaceString& nss, const std::vector<BSONObj>& docs) {
- ASSERT_FALSE(onInsertsCalled);
- onInsertsCalled = true;
- // TODO Check that (nss.dbName() == _dbName) once the OplogEntry deserializer passes
- // "tid" to the NamespaceString constructor
- ASSERT_EQUALS(nss.dbName().db(), _dbName.toStringWithTenantId());
- ASSERT_EQUALS(nss.coll(), "bar");
+ bool onInsertsCalledForNss = false;
+ _opObserver->onInsertsFn = [&](OperationContext* opCtx,
+ const NamespaceString& onInsertsNss,
+ const std::vector<BSONObj>& docs) {
+ if (onInsertsNss == nss) {
+ ASSERT_FALSE(onInsertsCalledForNss);
+ onInsertsCalledForNss = true;
+ // TODO Check that (onInsertsNss.dbName() == _dbName) once the OplogEntry deserializer
+ // passes "tid" to the NamespaceString constructor
+ ASSERT_EQUALS(onInsertsNss.dbName().db(), _dbName.toStringWithTenantId());
+ ASSERT_EQUALS(onInsertsNss.coll(), "bar");
ASSERT_EQUALS(1, docs.size());
ASSERT_BSONOBJ_EQ(docs[0], entry.getObject());
- };
+ }
+ };
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
- ASSERT_TRUE(onInsertsCalled);
+ ASSERT_TRUE(onInsertsCalledForNss);
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
@@ -651,8 +641,9 @@ TEST_F(TenantOplogApplierTest, ApplyInserts_Grouped) {
NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar");
NamespaceString nss2 =
NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "baz");
- auto uuid1 = createCollectionWithUuid(_opCtx.get(), nss1);
- auto uuid2 = createCollectionWithUuid(_opCtx.get(), nss2);
+ auto opCtx = cc().makeOperationContext();
+ auto uuid1 = createCollectionWithUuid(opCtx.get(), nss1);
+ auto uuid2 = createCollectionWithUuid(opCtx.get(), nss2);
std::vector<OplogEntry> entries;
bool onInsertsCalledNss1 = false;
bool onInsertsCalledNss2 = false;
@@ -677,7 +668,7 @@ TEST_F(TenantOplogApplierTest, ApplyInserts_Grouped) {
for (int i = 3; i < 6; i++) {
ASSERT_BSONOBJ_EQ(docs[i], entries[i + 1].getObject());
}
- } else {
+ } else if (nss == nss2) {
ASSERT_EQUALS(nss2, nss);
ASSERT_FALSE(onInsertsCalledNss2);
onInsertsCalledNss2 = true;
@@ -689,274 +680,248 @@ TEST_F(TenantOplogApplierTest, ApplyInserts_Grouped) {
// Make sure all ops end up in a single thread so they can be batched.
auto writerPool = makeTenantMigrationWriterPool(1);
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
auto opAppliedFuture = applier->getNotificationForOpTime(entries.back().getOpTime());
ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
ASSERT_TRUE(onInsertsCalledNss1);
ASSERT_TRUE(onInsertsCalledNss2);
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyUpdate_MissingDocument) {
NamespaceString nss =
NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar");
- auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
+ auto opCtx = cc().makeOperationContext();
+ auto uuid = createCollectionWithUuid(opCtx.get(), nss);
auto entry = makeOplogEntry(repl::OpTypeEnum::kUpdate,
nss,
uuid,
update_oplog_entry::makeDeltaOplogEntry(
BSON(doc_diff::kUpdateSectionFieldName << fromjson("{a: 1}"))),
BSON("_id" << 0));
- bool onInsertsCalled = false;
- bool onUpdateCalled = false;
- _opObserver->onInsertsFn =
- [&](OperationContext* opCtx, const NamespaceString& nss, const std::vector<BSONObj>& docs) {
- onInsertsCalled = true;
- };
- _opObserver->onUpdateFn = [&](OperationContext* opCtx, const OplogUpdateEntryArgs&) {
- onUpdateCalled = true;
+ bool onInsertsCalledForNss = false;
+ bool onUpdateCalledForNss = false;
+ _opObserver->onInsertsFn = [&](OperationContext* opCtx,
+ const NamespaceString& onInsertsNss,
+ const std::vector<BSONObj>& docs) {
+ if (onInsertsNss == nss) {
+ onInsertsCalledForNss = true;
+ }
+ };
+ _opObserver->onUpdateFn = [&](OperationContext* opCtx,
+ const OplogUpdateEntryArgs& onUpdateArgs) {
+ if (onUpdateArgs.coll->ns() == nss) {
+ onUpdateCalledForNss = true;
+ }
};
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
// Updates to missing documents should just be dropped, neither inserted nor updated.
- ASSERT_FALSE(onInsertsCalled);
- ASSERT_FALSE(onUpdateCalled);
+ ASSERT_FALSE(onInsertsCalledForNss);
+ ASSERT_FALSE(onUpdateCalledForNss);
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyUpdate_Success) {
NamespaceString nss =
NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar");
- auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
- ASSERT_OK(getStorageInterface()->insertDocument(_opCtx.get(), nss, {BSON("_id" << 0)}, 0));
+ auto opCtx = cc().makeOperationContext();
+ auto uuid = createCollectionWithUuid(opCtx.get(), nss);
+ ASSERT_OK(getStorageInterface()->insertDocument(opCtx.get(), nss, {BSON("_id" << 0)}, 0));
auto entry = makeOplogEntry(repl::OpTypeEnum::kUpdate,
nss,
uuid,
update_oplog_entry::makeDeltaOplogEntry(
BSON(doc_diff::kUpdateSectionFieldName << fromjson("{a: 1}"))),
BSON("_id" << 0));
- bool onUpdateCalled = false;
- _opObserver->onUpdateFn = [&](OperationContext* opCtx, const OplogUpdateEntryArgs& args) {
- onUpdateCalled = true;
- ASSERT_EQUALS(nss, args.coll->ns());
- ASSERT_EQUALS(uuid, args.coll->uuid());
+ bool onUpdateCalledForNss = false;
+ _opObserver->onUpdateFn = [&](OperationContext* opCtx,
+ const OplogUpdateEntryArgs& onUpdateArgs) {
+ if (onUpdateArgs.coll->ns() == nss) {
+ onUpdateCalledForNss = true;
+ ASSERT_EQUALS(nss, onUpdateArgs.coll->ns());
+ ASSERT_EQUALS(uuid, onUpdateArgs.coll->uuid());
+ }
};
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
- ASSERT_TRUE(onUpdateCalled);
+ ASSERT_TRUE(onUpdateCalledForNss);
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyDelete_DatabaseMissing) {
- auto entry = makeOplogEntry(
- OpTypeEnum::kDelete,
- NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar"),
- UUID::gen());
- bool onDeleteCalled = false;
- _opObserver->onDeleteFn =
- [&](OperationContext* opCtx, const CollectionPtr&, StmtId, const OplogDeleteEntryArgs&) {
- onDeleteCalled = true;
- };
+ NamespaceString nss =
+ NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar");
+ auto entry = makeOplogEntry(OpTypeEnum::kDelete, nss, UUID::gen());
+ bool onDeleteCalledForNss = false;
+ _opObserver->onDeleteFn = [&](OperationContext* opCtx,
+ const CollectionPtr& coll,
+ StmtId,
+ const OplogDeleteEntryArgs&) {
+ if (coll->ns() == nss) {
+ onDeleteCalledForNss = true;
+ }
+ };
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
// Since no database was available, the delete shouldn't actually happen.
- ASSERT_FALSE(onDeleteCalled);
+ ASSERT_FALSE(onDeleteCalledForNss);
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ auto opCtx = cc().makeOperationContext();
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyDelete_CollectionMissing) {
- createDatabase(_opCtx.get(), _dbName.toString());
- auto entry = makeOplogEntry(
- OpTypeEnum::kDelete,
- NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar"),
- UUID::gen());
- bool onDeleteCalled = false;
- _opObserver->onDeleteFn =
- [&](OperationContext* opCtx, const CollectionPtr&, StmtId, const OplogDeleteEntryArgs&) {
- onDeleteCalled = true;
- };
+ auto opCtx = cc().makeOperationContext();
+ createDatabase(opCtx.get(), _dbName.toString());
+ NamespaceString nss =
+ NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar");
+ auto entry = makeOplogEntry(OpTypeEnum::kDelete, nss, UUID::gen());
+ bool onDeleteCalledForNss = false;
+ _opObserver->onDeleteFn = [&](OperationContext* opCtx,
+ const CollectionPtr& coll,
+ StmtId,
+ const OplogDeleteEntryArgs&) {
+ if (coll->ns() == nss) {
+ onDeleteCalledForNss = true;
+ }
+ };
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
// Since no collection was available, the delete shouldn't actually happen.
- ASSERT_FALSE(onDeleteCalled);
+ ASSERT_FALSE(onDeleteCalledForNss);
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyDelete_DocumentMissing) {
NamespaceString nss =
NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar");
- auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
+ auto opCtx = cc().makeOperationContext();
+ auto uuid = createCollectionWithUuid(opCtx.get(), nss);
auto entry = makeOplogEntry(OpTypeEnum::kDelete, nss, uuid, BSON("_id" << 0));
- bool onDeleteCalled = false;
- _opObserver->onDeleteFn =
- [&](OperationContext* opCtx, const CollectionPtr&, StmtId, const OplogDeleteEntryArgs&) {
- onDeleteCalled = true;
- };
+ bool onDeleteCalledForNss = false;
+ _opObserver->onDeleteFn = [&](OperationContext* opCtx,
+ const CollectionPtr& coll,
+ StmtId,
+ const OplogDeleteEntryArgs&) {
+ if (coll->ns() == nss) {
+ onDeleteCalledForNss = true;
+ }
+ };
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
// Since the document wasn't available, onDelete should not be called.
- ASSERT_FALSE(onDeleteCalled);
+ ASSERT_FALSE(onDeleteCalledForNss);
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyDelete_Success) {
NamespaceString nss =
NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar");
- auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
- ASSERT_OK(getStorageInterface()->insertDocument(_opCtx.get(), nss, {BSON("_id" << 0)}, 0));
+ auto opCtx = cc().makeOperationContext();
+ auto uuid = createCollectionWithUuid(opCtx.get(), nss);
+ ASSERT_OK(getStorageInterface()->insertDocument(opCtx.get(), nss, {BSON("_id" << 0)}, 0));
auto entry = makeOplogEntry(OpTypeEnum::kDelete, nss, uuid, BSON("_id" << 0));
- bool onDeleteCalled = false;
+ bool onDeleteCalledForNss = false;
_opObserver->onDeleteFn = [&](OperationContext* opCtx,
const CollectionPtr& coll,
StmtId,
const OplogDeleteEntryArgs& args) {
- onDeleteCalled = true;
- ASSERT_TRUE(opCtx);
- ASSERT_TRUE(opCtx->lockState()->isDbLockedForMode(nss.dbName(), MODE_IX));
- ASSERT_TRUE(opCtx->lockState()->isCollectionLockedForMode(nss, MODE_IX));
- ASSERT_TRUE(opCtx->writesAreReplicated());
- ASSERT_FALSE(args.fromMigrate);
- // TODO SERVER-70007 Check that (nss.dbName() == _dbName) once the OplogEntry deserializer
- // passes "tid" to the NamespaceString constructor
- ASSERT_EQUALS(nss.dbName().db(), _dbName.toStringWithTenantId());
- ASSERT_EQUALS(nss.coll(), "bar");
- ASSERT_EQUALS(uuid, coll->uuid());
+ if (coll->ns() == nss) {
+ onDeleteCalledForNss = true;
+ ASSERT_TRUE(opCtx);
+ ASSERT_TRUE(opCtx->lockState()->isDbLockedForMode(nss.dbName(), MODE_IX));
+ ASSERT_TRUE(opCtx->lockState()->isCollectionLockedForMode(nss, MODE_IX));
+ ASSERT_TRUE(opCtx->writesAreReplicated());
+ ASSERT_FALSE(args.fromMigrate);
+ // TODO SERVER-70007 Check that (nss.dbName() == _dbName) once the OplogEntry
+ // deserializer passes "tid" to the NamespaceString constructor
+ ASSERT_EQUALS(nss.dbName().db(), _dbName.toStringWithTenantId());
+ ASSERT_EQUALS(nss.coll(), "bar");
+ ASSERT_EQUALS(uuid, coll->uuid());
+ }
};
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
- ASSERT_TRUE(onDeleteCalled);
+ ASSERT_TRUE(onDeleteCalledForNss);
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyCreateCollCommand_CollExisting) {
NamespaceString nss =
NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar");
- auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
+ auto opCtx = cc().makeOperationContext();
+ auto uuid = createCollectionWithUuid(opCtx.get(), nss);
auto op = BSON("op"
<< "c"
<< "ns" << nss.getCommandNS().ns() << "wall" << Date_t() << "o"
<< BSON("create" << nss.coll()) << "ts" << Timestamp(1, 1) << "ui" << uuid);
- bool applyCmdCalled = false;
+ bool onCreateCollectionCalledForNss = false;
_opObserver->onCreateCollectionFn = [&](OperationContext* opCtx,
const CollectionPtr&,
const NamespaceString& collNss,
const CollectionOptions&,
const BSONObj&) {
- applyCmdCalled = true;
+ if (collNss == nss) {
+ onCreateCollectionCalledForNss = true;
+ }
};
auto entry = OplogEntry(op);
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
// Since the collection already exists, onCreateCollection should not happen.
- ASSERT_FALSE(applyCmdCalled);
+ ASSERT_FALSE(onCreateCollectionCalledForNss);
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
@@ -965,14 +930,15 @@ TEST_F(TenantOplogApplierTest, ApplyRenameCollCommand_CollExisting) {
NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "foo");
NamespaceString nss2 =
NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar");
- auto uuid = createCollectionWithUuid(_opCtx.get(), nss2);
+ auto opCtx = cc().makeOperationContext();
+ auto uuid = createCollectionWithUuid(opCtx.get(), nss2);
auto op =
BSON("op"
<< "c"
<< "ns" << nss1.getCommandNS().ns() << "wall" << Date_t() << "o"
<< BSON("renameCollection" << nss1.ns() << "to" << nss2.ns() << "stayTemp" << false)
<< "ts" << Timestamp(1, 1) << "ui" << uuid);
- bool applyCmdCalled = false;
+ bool onRenameCollectionCalledForNss = false;
_opObserver->onRenameCollectionFn = [&](OperationContext* opCtx,
const NamespaceString& fromColl,
const NamespaceString& toColl,
@@ -980,27 +946,22 @@ TEST_F(TenantOplogApplierTest, ApplyRenameCollCommand_CollExisting) {
const boost::optional<UUID>& dropTargetUUID,
std::uint64_t numRecords,
bool stayTemp) {
- applyCmdCalled = true;
+ if (nss1 == fromColl) {
+ onRenameCollectionCalledForNss = true;
+ }
};
auto entry = OplogEntry(op);
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
// Since the collection already has the target name, onRenameCollection should not happen.
- ASSERT_FALSE(applyCmdCalled);
+ ASSERT_FALSE(onRenameCollectionCalledForNss);
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
@@ -1012,43 +973,40 @@ TEST_F(TenantOplogApplierTest, ApplyCreateCollCommand_Success) {
<< "c"
<< "ns" << nss.getCommandNS().ns() << "wall" << Date_t() << "o"
<< BSON("create" << nss.coll()) << "ts" << Timestamp(1, 1) << "ui" << UUID::gen());
- bool applyCmdCalled = false;
+ bool onCreateCollectionCalledForNss = false;
_opObserver->onCreateCollectionFn = [&](OperationContext* opCtx,
const CollectionPtr&,
const NamespaceString& collNss,
const CollectionOptions&,
const BSONObj&) {
- applyCmdCalled = true;
- ASSERT_TRUE(opCtx);
- ASSERT_TRUE(opCtx->lockState()->isDbLockedForMode(nss.dbName(), MODE_IX));
- ASSERT_TRUE(opCtx->writesAreReplicated());
- ASSERT_EQUALS(nss, collNss);
+ if (collNss == nss) {
+ onCreateCollectionCalledForNss = true;
+ ASSERT_TRUE(opCtx);
+ ASSERT_TRUE(opCtx->lockState()->isDbLockedForMode(nss.dbName(), MODE_IX));
+ ASSERT_TRUE(opCtx->writesAreReplicated());
+ ASSERT_EQUALS(nss, collNss);
+ }
};
auto entry = OplogEntry(op);
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
- ASSERT_TRUE(applyCmdCalled);
+ ASSERT_TRUE(onCreateCollectionCalledForNss);
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ auto opCtx = cc().makeOperationContext();
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyCreateIndexesCommand_Success) {
NamespaceString nss =
NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "t");
- auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
+ auto opCtx = cc().makeOperationContext();
+ auto uuid = createCollectionWithUuid(opCtx.get(), nss);
auto op =
BSON("op"
<< "c"
@@ -1056,47 +1014,43 @@ TEST_F(TenantOplogApplierTest, ApplyCreateIndexesCommand_Success) {
<< BSON("createIndexes" << nss.coll() << "v" << 2 << "key" << BSON("a" << 1) << "name"
<< "a_1")
<< "ts" << Timestamp(1, 1) << "ui" << uuid);
- bool applyCmdCalled = false;
+ bool onCreateIndexCalledForNss = false;
_opObserver->onCreateIndexFn = [&](OperationContext* opCtx,
const NamespaceString& collNss,
const UUID& collUuid,
BSONObj indexDoc,
bool fromMigrate) {
- ASSERT_FALSE(applyCmdCalled);
- applyCmdCalled = true;
- ASSERT_TRUE(opCtx->lockState()->isDbLockedForMode(nss.dbName(), MODE_IX));
- ASSERT_TRUE(opCtx->writesAreReplicated());
- ASSERT_BSONOBJ_EQ(indexDoc,
- BSON("v" << 2 << "key" << BSON("a" << 1) << "name"
- << "a_1"));
- ASSERT_EQUALS(nss, collNss);
- ASSERT_EQUALS(uuid, collUuid);
+ if (collNss == nss) {
+ ASSERT_FALSE(onCreateIndexCalledForNss);
+ onCreateIndexCalledForNss = true;
+ ASSERT_TRUE(opCtx->lockState()->isDbLockedForMode(nss.dbName(), MODE_IX));
+ ASSERT_TRUE(opCtx->writesAreReplicated());
+ ASSERT_BSONOBJ_EQ(indexDoc,
+ BSON("v" << 2 << "key" << BSON("a" << 1) << "name"
+ << "a_1"));
+ ASSERT_EQUALS(nss, collNss);
+ ASSERT_EQUALS(uuid, collUuid);
+ }
};
auto entry = OplogEntry(op);
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
- ASSERT_TRUE(applyCmdCalled);
+ ASSERT_TRUE(onCreateIndexCalledForNss);
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyStartIndexBuildCommand_Failure) {
NamespaceString nss =
NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "t");
- auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
+ auto opCtx = cc().makeOperationContext();
+ auto uuid = createCollectionWithUuid(opCtx.get(), nss);
auto op = BSON("op"
<< "c"
<< "ns" << nss.getCommandNS().ns() << "wall" << Date_t() << "o"
@@ -1108,19 +1062,12 @@ TEST_F(TenantOplogApplierTest, ApplyStartIndexBuildCommand_Failure) {
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_EQUALS(opAppliedFuture.getNoThrow().getStatus().code(), 5434700);
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
@@ -1132,32 +1079,28 @@ TEST_F(TenantOplogApplierTest, ApplyCreateCollCommand_WrongNSS) {
<< "c"
<< "ns" << nss.getCommandNS().ns() << "wall" << Date_t() << "o"
<< BSON("create" << nss.coll()) << "ts" << Timestamp(1, 1) << "ui" << UUID::gen());
- bool applyCmdCalled = false;
+ bool onCreateCollectionCalledForNss = false;
_opObserver->onCreateCollectionFn = [&](OperationContext* opCtx,
const CollectionPtr&,
const NamespaceString& collNss,
const CollectionOptions&,
const BSONObj&) {
- applyCmdCalled = true;
+ if (collNss == nss) {
+ onCreateCollectionCalledForNss = true;
+ }
};
auto entry = OplogEntry(op);
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_NOT_OK(opAppliedFuture.getNoThrow().getStatus());
- ASSERT_FALSE(applyCmdCalled);
+ ASSERT_FALSE(onCreateCollectionCalledForNss);
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ auto opCtx = cc().makeOperationContext();
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
@@ -1169,79 +1112,71 @@ TEST_F(TenantOplogApplierTest, ApplyCreateCollCommand_WrongNSS_Merge) {
<< "c"
<< "ns" << nss.getCommandNS().ns() << "wall" << Date_t() << "o"
<< BSON("create" << nss.coll()) << "ts" << Timestamp(1, 1) << "ui" << UUID::gen());
- bool applyCmdCalled = false;
+ bool onCreateCollectionCalledForNss = false;
_opObserver->onCreateCollectionFn = [&](OperationContext* opCtx,
const CollectionPtr&,
const NamespaceString& collNss,
const CollectionOptions&,
const BSONObj&) {
- applyCmdCalled = true;
+ if (collNss == nss) {
+ onCreateCollectionCalledForNss = true;
+ }
};
auto entry = OplogEntry(op);
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- auto applier = std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kShardMerge,
- boost::none,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeShardMergeOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_EQ(opAppliedFuture.getNoThrow().getStatus().code(), ErrorCodes::InvalidTenantId);
- ASSERT_FALSE(applyCmdCalled);
+ ASSERT_FALSE(onCreateCollectionCalledForNss);
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ auto opCtx = cc().makeOperationContext();
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyDropIndexesCommand_IndexNotFound) {
NamespaceString nss =
NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar");
- auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
+ auto opCtx = cc().makeOperationContext();
+ auto uuid = createCollectionWithUuid(opCtx.get(), nss);
auto op = BSON("op"
<< "c"
<< "ns" << nss.getCommandNS().ns() << "wall" << Date_t() << "o"
<< BSON("dropIndexes" << nss.coll() << "index"
<< "a_1")
<< "ts" << Timestamp(1, 1) << "ui" << uuid);
- bool applyCmdCalled = false;
+ bool onDropIndexCalledForNss = false;
_opObserver->onDropIndexFn = [&](OperationContext* opCtx,
- const NamespaceString& nss,
+ const NamespaceString& onDropIndexNss,
const boost::optional<UUID>& uuid,
const std::string& indexName,
const BSONObj& idxDescriptor) {
- applyCmdCalled = true;
+ onDropIndexCalledForNss = true;
};
auto entry = OplogEntry(op);
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
// The IndexNotFound error should be ignored and drop index should not happen.
- ASSERT_FALSE(applyCmdCalled);
+ ASSERT_FALSE(onDropIndexCalledForNss);
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyCollModCommand_IndexNotFound) {
NamespaceString nss =
NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar");
- auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
+ auto opCtx = cc().makeOperationContext();
+ auto uuid = createCollectionWithUuid(opCtx.get(), nss);
auto op = BSON("op"
<< "c"
<< "ns" << nss.getCommandNS().ns() << "wall" << Date_t() << "o"
@@ -1250,40 +1185,36 @@ TEST_F(TenantOplogApplierTest, ApplyCollModCommand_IndexNotFound) {
<< "data_1"
<< "hidden" << true))
<< "ts" << Timestamp(1, 1) << "ui" << uuid);
- bool applyCmdCalled = false;
+ bool onCollModCalledForNss = false;
_opObserver->onCollModFn = [&](OperationContext* opCtx,
- const NamespaceString& nss,
+ const NamespaceString& onCollModNss,
const UUID& uuid,
const BSONObj& collModCmd,
const CollectionOptions& oldCollOptions,
boost::optional<IndexCollModInfo> indexInfo) {
- applyCmdCalled = true;
+ if (onCollModNss == nss) {
+ onCollModCalledForNss = true;
+ }
};
auto entry = OplogEntry(op);
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
// The IndexNotFound error should be ignored and collMod should not happen.
- ASSERT_FALSE(applyCmdCalled);
+ ASSERT_FALSE(onCollModCalledForNss);
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyCollModCommand_CollectionMissing) {
- createDatabase(_opCtx.get(), _dbName.toString());
+ auto opCtx = cc().makeOperationContext();
+ createDatabase(opCtx.get(), _dbName.toString());
NamespaceString nss =
NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar");
UUID uuid(UUID::gen());
@@ -1295,65 +1226,57 @@ TEST_F(TenantOplogApplierTest, ApplyCollModCommand_CollectionMissing) {
<< "data_1"
<< "hidden" << true))
<< "ts" << Timestamp(1, 1) << "ui" << uuid);
- bool applyCmdCalled = false;
+ bool onCollModCalledForNss = false;
_opObserver->onCollModFn = [&](OperationContext* opCtx,
- const NamespaceString& nss,
+ const NamespaceString& onCollModNss,
const UUID& uuid,
const BSONObj& collModCmd,
const CollectionOptions& oldCollOptions,
boost::optional<IndexCollModInfo> indexInfo) {
- applyCmdCalled = true;
+ if (onCollModNss == nss) {
+ onCollModCalledForNss = true;
+ }
};
auto entry = OplogEntry(op);
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
// The NamespaceNotFound error should be ignored and collMod should not happen.
- ASSERT_FALSE(applyCmdCalled);
+ ASSERT_FALSE(onCollModCalledForNss);
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyCRUD_WrongNSS) {
// Should not be able to apply a CRUD operation to a namespace not belonging to us.
NamespaceString nss = NamespaceString::createNamespaceString_forTest("notmytenant", "bar");
- auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
+ auto opCtx = cc().makeOperationContext();
+ auto uuid = createCollectionWithUuid(opCtx.get(), nss);
auto entry = makeInsertOplogEntry(1, nss, uuid);
- bool onInsertsCalled = false;
- _opObserver->onInsertsFn =
- [&](OperationContext* opCtx, const NamespaceString& nss, const std::vector<BSONObj>& docs) {
- onInsertsCalled = true;
- };
+ bool onInsertsCalledForNss = false;
+ _opObserver->onInsertsFn = [&](OperationContext* opCtx,
+ const NamespaceString& onInsertsNss,
+ const std::vector<BSONObj>& docs) {
+ if (onInsertsNss == nss) {
+ onInsertsCalledForNss = true;
+ }
+ };
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_NOT_OK(opAppliedFuture.getNoThrow().getStatus());
- ASSERT_FALSE(onInsertsCalled);
+ ASSERT_FALSE(onInsertsCalledForNss);
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
@@ -1363,29 +1286,27 @@ TEST_F(TenantOplogApplierTest, ApplyCRUD_WrongNSS_Merge) {
// Should not be able to apply a CRUD operation to a namespace not belonging to us.
NamespaceString nss =
NamespaceString::createNamespaceString_forTest(DatabaseName(invalidTenant, "test"), "bar");
- auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
+ auto opCtx = cc().makeOperationContext();
+ auto uuid = createCollectionWithUuid(opCtx.get(), nss);
auto entry = makeInsertOplogEntry(1, nss, uuid);
- bool onInsertsCalled = false;
- _opObserver->onInsertsFn =
- [&](OperationContext* opCtx, const NamespaceString& nss, const std::vector<BSONObj>& docs) {
- onInsertsCalled = true;
- };
+ bool onInsertsCalledForNss = false;
+ _opObserver->onInsertsFn = [&](OperationContext* opCtx,
+ const NamespaceString& onInsertsNss,
+ const std::vector<BSONObj>& docs) {
+ if (onInsertsNss == nss) {
+ onInsertsCalledForNss = true;
+ }
+ };
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- auto applier = std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kShardMerge,
- boost::none,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeShardMergeOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_EQ(opAppliedFuture.getNoThrow().getStatus().code(), ErrorCodes::InvalidTenantId);
- ASSERT_FALSE(onInsertsCalled);
+ ASSERT_FALSE(onInsertsCalledForNss);
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
@@ -1394,30 +1315,25 @@ TEST_F(TenantOplogApplierTest, ApplyCRUD_WrongUUID) {
// we claim it does in the nss field.
NamespaceString nss = NamespaceString::createNamespaceString_forTest("notmytenant", "bar");
NamespaceString nss_to_apply = NamespaceString::createNamespaceString_forTest(_dbName, "bar");
- auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
+ auto opCtx = cc().makeOperationContext();
+ auto uuid = createCollectionWithUuid(opCtx.get(), nss);
auto entry = makeInsertOplogEntry(1, nss_to_apply, uuid);
- bool onInsertsCalled = false;
- _opObserver->onInsertsFn =
- [&](OperationContext* opCtx, const NamespaceString& nss, const std::vector<BSONObj>& docs) {
- onInsertsCalled = true;
- };
+ bool onInsertsCalledForNss = false;
+ _opObserver->onInsertsFn = [&](OperationContext* opCtx,
+ const NamespaceString& onInsertsNss,
+ const std::vector<BSONObj>& docs) {
+ onInsertsCalledForNss = true;
+ };
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime());
ASSERT_NOT_OK(opAppliedFuture.getNoThrow().getStatus());
- ASSERT_FALSE(onInsertsCalled);
+ ASSERT_FALSE(onInsertsCalledForNss);
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
@@ -1427,14 +1343,7 @@ TEST_F(TenantOplogApplierTest, ApplyNoop_Success) {
pushOps(srcOps);
auto writerPool = makeTenantMigrationWriterPool();
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
auto opAppliedFuture = applier->getNotificationForOpTime(srcOps[0].getOpTime());
auto futureRes = opAppliedFuture.getNoThrow();
@@ -1447,7 +1356,8 @@ TEST_F(TenantOplogApplierTest, ApplyNoop_Success) {
ASSERT_EQUALS(futureRes.getValue().recipientOpTime, entries[0].getOpTime());
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ auto opCtx = cc().makeOperationContext();
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
@@ -1457,14 +1367,7 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenNoop_Success) {
pushOps(srcOps);
auto writerPool = makeTenantMigrationWriterPool();
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
auto opAppliedFuture = applier->getNotificationForOpTime(srcOps[0].getOpTime());
auto futureRes = opAppliedFuture.getNoThrow();
@@ -1477,7 +1380,8 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenNoop_Success) {
ASSERT_EQUALS(futureRes.getValue().recipientOpTime, OpTime());
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ auto opCtx = cc().makeOperationContext();
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
@@ -1491,14 +1395,7 @@ TEST_F(TenantOplogApplierTest, ApplyInsertThenResumeTokenNoopInDifferentBatch_Su
pushOps(srcOps);
auto writerPool = makeTenantMigrationWriterPool();
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
tenantApplierBatchSizeBytes.store(100 * 1024 /* bytes */);
tenantApplierBatchSizeOps.store(1 /* ops */);
@@ -1516,7 +1413,8 @@ TEST_F(TenantOplogApplierTest, ApplyInsertThenResumeTokenNoopInDifferentBatch_Su
ASSERT_EQUALS(futureRes.getValue().recipientOpTime, entries[0].getOpTime());
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ auto opCtx = cc().makeOperationContext();
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
@@ -1530,14 +1428,7 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenNoopThenInsertInSameBatch_Success
pushOps(srcOps);
auto writerPool = makeTenantMigrationWriterPool();
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
auto opAppliedFuture = applier->getNotificationForOpTime(srcOps[1].getOpTime());
auto futureRes = opAppliedFuture.getNoThrow();
@@ -1551,7 +1442,8 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenNoopThenInsertInSameBatch_Success
ASSERT_EQUALS(futureRes.getValue().recipientOpTime, entries[0].getOpTime());
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ auto opCtx = cc().makeOperationContext();
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
@@ -1566,14 +1458,7 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenInsertThenNoopSameTimestamp_Succe
ASSERT_EQ(srcOps[0].getOpTime(), srcOps[1].getOpTime());
auto writerPool = makeTenantMigrationWriterPool();
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
auto opAppliedFuture = applier->getNotificationForOpTime(srcOps[1].getOpTime());
auto futureRes = opAppliedFuture.getNoThrow();
@@ -1587,7 +1472,8 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenInsertThenNoopSameTimestamp_Succe
ASSERT_EQUALS(futureRes.getValue().recipientOpTime, entries[0].getOpTime());
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ auto opCtx = cc().makeOperationContext();
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
@@ -1601,14 +1487,7 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenInsertThenNoop_Success) {
pushOps(srcOps);
auto writerPool = makeTenantMigrationWriterPool();
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
auto opAppliedFuture = applier->getNotificationForOpTime(srcOps[1].getOpTime());
auto futureRes = opAppliedFuture.getNoThrow();
@@ -1622,22 +1501,24 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenInsertThenNoop_Success) {
ASSERT_EQUALS(futureRes.getValue().recipientOpTime, entries[0].getOpTime());
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
+ auto opCtx = cc().makeOperationContext();
+ _oplogBuffer.shutdown(opCtx.get());
applier->join();
}
TEST_F(TenantOplogApplierTest, ApplyInsert_MultiKeyIndex) {
- createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace);
+ auto opCtx = cc().makeOperationContext();
+ createCollectionWithUuid(opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace);
NamespaceString indexedNss(_dbName.toStringWithTenantId(), "indexedColl");
NamespaceString nonIndexedNss(_dbName.toStringWithTenantId(), "nonIndexedColl");
- auto indexedCollUUID = createCollectionWithUuid(_opCtx.get(), indexedNss);
- createCollection(_opCtx.get(), nonIndexedNss, CollectionOptions());
+ auto indexedCollUUID = createCollectionWithUuid(opCtx.get(), indexedNss);
+ createCollection(opCtx.get(), nonIndexedNss, CollectionOptions());
// Create index on the collection.
auto indexKey = BSON("val" << 1);
auto spec = BSON("v" << int(IndexDescriptor::kLatestIndexVersion) << "key" << indexKey << "name"
<< "val_1");
- createIndex(_opCtx.get(), indexedNss, indexedCollUUID, spec);
+ createIndex(opCtx.get(), indexedNss, indexedCollUUID, spec);
const BSONObj multiKeyDoc = BSON("_id" << 1 << "val" << BSON_ARRAY(1 << 2));
const BSONObj singleKeyDoc = BSON("_id" << 2 << "val" << 1);
@@ -1653,25 +1534,209 @@ TEST_F(TenantOplogApplierTest, ApplyInsert_MultiKeyIndex) {
// writer worker thread to ensure that the same opCtx is used.
auto writerPool = makeTenantMigrationWriterPool(1);
- auto applier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- MigrationProtocolEnum::kMultitenantMigrations,
- _tenantId,
- OpTime(),
- &_oplogBuffer,
- _executor,
- writerPool.get());
+ auto applier = makeTenantMigrationOplogApplier(writerPool.get());
ASSERT_OK(applier->startup());
auto opAppliedFuture = applier->getNotificationForOpTime(unindexedOp.getOpTime());
ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
- ASSERT_TRUE(docExists(_opCtx.get(), indexedNss, multiKeyDoc));
- ASSERT_TRUE(docExists(_opCtx.get(), nonIndexedNss, singleKeyDoc));
+ ASSERT_TRUE(docExists(opCtx.get(), indexedNss, multiKeyDoc));
+ ASSERT_TRUE(docExists(opCtx.get(), nonIndexedNss, singleKeyDoc));
+
+ applier->shutdown();
+ _oplogBuffer.shutdown(opCtx.get());
+ applier->join();
+}
+
+TEST_F(TenantOplogApplierTest, StoresOplogApplierProgress) {
+ auto nss =
+ NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar");
+ auto entry1 = makeInsertOplogEntry(1, nss, UUID::gen());
+ pushOps({entry1});
+ auto writerPool = makeTenantMigrationWriterPool();
+
+ auto progressNss =
+ NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "progress");
+
+ auto applier =
+ makeTenantMigrationOplogApplier(writerPool.get(), OpTime(), OpTime(), progressNss);
+ ASSERT_OK(applier->startup());
+ auto opAppliedFuture = applier->getNotificationForOpTime(entry1.getOpTime());
+ ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
+
+ auto opCtx = cc().makeOperationContext();
+ auto progress = applier->getStoredProgress(opCtx.get());
+ ASSERT_TRUE(progress.has_value());
+ ASSERT_EQ(progress->getDonorOpTime(), entry1.getOpTime());
+
+ auto entry2 = makeInsertOplogEntry(2, nss, UUID::gen());
+ pushOps({entry2});
+
+ opAppliedFuture = applier->getNotificationForOpTime(entry2.getOpTime());
+ ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
+
+ progress = applier->getStoredProgress(opCtx.get());
+ ASSERT_TRUE(progress.has_value());
+ ASSERT_EQ(progress->getDonorOpTime(), entry2.getOpTime());
+
+ ASSERT_EQ(applier->getNumOpsApplied(), 2);
+
+ applier->shutdown();
+ _oplogBuffer.shutdown(opCtx.get());
+ applier->join();
+}
+
+TEST_F(TenantOplogApplierTest, ResumesOplogApplierProgress) {
+ auto nss =
+ NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar");
+ auto entry1 = makeInsertOplogEntry(1, nss, UUID::gen());
+ auto entry2 = makeInsertOplogEntry(2, nss, UUID::gen());
+ pushOps({entry1, entry2});
+ auto writerPool = makeTenantMigrationWriterPool();
+
+ auto progressNss =
+ NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "progress");
+
+ auto applier = makeTenantMigrationOplogApplier(
+ writerPool.get(), OpTime(Timestamp(1, 0), 0), OpTime(Timestamp(1, 0), 0), progressNss);
+ ASSERT_OK(applier->startup());
+ auto opAppliedFuture = applier->getNotificationForOpTime(entry1.getOpTime());
+ ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
+
+ opAppliedFuture = applier->getNotificationForOpTime(entry2.getOpTime());
+ ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
+
+ ASSERT_EQ(applier->getNumOpsApplied(), 2);
+
+ {
+ auto opCtx = cc().makeOperationContext();
+ _oplogBuffer.clear(opCtx.get());
+ }
+
+ applier->shutdown();
+ applier->join();
+
+ auto entry3 = makeInsertOplogEntry(3, nss, UUID::gen());
+ pushOps({entry1, entry2, entry3});
+
+ applier = makeTenantMigrationOplogApplier(writerPool.get(),
+ OpTime(Timestamp(1, 0), 0),
+ OpTime(Timestamp(1, 0), 0),
+ progressNss,
+ true);
+ ASSERT_OK(applier->startup());
+
+ opAppliedFuture = applier->getNotificationForOpTime(entry3.getOpTime());
+ ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
+
+ ASSERT_EQ(applier->getNumOpsApplied(), 1);
+
+ applier->shutdown();
+ applier->join();
+
+ auto opCtx = cc().makeOperationContext();
+ _oplogBuffer.shutdown(opCtx.get());
+}
+
+TEST_F(TenantOplogApplierTest, ResumeOplogApplierDoesNotReApplyPreviouslyAppliedRetryableWrites) {
+ {
+ auto opCtx = cc().makeOperationContext();
+ createCollectionWithUuid(opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace);
+ DBDirectClient client(opCtx.get());
+ client.createIndexes(NamespaceString::kSessionTransactionsTableNamespace,
+ {MongoDSessionCatalog::getConfigTxnPartialIndexSpec()});
+ }
+
+ auto getOplogEntryCount = [&]() {
+ auto opCtx = cc().makeOperationContext();
+ OplogInterfaceLocal oplog(opCtx.get());
+ auto oplogIter = oplog.makeIterator();
+ auto result = oplogIter->next();
+ auto oplogEntryCount = 0;
+ while (result.isOK()) {
+ oplogEntryCount++;
+ result = oplogIter->next();
+ }
+ return oplogEntryCount;
+ };
+
+ auto nss =
+ NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "bar");
+
+ const auto sessionId = makeLogicalSessionIdForTest();
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(sessionId);
+ sessionInfo.setTxnNumber(0);
+ auto retryableWrite = makeOplogEntry({Timestamp(2, 0), 1},
+ OpTypeEnum::kInsert,
+ nss,
+ BSON("_id" << 1),
+ boost::none,
+ sessionInfo,
+ Date_t::now(),
+ {0});
+
+ pushOps({retryableWrite});
+
+ auto writerPool = makeTenantMigrationWriterPool();
+
+ auto progressNss =
+ NamespaceString::createNamespaceString_forTest(_dbName.toStringWithTenantId(), "progress");
+
+ auto applier = makeTenantMigrationOplogApplier(
+ writerPool.get(), OpTime(Timestamp(1, 0), 0), OpTime(Timestamp(1, 0), 0), progressNss);
+ ASSERT_OK(applier->startup());
+
+ auto opAppliedFuture = applier->getNotificationForOpTime(retryableWrite.getOpTime());
+ ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
+
+ ASSERT_EQ(applier->getNumOpsApplied(), 1);
+
+ // The retryable write noop entry should have been logged.
+ ASSERT_EQ(getOplogEntryCount(), 1);
+
+ applier->shutdown();
+ applier->join();
+
+ // Delete progress collection documents to simulate the loss or absensce of progress data.
+ {
+ auto opCtx = cc().makeOperationContext();
+ auto storageInterface = StorageInterface::get(opCtx.get());
+ ASSERT_OK(storageInterface->deleteDocuments(opCtx.get(),
+ progressNss,
+ boost::none,
+ StorageInterface::ScanDirection::kForward,
+ {},
+ BoundInclusion::kIncludeStartKeyOnly,
+ 1U));
+ _oplogBuffer.clear(opCtx.get());
+ }
+
+ pushOps({retryableWrite});
+
+ // Create a new TenantOplogApplier with resume enabled. The first retryable write will be
+ // re-applied.
+ applier = makeTenantMigrationOplogApplier(writerPool.get(),
+ OpTime(Timestamp(1, 0), 0),
+ OpTime(Timestamp(1, 0), 0),
+ progressNss,
+ true);
+ ASSERT_OK(applier->startup());
+
+ opAppliedFuture = applier->getNotificationForOpTime(retryableWrite.getOpTime());
+ ASSERT_OK(opAppliedFuture.getNoThrow().getStatus());
+
+ ASSERT_EQ(applier->getNumOpsApplied(), 1);
+
+ // The retryable write noop entry should not have been logged again, so the count should be the
+ // same.
+ ASSERT_EQ(getOplogEntryCount(), 1);
applier->shutdown();
- _oplogBuffer.shutdown(_opCtx.get());
applier->join();
+
+ auto opCtx = cc().makeOperationContext();
+ _oplogBuffer.shutdown(opCtx.get());
}
} // namespace repl