summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
authorEric Cox <eric.cox@mongodb.com>2022-06-24 13:52:42 +0000
committerEric Cox <eric.cox@mongodb.com>2022-06-24 13:52:42 +0000
commite41eb06388b603a2575e826d87051eebd38d52f5 (patch)
tree2fd04f7aa3047bacb6b5f81ea802ae51ecd7b844 /src/mongo/db/repl
parente27fb371450c1aecbf3045c13c9a5257560ee615 (diff)
parentd37641e0439f48745a656272a09eb121636ae7a2 (diff)
downloadmongo-e41eb06388b603a2575e826d87051eebd38d52f5.tar.gz
Merge branch 'master' into eric/id-hack-ix-scan-refactor
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/SConscript5
-rw-r--r--src/mongo/db/repl/apply_ops.cpp12
-rw-r--r--src/mongo/db/repl/collection_bulk_loader_impl.cpp3
-rw-r--r--src/mongo/db/repl/collection_cloner.cpp45
-rw-r--r--src/mongo/db/repl/collection_cloner.h6
-rw-r--r--src/mongo/db/repl/data_replicator_external_state.h2
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_impl.cpp2
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_impl.h2
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_mock.cpp6
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_mock.h2
-rw-r--r--src/mongo/db/repl/idempotency_test.cpp2
-rw-r--r--src/mongo/db/repl/oplog.cpp4
-rw-r--r--src/mongo/db/repl/oplog_applier_impl.cpp15
-rw-r--r--src/mongo/db/repl/oplog_applier_impl_test.cpp41
-rw-r--r--src/mongo/db/repl/oplog_entry.idl3
-rw-r--r--src/mongo/db/repl/oplog_entry_test.cpp4
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp80
-rw-r--r--src/mongo/db/repl/oplog_fetcher.h9
-rw-r--r--src/mongo/db/repl/oplog_fetcher_test.cpp43
-rw-r--r--src/mongo/db/repl/primary_only_service.cpp3
-rw-r--r--src/mongo/db/repl/repl_set_commands.cpp7
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp81
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h29
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp74
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp99
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp16
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h4
-rw-r--r--src/mongo/db/repl/roll_back_local_operations_test.cpp3
-rw-r--r--src/mongo/db/repl/rollback_source_impl.cpp3
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp36
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp24
-rw-r--r--src/mongo/db/repl/storage_interface_impl_test.cpp5
-rw-r--r--src/mongo/db/repl/storage_timestamp_test.cpp6
-rw-r--r--src/mongo/db/repl/tenant_collection_cloner.cpp56
-rw-r--r--src/mongo/db/repl/tenant_collection_cloner.h6
-rw-r--r--src/mongo/db/repl/tenant_file_cloner.cpp13
-rw-r--r--src/mongo/db/repl/tenant_file_cloner.h2
-rw-r--r--src/mongo/db/repl/tenant_file_importer_service.cpp62
-rw-r--r--src/mongo/db/repl/tenant_file_importer_service.h92
-rw-r--r--src/mongo/db/repl/tenant_migration_access_blocker_util.cpp4
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp9
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp21
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier_test.cpp117
-rw-r--r--src/mongo/db/repl/topology_coordinator.cpp6
-rw-r--r--src/mongo/db/repl/topology_coordinator.h2
46 files changed, 584 insertions, 484 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index e9bcecfbdbf..962477568b3 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -261,6 +261,7 @@ env.Library(
'$BUILD_DIR/mongo/db/catalog/catalog_helpers',
'$BUILD_DIR/mongo/db/catalog/database_holder',
'$BUILD_DIR/mongo/db/catalog/multi_index_block',
+ '$BUILD_DIR/mongo/db/change_stream_change_collection_manager',
'$BUILD_DIR/mongo/db/common',
'$BUILD_DIR/mongo/db/concurrency/exception_util',
'$BUILD_DIR/mongo/db/dbhelpers',
@@ -529,9 +530,11 @@ env.Library(
'roll_back_local_operations',
],
LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/catalog/database_holder',
'$BUILD_DIR/mongo/db/catalog/import_collection_oplog_entry',
'$BUILD_DIR/mongo/db/index_builds_coordinator_interface',
'$BUILD_DIR/mongo/db/multitenancy',
+ '$BUILD_DIR/mongo/db/repl/tenant_migration_access_blocker',
'$BUILD_DIR/mongo/db/s/sharding_runtime_d',
'$BUILD_DIR/mongo/db/storage/historical_ident_tracker',
'$BUILD_DIR/mongo/idl/server_parameter',
@@ -619,6 +622,7 @@ env.Library(
'storage_interface',
],
LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/change_stream_change_collection_manager',
'$BUILD_DIR/mongo/db/commands/mongod_fsync',
'$BUILD_DIR/mongo/db/concurrency/exception_util',
'$BUILD_DIR/mongo/db/storage/storage_control',
@@ -1705,6 +1709,7 @@ if wiredtiger:
'$BUILD_DIR/mongo/db/logical_time',
'$BUILD_DIR/mongo/db/multitenancy',
'$BUILD_DIR/mongo/db/op_observer_impl',
+ '$BUILD_DIR/mongo/db/pipeline/change_stream_expired_pre_image_remover',
'$BUILD_DIR/mongo/db/query/command_request_response',
'$BUILD_DIR/mongo/db/s/sharding_runtime_d',
'$BUILD_DIR/mongo/db/service_context_d_test_fixture',
diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp
index 972c1fb2580..4887982c95c 100644
--- a/src/mongo/db/repl/apply_ops.cpp
+++ b/src/mongo/db/repl/apply_ops.cpp
@@ -28,11 +28,10 @@
*/
-#include "mongo/platform/basic.h"
-
#include "mongo/db/repl/apply_ops.h"
#include "mongo/bson/util/bson_extract.h"
+#include "mongo/client/client_deprecated.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/database_holder.h"
@@ -297,12 +296,11 @@ Status _checkPrecondition(OperationContext* opCtx,
DBDirectClient db(opCtx);
// The preconditions come in "q: {{query: {...}, orderby: ..., etc.}}" format. This format
// is no longer used either internally or over the wire in other contexts. We are using a
- // legacy API from 'DBDirectClient' in order to parse this format and convert it into the
+ // legacy API from 'client_deprecated' in order to parse this format and convert it into the
// corresponding find command.
- auto preconditionQuery = Query::fromBSONDeprecated(preCondition["q"].Obj());
- auto cursor =
- db.query_DEPRECATED(nss, preconditionQuery.getFilter(), preconditionQuery, 1 /*limit*/);
- BSONObj realres = cursor->more() ? cursor->nextSafe() : BSONObj{};
+ FindCommandRequest findCmd{nss};
+ client_deprecated::initFindFromLegacyOptions(preCondition["q"].Obj(), 0, &findCmd);
+ BSONObj realres = db.findOne(std::move(findCmd));
// Get collection default collation.
auto databaseHolder = DatabaseHolder::get(opCtx);
diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.cpp b/src/mongo/db/repl/collection_bulk_loader_impl.cpp
index 574607ea257..f000e93150c 100644
--- a/src/mongo/db/repl/collection_bulk_loader_impl.cpp
+++ b/src/mongo/db/repl/collection_bulk_loader_impl.cpp
@@ -95,7 +95,8 @@ Status CollectionBulkLoaderImpl::init(const std::vector<BSONObj>& secondaryIndex
UnreplicatedWritesBlock uwb(_opCtx.get());
// This enforces the buildIndexes setting in the replica set configuration.
CollectionWriter collWriter(_opCtx.get(), *_collection);
- auto indexCatalog = collWriter.getWritableCollection()->getIndexCatalog();
+ auto indexCatalog =
+ collWriter.getWritableCollection(_opCtx.get())->getIndexCatalog();
auto specs = indexCatalog->removeExistingIndexesNoChecks(
_opCtx.get(), collWriter.get(), secondaryIndexSpecs);
if (specs.size()) {
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp
index bde00eef906..e380fbe6238 100644
--- a/src/mongo/db/repl/collection_cloner.cpp
+++ b/src/mongo/db/repl/collection_cloner.cpp
@@ -317,38 +317,43 @@ BaseCloner::AfterStageBehavior CollectionCloner::setupIndexBuildersForUnfinished
}
void CollectionCloner::runQuery() {
- // Non-resumable query.
- Query query;
+ FindCommandRequest findCmd{_sourceDbAndUuid};
if (_resumeToken) {
// Resume the query from where we left off.
LOGV2_DEBUG(21133, 1, "Collection cloner will resume the last successful query");
- query.requestResumeToken(true).resumeAfter(_resumeToken.get());
+ findCmd.setRequestResumeToken(true);
+ findCmd.setResumeAfter(_resumeToken.get());
} else {
// New attempt at a resumable query.
LOGV2_DEBUG(21134, 1, "Collection cloner will run a new query");
- query.requestResumeToken(true);
+ findCmd.setRequestResumeToken(true);
}
- query.hint(BSON("$natural" << 1));
+
+ findCmd.setHint(BSON("$natural" << 1));
+ findCmd.setNoCursorTimeout(true);
+ findCmd.setReadConcern(ReadConcernArgs::kLocal);
+ if (_collectionClonerBatchSize) {
+ findCmd.setBatchSize(_collectionClonerBatchSize);
+ }
+
+ ExhaustMode exhaustMode = collectionClonerUsesExhaust ? ExhaustMode::kOn : ExhaustMode::kOff;
// We reset this every time we retry or resume a query.
// We distinguish the first batch from the rest so that we only store the remote cursor id
// the first time we get it.
_firstBatchOfQueryRound = true;
- getClient()->query_DEPRECATED(
- [this](DBClientCursorBatchIterator& iter) { handleNextBatch(iter); },
- _sourceDbAndUuid,
- BSONObj{},
- query,
- nullptr /* fieldsToReturn */,
- QueryOption_NoCursorTimeout | QueryOption_SecondaryOk |
- (collectionClonerUsesExhaust ? QueryOption_Exhaust : 0),
- _collectionClonerBatchSize,
- ReadConcernArgs::kLocal);
+ auto cursor = getClient()->find(
+ std::move(findCmd), ReadPreferenceSetting{ReadPreference::SecondaryPreferred}, exhaustMode);
+
+ // Process the results of the cursor one batch at a time.
+ while (cursor->more()) {
+ handleNextBatch(*cursor);
+ }
}
-void CollectionCloner::handleNextBatch(DBClientCursorBatchIterator& iter) {
+void CollectionCloner::handleNextBatch(DBClientCursor& cursor) {
{
stdx::lock_guard<InitialSyncSharedData> lk(*getSharedData());
if (!getSharedData()->getStatus(lk).isOK()) {
@@ -370,15 +375,15 @@ void CollectionCloner::handleNextBatch(DBClientCursorBatchIterator& iter) {
if (_firstBatchOfQueryRound) {
// Store the cursorId of the remote cursor.
- _remoteCursorId = iter.getCursorId();
+ _remoteCursorId = cursor.getCursorId();
}
_firstBatchOfQueryRound = false;
{
stdx::lock_guard<Latch> lk(_mutex);
_stats.receivedBatches++;
- while (iter.moreInCurrentBatch()) {
- _documentsToInsert.emplace_back(iter.nextSafe());
+ while (cursor.moreInCurrentBatch()) {
+ _documentsToInsert.emplace_back(cursor.nextSafe());
}
}
@@ -394,7 +399,7 @@ void CollectionCloner::handleNextBatch(DBClientCursorBatchIterator& iter) {
}
// Store the resume token for this batch.
- _resumeToken = iter.getPostBatchResumeToken();
+ _resumeToken = cursor.getPostBatchResumeToken();
initialSyncHangCollectionClonerAfterHandlingBatchResponse.executeIf(
[&](const BSONObj&) {
diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h
index 80d8a9d72bc..085c6abdb3f 100644
--- a/src/mongo/db/repl/collection_cloner.h
+++ b/src/mongo/db/repl/collection_cloner.h
@@ -207,10 +207,10 @@ private:
AfterStageBehavior setupIndexBuildersForUnfinishedIndexesStage();
/**
- * Put all results from a query batch into a buffer to be inserted, and schedule
- * it to be inserted.
+ * Put all results from a query batch into a buffer to be inserted, and schedule it to be
+ * inserted.
*/
- void handleNextBatch(DBClientCursorBatchIterator& iter);
+ void handleNextBatch(DBClientCursor& cursor);
/**
* Called whenever there is a new batch of documents ready from the DBClientConnection.
diff --git a/src/mongo/db/repl/data_replicator_external_state.h b/src/mongo/db/repl/data_replicator_external_state.h
index 87826b0f199..219b5a7ec31 100644
--- a/src/mongo/db/repl/data_replicator_external_state.h
+++ b/src/mongo/db/repl/data_replicator_external_state.h
@@ -90,7 +90,7 @@ public:
* Forwards the parsed metadata in the query results to the replication system.
*/
virtual void processMetadata(const rpc::ReplSetMetadata& replMetadata,
- rpc::OplogQueryMetadata oqMetadata) = 0;
+ const rpc::OplogQueryMetadata& oqMetadata) = 0;
/**
* Evaluates quality of sync source. Accepts the current sync source; the last optime on this
diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp
index 8c43a013e9a..330cdf51305 100644
--- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp
@@ -84,7 +84,7 @@ OpTimeWithTerm DataReplicatorExternalStateImpl::getCurrentTermAndLastCommittedOp
}
void DataReplicatorExternalStateImpl::processMetadata(const rpc::ReplSetMetadata& replMetadata,
- rpc::OplogQueryMetadata oqMetadata) {
+ const rpc::OplogQueryMetadata& oqMetadata) {
OpTimeAndWallTime newCommitPoint = oqMetadata.getLastOpCommitted();
const bool fromSyncSource = true;
diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.h b/src/mongo/db/repl/data_replicator_external_state_impl.h
index c408c484dc9..284cea32b41 100644
--- a/src/mongo/db/repl/data_replicator_external_state_impl.h
+++ b/src/mongo/db/repl/data_replicator_external_state_impl.h
@@ -53,7 +53,7 @@ public:
OpTimeWithTerm getCurrentTermAndLastCommittedOpTime() override;
void processMetadata(const rpc::ReplSetMetadata& replMetadata,
- rpc::OplogQueryMetadata oqMetadata) override;
+ const rpc::OplogQueryMetadata& oqMetadata) override;
ChangeSyncSourceAction shouldStopFetching(const HostAndPort& source,
const rpc::ReplSetMetadata& replMetadata,
diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp
index ddcfc701ca6..0ee71071f03 100644
--- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp
@@ -87,9 +87,9 @@ OpTimeWithTerm DataReplicatorExternalStateMock::getCurrentTermAndLastCommittedOp
}
void DataReplicatorExternalStateMock::processMetadata(const rpc::ReplSetMetadata& replMetadata,
- rpc::OplogQueryMetadata oqMetadata) {
- replMetadataProcessed = replMetadata;
- oqMetadataProcessed = oqMetadata;
+ const rpc::OplogQueryMetadata& oqMetadata) {
+ replMetadataProcessed = rpc::ReplSetMetadata(replMetadata);
+ oqMetadataProcessed = rpc::OplogQueryMetadata(oqMetadata);
metadataWasProcessed = true;
}
diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.h b/src/mongo/db/repl/data_replicator_external_state_mock.h
index 535ee513102..7ec17591a44 100644
--- a/src/mongo/db/repl/data_replicator_external_state_mock.h
+++ b/src/mongo/db/repl/data_replicator_external_state_mock.h
@@ -50,7 +50,7 @@ public:
OpTimeWithTerm getCurrentTermAndLastCommittedOpTime() override;
void processMetadata(const rpc::ReplSetMetadata& metadata,
- rpc::OplogQueryMetadata oqMetadata) override;
+ const rpc::OplogQueryMetadata& oqMetadata) override;
ChangeSyncSourceAction shouldStopFetching(const HostAndPort& source,
const rpc::ReplSetMetadata& replMetadata,
diff --git a/src/mongo/db/repl/idempotency_test.cpp b/src/mongo/db/repl/idempotency_test.cpp
index 9e94154f1c0..69777fdbc55 100644
--- a/src/mongo/db/repl/idempotency_test.cpp
+++ b/src/mongo/db/repl/idempotency_test.cpp
@@ -131,7 +131,7 @@ BSONObj RandomizedIdempotencyTest::canonicalizeDocumentForDataHash(const BSONObj
BSONObj RandomizedIdempotencyTest::getDoc() {
AutoGetCollectionForReadCommand autoColl(_opCtx.get(), nss);
BSONObj doc;
- Helpers::findById(_opCtx.get(), autoColl.getDb(), nss.ns(), kDocIdQuery, doc);
+ Helpers::findById(_opCtx.get(), nss.ns(), kDocIdQuery, doc);
return doc.getOwned();
}
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 0908b06213a..7ffffbbf2c1 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -389,7 +389,7 @@ void _logOpsInner(OperationContext* opCtx,
}
// Insert the oplog records to the respective tenants change collections.
- if (ChangeStreamChangeCollectionManager::isChangeCollectionEnabled()) {
+ if (ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) {
ChangeStreamChangeCollectionManager::get(opCtx).insertDocumentsToChangeCollection(
opCtx, *records, timestamps);
}
@@ -1578,7 +1578,7 @@ Status applyOperation_inlock(OperationContext* opCtx,
invariant(op.getObject2());
auto&& documentId = *op.getObject2();
auto documentFound = Helpers::findById(
- opCtx, db, collection->ns().ns(), documentId, changeStreamPreImage);
+ opCtx, collection->ns().ns(), documentId, changeStreamPreImage);
invariant(documentFound);
}
diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp
index e9ca22da35c..575035711e0 100644
--- a/src/mongo/db/repl/oplog_applier_impl.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl.cpp
@@ -623,8 +623,6 @@ void OplogApplierImpl::_deriveOpsAndFillWriterVectors(
LogicalSessionIdMap<std::vector<OplogEntry*>> partialTxnOps;
CachedCollectionProperties collPropertiesCache;
- // Used to serialize writes to the tenant migrations donor and recipient namespaces.
- boost::optional<uint32_t> tenantMigrationsWriterId;
for (auto&& op : *ops) {
// If the operation's optime is before or the same as the beginApplyingOpTime we don't want
// to apply it, so don't include it in writerVectors.
@@ -706,19 +704,6 @@ void OplogApplierImpl::_deriveOpsAndFillWriterVectors(
continue;
}
- // Writes to the tenant migration namespaces must be serialized to preserve the order of
- // migration and access blocker states.
- if (op.getNss() == NamespaceString::kTenantMigrationDonorsNamespace ||
- op.getNss() == NamespaceString::kTenantMigrationRecipientsNamespace) {
- auto writerId = OplogApplierUtils::addToWriterVector(
- opCtx, &op, writerVectors, &collPropertiesCache, tenantMigrationsWriterId);
- if (!tenantMigrationsWriterId) {
- tenantMigrationsWriterId.emplace(writerId);
- } else {
- invariant(writerId == *tenantMigrationsWriterId);
- }
- continue;
- }
OplogApplierUtils::addToWriterVector(opCtx, &op, writerVectors, &collPropertiesCache);
}
}
diff --git a/src/mongo/db/repl/oplog_applier_impl_test.cpp b/src/mongo/db/repl/oplog_applier_impl_test.cpp
index 5784b645cc5..b734004bb28 100644
--- a/src/mongo/db/repl/oplog_applier_impl_test.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl_test.cpp
@@ -2644,42 +2644,6 @@ TEST_F(OplogApplierImplWithSlowAutoAdvancingClockTest, DoNotLogNonSlowOpApplicat
ASSERT_EQUALS(0, countTextFormatLogLinesContaining(expected.str()));
}
-TEST_F(OplogApplierImplTest, SerializeOplogApplicationOfWritesToTenantMigrationNamespaces) {
- auto writerPool = makeReplWriterPool();
- NoopOplogApplierObserver observer;
- TrackOpsAppliedApplier oplogApplier(
- nullptr, // executor
- nullptr, // oplogBuffer
- &observer,
- ReplicationCoordinator::get(_opCtx.get()),
- getConsistencyMarkers(),
- getStorageInterface(),
- repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
- writerPool.get());
-
- const auto donorNss = NamespaceString::kTenantMigrationDonorsNamespace;
- const auto recipientNss = NamespaceString::kTenantMigrationRecipientsNamespace;
-
- std::vector<OplogEntry> opsToApply;
- opsToApply.push_back(
- makeDeleteDocumentOplogEntry({Timestamp(Seconds(2), 0), 1LL}, donorNss, BSON("_id" << 2)));
- opsToApply.push_back(makeInsertDocumentOplogEntry(
- {Timestamp(Seconds(3), 0), 1LL}, recipientNss, BSON("_id" << 3)));
- opsToApply.push_back(makeDeleteDocumentOplogEntry(
- {Timestamp(Seconds(4), 0), 1LL}, recipientNss, BSON("_id" << 3)));
- opsToApply.push_back(
- makeInsertDocumentOplogEntry({Timestamp(Seconds(5), 0), 1LL}, donorNss, BSON("_id" << 4)));
-
- ASSERT_OK(oplogApplier.applyOplogBatch(_opCtx.get(), opsToApply));
- const auto applied = oplogApplier.getOperationsApplied();
- ASSERT_EQ(4U, applied.size());
- ASSERT_BSONOBJ_EQ(opsToApply[0].getEntry().toBSON(), applied[0].getEntry().toBSON());
- ASSERT_BSONOBJ_EQ(opsToApply[1].getEntry().toBSON(), applied[1].getEntry().toBSON());
- ASSERT_BSONOBJ_EQ(opsToApply[2].getEntry().toBSON(), applied[2].getEntry().toBSON());
- ASSERT_BSONOBJ_EQ(opsToApply[3].getEntry().toBSON(), applied[3].getEntry().toBSON());
-}
-
-
class OplogApplierImplTxnTableTest : public OplogApplierImplTest {
public:
void setUp() override {
@@ -3319,10 +3283,7 @@ TEST_F(IdempotencyTest, EmptyCappedNamespaceNotFound) {
ASSERT_OK(runOpInitialSync(emptyCappedOp));
AutoGetCollectionForReadCommand autoColl(_opCtx.get(), nss);
-
- // Ensure that autoColl.getCollection() and autoColl.getDb() are both null.
- ASSERT_FALSE(autoColl.getCollection());
- ASSERT_FALSE(autoColl.getDb());
+ ASSERT_FALSE(autoColl);
}
TEST_F(IdempotencyTest, UpdateTwoFields) {
diff --git a/src/mongo/db/repl/oplog_entry.idl b/src/mongo/db/repl/oplog_entry.idl
index 7c1ba09f320..987f5806cbf 100644
--- a/src/mongo/db/repl/oplog_entry.idl
+++ b/src/mongo/db/repl/oplog_entry.idl
@@ -59,6 +59,9 @@ enums:
kPostImage: "postImage"
structs:
+ # TODO SERVER-67155 Ensure the tenantId is included in the serialized "ns" field when
+ # multitenancySupport is on but featureFlagRequireTenantId is off. Currently it will not be
+ # included in either place
DurableReplOperation:
description: "A document that represents an operation. Should never be used directly in
server code. Instead, create an instance of ReplOperation."
diff --git a/src/mongo/db/repl/oplog_entry_test.cpp b/src/mongo/db/repl/oplog_entry_test.cpp
index ae5039be724..4bcc4adfeb0 100644
--- a/src/mongo/db/repl/oplog_entry_test.cpp
+++ b/src/mongo/db/repl/oplog_entry_test.cpp
@@ -150,7 +150,9 @@ TEST(OplogEntryTest, InsertIncludesTidField) {
ASSERT(entry.getTid());
ASSERT_EQ(*entry.getTid(), tid);
- ASSERT_EQ(entry.getNss(), nss);
+ // TODO SERVER-66708 Check that (entry.getNss() == nss) once the OplogEntry deserializer
+ // passes "tid" to the NamespaceString constructor
+ ASSERT_EQ(entry.getNss(), NamespaceString(boost::none, nss.ns()));
ASSERT_BSONOBJ_EQ(entry.getIdElement().wrap("_id"), BSON("_id" << docId));
ASSERT_BSONOBJ_EQ(entry.getOperationToApply(), doc);
}
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index d50917d7fd7..6ec6c9778de 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -265,12 +265,8 @@ OpTime OplogFetcher::getLastOpTimeFetched_forTest() const {
return _getLastOpTimeFetched();
}
-BSONObj OplogFetcher::getFindQueryFilter_forTest() const {
- return _makeFindQueryFilter();
-}
-
-Query OplogFetcher::getFindQuerySettings_forTest(long long findTimeout) const {
- return _makeFindQuerySettings(findTimeout);
+FindCommandRequest OplogFetcher::makeFindCmdRequest_forTest(long long findTimeout) const {
+ return _makeFindCmdRequest(findTimeout);
}
Milliseconds OplogFetcher::getAwaitDataTimeout_forTest() const {
@@ -584,46 +580,56 @@ AggregateCommandRequest OplogFetcher::_makeAggregateCommandRequest(long long max
return aggRequest;
}
-BSONObj OplogFetcher::_makeFindQueryFilter() const {
- BSONObjBuilder queryBob;
-
- auto lastOpTimeFetched = _getLastOpTimeFetched();
- BSONObjBuilder filterBob;
- filterBob.append("ts", BSON("$gte" << lastOpTimeFetched.getTimestamp()));
- // Handle caller-provided filter.
- if (!_config.queryFilter.isEmpty()) {
- filterBob.append(
- "$or",
- BSON_ARRAY(_config.queryFilter << BSON("ts" << lastOpTimeFetched.getTimestamp())));
+FindCommandRequest OplogFetcher::_makeFindCmdRequest(long long findTimeout) const {
+ FindCommandRequest findCmd{_nss};
+
+ // Construct the find command's filter and set it on the 'FindCommandRequest'.
+ {
+ BSONObjBuilder queryBob;
+
+ auto lastOpTimeFetched = _getLastOpTimeFetched();
+ BSONObjBuilder filterBob;
+ filterBob.append("ts", BSON("$gte" << lastOpTimeFetched.getTimestamp()));
+ // Handle caller-provided filter.
+ if (!_config.queryFilter.isEmpty()) {
+ filterBob.append(
+ "$or",
+ BSON_ARRAY(_config.queryFilter << BSON("ts" << lastOpTimeFetched.getTimestamp())));
+ }
+ findCmd.setFilter(filterBob.obj());
+ }
+
+ findCmd.setTailable(true);
+ findCmd.setAwaitData(true);
+ findCmd.setMaxTimeMS(findTimeout);
+
+ if (_config.batchSize) {
+ findCmd.setBatchSize(_config.batchSize);
}
- return filterBob.obj();
-}
-Query OplogFetcher::_makeFindQuerySettings(long long findTimeout) const {
- Query query = Query().maxTimeMS(findTimeout);
if (_config.requestResumeToken) {
- query.hint(BSON("$natural" << 1)).requestResumeToken(true);
+ findCmd.setHint(BSON("$natural" << 1));
+ findCmd.setRequestResumeToken(true);
}
auto lastCommittedWithCurrentTerm =
_dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime();
auto term = lastCommittedWithCurrentTerm.value;
if (term != OpTime::kUninitializedTerm) {
- query.term(term);
+ findCmd.setTerm(term);
}
if (_config.queryReadConcern.isEmpty()) {
// This ensures that the sync source waits for all earlier oplog writes to be visible.
// Since Timestamp(0, 0) isn't allowed, Timestamp(0, 1) is the minimal we can use.
- query.readConcern(BSON("level"
- << "local"
- << "afterClusterTime" << Timestamp(0, 1)));
+ findCmd.setReadConcern(BSON("level"
+ << "local"
+ << "afterClusterTime" << Timestamp(0, 1)));
} else {
// Caller-provided read concern.
- query.appendElements(_config.queryReadConcern.toBSON());
+ findCmd.setReadConcern(_config.queryReadConcern.toBSONInner());
}
-
- return query;
+ return findCmd;
}
Status OplogFetcher::_createNewCursor(bool initialFind) {
@@ -651,17 +657,9 @@ Status OplogFetcher::_createNewCursor(bool initialFind) {
}
_cursor = std::move(ret.getValue());
} else {
+ auto findCmd = _makeFindCmdRequest(maxTimeMs);
_cursor = std::make_unique<DBClientCursor>(
- _conn.get(),
- _nss,
- _makeFindQueryFilter(),
- _makeFindQuerySettings(maxTimeMs),
- 0 /* limit */,
- 0 /* nToSkip */,
- nullptr /* fieldsToReturn */,
- QueryOption_CursorTailable | QueryOption_AwaitData |
- (oplogFetcherUsesExhaust ? QueryOption_Exhaust : 0),
- _config.batchSize);
+ _conn.get(), std::move(findCmd), ReadPreferenceSetting{}, oplogFetcherUsesExhaust);
}
_firstBatch = true;
@@ -817,7 +815,7 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) {
"metadata"_attr = _metadataObj);
return oqMetadataResult.getStatus();
}
- auto oqMetadata = oqMetadataResult.getValue();
+ const auto& oqMetadata = oqMetadataResult.getValue();
if (_firstBatch) {
auto status =
@@ -884,7 +882,7 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) {
"metadata"_attr = _metadataObj);
return metadataResult.getStatus();
}
- auto replSetMetadata = metadataResult.getValue();
+ const auto& replSetMetadata = metadataResult.getValue();
// Determine if we should stop syncing from our current sync source.
auto changeSyncSourceAction = _dataReplicatorExternalState->shouldStopFetching(
diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h
index 01a4347669b..2147eb9ebde 100644
--- a/src/mongo/db/repl/oplog_fetcher.h
+++ b/src/mongo/db/repl/oplog_fetcher.h
@@ -275,8 +275,7 @@ public:
/**
* Returns the `find` query run on the sync source's oplog.
*/
- BSONObj getFindQueryFilter_forTest() const;
- Query getFindQuerySettings_forTest(long long findTimeout) const;
+ FindCommandRequest makeFindCmdRequest_forTest(long long findTimeout) const;
/**
* Returns the OpTime of the last oplog entry fetched and processed.
@@ -387,11 +386,9 @@ private:
/**
* This function will create the `find` query to issue to the sync source. It is provided with
- * whether this is the initial attempt to create the `find` query to determine what the find
- * timeout should be.
+ * the value to use as the "maxTimeMS" for the find command.
*/
- BSONObj _makeFindQueryFilter() const;
- Query _makeFindQuerySettings(long long findTimeout) const;
+ FindCommandRequest _makeFindCmdRequest(long long findTimeout) const;
/**
* Gets the next batch from the exhaust cursor.
diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp
index e98039a0f8a..adc09da1300 100644
--- a/src/mongo/db/repl/oplog_fetcher_test.cpp
+++ b/src/mongo/db/repl/oplog_fetcher_test.cpp
@@ -806,19 +806,25 @@ TEST_F(OplogFetcherTest,
auto oplogFetcher = makeOplogFetcher();
auto findTimeout = durationCount<Milliseconds>(oplogFetcher->getInitialFindMaxTime_forTest());
- auto filter = oplogFetcher->getFindQueryFilter_forTest();
+ auto findCmdRequest = oplogFetcher->makeFindCmdRequest_forTest(findTimeout);
+
+ auto filter = findCmdRequest.getFilter();
ASSERT_BSONOBJ_EQ(BSON("ts" << BSON("$gte" << lastFetched.getTimestamp())), filter);
- auto queryObj =
- (oplogFetcher->getFindQuerySettings_forTest(findTimeout)).getFullSettingsDeprecated();
- ASSERT_EQUALS(60000, queryObj.getIntField("$maxTimeMS"));
+ auto maxTimeMS = findCmdRequest.getMaxTimeMS();
+ ASSERT(maxTimeMS);
+ ASSERT_EQUALS(60000, *maxTimeMS);
- ASSERT_EQUALS(mongo::BSONType::Object, queryObj["readConcern"].type());
+ auto readConcern = findCmdRequest.getReadConcern();
+ ASSERT(readConcern);
ASSERT_BSONOBJ_EQ(BSON("level"
<< "local"
<< "afterClusterTime" << Timestamp(0, 1)),
- queryObj["readConcern"].Obj());
- ASSERT_EQUALS(dataReplicatorExternalState->currentTerm, queryObj["term"].numberLong());
+ *readConcern);
+
+ auto term = findCmdRequest.getTerm();
+ ASSERT(term);
+ ASSERT_EQUALS(dataReplicatorExternalState->currentTerm, *term);
}
TEST_F(OplogFetcherTest,
@@ -826,21 +832,26 @@ TEST_F(OplogFetcherTest,
dataReplicatorExternalState->currentTerm = OpTime::kUninitializedTerm;
auto oplogFetcher = makeOplogFetcher();
- auto filter = oplogFetcher->getFindQueryFilter_forTest();
- ASSERT_BSONOBJ_EQ(BSON("ts" << BSON("$gte" << lastFetched.getTimestamp())), filter);
-
// Test that the correct maxTimeMS is set if we are retrying the 'find' query.
auto findTimeout = durationCount<Milliseconds>(oplogFetcher->getRetriedFindMaxTime_forTest());
- auto queryObj =
- (oplogFetcher->getFindQuerySettings_forTest(findTimeout)).getFullSettingsDeprecated();
- ASSERT_EQUALS(2000, queryObj.getIntField("$maxTimeMS"));
+ auto findCmdRequest = oplogFetcher->makeFindCmdRequest_forTest(findTimeout);
- ASSERT_EQUALS(mongo::BSONType::Object, queryObj["readConcern"].type());
+ auto filter = findCmdRequest.getFilter();
+ ASSERT_BSONOBJ_EQ(BSON("ts" << BSON("$gte" << lastFetched.getTimestamp())), filter);
+
+ auto maxTimeMS = findCmdRequest.getMaxTimeMS();
+ ASSERT(maxTimeMS);
+ ASSERT_EQUALS(2000, *maxTimeMS);
+
+ auto readConcern = findCmdRequest.getReadConcern();
+ ASSERT(readConcern);
ASSERT_BSONOBJ_EQ(BSON("level"
<< "local"
<< "afterClusterTime" << Timestamp(0, 1)),
- queryObj["readConcern"].Obj());
- ASSERT_FALSE(queryObj.hasField("term"));
+ *readConcern);
+
+ auto term = findCmdRequest.getTerm();
+ ASSERT(!term);
}
TEST_F(
diff --git a/src/mongo/db/repl/primary_only_service.cpp b/src/mongo/db/repl/primary_only_service.cpp
index cb79c007ced..dbe696ecce7 100644
--- a/src/mongo/db/repl/primary_only_service.cpp
+++ b/src/mongo/db/repl/primary_only_service.cpp
@@ -362,6 +362,9 @@ void PrimaryOnlyService::onStepUp(const OpTime& stepUpOpTime) {
instance.second.waitForCompletion();
}
+ savedInstances.clear();
+ newThenOldScopedExecutor.reset();
+
PrimaryOnlyServiceHangBeforeLaunchingStepUpLogic.pauseWhileSet();
// Now wait for the first write of the new term to be majority committed, so that we know
diff --git a/src/mongo/db/repl/repl_set_commands.cpp b/src/mongo/db/repl/repl_set_commands.cpp
index 5823d880c14..7f35d2cfb31 100644
--- a/src/mongo/db/repl/repl_set_commands.cpp
+++ b/src/mongo/db/repl/repl_set_commands.cpp
@@ -528,6 +528,11 @@ public:
"primary.)\n"
"http://dochub.mongodb.org/core/replicasetcommands";
}
+
+ bool shouldCheckoutSession() const final {
+ return false;
+ }
+
CmdReplSetStepDown()
: ReplSetCommand("replSetStepDown"),
_stepDownCmdsWithForceExecutedMetric("commands.replSetStepDownWithForce.total",
@@ -685,7 +690,7 @@ public:
if (metadataResult.isOK()) {
// New style update position command has metadata, which may inform the
// upstream of a higher term.
- auto metadata = metadataResult.getValue();
+ const auto& metadata = metadataResult.getValue();
replCoord->processReplSetMetadata(metadata);
}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 7cbc79f9aed..5d450af12d7 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -560,7 +560,7 @@ OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationC
// TODO: SERVER-65948 move the change collection creation logic from here to the PM-2502 hooks.
// The change collection will be created when the change stream is enabled.
- if (ChangeStreamChangeCollectionManager::isChangeCollectionEnabled()) {
+ if (ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) {
auto status = ChangeStreamChangeCollectionManager::get(opCtx).createChangeCollection(
opCtx, boost::none);
if (!status.isOK()) {
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index c2f2aa1ad08..fe769df7572 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -1340,7 +1340,6 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx,
_updateMemberStateFromTopologyCoordinator(lk);
LOGV2(21331, "Transition to primary complete; database writes are now permitted");
- _drainFinishedCond.notify_all();
_externalState->startNoopWriter(_getMyLastAppliedOpTime_inlock());
}
@@ -1830,8 +1829,9 @@ Status ReplicationCoordinatorImpl::setLastDurableOptime_forTest(long long cfgVer
const UpdatePositionArgs::UpdateInfo update(
OpTime(), Date_t(), opTime, wallTime, cfgVer, memberId);
- const auto status = _setLastOptime(lock, update);
- return status;
+ const auto statusWithOpTime = _setLastOptimeForMember(lock, update);
+ _updateStateAfterRemoteOpTimeUpdates(lock, statusWithOpTime.getValue());
+ return statusWithOpTime.getStatus();
}
Status ReplicationCoordinatorImpl::setLastAppliedOptime_forTest(long long cfgVer,
@@ -1847,25 +1847,29 @@ Status ReplicationCoordinatorImpl::setLastAppliedOptime_forTest(long long cfgVer
const UpdatePositionArgs::UpdateInfo update(
opTime, wallTime, OpTime(), Date_t(), cfgVer, memberId);
- const auto status = _setLastOptime(lock, update);
- return status;
+ const auto statusWithOpTime = _setLastOptimeForMember(lock, update);
+ _updateStateAfterRemoteOpTimeUpdates(lock, statusWithOpTime.getValue());
+ return statusWithOpTime.getStatus();
}
-Status ReplicationCoordinatorImpl::_setLastOptime(WithLock lk,
- const UpdatePositionArgs::UpdateInfo& args) {
- auto result = _topCoord->setLastOptime(args, _replExecutor->now());
+StatusWith<OpTime> ReplicationCoordinatorImpl::_setLastOptimeForMember(
+ WithLock lk, const UpdatePositionArgs::UpdateInfo& args) {
+ auto result = _topCoord->setLastOptimeForMember(args, _replExecutor->now());
if (!result.isOK())
return result.getStatus();
const bool advancedOpTime = result.getValue();
+ _rescheduleLivenessUpdate_inlock(args.memberId);
+ return advancedOpTime ? std::max(args.appliedOpTime, args.durableOpTime) : OpTime();
+}
+
+void ReplicationCoordinatorImpl::_updateStateAfterRemoteOpTimeUpdates(
+ WithLock lk, const OpTime& maxRemoteOpTime) {
// Only update committed optime if the remote optimes increased.
- if (advancedOpTime) {
+ if (!maxRemoteOpTime.isNull()) {
_updateLastCommittedOpTimeAndWallTime(lk);
// Wait up replication waiters on optime changes.
- _wakeReadyWaiters(lk, std::max(args.appliedOpTime, args.durableOpTime));
+ _wakeReadyWaiters(lk, maxRemoteOpTime);
}
-
- _rescheduleLivenessUpdate_inlock(args.memberId);
- return Status::OK();
}
bool ReplicationCoordinatorImpl::isCommitQuorumSatisfied(
@@ -4415,7 +4419,7 @@ void ReplicationCoordinatorImpl::_errorOnPromisesIfHorizonChanged(WithLock lk,
HelloMetrics::get(opCtx)->resetNumAwaitingTopologyChanges();
}
- if (oldIndex >= 0 && newIndex >= 0) {
+ if (oldIndex >= 0) {
invariant(_sniToValidConfigPromiseMap.empty());
const auto oldHorizonMappings = oldConfig.getMemberAt(oldIndex).getHorizonMappings();
@@ -5079,18 +5083,22 @@ void ReplicationCoordinatorImpl::_wakeReadyWaiters(WithLock lk, boost::optional<
Status ReplicationCoordinatorImpl::processReplSetUpdatePosition(const UpdatePositionArgs& updates) {
stdx::unique_lock<Latch> lock(_mutex);
Status status = Status::OK();
- bool somethingChanged = false;
+ bool gotValidUpdate = false;
+ OpTime maxRemoteOpTime;
for (UpdatePositionArgs::UpdateIterator update = updates.updatesBegin();
update != updates.updatesEnd();
++update) {
- status = _setLastOptime(lock, *update);
- if (!status.isOK()) {
+ auto statusWithOpTime = _setLastOptimeForMember(lock, *update);
+ if (!statusWithOpTime.isOK()) {
+ status = statusWithOpTime.getStatus();
break;
}
- somethingChanged = true;
+ maxRemoteOpTime = std::max(maxRemoteOpTime, statusWithOpTime.getValue());
+ gotValidUpdate = true;
}
+ _updateStateAfterRemoteOpTimeUpdates(lock, maxRemoteOpTime);
- if (somethingChanged && !_getMemberState_inlock().primary()) {
+ if (gotValidUpdate && !_getMemberState_inlock().primary()) {
lock.unlock();
// Must do this outside _mutex
_externalState->forwardSecondaryProgress();
@@ -5716,28 +5724,27 @@ void ReplicationCoordinatorImpl::prepareReplMetadata(const BSONObj& metadataRequ
invariant(-1 != rbid);
}
- stdx::lock_guard<Latch> lk(_mutex);
+ boost::optional<rpc::ReplSetMetadata> replSetMetadata;
+ boost::optional<rpc::OplogQueryMetadata> oplogQueryMetadata;
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
- if (hasReplSetMetadata) {
- _prepareReplSetMetadata_inlock(lastOpTimeFromClient, builder);
- }
+ if (hasReplSetMetadata) {
+ OpTime lastVisibleOpTime =
+ std::max(lastOpTimeFromClient, _getCurrentCommittedSnapshotOpTime_inlock());
+ replSetMetadata = _topCoord->prepareReplSetMetadata(lastVisibleOpTime);
+ }
- if (hasOplogQueryMetadata) {
- _prepareOplogQueryMetadata_inlock(rbid, builder);
+ if (hasOplogQueryMetadata) {
+ oplogQueryMetadata = _topCoord->prepareOplogQueryMetadata(rbid);
+ }
}
-}
-
-void ReplicationCoordinatorImpl::_prepareReplSetMetadata_inlock(const OpTime& lastOpTimeFromClient,
- BSONObjBuilder* builder) const {
- OpTime lastVisibleOpTime =
- std::max(lastOpTimeFromClient, _getCurrentCommittedSnapshotOpTime_inlock());
- auto metadata = _topCoord->prepareReplSetMetadata(lastVisibleOpTime);
- metadata.writeToMetadata(builder).transitional_ignore();
-}
-void ReplicationCoordinatorImpl::_prepareOplogQueryMetadata_inlock(int rbid,
- BSONObjBuilder* builder) const {
- _topCoord->prepareOplogQueryMetadata(rbid).writeToMetadata(builder).transitional_ignore();
+ // Do BSON serialization outside lock.
+ if (replSetMetadata)
+ invariantStatusOK(replSetMetadata->writeToMetadata(builder));
+ if (oplogQueryMetadata)
+ invariantStatusOK(oplogQueryMetadata->writeToMetadata(builder));
}
bool ReplicationCoordinatorImpl::getWriteConcernMajorityShouldJournal() {
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index a6dc8fe9066..9ac44fdc62e 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -469,7 +469,7 @@ public:
executor::TaskExecutor::CallbackHandle getCatchupTakeoverCbh_forTest() const;
/**
- * Simple wrappers around _setLastOptime to make it easier to test.
+ * Simple wrappers around _setLastOptimeForMember to make it easier to test.
*/
Status setLastAppliedOptime_forTest(long long cfgVer,
long long memberId,
@@ -1099,8 +1099,19 @@ private:
* This is only valid to call on replica sets.
* "configVersion" will be populated with our config version if it and the configVersion
* of "args" differ.
+ *
+ * If either applied or durable optime has changed, returns the later of the two (even if
+ * that's not the one which changed). Otherwise returns a null optime.
+ */
+ StatusWith<OpTime> _setLastOptimeForMember(WithLock lk,
+ const UpdatePositionArgs::UpdateInfo& args);
+
+ /**
+ * Helper for processReplSetUpdatePosition, companion to _setLastOptimeForMember above. Updates
+ * replication coordinator state and notifies waiters after remote optime updates. Must be
+ * called within the same critical section as _setLastOptimeForMember.
*/
- Status _setLastOptime(WithLock lk, const UpdatePositionArgs::UpdateInfo& args);
+ void _updateStateAfterRemoteOpTimeUpdates(WithLock lk, const OpTime& maxRemoteOpTime);
/**
* This function will report our position externally (like upstream) if necessary.
@@ -1463,17 +1474,6 @@ private:
EventHandle _processReplSetMetadata_inlock(const rpc::ReplSetMetadata& replMetadata);
/**
- * Prepares a metadata object for ReplSetMetadata.
- */
- void _prepareReplSetMetadata_inlock(const OpTime& lastOpTimeFromClient,
- BSONObjBuilder* builder) const;
-
- /**
- * Prepares a metadata object for OplogQueryMetadata.
- */
- void _prepareOplogQueryMetadata_inlock(int rbid, BSONObjBuilder* builder) const;
-
- /**
* Blesses a snapshot to be used for new committed reads.
*
* Returns true if the value was updated to `newCommittedSnapshot`.
@@ -1719,9 +1719,6 @@ private:
// Current ReplicaSet state.
MemberState _memberState; // (M)
- // Used to signal threads waiting for changes to _memberState.
- stdx::condition_variable _drainFinishedCond; // (M)
-
ReplicationCoordinator::ApplierState _applierState = ApplierState::Running; // (M)
// Used to signal threads waiting for changes to _rsConfigState.
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index cfb8b355366..1392cceb923 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -661,51 +661,51 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatReconfig(WithLock lk,
std::tuple<StatusWith<ReplSetConfig>, bool> ReplicationCoordinatorImpl::_resolveConfigToApply(
const ReplSetConfig& config) {
+ if (!_settings.isServerless() || !config.isSplitConfig()) {
+ return {config, false};
+ }
+
stdx::unique_lock<Latch> lk(_mutex);
- if (config.isSplitConfig()) {
- if (!_rsConfig.isInitialized()) {
- // Unlock the lock because isSelf performs network I/O.
- lk.unlock();
+ if (!_rsConfig.isInitialized()) {
+ // Unlock the lock because isSelf performs network I/O.
+ lk.unlock();
- // If this node is listed in the members of incoming config, accept the config.
- const auto foundSelfInMembers =
- std::any_of(config.membersBegin(),
- config.membersEnd(),
- [externalState = _externalState.get()](const MemberConfig& config) {
- return externalState->isSelf(config.getHostAndPort(),
- getGlobalServiceContext());
- });
-
- if (foundSelfInMembers) {
- return {config, false};
- }
+ // If this node is listed in the members of incoming config, accept the config.
+ const auto foundSelfInMembers = std::any_of(
+ config.membersBegin(),
+ config.membersEnd(),
+ [externalState = _externalState.get()](const MemberConfig& config) {
+ return externalState->isSelf(config.getHostAndPort(), getGlobalServiceContext());
+ });
- return {Status(ErrorCodes::NotYetInitialized,
- "Cannot apply a split config if the current config is uninitialized"),
- false};
+ if (foundSelfInMembers) {
+ return {config, false};
}
- auto recipientConfig = config.getRecipientConfig();
- const auto& selfMember = _rsConfig.getMemberAt(_selfIndex);
- if (recipientConfig->findMemberByHostAndPort(selfMember.getHostAndPort())) {
- if (selfMember.getNumVotes() > 0) {
- return {
- Status(ErrorCodes::BadValue, "Cannot apply recipient config to a voting node"),
- false};
- }
+ return {Status(ErrorCodes::NotYetInitialized,
+ "Cannot apply a split config if the current config is uninitialized"),
+ false};
+ }
- if (_rsConfig.getReplSetName() == recipientConfig->getReplSetName()) {
- return {Status(ErrorCodes::InvalidReplicaSetConfig,
- "Cannot apply recipient config since current config and recipient "
- "config have the same set name."),
- false};
- }
+ auto recipientConfig = config.getRecipientConfig();
+ const auto& selfMember = _rsConfig.getMemberAt(_selfIndex);
+ if (recipientConfig->findMemberByHostAndPort(selfMember.getHostAndPort())) {
+ if (selfMember.getNumVotes() > 0) {
+ return {Status(ErrorCodes::BadValue, "Cannot apply recipient config to a voting node"),
+ false};
+ }
- auto mutableConfig = recipientConfig->getMutable();
- mutableConfig.setConfigVersion(1);
- mutableConfig.setConfigTerm(1);
- return {ReplSetConfig(std::move(mutableConfig)), true};
+ if (_rsConfig.getReplSetName() == recipientConfig->getReplSetName()) {
+ return {Status(ErrorCodes::InvalidReplicaSetConfig,
+ "Cannot apply recipient config since current config and recipient "
+ "config have the same set name."),
+ false};
}
+
+ auto mutableConfig = recipientConfig->getMutable();
+ mutableConfig.setConfigVersion(1);
+ mutableConfig.setConfigTerm(1);
+ return {ReplSetConfig(std::move(mutableConfig)), true};
}
return {config, false};
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
index 5203980b575..e619276b129 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
@@ -58,6 +58,7 @@ namespace {
using executor::NetworkInterfaceMock;
using executor::RemoteCommandRequest;
using executor::RemoteCommandResponse;
+using InNetworkGuard = NetworkInterfaceMock::InNetworkGuard;
TEST(ReplSetHeartbeatArgs, AcceptsUnknownField) {
ReplSetHeartbeatArgsV1 hbArgs;
@@ -116,7 +117,8 @@ protected:
void processResponseFromPrimary(const ReplSetConfig& config,
long long version = -2,
- long long term = OpTime::kInitialTerm);
+ long long term = OpTime::kInitialTerm,
+ const HostAndPort& target = HostAndPort{"h1", 1});
};
void ReplCoordHBV1Test::assertMemberState(const MemberState expected, std::string msg) {
@@ -160,13 +162,14 @@ ReplCoordHBV1Test::performSyncToFinishReconfigHeartbeat() {
void ReplCoordHBV1Test::processResponseFromPrimary(const ReplSetConfig& config,
long long version,
- long long term) {
+ long long term,
+ const HostAndPort& target) {
NetworkInterfaceMock* net = getNet();
const Date_t startDate = getNet()->now();
NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
const RemoteCommandRequest& request = noi->getRequest();
- ASSERT_EQUALS(HostAndPort("h1", 1), request.target);
+ ASSERT_EQUALS(target, request.target);
ReplSetHeartbeatArgsV1 hbArgs;
ASSERT_OK(hbArgs.initialize(request.cmdObj));
ASSERT_EQUALS("mySet", hbArgs.getSetName());
@@ -266,6 +269,85 @@ TEST_F(ReplCoordHBV1Test,
ASSERT_TRUE(getExternalState()->threadsStarted());
}
+TEST_F(ReplCoordHBV1Test, RejectSplitConfigWhenNotInServerlessMode) {
+ auto severityGuard = unittest::MinimumLoggedSeverityGuard{logv2::LogComponent::kDefault,
+ logv2::LogSeverity::Debug(3)};
+
+ // Start up with three nodes, and assume the role of "node2" as a secondary. Notably, the local
+ // node is NOT started in serverless mode. "node2" is configured as having no votes, no
+ // priority, so that we can pass validation for accepting a split config.
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "protocolVersion" << 1 << "version" << 2 << "members"
+ << BSON_ARRAY(BSON("_id" << 1 << "host"
+ << "node1:12345")
+ << BSON("_id" << 2 << "host"
+ << "node2:12345"
+ << "votes" << 0 << "priority" << 0)
+ << BSON("_id" << 3 << "host"
+ << "node3:12345"))),
+ HostAndPort("node2", 12345));
+ ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
+ getReplCoord()->updateTerm_forTest(1, nullptr);
+ ASSERT_EQ(getReplCoord()->getTerm(), 1);
+ // respond to initial heartbeat requests
+ for (int j = 0; j < 2; ++j) {
+ replyToReceivedHeartbeatV1();
+ }
+
+ // Verify that there are no further heartbeat requests, since the heartbeat requests should be
+ // scheduled for the future.
+ {
+ InNetworkGuard guard(getNet());
+ assertMemberState(MemberState::RS_SECONDARY);
+ ASSERT_FALSE(getNet()->hasReadyRequests());
+ }
+
+ ReplSetConfig splitConfig =
+ assertMakeRSConfig(BSON("_id"
+ << "mySet"
+ << "version" << 3 << "term" << 1 << "protocolVersion" << 1
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 1 << "host"
+ << "node1:12345")
+ << BSON("_id" << 2 << "host"
+ << "node2:12345")
+ << BSON("_id" << 3 << "host"
+ << "node3:12345"))
+ << "recipientConfig"
+ << BSON("_id"
+ << "recipientSet"
+ << "version" << 1 << "term" << 1 << "members"
+ << BSON_ARRAY(BSON("_id" << 1 << "host"
+ << "node1:12345")
+ << BSON("_id" << 2 << "host"
+ << "node2:12345")
+ << BSON("_id" << 3 << "host"
+ << "node3:12345")))));
+
+ // Accept a heartbeat from `node1` which has a split config. The split config lists this node
+ // ("node2") in the recipient member list, but a node started not in serverless mode should not
+ // accept and install the recipient config.
+ receiveHeartbeatFrom(splitConfig, 1, HostAndPort("node1", 12345));
+
+ {
+ InNetworkGuard guard(getNet());
+ processResponseFromPrimary(splitConfig, 2, 1, HostAndPort{"node1", 12345});
+ assertMemberState(MemberState::RS_SECONDARY);
+ OperationContextNoop opCtx;
+ auto storedConfig = ReplSetConfig::parse(
+ unittest::assertGet(getExternalState()->loadLocalConfigDocument(&opCtx)));
+ ASSERT_OK(storedConfig.validate());
+
+ // Verify that the recipient config was not accepted. A successfully applied splitConfig
+ // will install at version and term {1, 1}.
+ ASSERT_EQUALS(ConfigVersionAndTerm(3, 1), storedConfig.getConfigVersionAndTerm());
+ ASSERT_EQUALS("mySet", storedConfig.getReplSetName());
+ }
+
+ ASSERT_TRUE(getExternalState()->threadsStarted());
+}
+
TEST_F(ReplCoordHBV1Test, NodeRejectsSplitConfigWhenNotInitialized) {
ReplSetConfig rsConfig =
assertMakeRSConfig(BSON("_id"
@@ -556,6 +638,10 @@ TEST_F(
class ReplCoordHBV1SplitConfigTest : public ReplCoordHBV1Test {
public:
void startUp(const std::string& hostAndPort) {
+ ReplSettings settings;
+ settings.setServerlessMode();
+ init(settings);
+
BSONObj configBson =
BSON("_id" << _donorSetName << "version" << _configVersion << "term" << _configTerm
<< "members" << _members << "protocolVersion" << 1);
@@ -740,7 +826,6 @@ TEST_F(ReplCoordHBV1SplitConfigTest, RecipientNodeApplyConfig) {
validateNextRequest("", _recipientSetName, 1, 1);
}
-using InNetworkGuard = NetworkInterfaceMock::InNetworkGuard;
TEST_F(ReplCoordHBV1SplitConfigTest, RejectMismatchedSetNameInHeartbeatResponse) {
startUp(_recipientSecondaryNode);
@@ -813,9 +898,9 @@ TEST_F(ReplCoordHBV1SplitConfigTest, RecipientNodeNonZeroVotes) {
getNet()->runReadyNetworkOperations();
// The node rejected the config as it's a voting node and its version has not changed.
- ASSERT_EQ(getReplCoord()->getConfigVersion(), _configVersion);
- ASSERT_EQ(getReplCoord()->getConfigTerm(), _configTerm);
- ASSERT_EQ(getReplCoord()->getSettings().ourSetName(), _donorSetName);
+ auto config = getReplCoord()->getConfig();
+ ASSERT_EQ(config.getConfigVersionAndTerm(), ConfigVersionAndTerm(_configVersion, _configTerm));
+ ASSERT_EQ(config.getReplSetName(), _donorSetName);
}
class ReplCoordHBV1ReconfigTest : public ReplCoordHBV1Test {
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index bbe14690c7a..31a307a96b0 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -236,11 +236,11 @@ void ReplicationCoordinatorMock::setMyHeartbeatMessage(const std::string& msg) {
}
void ReplicationCoordinatorMock::_setMyLastAppliedOpTimeAndWallTime(
- const OpTimeAndWallTime& opTimeAndWallTime) {
+ WithLock lk, const OpTimeAndWallTime& opTimeAndWallTime) {
_myLastAppliedOpTime = opTimeAndWallTime.opTime;
_myLastAppliedWallTime = opTimeAndWallTime.wallTime;
- setCurrentCommittedSnapshotOpTime(opTimeAndWallTime.opTime);
+ _setCurrentCommittedSnapshotOpTime(lk, opTimeAndWallTime.opTime);
if (auto storageEngine = _service->getStorageEngine()) {
if (auto snapshotManager = storageEngine->getSnapshotManager()) {
@@ -253,7 +253,7 @@ void ReplicationCoordinatorMock::setMyLastAppliedOpTimeAndWallTime(
const OpTimeAndWallTime& opTimeAndWallTime) {
stdx::lock_guard<Mutex> lk(_mutex);
- _setMyLastAppliedOpTimeAndWallTime(opTimeAndWallTime);
+ _setMyLastAppliedOpTimeAndWallTime(lk, opTimeAndWallTime);
}
void ReplicationCoordinatorMock::setMyLastDurableOpTimeAndWallTime(
@@ -269,7 +269,7 @@ void ReplicationCoordinatorMock::setMyLastAppliedOpTimeAndWallTimeForward(
stdx::lock_guard<Mutex> lk(_mutex);
if (opTimeAndWallTime.opTime > _myLastAppliedOpTime) {
- _setMyLastAppliedOpTimeAndWallTime(opTimeAndWallTime);
+ _setMyLastAppliedOpTimeAndWallTime(lk, opTimeAndWallTime);
}
}
@@ -657,11 +657,17 @@ Status ReplicationCoordinatorMock::updateTerm(OperationContext* opCtx, long long
void ReplicationCoordinatorMock::clearCommittedSnapshot() {}
-void ReplicationCoordinatorMock::setCurrentCommittedSnapshotOpTime(OpTime time) {
+void ReplicationCoordinatorMock::_setCurrentCommittedSnapshotOpTime(WithLock lk, OpTime time) {
_currentCommittedSnapshotOpTime = time;
}
+void ReplicationCoordinatorMock::setCurrentCommittedSnapshotOpTime(OpTime time) {
+ stdx::lock_guard<Mutex> lk(_mutex);
+ _setCurrentCommittedSnapshotOpTime(lk, time);
+}
+
OpTime ReplicationCoordinatorMock::getCurrentCommittedSnapshotOpTime() const {
+ stdx::lock_guard<Mutex> lk(_mutex);
return _currentCommittedSnapshotOpTime;
}
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index 3ac7686ea34..dbe7b28ef83 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -422,7 +422,9 @@ public:
virtual WriteConcernTagChanges* getWriteConcernTagChanges() override;
private:
- void _setMyLastAppliedOpTimeAndWallTime(const OpTimeAndWallTime& opTimeAndWallTime);
+ void _setMyLastAppliedOpTimeAndWallTime(WithLock lk,
+ const OpTimeAndWallTime& opTimeAndWallTime);
+ void _setCurrentCommittedSnapshotOpTime(WithLock lk, OpTime time);
ServiceContext* const _service;
ReplSettings _settings;
diff --git a/src/mongo/db/repl/roll_back_local_operations_test.cpp b/src/mongo/db/repl/roll_back_local_operations_test.cpp
index b71765e33d3..70421f959e1 100644
--- a/src/mongo/db/repl/roll_back_local_operations_test.cpp
+++ b/src/mongo/db/repl/roll_back_local_operations_test.cpp
@@ -321,7 +321,8 @@ public:
DBClientConnectionForTest(int numInitFailures) : _initFailuresLeft(numInitFailures) {}
std::unique_ptr<DBClientCursor> find(FindCommandRequest findRequest,
- const ReadPreferenceSetting& readPref) override {
+ const ReadPreferenceSetting& readPref,
+ ExhaustMode exhaustMode) override {
if (_initFailuresLeft > 0) {
_initFailuresLeft--;
LOGV2(21657,
diff --git a/src/mongo/db/repl/rollback_source_impl.cpp b/src/mongo/db/repl/rollback_source_impl.cpp
index 9c56b0ff21e..8b427be197c 100644
--- a/src/mongo/db/repl/rollback_source_impl.cpp
+++ b/src/mongo/db/repl/rollback_source_impl.cpp
@@ -94,7 +94,8 @@ std::pair<BSONObj, NamespaceString> RollbackSourceImpl::findOneByUUID(const std:
auto cursor =
std::make_unique<DBClientCursor>(_getConnection(),
std::move(findRequest),
- ReadPreferenceSetting{ReadPreference::SecondaryPreferred});
+ ReadPreferenceSetting{ReadPreference::SecondaryPreferred},
+ false /*isExhaust*/);
uassert(6138500, "find one by UUID failed", cursor->init());
BSONObj result = cursor->more() ? cursor->nextSafe() : BSONObj{};
NamespaceString nss = cursor->getNamespaceString();
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
index e527aa204eb..8777903803c 100644
--- a/src/mongo/db/repl/rs_rollback.cpp
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -949,7 +949,7 @@ void rollbackCreateIndexes(OperationContext* opCtx, UUID uuid, std::set<std::str
"indexName"_attr = indexName);
WriteUnitOfWork wuow(opCtx);
- dropIndex(opCtx, collection.getWritableCollection(), indexName, *nss);
+ dropIndex(opCtx, collection.getWritableCollection(opCtx), indexName, *nss);
wuow.commit();
LOGV2_DEBUG(21673,
@@ -1634,12 +1634,12 @@ void rollback_internal::syncFixUp(OperationContext* opCtx,
WriteUnitOfWork wuow(opCtx);
// Set collection to whatever temp status is on the sync source.
- collection.getWritableCollection()->setIsTemp(opCtx, options.temp);
+ collection.getWritableCollection(opCtx)->setIsTemp(opCtx, options.temp);
// Set any document validation options. We update the validator fields without
// parsing/validation, since we fetched the options object directly from the sync
// source, and we should set our validation options to match it exactly.
- auto validatorStatus = collection.getWritableCollection()->updateValidator(
+ auto validatorStatus = collection.getWritableCollection(opCtx)->updateValidator(
opCtx, options.validator, options.validationLevel, options.validationAction);
if (!validatorStatus.isOK()) {
throw RSFatalException(str::stream()
@@ -1811,16 +1811,16 @@ void rollback_internal::syncFixUp(OperationContext* opCtx,
// RecordId loc = Helpers::findById(nsd, pattern);
if (!loc.isNull()) {
try {
- writeConflictRetry(opCtx,
- "cappedTruncateAfter",
- collection->ns().ns(),
- [&] {
- WriteUnitOfWork wunit(opCtx);
- collection.getWritableCollection()
- ->cappedTruncateAfter(
- opCtx, loc, true);
- wunit.commit();
- });
+ writeConflictRetry(
+ opCtx,
+ "cappedTruncateAfter",
+ collection->ns().ns(),
+ [&] {
+ WriteUnitOfWork wunit(opCtx);
+ collection.getWritableCollection(opCtx)
+ ->cappedTruncateAfter(opCtx, loc, true);
+ wunit.commit();
+ });
} catch (const DBException& e) {
if (e.code() == 13415) {
// hack: need to just make cappedTruncate do this...
@@ -1828,7 +1828,7 @@ void rollback_internal::syncFixUp(OperationContext* opCtx,
opCtx, "truncate", collection->ns().ns(), [&] {
WriteUnitOfWork wunit(opCtx);
uassertStatusOK(
- collection.getWritableCollection()
+ collection.getWritableCollection(opCtx)
->truncate(opCtx));
wunit.commit();
});
@@ -2012,14 +2012,6 @@ void rollback_internal::syncFixUp(OperationContext* opCtx,
validator->resetKeyManagerCache();
}
- // Force the config server to update its shard registry on next access. Otherwise it may have
- // the stale data that has been just rolled back.
- if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
- if (auto shardRegistry = Grid::get(opCtx)->shardRegistry()) {
- shardRegistry->clearEntries();
- }
- }
-
// Force the default read/write concern cache to reload on next access in case the defaults
// document was rolled back.
ReadWriteConcernDefaults::get(opCtx).invalidate();
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
index 1a90e3a57c8..22d7c7648e4 100644
--- a/src/mongo/db/repl/storage_interface_impl.cpp
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -50,6 +50,7 @@
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/catalog/index_catalog.h"
+#include "mongo/db/change_stream_change_collection_manager.h"
#include "mongo/db/client.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/exception_util.h"
@@ -323,12 +324,6 @@ template <typename AutoGetCollectionType>
StatusWith<const CollectionPtr*> getCollection(const AutoGetCollectionType& autoGetCollection,
const NamespaceStringOrUUID& nsOrUUID,
const std::string& message) {
- if (!autoGetCollection.getDb()) {
- StringData dbName = nsOrUUID.nss() ? nsOrUUID.nss()->db() : nsOrUUID.dbname();
- return {ErrorCodes::NamespaceNotFound,
- str::stream() << "Database [" << dbName << "] not found. " << message};
- }
-
const auto& collection = autoGetCollection.getCollection();
if (!collection) {
return {ErrorCodes::NamespaceNotFound,
@@ -347,6 +342,8 @@ Status insertDocumentsSingleBatch(OperationContext* opCtx,
boost::optional<AutoGetOplog> autoOplog;
const CollectionPtr* collection;
+ bool shouldWriteToChangeCollections = false;
+
auto nss = nsOrUUID.nss();
if (nss && nss->isOplog()) {
// Simplify locking rules for oplog collection.
@@ -355,6 +352,9 @@ Status insertDocumentsSingleBatch(OperationContext* opCtx,
if (!*collection) {
return {ErrorCodes::NamespaceNotFound, "Oplog collection does not exist"};
}
+
+ shouldWriteToChangeCollections =
+ ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive();
} else {
autoColl.emplace(opCtx, nsOrUUID, MODE_IX);
auto collectionResult = getCollection(
@@ -371,6 +371,18 @@ Status insertDocumentsSingleBatch(OperationContext* opCtx,
if (!status.isOK()) {
return status;
}
+
+ // Insert oplog entries to change collections if we are running in the serverless and the 'nss'
+ // is 'local.oplog.rs'.
+ if (shouldWriteToChangeCollections) {
+ auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx);
+ status = changeCollectionManager.insertDocumentsToChangeCollection(
+ opCtx, begin, end, nullOpDebug);
+ if (!status.isOK()) {
+ return status;
+ }
+ }
+
wunit.commit();
return Status::OK();
diff --git a/src/mongo/db/repl/storage_interface_impl_test.cpp b/src/mongo/db/repl/storage_interface_impl_test.cpp
index 3c942ed7361..14362a56821 100644
--- a/src/mongo/db/repl/storage_interface_impl_test.cpp
+++ b/src/mongo/db/repl/storage_interface_impl_test.cpp
@@ -2684,7 +2684,6 @@ TEST_F(StorageInterfaceImplTest,
auto doc = BSON("_id" << 0 << "x" << 1);
auto status = storage.upsertById(opCtx, nss, doc["_id"], doc);
ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, status);
- ASSERT_EQUALS("Database [nosuchdb] not found. Unable to update document.", status.reason());
}
TEST_F(StorageInterfaceImplTest,
@@ -2879,10 +2878,6 @@ TEST_F(StorageInterfaceImplTest, DeleteByFilterReturnsNamespaceNotFoundWhenDatab
auto filter = BSON("x" << 1);
auto status = storage.deleteByFilter(opCtx, nss, filter);
ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, status);
- ASSERT_EQUALS(std::string(str::stream()
- << "Database [nosuchdb] not found. Unable to delete documents in "
- << nss.ns() << " using filter " << filter),
- status.reason());
}
TEST_F(StorageInterfaceImplTest, DeleteByFilterReturnsBadValueWhenFilterContainsUnknownOperator) {
diff --git a/src/mongo/db/repl/storage_timestamp_test.cpp b/src/mongo/db/repl/storage_timestamp_test.cpp
index cc0f88d0779..fb9325c1978 100644
--- a/src/mongo/db/repl/storage_timestamp_test.cpp
+++ b/src/mongo/db/repl/storage_timestamp_test.cpp
@@ -162,7 +162,7 @@ Status createIndexFromSpec(OperationContext* opCtx,
}
WriteUnitOfWork wunit(opCtx);
ASSERT_OK(indexer.commit(opCtx,
- collection.getWritableCollection(),
+ collection.getWritableCollection(opCtx),
MultiIndexBlock::kNoopOnCreateEachFn,
MultiIndexBlock::kNoopOnCommitFn));
LogicalTime indexTs = clock->tickClusterTime(1);
@@ -394,7 +394,7 @@ public:
// Timestamping index completion. Primaries write an oplog entry.
ASSERT_OK(
indexer.commit(_opCtx,
- coll.getWritableCollection(),
+ coll.getWritableCollection(_opCtx),
[&](const BSONObj& indexSpec) {
_opCtx->getServiceContext()->getOpObserver()->onCreateIndex(
_opCtx, coll->ns(), coll->uuid(), indexSpec, false);
@@ -2787,7 +2787,7 @@ TEST_F(StorageTimestampTest, IndexBuildsResolveErrorsDuringStateChangeToPrimary)
WriteUnitOfWork wuow(_opCtx);
ASSERT_OK(
indexer.commit(_opCtx,
- collection.getWritableCollection(),
+ collection.getWritableCollection(_opCtx),
[&](const BSONObj& indexSpec) {
_opCtx->getServiceContext()->getOpObserver()->onCreateIndex(
_opCtx, collection->ns(), collection->uuid(), indexSpec, false);
diff --git a/src/mongo/db/repl/tenant_collection_cloner.cpp b/src/mongo/db/repl/tenant_collection_cloner.cpp
index 9e6d5f7e02a..165538954bd 100644
--- a/src/mongo/db/repl/tenant_collection_cloner.cpp
+++ b/src/mongo/db/repl/tenant_collection_cloner.cpp
@@ -474,36 +474,42 @@ BaseCloner::AfterStageBehavior TenantCollectionCloner::queryStage() {
}
void TenantCollectionCloner::runQuery() {
- const BSONObj& filter = _lastDocId.isEmpty()
- ? BSONObj{} // Use $expr and the aggregation version of $gt to avoid type bracketing.
- : BSON("$expr" << BSON("$gt" << BSON_ARRAY("$_id" << _lastDocId["_id"])));
-
- auto query = _collectionOptions.clusteredIndex
- // RecordIds are _id values and has no separate _id index
- ? Query().hint(BSON("$natural" << 1))
- : Query().hint(BSON("_id" << 1));
-
-
- // Any errors that are thrown here (including NamespaceNotFound) will be handled on the stage
- // level.
- getClient()->query_DEPRECATED(
- [this](DBClientCursorBatchIterator& iter) { handleNextBatch(iter); },
- _sourceDbAndUuid,
- filter,
- query,
- nullptr /* fieldsToReturn */,
- QueryOption_NoCursorTimeout | QueryOption_SecondaryOk |
- (collectionClonerUsesExhaust ? QueryOption_Exhaust : 0),
- _collectionClonerBatchSize,
- ReadConcernArgs(ReadConcernLevel::kMajorityReadConcern).toBSONInner());
+ FindCommandRequest findCmd{_sourceDbAndUuid};
+
+ findCmd.setFilter(
+ _lastDocId.isEmpty()
+ ? BSONObj{} // Use $expr and the aggregation version of $gt to avoid type bracketing.
+ : BSON("$expr" << BSON("$gt" << BSON_ARRAY("$_id" << _lastDocId["_id"]))));
+
+ if (_collectionOptions.clusteredIndex) {
+ findCmd.setHint(BSON("$natural" << 1));
+ } else {
+ findCmd.setHint(BSON("_id" << 1));
+ }
+
+ findCmd.setNoCursorTimeout(true);
+ findCmd.setReadConcern(ReadConcernArgs(ReadConcernLevel::kMajorityReadConcern).toBSONInner());
+ if (_collectionClonerBatchSize) {
+ findCmd.setBatchSize(_collectionClonerBatchSize);
+ }
+
+ ExhaustMode exhaustMode = collectionClonerUsesExhaust ? ExhaustMode::kOn : ExhaustMode::kOff;
+
+ auto cursor = getClient()->find(
+ std::move(findCmd), ReadPreferenceSetting{ReadPreference::SecondaryPreferred}, exhaustMode);
+
+ // Process the results of the cursor one batch at a time.
+ while (cursor->more()) {
+ handleNextBatch(*cursor);
+ }
}
-void TenantCollectionCloner::handleNextBatch(DBClientCursorBatchIterator& iter) {
+void TenantCollectionCloner::handleNextBatch(DBClientCursor& cursor) {
{
stdx::lock_guard<Latch> lk(_mutex);
_stats.receivedBatches++;
- while (iter.moreInCurrentBatch()) {
- _documentsToInsert.emplace_back(iter.nextSafe());
+ while (cursor.moreInCurrentBatch()) {
+ _documentsToInsert.emplace_back(cursor.nextSafe());
}
}
diff --git a/src/mongo/db/repl/tenant_collection_cloner.h b/src/mongo/db/repl/tenant_collection_cloner.h
index b9c22928917..12bd9bbb832 100644
--- a/src/mongo/db/repl/tenant_collection_cloner.h
+++ b/src/mongo/db/repl/tenant_collection_cloner.h
@@ -209,10 +209,10 @@ private:
AfterStageBehavior queryStage();
/**
- * Put all results from a query batch into a buffer to be inserted, and schedule
- * it to be inserted.
+ * Put all results from a query batch into a buffer to be inserted, and schedule it to be
+ * inserted.
*/
- void handleNextBatch(DBClientCursorBatchIterator& iter);
+ void handleNextBatch(DBClientCursor& cursor);
/**
* Called whenever there is a new batch of documents ready from the DBClientConnection.
diff --git a/src/mongo/db/repl/tenant_file_cloner.cpp b/src/mongo/db/repl/tenant_file_cloner.cpp
index 83ae3c65fc8..b909039eed1 100644
--- a/src/mongo/db/repl/tenant_file_cloner.cpp
+++ b/src/mongo/db/repl/tenant_file_cloner.cpp
@@ -188,8 +188,7 @@ void TenantFileCloner::runQuery() {
getClient(), std::move(aggRequest), true /* secondaryOk */, useExhaust));
try {
while (cursor->more()) {
- DBClientCursorBatchIterator iter(*cursor);
- handleNextBatch(iter);
+ handleNextBatch(*cursor);
}
} catch (const DBException& e) {
// We cannot continue after an error when processing exhaust cursors. Instead we must
@@ -207,7 +206,7 @@ void TenantFileCloner::runQuery() {
}
}
-void TenantFileCloner::handleNextBatch(DBClientCursorBatchIterator& iter) {
+void TenantFileCloner::handleNextBatch(DBClientCursor& cursor) {
LOGV2_DEBUG(6113307,
3,
"TenantFileCloner handleNextBatch",
@@ -215,7 +214,7 @@ void TenantFileCloner::handleNextBatch(DBClientCursorBatchIterator& iter) {
"backupId"_attr = _backupId,
"remoteFile"_attr = _remoteFileName,
"fileOffset"_attr = getFileOffset(),
- "moreInCurrentBatch"_attr = iter.moreInCurrentBatch());
+ "moreInCurrentBatch"_attr = cursor.moreInCurrentBatch());
{
stdx::lock_guard<TenantMigrationSharedData> lk(*getSharedData());
if (!getSharedData()->getStatus(lk).isOK()) {
@@ -225,11 +224,11 @@ void TenantFileCloner::handleNextBatch(DBClientCursorBatchIterator& iter) {
str::stream() << message << ": " << getSharedData()->getStatus(lk));
}
}
- while (iter.moreInCurrentBatch()) {
+ while (cursor.moreInCurrentBatch()) {
stdx::lock_guard<Latch> lk(_mutex);
_stats.receivedBatches++;
- while (iter.moreInCurrentBatch()) {
- _dataToWrite.emplace_back(iter.nextSafe());
+ while (cursor.moreInCurrentBatch()) {
+ _dataToWrite.emplace_back(cursor.nextSafe());
}
}
diff --git a/src/mongo/db/repl/tenant_file_cloner.h b/src/mongo/db/repl/tenant_file_cloner.h
index 90e37946224..27ff89fbc3a 100644
--- a/src/mongo/db/repl/tenant_file_cloner.h
+++ b/src/mongo/db/repl/tenant_file_cloner.h
@@ -160,7 +160,7 @@ private:
/**
* Put all results from a query batch into a buffer, and schedule it to be written to disk.
*/
- void handleNextBatch(DBClientCursorBatchIterator& iter);
+ void handleNextBatch(DBClientCursor& cursor);
/**
* Called whenever there is a new batch of documents ready from the DBClientConnection.
diff --git a/src/mongo/db/repl/tenant_file_importer_service.cpp b/src/mongo/db/repl/tenant_file_importer_service.cpp
index 85d95d7e22d..af565c3c713 100644
--- a/src/mongo/db/repl/tenant_file_importer_service.cpp
+++ b/src/mongo/db/repl/tenant_file_importer_service.cpp
@@ -118,14 +118,21 @@ TenantFileImporterService* TenantFileImporterService::get(ServiceContext* servic
void TenantFileImporterService::startMigration(const UUID& migrationId,
const StringData& donorConnectionString) {
stdx::lock_guard lk(_mutex);
+ if (migrationId == _migrationId && _state >= State::kStarted && _state < State::kInterrupted) {
+ return;
+ }
+
_reset(lk);
_migrationId = migrationId;
_donorConnectionString = donorConnectionString.toString();
- _eventQueue = std::make_unique<Queue>();
- _state.setState(ImporterState::State::kStarted);
+ _eventQueue = std::make_shared<Queue>();
+ _state = State::kStarted;
- _thread = std::make_unique<stdx::thread>([this] {
+ _thread = std::make_unique<stdx::thread>([this, migrationId] {
Client::initThread("TenantFileImporterService");
+ LOGV2_INFO(6378904,
+ "TenantFileImporterService starting worker thread",
+ "migrationId"_attr = migrationId.toString());
auto opCtx = cc().makeOperationContext();
_handleEvents(opCtx.get());
});
@@ -134,48 +141,55 @@ void TenantFileImporterService::startMigration(const UUID& migrationId,
void TenantFileImporterService::learnedFilename(const UUID& migrationId,
const BSONObj& metadataDoc) {
stdx::lock_guard lk(_mutex);
+ if (migrationId == _migrationId && _state >= State::kLearnedAllFilenames) {
+ return;
+ }
+
tassert(8423347,
"Called learnedFilename with migrationId {}, but {} is active"_format(
migrationId.toString(), _migrationId ? _migrationId->toString() : "no migration"),
migrationId == _migrationId);
- _state.setState(ImporterState::State::kLearnedFilename);
+ _state = State::kLearnedFilename;
ImporterEvent event{ImporterEvent::Type::kLearnedFileName, migrationId};
event.metadataDoc = metadataDoc.getOwned();
+ invariant(_eventQueue);
auto success = _eventQueue->tryPush(std::move(event));
- uassert(6378904,
+ uassert(6378903,
"TenantFileImporterService failed to push '{}' event without blocking"_format(
- _state.toString()),
+ stateToString(_state)),
success);
}
void TenantFileImporterService::learnedAllFilenames(const UUID& migrationId) {
stdx::lock_guard lk(_mutex);
+ if (migrationId == _migrationId && _state >= State::kLearnedAllFilenames) {
+ return;
+ }
+
tassert(8423345,
"Called learnedAllFilenames with migrationId {}, but {} is active"_format(
migrationId.toString(), _migrationId ? _migrationId->toString() : "no migration"),
migrationId == _migrationId);
- _state.setState(ImporterState::State::kLearnedAllFilenames);
+ _state = State::kLearnedAllFilenames;
+ invariant(_eventQueue);
auto success = _eventQueue->tryPush({ImporterEvent::Type::kLearnedAllFilenames, migrationId});
- uassert(6378905,
+ uassert(6378902,
"TenantFileImporterService failed to push '{}' event without blocking"_format(
- _state.toString()),
+ stateToString(_state)),
success);
}
void TenantFileImporterService::interrupt(const UUID& migrationId) {
stdx::lock_guard lk(_mutex);
- if (!_migrationId) {
- return;
- }
if (migrationId != _migrationId) {
LOGV2_WARNING(
- 6378907,
+ 6378901,
"Called interrupt with migrationId {migrationId}, but {activeMigrationId} is active",
"migrationId"_attr = migrationId.toString(),
- "activeMigrationId"_attr = _migrationId->toString());
+ "activeMigrationId"_attr = _migrationId ? _migrationId->toString() : "no migration");
return;
}
_interrupt(lk);
@@ -195,8 +209,11 @@ void TenantFileImporterService::_handleEvents(OperationContext* opCtx) {
std::string donorConnectionString;
boost::optional<UUID> migrationId;
+ std::shared_ptr<Queue> eventQueueRef;
{
stdx::lock_guard lk(_mutex);
+ invariant(_eventQueue);
+ eventQueueRef = _eventQueue;
donorConnectionString = _donorConnectionString;
migrationId = _migrationId;
}
@@ -206,9 +223,9 @@ void TenantFileImporterService::_handleEvents(OperationContext* opCtx) {
opCtx->checkForInterrupt();
try {
- event = _eventQueue->pop(opCtx);
+ event = eventQueueRef->pop(opCtx);
} catch (const ExceptionFor<ErrorCodes::ProducerConsumerQueueEndClosed>& err) {
- LOGV2_WARNING(6378908, "Event queue was interrupted", "error"_attr = err);
+ LOGV2_WARNING(6378900, "Event queue was interrupted", "error"_attr = err);
break;
}
@@ -259,7 +276,7 @@ void TenantFileImporterService::_voteImportedFiles(OperationContext* opCtx) {
}
void TenantFileImporterService::_interrupt(WithLock) {
- if (_state.is(ImporterState::State::kInterrupted)) {
+ if (_state == State::kInterrupted) {
return;
}
@@ -276,11 +293,16 @@ void TenantFileImporterService::_interrupt(WithLock) {
// _opCtx->markKilled(ErrorCodes::Interrupted);
}
- _state.setState(ImporterState::State::kInterrupted);
+ _state = State::kInterrupted;
}
void TenantFileImporterService::_reset(WithLock) {
- _migrationId.reset();
+ if (_migrationId) {
+ LOGV2_INFO(6378905,
+ "TenantFileImporterService resetting migration",
+ "migrationId"_attr = _migrationId->toString());
+ _migrationId.reset();
+ }
if (_thread && _thread->joinable()) {
_thread->join();
@@ -292,6 +314,6 @@ void TenantFileImporterService::_reset(WithLock) {
}
// TODO SERVER-66907: how should we be resetting _opCtx?
- _state.setState(ImporterState::State::kUninitialized);
+ _state = State::kUninitialized;
}
} // namespace mongo::repl
diff --git a/src/mongo/db/repl/tenant_file_importer_service.h b/src/mongo/db/repl/tenant_file_importer_service.h
index 9a27af816da..d7188f9a0e6 100644
--- a/src/mongo/db/repl/tenant_file_importer_service.h
+++ b/src/mongo/db/repl/tenant_file_importer_service.h
@@ -82,75 +82,35 @@ private:
boost::optional<UUID> _migrationId;
std::string _donorConnectionString;
Mutex _mutex = MONGO_MAKE_LATCH("TenantFileImporterService::_mutex");
- class ImporterState {
- public:
- enum class State {
- kUninitialized,
- kStarted,
- kLearnedFilename,
- kLearnedAllFilenames,
- kInterrupted
- };
- void setState(State nextState) {
- tassert(6114403,
- str::stream() << "current state: " << toString(_state)
- << ", new state: " << toString(nextState),
- isValidTransition(nextState));
- _state = nextState;
- }
-
- bool is(State state) const {
- return _state == state;
- }
-
- StringData toString() const {
- return toString(_state);
- }
- private:
- static StringData toString(State value) {
- switch (value) {
- case State::kUninitialized:
- return "uninitialized";
- case State::kStarted:
- return "started";
- case State::kLearnedFilename:
- return "learned filename";
- case State::kLearnedAllFilenames:
- return "learned all filenames";
- case State::kInterrupted:
- return "interrupted";
- }
- MONGO_UNREACHABLE;
- return StringData();
- }
+ // Explicit State enum ordering defined here because we rely on comparison
+ // operators for state checking in various TenantFileImporterService methods.
+ enum class State {
+ kUninitialized = 0,
+ kStarted = 1,
+ kLearnedFilename = 2,
+ kLearnedAllFilenames = 3,
+ kInterrupted = 4
+ };
- bool isValidTransition(State newState) {
- if (_state == newState) {
- return true;
- }
-
- switch (_state) {
- case State::kUninitialized:
- return newState == State::kStarted || newState == State::kInterrupted;
- case State::kStarted:
- return newState == State::kInterrupted || newState == State::kLearnedFilename ||
- newState == State::kLearnedAllFilenames;
- case State::kLearnedFilename:
- return newState == State::kInterrupted || newState == State::kLearnedFilename ||
- newState == State::kLearnedAllFilenames;
- case State::kLearnedAllFilenames:
- return newState == State::kInterrupted;
- case State::kInterrupted:
- return newState == State::kUninitialized || newState == State::kStarted;
- }
- MONGO_UNREACHABLE;
+ static StringData stateToString(State state) {
+ switch (state) {
+ case State::kUninitialized:
+ return "uninitialized";
+ case State::kStarted:
+ return "started";
+ case State::kLearnedFilename:
+ return "learned filename";
+ case State::kLearnedAllFilenames:
+ return "learned all filenames";
+ case State::kInterrupted:
+ return "interrupted";
}
+ MONGO_UNREACHABLE;
+ return StringData();
+ }
- State _state = State::kUninitialized;
- };
-
- ImporterState _state;
+ State _state;
struct ImporterEvent {
enum class Type { kNone, kLearnedFileName, kLearnedAllFilenames };
@@ -166,6 +126,6 @@ private:
MultiProducerSingleConsumerQueue<ImporterEvent,
producer_consumer_queue_detail::DefaultCostFunction>;
- std::unique_ptr<Queue> _eventQueue;
+ std::shared_ptr<Queue> _eventQueue;
};
} // namespace mongo::repl
diff --git a/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp b/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp
index 53e7b24f135..fc693f64c20 100644
--- a/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp
+++ b/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp
@@ -437,7 +437,7 @@ void recoverTenantMigrationAccessBlockers(OperationContext* opCtx) {
// Recover TenantMigrationDonorAccessBlockers for ShardSplit.
PersistentTaskStore<ShardSplitDonorDocument> shardSplitDonorStore(
- NamespaceString::kTenantSplitDonorsNamespace);
+ NamespaceString::kShardSplitDonorsNamespace);
shardSplitDonorStore.forEach(opCtx, {}, [&](const ShardSplitDonorDocument& doc) {
// Skip creating a TenantMigrationDonorAccessBlocker for terminal shard split that have been
@@ -462,6 +462,8 @@ void recoverTenantMigrationAccessBlockers(OperationContext* opCtx) {
.add(tenantId.toString(), mtab);
switch (doc.getState()) {
+ case ShardSplitDonorStateEnum::kAbortingIndexBuilds:
+ break;
case ShardSplitDonorStateEnum::kBlocking:
invariant(doc.getBlockTimestamp());
mtab->startBlockingWrites();
diff --git a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
index 34700086793..4cfdb60b43c 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
@@ -282,11 +282,11 @@ void TenantMigrationRecipientOpObserver::onDelete(OperationContext* opCtx,
if (nss == NamespaceString::kTenantMigrationRecipientsNamespace &&
!tenant_migration_access_blocker::inRecoveryMode(opCtx)) {
if (tenantIdToDeleteDecoration(opCtx)) {
+ auto tenantId = tenantIdToDeleteDecoration(opCtx).get();
LOGV2_INFO(8423337, "Removing expired 'multitenant migration' migration");
- opCtx->recoveryUnit()->onCommit([opCtx](boost::optional<Timestamp>) {
+ opCtx->recoveryUnit()->onCommit([opCtx, tenantId](boost::optional<Timestamp>) {
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
- .remove(tenantIdToDeleteDecoration(opCtx).get(),
- TenantMigrationAccessBlocker::BlockerType::kRecipient);
+ .remove(tenantId, TenantMigrationAccessBlocker::BlockerType::kRecipient);
});
}
@@ -297,8 +297,7 @@ void TenantMigrationRecipientOpObserver::onDelete(OperationContext* opCtx,
"migrationId"_attr = migrationId);
opCtx->recoveryUnit()->onCommit([opCtx, migrationId](boost::optional<Timestamp>) {
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
- .removeRecipientAccessBlockersForMigration(
- migrationIdToDeleteDecoration(opCtx).get());
+ .removeRecipientAccessBlockersForMigration(migrationId);
repl::TenantFileImporterService::get(opCtx->getServiceContext())
->interrupt(migrationId);
});
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index facaf190ab8..f355b7a3ac6 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -43,6 +43,7 @@
#include "mongo/db/commands/tenant_migration_donor_cmds_gen.h"
#include "mongo/db/commands/test_commands_enabled.h"
#include "mongo/db/concurrency/exception_util.h"
+#include "mongo/db/concurrency/replication_state_transition_lock_guard.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/namespace_string.h"
@@ -213,7 +214,7 @@ public:
// Tenant migration does not require the metadata from the oplog query.
void processMetadata(const rpc::ReplSetMetadata& replMetadata,
- rpc::OplogQueryMetadata oqMetadata) final {}
+ const rpc::OplogQueryMetadata& oqMetadata) final {}
// Tenant migration does not change sync source depending on metadata.
ChangeSyncSourceAction shouldStopFetching(const HostAndPort& source,
@@ -2516,7 +2517,8 @@ void TenantMigrationRecipientService::Instance::_startOplogApplier() {
}
void TenantMigrationRecipientService::Instance::_setup() {
- auto opCtx = cc().makeOperationContext();
+ auto uniqueOpCtx = cc().makeOperationContext();
+ auto opCtx = uniqueOpCtx.get();
{
stdx::lock_guard lk(_mutex);
// Do not set the internal states if the migration is already interrupted.
@@ -2543,12 +2545,23 @@ void TenantMigrationRecipientService::Instance::_setup() {
_sharedData = std::make_unique<TenantMigrationSharedData>(
getGlobalServiceContext()->getFastClockSource(), getMigrationUUID(), resumePhase);
- _createOplogBuffer(lk, opCtx.get());
+ _createOplogBuffer(lk, opCtx);
}
// Start the oplog buffer outside the mutex to avoid deadlock on a concurrent stepdown.
try {
- _donorOplogBuffer->startup(opCtx.get());
+ // It is illegal to start the replicated donor buffer when the node is not primary.
+ // So ensure we are primary before trying to startup the oplog buffer.
+ repl::ReplicationStateTransitionLockGuard rstl(opCtx, MODE_IX);
+
+ auto oplogBufferNS = getOplogBufferNs(getMigrationUUID());
+ if (!repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase(
+ opCtx, oplogBufferNS.db())) {
+ uassertStatusOK(
+ Status(ErrorCodes::NotWritablePrimary, "Recipient node is no longer a primary."));
+ }
+
+ _donorOplogBuffer->startup(opCtx);
} catch (DBException& ex) {
ex.addContext("Failed to create oplog buffer collection.");
throw;
diff --git a/src/mongo/db/repl/tenant_oplog_applier_test.cpp b/src/mongo/db/repl/tenant_oplog_applier_test.cpp
index 4215b04043a..864960d84d7 100644
--- a/src/mongo/db/repl/tenant_oplog_applier_test.cpp
+++ b/src/mongo/db/repl/tenant_oplog_applier_test.cpp
@@ -201,10 +201,14 @@ private:
logv2::LogComponent::kTenantMigration, logv2::LogSeverity::Debug(1)};
};
+// TODO SERVER-67155 Remove all calls to DatabaseName::toStringWithTenantId() once the OplogEntry
+// deserializer passes "tid" to the NamespaceString constructor
TEST_F(TenantOplogApplierTest, NoOpsForSingleBatch) {
std::vector<OplogEntry> srcOps;
- srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(_dbName, "foo"), UUID::gen()));
- srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(_dbName, "bar"), UUID::gen()));
+ srcOps.push_back(makeInsertOplogEntry(
+ 1, NamespaceString(_dbName.toStringWithTenantId(), "foo"), UUID::gen()));
+ srcOps.push_back(makeInsertOplogEntry(
+ 2, NamespaceString(_dbName.toStringWithTenantId(), "bar"), UUID::gen()));
pushOps(srcOps);
auto writerPool = makeTenantMigrationWriterPool();
@@ -235,7 +239,8 @@ TEST_F(TenantOplogApplierTest, NoOpsForLargeBatch) {
std::vector<OplogEntry> srcOps;
// This should be big enough to use several threads to do the writing
for (int i = 0; i < 64; i++) {
- srcOps.push_back(makeInsertOplogEntry(i + 1, NamespaceString(_dbName, "foo"), UUID::gen()));
+ srcOps.push_back(makeInsertOplogEntry(
+ i + 1, NamespaceString(_dbName.toStringWithTenantId(), "foo"), UUID::gen()));
}
pushOps(srcOps);
@@ -266,10 +271,14 @@ TEST_F(TenantOplogApplierTest, NoOpsForLargeBatch) {
TEST_F(TenantOplogApplierTest, NoOpsForMultipleBatches) {
std::vector<OplogEntry> srcOps;
- srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(_dbName, "foo"), UUID::gen()));
- srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(_dbName, "bar"), UUID::gen()));
- srcOps.push_back(makeInsertOplogEntry(3, NamespaceString(_dbName, "baz"), UUID::gen()));
- srcOps.push_back(makeInsertOplogEntry(4, NamespaceString(_dbName, "bif"), UUID::gen()));
+ srcOps.push_back(makeInsertOplogEntry(
+ 1, NamespaceString(_dbName.toStringWithTenantId(), "foo"), UUID::gen()));
+ srcOps.push_back(makeInsertOplogEntry(
+ 2, NamespaceString(_dbName.toStringWithTenantId(), "bar"), UUID::gen()));
+ srcOps.push_back(makeInsertOplogEntry(
+ 3, NamespaceString(_dbName.toStringWithTenantId(), "baz"), UUID::gen()));
+ srcOps.push_back(makeInsertOplogEntry(
+ 4, NamespaceString(_dbName.toStringWithTenantId(), "bif"), UUID::gen()));
auto writerPool = makeTenantMigrationWriterPool();
@@ -305,14 +314,20 @@ TEST_F(TenantOplogApplierTest, NoOpsForMultipleBatches) {
TEST_F(TenantOplogApplierTest, NoOpsForLargeTransaction) {
std::vector<OplogEntry> innerOps1;
- innerOps1.push_back(makeInsertOplogEntry(11, NamespaceString(_dbName, "bar"), UUID::gen()));
- innerOps1.push_back(makeInsertOplogEntry(12, NamespaceString(_dbName, "bar"), UUID::gen()));
+ innerOps1.push_back(makeInsertOplogEntry(
+ 11, NamespaceString(_dbName.toStringWithTenantId(), "bar"), UUID::gen()));
+ innerOps1.push_back(makeInsertOplogEntry(
+ 12, NamespaceString(_dbName.toStringWithTenantId(), "bar"), UUID::gen()));
std::vector<OplogEntry> innerOps2;
- innerOps2.push_back(makeInsertOplogEntry(21, NamespaceString(_dbName, "bar"), UUID::gen()));
- innerOps2.push_back(makeInsertOplogEntry(22, NamespaceString(_dbName, "bar"), UUID::gen()));
+ innerOps2.push_back(makeInsertOplogEntry(
+ 21, NamespaceString(_dbName.toStringWithTenantId(), "bar"), UUID::gen()));
+ innerOps2.push_back(makeInsertOplogEntry(
+ 22, NamespaceString(_dbName.toStringWithTenantId(), "bar"), UUID::gen()));
std::vector<OplogEntry> innerOps3;
- innerOps3.push_back(makeInsertOplogEntry(31, NamespaceString(_dbName, "bar"), UUID::gen()));
- innerOps3.push_back(makeInsertOplogEntry(32, NamespaceString(_dbName, "bar"), UUID::gen()));
+ innerOps3.push_back(makeInsertOplogEntry(
+ 31, NamespaceString(_dbName.toStringWithTenantId(), "bar"), UUID::gen()));
+ innerOps3.push_back(makeInsertOplogEntry(
+ 32, NamespaceString(_dbName.toStringWithTenantId(), "bar"), UUID::gen()));
// Makes entries with ts from range [2, 5).
std::vector<OplogEntry> srcOps = makeMultiEntryTransactionOplogEntries(
@@ -353,7 +368,7 @@ TEST_F(TenantOplogApplierTest, CommitUnpreparedTransaction_DataPartiallyApplied)
client.createIndexes(NamespaceString::kSessionTransactionsTableNamespace.ns(),
{MongoDSessionCatalog::getConfigTxnPartialIndexSpec()});
}
- NamespaceString nss(_dbName, "bar");
+ NamespaceString nss(_dbName.toStringWithTenantId(), "bar");
auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
auto lsid = makeLogicalSessionId(_opCtx.get());
TxnNumber txnNum(0);
@@ -411,7 +426,8 @@ TEST_F(TenantOplogApplierTest, CommitUnpreparedTransaction_DataPartiallyApplied)
}
TEST_F(TenantOplogApplierTest, ApplyInsert_DatabaseMissing) {
- auto entry = makeInsertOplogEntry(1, NamespaceString(_dbName, "bar"), UUID::gen());
+ auto entry = makeInsertOplogEntry(
+ 1, NamespaceString(_dbName.toStringWithTenantId(), "bar"), UUID::gen());
bool onInsertsCalled = false;
_opObserver->onInsertsFn = [&](OperationContext* opCtx,
const NamespaceString&,
@@ -439,7 +455,8 @@ TEST_F(TenantOplogApplierTest, ApplyInsert_DatabaseMissing) {
TEST_F(TenantOplogApplierTest, ApplyInsert_CollectionMissing) {
createDatabase(_opCtx.get(), _dbName.toString());
- auto entry = makeInsertOplogEntry(1, NamespaceString(_dbName, "bar"), UUID::gen());
+ auto entry = makeInsertOplogEntry(
+ 1, NamespaceString(_dbName.toStringWithTenantId(), "bar"), UUID::gen());
bool onInsertsCalled = false;
_opObserver->onInsertsFn = [&](OperationContext* opCtx,
const NamespaceString&,
@@ -466,7 +483,7 @@ TEST_F(TenantOplogApplierTest, ApplyInsert_CollectionMissing) {
}
TEST_F(TenantOplogApplierTest, ApplyInsert_InsertExisting) {
- NamespaceString nss(_dbName, "bar");
+ NamespaceString nss(_dbName.toStringWithTenantId(), "bar");
auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
ASSERT_OK(getStorageInterface()->insertDocument(_opCtx.get(),
nss,
@@ -504,7 +521,7 @@ TEST_F(TenantOplogApplierTest, ApplyInsert_InsertExisting) {
}
TEST_F(TenantOplogApplierTest, ApplyInsert_UniqueKey_InsertExisting) {
- NamespaceString nss(_dbName, "bar");
+ NamespaceString nss(_dbName.toStringWithTenantId(), "bar");
auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
// Create unique key index on the collection.
@@ -545,7 +562,7 @@ TEST_F(TenantOplogApplierTest, ApplyInsert_UniqueKey_InsertExisting) {
}
TEST_F(TenantOplogApplierTest, ApplyInsert_Success) {
- NamespaceString nss(_dbName, "bar");
+ NamespaceString nss(_dbName.toStringWithTenantId(), "bar");
auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
auto entry = makeInsertOplogEntry(1, nss, uuid);
bool onInsertsCalled = false;
@@ -553,7 +570,9 @@ TEST_F(TenantOplogApplierTest, ApplyInsert_Success) {
[&](OperationContext* opCtx, const NamespaceString& nss, const std::vector<BSONObj>& docs) {
ASSERT_FALSE(onInsertsCalled);
onInsertsCalled = true;
- ASSERT_EQUALS(nss.db(), _dbName.toString());
+ // TODO Check that (nss.dbName() == _dbName) once the OplogEntry deserializer passes
+ // "tid" to the NamespaceString constructor
+ ASSERT_EQUALS(nss.dbName().db(), _dbName.toStringWithTenantId());
ASSERT_EQUALS(nss.coll(), "bar");
ASSERT_EQUALS(1, docs.size());
ASSERT_BSONOBJ_EQ(docs[0], entry.getObject());
@@ -581,9 +600,9 @@ TEST_F(TenantOplogApplierTest, ApplyInsert_Success) {
TEST_F(TenantOplogApplierTest, ApplyInserts_Grouped) {
// TODO(SERVER-50256): remove nss_workaround, which is used to work around a bug where
// the first operation assigned to a worker cannot be grouped.
- NamespaceString nss_workaround(_dbName, "a");
- NamespaceString nss1(_dbName, "bar");
- NamespaceString nss2(_dbName, "baz");
+ NamespaceString nss_workaround(_dbName.toStringWithTenantId(), "a");
+ NamespaceString nss1(_dbName.toStringWithTenantId(), "bar");
+ NamespaceString nss2(_dbName.toStringWithTenantId(), "baz");
auto uuid1 = createCollectionWithUuid(_opCtx.get(), nss1);
auto uuid2 = createCollectionWithUuid(_opCtx.get(), nss2);
std::vector<OplogEntry> entries;
@@ -641,7 +660,7 @@ TEST_F(TenantOplogApplierTest, ApplyInserts_Grouped) {
}
TEST_F(TenantOplogApplierTest, ApplyUpdate_MissingDocument) {
- NamespaceString nss(_dbName, "bar");
+ NamespaceString nss(_dbName.toStringWithTenantId(), "bar");
auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
auto entry = makeOplogEntry(
repl::OpTypeEnum::kUpdate, nss, uuid, BSON("$set" << BSON("a" << 1)), BSON("_id" << 0));
@@ -676,7 +695,7 @@ TEST_F(TenantOplogApplierTest, ApplyUpdate_MissingDocument) {
}
TEST_F(TenantOplogApplierTest, ApplyUpdate_Success) {
- NamespaceString nss(_dbName, "bar");
+ NamespaceString nss(_dbName.toStringWithTenantId(), "bar");
auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
ASSERT_OK(getStorageInterface()->insertDocument(_opCtx.get(), nss, {BSON("_id" << 0)}, 0));
auto entry = makeOplogEntry(
@@ -708,7 +727,8 @@ TEST_F(TenantOplogApplierTest, ApplyUpdate_Success) {
}
TEST_F(TenantOplogApplierTest, ApplyDelete_DatabaseMissing) {
- auto entry = makeOplogEntry(OpTypeEnum::kDelete, NamespaceString(_dbName, "bar"), UUID::gen());
+ auto entry = makeOplogEntry(
+ OpTypeEnum::kDelete, NamespaceString(_dbName.toStringWithTenantId(), "bar"), UUID::gen());
bool onDeleteCalled = false;
_opObserver->onDeleteFn = [&](OperationContext* opCtx,
const NamespaceString&,
@@ -738,7 +758,8 @@ TEST_F(TenantOplogApplierTest, ApplyDelete_DatabaseMissing) {
TEST_F(TenantOplogApplierTest, ApplyDelete_CollectionMissing) {
createDatabase(_opCtx.get(), _dbName.toString());
- auto entry = makeOplogEntry(OpTypeEnum::kDelete, NamespaceString(_dbName, "bar"), UUID::gen());
+ auto entry = makeOplogEntry(
+ OpTypeEnum::kDelete, NamespaceString(_dbName.toStringWithTenantId(), "bar"), UUID::gen());
bool onDeleteCalled = false;
_opObserver->onDeleteFn = [&](OperationContext* opCtx,
const NamespaceString&,
@@ -767,7 +788,7 @@ TEST_F(TenantOplogApplierTest, ApplyDelete_CollectionMissing) {
}
TEST_F(TenantOplogApplierTest, ApplyDelete_DocumentMissing) {
- NamespaceString nss(_dbName, "bar");
+ NamespaceString nss(_dbName.toStringWithTenantId(), "bar");
auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
auto entry = makeOplogEntry(OpTypeEnum::kDelete, nss, uuid, BSON("_id" << 0));
bool onDeleteCalled = false;
@@ -798,7 +819,7 @@ TEST_F(TenantOplogApplierTest, ApplyDelete_DocumentMissing) {
}
TEST_F(TenantOplogApplierTest, ApplyDelete_Success) {
- NamespaceString nss(_dbName, "bar");
+ NamespaceString nss(_dbName.toStringWithTenantId(), "bar");
auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
ASSERT_OK(getStorageInterface()->insertDocument(_opCtx.get(), nss, {BSON("_id" << 0)}, 0));
auto entry = makeOplogEntry(OpTypeEnum::kDelete, nss, uuid, BSON("_id" << 0));
@@ -814,7 +835,9 @@ TEST_F(TenantOplogApplierTest, ApplyDelete_Success) {
ASSERT_TRUE(opCtx->lockState()->isCollectionLockedForMode(nss, MODE_IX));
ASSERT_TRUE(opCtx->writesAreReplicated());
ASSERT_FALSE(args.fromMigrate);
- ASSERT_EQUALS(nss.db(), _dbName.toString());
+ // TODO SERVER-66708 Check that (nss.dbName() == _dbName) once the OplogEntry deserializer
+ // passes "tid" to the NamespaceString constructor
+ ASSERT_EQUALS(nss.dbName().db(), _dbName.toStringWithTenantId());
ASSERT_EQUALS(nss.coll(), "bar");
ASSERT_EQUALS(uuid, observer_uuid);
};
@@ -839,7 +862,7 @@ TEST_F(TenantOplogApplierTest, ApplyDelete_Success) {
}
TEST_F(TenantOplogApplierTest, ApplyCreateCollCommand_CollExisting) {
- NamespaceString nss(_dbName, "bar");
+ NamespaceString nss(_dbName.toStringWithTenantId(), "bar");
auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
auto op = BSON("op"
<< "c"
@@ -874,8 +897,8 @@ TEST_F(TenantOplogApplierTest, ApplyCreateCollCommand_CollExisting) {
}
TEST_F(TenantOplogApplierTest, ApplyRenameCollCommand_CollExisting) {
- NamespaceString nss1(_dbName, "foo");
- NamespaceString nss2(_dbName, "bar");
+ NamespaceString nss1(_dbName.toStringWithTenantId(), "foo");
+ NamespaceString nss2(_dbName.toStringWithTenantId(), "bar");
auto uuid = createCollectionWithUuid(_opCtx.get(), nss2);
auto op =
BSON("op"
@@ -914,7 +937,7 @@ TEST_F(TenantOplogApplierTest, ApplyRenameCollCommand_CollExisting) {
}
TEST_F(TenantOplogApplierTest, ApplyCreateCollCommand_Success) {
- NamespaceString nss(_dbName, "t");
+ NamespaceString nss(_dbName.toStringWithTenantId(), "t");
auto op =
BSON("op"
<< "c"
@@ -954,7 +977,7 @@ TEST_F(TenantOplogApplierTest, ApplyCreateCollCommand_Success) {
}
TEST_F(TenantOplogApplierTest, ApplyCreateIndexesCommand_Success) {
- NamespaceString nss(_dbName, "t");
+ NamespaceString nss(_dbName.toStringWithTenantId(), "t");
auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
auto op =
BSON("op"
@@ -1001,7 +1024,7 @@ TEST_F(TenantOplogApplierTest, ApplyCreateIndexesCommand_Success) {
}
TEST_F(TenantOplogApplierTest, ApplyStartIndexBuildCommand_Failure) {
- NamespaceString nss(_dbName, "t");
+ NamespaceString nss(_dbName.toStringWithTenantId(), "t");
auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
auto op = BSON("op"
<< "c"
@@ -1066,7 +1089,7 @@ TEST_F(TenantOplogApplierTest, ApplyCreateCollCommand_WrongNSS) {
}
TEST_F(TenantOplogApplierTest, ApplyDropIndexesCommand_IndexNotFound) {
- NamespaceString nss(_dbName, "bar");
+ NamespaceString nss(_dbName.toStringWithTenantId(), "bar");
auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
auto op = BSON("op"
<< "c"
@@ -1104,7 +1127,7 @@ TEST_F(TenantOplogApplierTest, ApplyDropIndexesCommand_IndexNotFound) {
}
TEST_F(TenantOplogApplierTest, ApplyCollModCommand_IndexNotFound) {
- NamespaceString nss(_dbName, "bar");
+ NamespaceString nss(_dbName.toStringWithTenantId(), "bar");
auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
auto op = BSON("op"
<< "c"
@@ -1148,7 +1171,7 @@ TEST_F(TenantOplogApplierTest, ApplyCollModCommand_IndexNotFound) {
TEST_F(TenantOplogApplierTest, ApplyCollModCommand_CollectionMissing) {
createDatabase(_opCtx.get(), _dbName.toString());
- NamespaceString nss(_dbName, "bar");
+ NamespaceString nss(_dbName.toStringWithTenantId(), "bar");
UUID uuid(UUID::gen());
auto op = BSON("op"
<< "c"
@@ -1312,7 +1335,8 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenNoop_Success) {
TEST_F(TenantOplogApplierTest, ApplyInsertThenResumeTokenNoopInDifferentBatch_Success) {
std::vector<OplogEntry> srcOps;
- srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(_dbName, "foo"), UUID::gen()));
+ srcOps.push_back(makeInsertOplogEntry(
+ 1, NamespaceString(_dbName.toStringWithTenantId(), "foo"), UUID::gen()));
srcOps.push_back(makeNoopOplogEntry(2, TenantMigrationRecipientService::kNoopMsg));
pushOps(srcOps);
auto writerPool = makeTenantMigrationWriterPool();
@@ -1349,7 +1373,8 @@ TEST_F(TenantOplogApplierTest, ApplyInsertThenResumeTokenNoopInDifferentBatch_Su
TEST_F(TenantOplogApplierTest, ApplyResumeTokenNoopThenInsertInSameBatch_Success) {
std::vector<OplogEntry> srcOps;
srcOps.push_back(makeNoopOplogEntry(1, TenantMigrationRecipientService::kNoopMsg));
- srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(_dbName, "foo"), UUID::gen()));
+ srcOps.push_back(makeInsertOplogEntry(
+ 2, NamespaceString(_dbName.toStringWithTenantId(), "foo"), UUID::gen()));
pushOps(srcOps);
auto writerPool = makeTenantMigrationWriterPool();
@@ -1380,7 +1405,8 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenNoopThenInsertInSameBatch_Success
TEST_F(TenantOplogApplierTest, ApplyResumeTokenInsertThenNoopSameTimestamp_Success) {
std::vector<OplogEntry> srcOps;
- srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(_dbName, "foo"), UUID::gen()));
+ srcOps.push_back(makeInsertOplogEntry(
+ 1, NamespaceString(_dbName.toStringWithTenantId(), "foo"), UUID::gen()));
srcOps.push_back(makeNoopOplogEntry(1, TenantMigrationRecipientService::kNoopMsg));
pushOps(srcOps);
ASSERT_EQ(srcOps[0].getOpTime(), srcOps[1].getOpTime());
@@ -1413,7 +1439,8 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenInsertThenNoopSameTimestamp_Succe
TEST_F(TenantOplogApplierTest, ApplyResumeTokenInsertThenNoop_Success) {
std::vector<OplogEntry> srcOps;
- srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(_dbName, "foo"), UUID::gen()));
+ srcOps.push_back(makeInsertOplogEntry(
+ 1, NamespaceString(_dbName.toStringWithTenantId(), "foo"), UUID::gen()));
srcOps.push_back(makeNoopOplogEntry(2, TenantMigrationRecipientService::kNoopMsg));
pushOps(srcOps);
auto writerPool = makeTenantMigrationWriterPool();
@@ -1445,8 +1472,8 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenInsertThenNoop_Success) {
TEST_F(TenantOplogApplierTest, ApplyInsert_MultiKeyIndex) {
createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace);
- NamespaceString indexedNss(_dbName, "indexedColl");
- NamespaceString nonIndexedNss(_dbName, "nonIndexedColl");
+ NamespaceString indexedNss(_dbName.toStringWithTenantId(), "indexedColl");
+ NamespaceString nonIndexedNss(_dbName.toStringWithTenantId(), "nonIndexedColl");
auto indexedCollUUID = createCollectionWithUuid(_opCtx.get(), indexedNss);
createCollection(_opCtx.get(), nonIndexedNss, CollectionOptions());
diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp
index 7f30b7b113d..c72bb2ddfb3 100644
--- a/src/mongo/db/repl/topology_coordinator.cpp
+++ b/src/mongo/db/repl/topology_coordinator.cpp
@@ -1364,14 +1364,14 @@ void TopologyCoordinator::setMyLastDurableOpTimeAndWallTime(OpTimeAndWallTime op
myMemberData.setLastDurableOpTimeAndWallTime(opTimeAndWallTime, now);
}
-StatusWith<bool> TopologyCoordinator::setLastOptime(const UpdatePositionArgs::UpdateInfo& args,
- Date_t now) {
+StatusWith<bool> TopologyCoordinator::setLastOptimeForMember(
+ const UpdatePositionArgs::UpdateInfo& args, Date_t now) {
if (_selfIndex == -1) {
// Ignore updates when we're in state REMOVED.
return Status(ErrorCodes::NotPrimaryOrSecondary,
"Received replSetUpdatePosition command but we are in state REMOVED");
}
- invariant(_rsConfig.isInitialized()); // Can only use setLastOptime in replSet mode.
+ invariant(_rsConfig.isInitialized()); // Can only use setLastOptimeForMember in replSet mode.
MemberId memberId;
try {
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index fb9f7a196f7..3285a5b4825 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -585,7 +585,7 @@ public:
* Returns a Status if the position could not be set, false if the last optimes for the node
* did not change, or true if either the last applied or last durable optime did change.
*/
- StatusWith<bool> setLastOptime(const UpdatePositionArgs::UpdateInfo& args, Date_t now);
+ StatusWith<bool> setLastOptimeForMember(const UpdatePositionArgs::UpdateInfo& args, Date_t now);
/**
* Sets the latest optime committed in the previous config to the current lastCommitted optime.