diff options
author | Eric Cox <eric.cox@mongodb.com> | 2022-06-24 13:52:42 +0000 |
---|---|---|
committer | Eric Cox <eric.cox@mongodb.com> | 2022-06-24 13:52:42 +0000 |
commit | e41eb06388b603a2575e826d87051eebd38d52f5 (patch) | |
tree | 2fd04f7aa3047bacb6b5f81ea802ae51ecd7b844 /src/mongo/db/repl | |
parent | e27fb371450c1aecbf3045c13c9a5257560ee615 (diff) | |
parent | d37641e0439f48745a656272a09eb121636ae7a2 (diff) | |
download | mongo-e41eb06388b603a2575e826d87051eebd38d52f5.tar.gz |
Merge branch 'master' into eric/id-hack-ix-scan-refactor
Diffstat (limited to 'src/mongo/db/repl')
46 files changed, 584 insertions, 484 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index e9bcecfbdbf..962477568b3 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -261,6 +261,7 @@ env.Library( '$BUILD_DIR/mongo/db/catalog/catalog_helpers', '$BUILD_DIR/mongo/db/catalog/database_holder', '$BUILD_DIR/mongo/db/catalog/multi_index_block', + '$BUILD_DIR/mongo/db/change_stream_change_collection_manager', '$BUILD_DIR/mongo/db/common', '$BUILD_DIR/mongo/db/concurrency/exception_util', '$BUILD_DIR/mongo/db/dbhelpers', @@ -529,9 +530,11 @@ env.Library( 'roll_back_local_operations', ], LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/catalog/database_holder', '$BUILD_DIR/mongo/db/catalog/import_collection_oplog_entry', '$BUILD_DIR/mongo/db/index_builds_coordinator_interface', '$BUILD_DIR/mongo/db/multitenancy', + '$BUILD_DIR/mongo/db/repl/tenant_migration_access_blocker', '$BUILD_DIR/mongo/db/s/sharding_runtime_d', '$BUILD_DIR/mongo/db/storage/historical_ident_tracker', '$BUILD_DIR/mongo/idl/server_parameter', @@ -619,6 +622,7 @@ env.Library( 'storage_interface', ], LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/change_stream_change_collection_manager', '$BUILD_DIR/mongo/db/commands/mongod_fsync', '$BUILD_DIR/mongo/db/concurrency/exception_util', '$BUILD_DIR/mongo/db/storage/storage_control', @@ -1705,6 +1709,7 @@ if wiredtiger: '$BUILD_DIR/mongo/db/logical_time', '$BUILD_DIR/mongo/db/multitenancy', '$BUILD_DIR/mongo/db/op_observer_impl', + '$BUILD_DIR/mongo/db/pipeline/change_stream_expired_pre_image_remover', '$BUILD_DIR/mongo/db/query/command_request_response', '$BUILD_DIR/mongo/db/s/sharding_runtime_d', '$BUILD_DIR/mongo/db/service_context_d_test_fixture', diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp index 972c1fb2580..4887982c95c 100644 --- a/src/mongo/db/repl/apply_ops.cpp +++ b/src/mongo/db/repl/apply_ops.cpp @@ -28,11 +28,10 @@ */ -#include "mongo/platform/basic.h" - #include "mongo/db/repl/apply_ops.h" #include "mongo/bson/util/bson_extract.h" +#include "mongo/client/client_deprecated.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" @@ -297,12 +296,11 @@ Status _checkPrecondition(OperationContext* opCtx, DBDirectClient db(opCtx); // The preconditions come in "q: {{query: {...}, orderby: ..., etc.}}" format. This format // is no longer used either internally or over the wire in other contexts. We are using a - // legacy API from 'DBDirectClient' in order to parse this format and convert it into the + // legacy API from 'client_deprecated' in order to parse this format and convert it into the // corresponding find command. - auto preconditionQuery = Query::fromBSONDeprecated(preCondition["q"].Obj()); - auto cursor = - db.query_DEPRECATED(nss, preconditionQuery.getFilter(), preconditionQuery, 1 /*limit*/); - BSONObj realres = cursor->more() ? cursor->nextSafe() : BSONObj{}; + FindCommandRequest findCmd{nss}; + client_deprecated::initFindFromLegacyOptions(preCondition["q"].Obj(), 0, &findCmd); + BSONObj realres = db.findOne(std::move(findCmd)); // Get collection default collation. auto databaseHolder = DatabaseHolder::get(opCtx); diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.cpp b/src/mongo/db/repl/collection_bulk_loader_impl.cpp index 574607ea257..f000e93150c 100644 --- a/src/mongo/db/repl/collection_bulk_loader_impl.cpp +++ b/src/mongo/db/repl/collection_bulk_loader_impl.cpp @@ -95,7 +95,8 @@ Status CollectionBulkLoaderImpl::init(const std::vector<BSONObj>& secondaryIndex UnreplicatedWritesBlock uwb(_opCtx.get()); // This enforces the buildIndexes setting in the replica set configuration. CollectionWriter collWriter(_opCtx.get(), *_collection); - auto indexCatalog = collWriter.getWritableCollection()->getIndexCatalog(); + auto indexCatalog = + collWriter.getWritableCollection(_opCtx.get())->getIndexCatalog(); auto specs = indexCatalog->removeExistingIndexesNoChecks( _opCtx.get(), collWriter.get(), secondaryIndexSpecs); if (specs.size()) { diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index bde00eef906..e380fbe6238 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -317,38 +317,43 @@ BaseCloner::AfterStageBehavior CollectionCloner::setupIndexBuildersForUnfinished } void CollectionCloner::runQuery() { - // Non-resumable query. - Query query; + FindCommandRequest findCmd{_sourceDbAndUuid}; if (_resumeToken) { // Resume the query from where we left off. LOGV2_DEBUG(21133, 1, "Collection cloner will resume the last successful query"); - query.requestResumeToken(true).resumeAfter(_resumeToken.get()); + findCmd.setRequestResumeToken(true); + findCmd.setResumeAfter(_resumeToken.get()); } else { // New attempt at a resumable query. LOGV2_DEBUG(21134, 1, "Collection cloner will run a new query"); - query.requestResumeToken(true); + findCmd.setRequestResumeToken(true); } - query.hint(BSON("$natural" << 1)); + + findCmd.setHint(BSON("$natural" << 1)); + findCmd.setNoCursorTimeout(true); + findCmd.setReadConcern(ReadConcernArgs::kLocal); + if (_collectionClonerBatchSize) { + findCmd.setBatchSize(_collectionClonerBatchSize); + } + + ExhaustMode exhaustMode = collectionClonerUsesExhaust ? ExhaustMode::kOn : ExhaustMode::kOff; // We reset this every time we retry or resume a query. // We distinguish the first batch from the rest so that we only store the remote cursor id // the first time we get it. _firstBatchOfQueryRound = true; - getClient()->query_DEPRECATED( - [this](DBClientCursorBatchIterator& iter) { handleNextBatch(iter); }, - _sourceDbAndUuid, - BSONObj{}, - query, - nullptr /* fieldsToReturn */, - QueryOption_NoCursorTimeout | QueryOption_SecondaryOk | - (collectionClonerUsesExhaust ? QueryOption_Exhaust : 0), - _collectionClonerBatchSize, - ReadConcernArgs::kLocal); + auto cursor = getClient()->find( + std::move(findCmd), ReadPreferenceSetting{ReadPreference::SecondaryPreferred}, exhaustMode); + + // Process the results of the cursor one batch at a time. + while (cursor->more()) { + handleNextBatch(*cursor); + } } -void CollectionCloner::handleNextBatch(DBClientCursorBatchIterator& iter) { +void CollectionCloner::handleNextBatch(DBClientCursor& cursor) { { stdx::lock_guard<InitialSyncSharedData> lk(*getSharedData()); if (!getSharedData()->getStatus(lk).isOK()) { @@ -370,15 +375,15 @@ void CollectionCloner::handleNextBatch(DBClientCursorBatchIterator& iter) { if (_firstBatchOfQueryRound) { // Store the cursorId of the remote cursor. - _remoteCursorId = iter.getCursorId(); + _remoteCursorId = cursor.getCursorId(); } _firstBatchOfQueryRound = false; { stdx::lock_guard<Latch> lk(_mutex); _stats.receivedBatches++; - while (iter.moreInCurrentBatch()) { - _documentsToInsert.emplace_back(iter.nextSafe()); + while (cursor.moreInCurrentBatch()) { + _documentsToInsert.emplace_back(cursor.nextSafe()); } } @@ -394,7 +399,7 @@ void CollectionCloner::handleNextBatch(DBClientCursorBatchIterator& iter) { } // Store the resume token for this batch. - _resumeToken = iter.getPostBatchResumeToken(); + _resumeToken = cursor.getPostBatchResumeToken(); initialSyncHangCollectionClonerAfterHandlingBatchResponse.executeIf( [&](const BSONObj&) { diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h index 80d8a9d72bc..085c6abdb3f 100644 --- a/src/mongo/db/repl/collection_cloner.h +++ b/src/mongo/db/repl/collection_cloner.h @@ -207,10 +207,10 @@ private: AfterStageBehavior setupIndexBuildersForUnfinishedIndexesStage(); /** - * Put all results from a query batch into a buffer to be inserted, and schedule - * it to be inserted. + * Put all results from a query batch into a buffer to be inserted, and schedule it to be + * inserted. */ - void handleNextBatch(DBClientCursorBatchIterator& iter); + void handleNextBatch(DBClientCursor& cursor); /** * Called whenever there is a new batch of documents ready from the DBClientConnection. diff --git a/src/mongo/db/repl/data_replicator_external_state.h b/src/mongo/db/repl/data_replicator_external_state.h index 87826b0f199..219b5a7ec31 100644 --- a/src/mongo/db/repl/data_replicator_external_state.h +++ b/src/mongo/db/repl/data_replicator_external_state.h @@ -90,7 +90,7 @@ public: * Forwards the parsed metadata in the query results to the replication system. */ virtual void processMetadata(const rpc::ReplSetMetadata& replMetadata, - rpc::OplogQueryMetadata oqMetadata) = 0; + const rpc::OplogQueryMetadata& oqMetadata) = 0; /** * Evaluates quality of sync source. Accepts the current sync source; the last optime on this diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp index 8c43a013e9a..330cdf51305 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp @@ -84,7 +84,7 @@ OpTimeWithTerm DataReplicatorExternalStateImpl::getCurrentTermAndLastCommittedOp } void DataReplicatorExternalStateImpl::processMetadata(const rpc::ReplSetMetadata& replMetadata, - rpc::OplogQueryMetadata oqMetadata) { + const rpc::OplogQueryMetadata& oqMetadata) { OpTimeAndWallTime newCommitPoint = oqMetadata.getLastOpCommitted(); const bool fromSyncSource = true; diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.h b/src/mongo/db/repl/data_replicator_external_state_impl.h index c408c484dc9..284cea32b41 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.h +++ b/src/mongo/db/repl/data_replicator_external_state_impl.h @@ -53,7 +53,7 @@ public: OpTimeWithTerm getCurrentTermAndLastCommittedOpTime() override; void processMetadata(const rpc::ReplSetMetadata& replMetadata, - rpc::OplogQueryMetadata oqMetadata) override; + const rpc::OplogQueryMetadata& oqMetadata) override; ChangeSyncSourceAction shouldStopFetching(const HostAndPort& source, const rpc::ReplSetMetadata& replMetadata, diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp index ddcfc701ca6..0ee71071f03 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp @@ -87,9 +87,9 @@ OpTimeWithTerm DataReplicatorExternalStateMock::getCurrentTermAndLastCommittedOp } void DataReplicatorExternalStateMock::processMetadata(const rpc::ReplSetMetadata& replMetadata, - rpc::OplogQueryMetadata oqMetadata) { - replMetadataProcessed = replMetadata; - oqMetadataProcessed = oqMetadata; + const rpc::OplogQueryMetadata& oqMetadata) { + replMetadataProcessed = rpc::ReplSetMetadata(replMetadata); + oqMetadataProcessed = rpc::OplogQueryMetadata(oqMetadata); metadataWasProcessed = true; } diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.h b/src/mongo/db/repl/data_replicator_external_state_mock.h index 535ee513102..7ec17591a44 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.h +++ b/src/mongo/db/repl/data_replicator_external_state_mock.h @@ -50,7 +50,7 @@ public: OpTimeWithTerm getCurrentTermAndLastCommittedOpTime() override; void processMetadata(const rpc::ReplSetMetadata& metadata, - rpc::OplogQueryMetadata oqMetadata) override; + const rpc::OplogQueryMetadata& oqMetadata) override; ChangeSyncSourceAction shouldStopFetching(const HostAndPort& source, const rpc::ReplSetMetadata& replMetadata, diff --git a/src/mongo/db/repl/idempotency_test.cpp b/src/mongo/db/repl/idempotency_test.cpp index 9e94154f1c0..69777fdbc55 100644 --- a/src/mongo/db/repl/idempotency_test.cpp +++ b/src/mongo/db/repl/idempotency_test.cpp @@ -131,7 +131,7 @@ BSONObj RandomizedIdempotencyTest::canonicalizeDocumentForDataHash(const BSONObj BSONObj RandomizedIdempotencyTest::getDoc() { AutoGetCollectionForReadCommand autoColl(_opCtx.get(), nss); BSONObj doc; - Helpers::findById(_opCtx.get(), autoColl.getDb(), nss.ns(), kDocIdQuery, doc); + Helpers::findById(_opCtx.get(), nss.ns(), kDocIdQuery, doc); return doc.getOwned(); } diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 0908b06213a..7ffffbbf2c1 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -389,7 +389,7 @@ void _logOpsInner(OperationContext* opCtx, } // Insert the oplog records to the respective tenants change collections. - if (ChangeStreamChangeCollectionManager::isChangeCollectionEnabled()) { + if (ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) { ChangeStreamChangeCollectionManager::get(opCtx).insertDocumentsToChangeCollection( opCtx, *records, timestamps); } @@ -1578,7 +1578,7 @@ Status applyOperation_inlock(OperationContext* opCtx, invariant(op.getObject2()); auto&& documentId = *op.getObject2(); auto documentFound = Helpers::findById( - opCtx, db, collection->ns().ns(), documentId, changeStreamPreImage); + opCtx, collection->ns().ns(), documentId, changeStreamPreImage); invariant(documentFound); } diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp index e9ca22da35c..575035711e0 100644 --- a/src/mongo/db/repl/oplog_applier_impl.cpp +++ b/src/mongo/db/repl/oplog_applier_impl.cpp @@ -623,8 +623,6 @@ void OplogApplierImpl::_deriveOpsAndFillWriterVectors( LogicalSessionIdMap<std::vector<OplogEntry*>> partialTxnOps; CachedCollectionProperties collPropertiesCache; - // Used to serialize writes to the tenant migrations donor and recipient namespaces. - boost::optional<uint32_t> tenantMigrationsWriterId; for (auto&& op : *ops) { // If the operation's optime is before or the same as the beginApplyingOpTime we don't want // to apply it, so don't include it in writerVectors. @@ -706,19 +704,6 @@ void OplogApplierImpl::_deriveOpsAndFillWriterVectors( continue; } - // Writes to the tenant migration namespaces must be serialized to preserve the order of - // migration and access blocker states. - if (op.getNss() == NamespaceString::kTenantMigrationDonorsNamespace || - op.getNss() == NamespaceString::kTenantMigrationRecipientsNamespace) { - auto writerId = OplogApplierUtils::addToWriterVector( - opCtx, &op, writerVectors, &collPropertiesCache, tenantMigrationsWriterId); - if (!tenantMigrationsWriterId) { - tenantMigrationsWriterId.emplace(writerId); - } else { - invariant(writerId == *tenantMigrationsWriterId); - } - continue; - } OplogApplierUtils::addToWriterVector(opCtx, &op, writerVectors, &collPropertiesCache); } } diff --git a/src/mongo/db/repl/oplog_applier_impl_test.cpp b/src/mongo/db/repl/oplog_applier_impl_test.cpp index 5784b645cc5..b734004bb28 100644 --- a/src/mongo/db/repl/oplog_applier_impl_test.cpp +++ b/src/mongo/db/repl/oplog_applier_impl_test.cpp @@ -2644,42 +2644,6 @@ TEST_F(OplogApplierImplWithSlowAutoAdvancingClockTest, DoNotLogNonSlowOpApplicat ASSERT_EQUALS(0, countTextFormatLogLinesContaining(expected.str())); } -TEST_F(OplogApplierImplTest, SerializeOplogApplicationOfWritesToTenantMigrationNamespaces) { - auto writerPool = makeReplWriterPool(); - NoopOplogApplierObserver observer; - TrackOpsAppliedApplier oplogApplier( - nullptr, // executor - nullptr, // oplogBuffer - &observer, - ReplicationCoordinator::get(_opCtx.get()), - getConsistencyMarkers(), - getStorageInterface(), - repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), - writerPool.get()); - - const auto donorNss = NamespaceString::kTenantMigrationDonorsNamespace; - const auto recipientNss = NamespaceString::kTenantMigrationRecipientsNamespace; - - std::vector<OplogEntry> opsToApply; - opsToApply.push_back( - makeDeleteDocumentOplogEntry({Timestamp(Seconds(2), 0), 1LL}, donorNss, BSON("_id" << 2))); - opsToApply.push_back(makeInsertDocumentOplogEntry( - {Timestamp(Seconds(3), 0), 1LL}, recipientNss, BSON("_id" << 3))); - opsToApply.push_back(makeDeleteDocumentOplogEntry( - {Timestamp(Seconds(4), 0), 1LL}, recipientNss, BSON("_id" << 3))); - opsToApply.push_back( - makeInsertDocumentOplogEntry({Timestamp(Seconds(5), 0), 1LL}, donorNss, BSON("_id" << 4))); - - ASSERT_OK(oplogApplier.applyOplogBatch(_opCtx.get(), opsToApply)); - const auto applied = oplogApplier.getOperationsApplied(); - ASSERT_EQ(4U, applied.size()); - ASSERT_BSONOBJ_EQ(opsToApply[0].getEntry().toBSON(), applied[0].getEntry().toBSON()); - ASSERT_BSONOBJ_EQ(opsToApply[1].getEntry().toBSON(), applied[1].getEntry().toBSON()); - ASSERT_BSONOBJ_EQ(opsToApply[2].getEntry().toBSON(), applied[2].getEntry().toBSON()); - ASSERT_BSONOBJ_EQ(opsToApply[3].getEntry().toBSON(), applied[3].getEntry().toBSON()); -} - - class OplogApplierImplTxnTableTest : public OplogApplierImplTest { public: void setUp() override { @@ -3319,10 +3283,7 @@ TEST_F(IdempotencyTest, EmptyCappedNamespaceNotFound) { ASSERT_OK(runOpInitialSync(emptyCappedOp)); AutoGetCollectionForReadCommand autoColl(_opCtx.get(), nss); - - // Ensure that autoColl.getCollection() and autoColl.getDb() are both null. - ASSERT_FALSE(autoColl.getCollection()); - ASSERT_FALSE(autoColl.getDb()); + ASSERT_FALSE(autoColl); } TEST_F(IdempotencyTest, UpdateTwoFields) { diff --git a/src/mongo/db/repl/oplog_entry.idl b/src/mongo/db/repl/oplog_entry.idl index 7c1ba09f320..987f5806cbf 100644 --- a/src/mongo/db/repl/oplog_entry.idl +++ b/src/mongo/db/repl/oplog_entry.idl @@ -59,6 +59,9 @@ enums: kPostImage: "postImage" structs: + # TODO SERVER-67155 Ensure the tenantId is included in the serialized "ns" field when + # multitenancySupport is on but featureFlagRequireTenantId is off. Currently it will not be + # included in either place DurableReplOperation: description: "A document that represents an operation. Should never be used directly in server code. Instead, create an instance of ReplOperation." diff --git a/src/mongo/db/repl/oplog_entry_test.cpp b/src/mongo/db/repl/oplog_entry_test.cpp index ae5039be724..4bcc4adfeb0 100644 --- a/src/mongo/db/repl/oplog_entry_test.cpp +++ b/src/mongo/db/repl/oplog_entry_test.cpp @@ -150,7 +150,9 @@ TEST(OplogEntryTest, InsertIncludesTidField) { ASSERT(entry.getTid()); ASSERT_EQ(*entry.getTid(), tid); - ASSERT_EQ(entry.getNss(), nss); + // TODO SERVER-66708 Check that (entry.getNss() == nss) once the OplogEntry deserializer + // passes "tid" to the NamespaceString constructor + ASSERT_EQ(entry.getNss(), NamespaceString(boost::none, nss.ns())); ASSERT_BSONOBJ_EQ(entry.getIdElement().wrap("_id"), BSON("_id" << docId)); ASSERT_BSONOBJ_EQ(entry.getOperationToApply(), doc); } diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index d50917d7fd7..6ec6c9778de 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -265,12 +265,8 @@ OpTime OplogFetcher::getLastOpTimeFetched_forTest() const { return _getLastOpTimeFetched(); } -BSONObj OplogFetcher::getFindQueryFilter_forTest() const { - return _makeFindQueryFilter(); -} - -Query OplogFetcher::getFindQuerySettings_forTest(long long findTimeout) const { - return _makeFindQuerySettings(findTimeout); +FindCommandRequest OplogFetcher::makeFindCmdRequest_forTest(long long findTimeout) const { + return _makeFindCmdRequest(findTimeout); } Milliseconds OplogFetcher::getAwaitDataTimeout_forTest() const { @@ -584,46 +580,56 @@ AggregateCommandRequest OplogFetcher::_makeAggregateCommandRequest(long long max return aggRequest; } -BSONObj OplogFetcher::_makeFindQueryFilter() const { - BSONObjBuilder queryBob; - - auto lastOpTimeFetched = _getLastOpTimeFetched(); - BSONObjBuilder filterBob; - filterBob.append("ts", BSON("$gte" << lastOpTimeFetched.getTimestamp())); - // Handle caller-provided filter. - if (!_config.queryFilter.isEmpty()) { - filterBob.append( - "$or", - BSON_ARRAY(_config.queryFilter << BSON("ts" << lastOpTimeFetched.getTimestamp()))); +FindCommandRequest OplogFetcher::_makeFindCmdRequest(long long findTimeout) const { + FindCommandRequest findCmd{_nss}; + + // Construct the find command's filter and set it on the 'FindCommandRequest'. + { + BSONObjBuilder queryBob; + + auto lastOpTimeFetched = _getLastOpTimeFetched(); + BSONObjBuilder filterBob; + filterBob.append("ts", BSON("$gte" << lastOpTimeFetched.getTimestamp())); + // Handle caller-provided filter. + if (!_config.queryFilter.isEmpty()) { + filterBob.append( + "$or", + BSON_ARRAY(_config.queryFilter << BSON("ts" << lastOpTimeFetched.getTimestamp()))); + } + findCmd.setFilter(filterBob.obj()); + } + + findCmd.setTailable(true); + findCmd.setAwaitData(true); + findCmd.setMaxTimeMS(findTimeout); + + if (_config.batchSize) { + findCmd.setBatchSize(_config.batchSize); } - return filterBob.obj(); -} -Query OplogFetcher::_makeFindQuerySettings(long long findTimeout) const { - Query query = Query().maxTimeMS(findTimeout); if (_config.requestResumeToken) { - query.hint(BSON("$natural" << 1)).requestResumeToken(true); + findCmd.setHint(BSON("$natural" << 1)); + findCmd.setRequestResumeToken(true); } auto lastCommittedWithCurrentTerm = _dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime(); auto term = lastCommittedWithCurrentTerm.value; if (term != OpTime::kUninitializedTerm) { - query.term(term); + findCmd.setTerm(term); } if (_config.queryReadConcern.isEmpty()) { // This ensures that the sync source waits for all earlier oplog writes to be visible. // Since Timestamp(0, 0) isn't allowed, Timestamp(0, 1) is the minimal we can use. - query.readConcern(BSON("level" - << "local" - << "afterClusterTime" << Timestamp(0, 1))); + findCmd.setReadConcern(BSON("level" + << "local" + << "afterClusterTime" << Timestamp(0, 1))); } else { // Caller-provided read concern. - query.appendElements(_config.queryReadConcern.toBSON()); + findCmd.setReadConcern(_config.queryReadConcern.toBSONInner()); } - - return query; + return findCmd; } Status OplogFetcher::_createNewCursor(bool initialFind) { @@ -651,17 +657,9 @@ Status OplogFetcher::_createNewCursor(bool initialFind) { } _cursor = std::move(ret.getValue()); } else { + auto findCmd = _makeFindCmdRequest(maxTimeMs); _cursor = std::make_unique<DBClientCursor>( - _conn.get(), - _nss, - _makeFindQueryFilter(), - _makeFindQuerySettings(maxTimeMs), - 0 /* limit */, - 0 /* nToSkip */, - nullptr /* fieldsToReturn */, - QueryOption_CursorTailable | QueryOption_AwaitData | - (oplogFetcherUsesExhaust ? QueryOption_Exhaust : 0), - _config.batchSize); + _conn.get(), std::move(findCmd), ReadPreferenceSetting{}, oplogFetcherUsesExhaust); } _firstBatch = true; @@ -817,7 +815,7 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) { "metadata"_attr = _metadataObj); return oqMetadataResult.getStatus(); } - auto oqMetadata = oqMetadataResult.getValue(); + const auto& oqMetadata = oqMetadataResult.getValue(); if (_firstBatch) { auto status = @@ -884,7 +882,7 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) { "metadata"_attr = _metadataObj); return metadataResult.getStatus(); } - auto replSetMetadata = metadataResult.getValue(); + const auto& replSetMetadata = metadataResult.getValue(); // Determine if we should stop syncing from our current sync source. auto changeSyncSourceAction = _dataReplicatorExternalState->shouldStopFetching( diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h index 01a4347669b..2147eb9ebde 100644 --- a/src/mongo/db/repl/oplog_fetcher.h +++ b/src/mongo/db/repl/oplog_fetcher.h @@ -275,8 +275,7 @@ public: /** * Returns the `find` query run on the sync source's oplog. */ - BSONObj getFindQueryFilter_forTest() const; - Query getFindQuerySettings_forTest(long long findTimeout) const; + FindCommandRequest makeFindCmdRequest_forTest(long long findTimeout) const; /** * Returns the OpTime of the last oplog entry fetched and processed. @@ -387,11 +386,9 @@ private: /** * This function will create the `find` query to issue to the sync source. It is provided with - * whether this is the initial attempt to create the `find` query to determine what the find - * timeout should be. + * the value to use as the "maxTimeMS" for the find command. */ - BSONObj _makeFindQueryFilter() const; - Query _makeFindQuerySettings(long long findTimeout) const; + FindCommandRequest _makeFindCmdRequest(long long findTimeout) const; /** * Gets the next batch from the exhaust cursor. diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp index e98039a0f8a..adc09da1300 100644 --- a/src/mongo/db/repl/oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/oplog_fetcher_test.cpp @@ -806,19 +806,25 @@ TEST_F(OplogFetcherTest, auto oplogFetcher = makeOplogFetcher(); auto findTimeout = durationCount<Milliseconds>(oplogFetcher->getInitialFindMaxTime_forTest()); - auto filter = oplogFetcher->getFindQueryFilter_forTest(); + auto findCmdRequest = oplogFetcher->makeFindCmdRequest_forTest(findTimeout); + + auto filter = findCmdRequest.getFilter(); ASSERT_BSONOBJ_EQ(BSON("ts" << BSON("$gte" << lastFetched.getTimestamp())), filter); - auto queryObj = - (oplogFetcher->getFindQuerySettings_forTest(findTimeout)).getFullSettingsDeprecated(); - ASSERT_EQUALS(60000, queryObj.getIntField("$maxTimeMS")); + auto maxTimeMS = findCmdRequest.getMaxTimeMS(); + ASSERT(maxTimeMS); + ASSERT_EQUALS(60000, *maxTimeMS); - ASSERT_EQUALS(mongo::BSONType::Object, queryObj["readConcern"].type()); + auto readConcern = findCmdRequest.getReadConcern(); + ASSERT(readConcern); ASSERT_BSONOBJ_EQ(BSON("level" << "local" << "afterClusterTime" << Timestamp(0, 1)), - queryObj["readConcern"].Obj()); - ASSERT_EQUALS(dataReplicatorExternalState->currentTerm, queryObj["term"].numberLong()); + *readConcern); + + auto term = findCmdRequest.getTerm(); + ASSERT(term); + ASSERT_EQUALS(dataReplicatorExternalState->currentTerm, *term); } TEST_F(OplogFetcherTest, @@ -826,21 +832,26 @@ TEST_F(OplogFetcherTest, dataReplicatorExternalState->currentTerm = OpTime::kUninitializedTerm; auto oplogFetcher = makeOplogFetcher(); - auto filter = oplogFetcher->getFindQueryFilter_forTest(); - ASSERT_BSONOBJ_EQ(BSON("ts" << BSON("$gte" << lastFetched.getTimestamp())), filter); - // Test that the correct maxTimeMS is set if we are retrying the 'find' query. auto findTimeout = durationCount<Milliseconds>(oplogFetcher->getRetriedFindMaxTime_forTest()); - auto queryObj = - (oplogFetcher->getFindQuerySettings_forTest(findTimeout)).getFullSettingsDeprecated(); - ASSERT_EQUALS(2000, queryObj.getIntField("$maxTimeMS")); + auto findCmdRequest = oplogFetcher->makeFindCmdRequest_forTest(findTimeout); - ASSERT_EQUALS(mongo::BSONType::Object, queryObj["readConcern"].type()); + auto filter = findCmdRequest.getFilter(); + ASSERT_BSONOBJ_EQ(BSON("ts" << BSON("$gte" << lastFetched.getTimestamp())), filter); + + auto maxTimeMS = findCmdRequest.getMaxTimeMS(); + ASSERT(maxTimeMS); + ASSERT_EQUALS(2000, *maxTimeMS); + + auto readConcern = findCmdRequest.getReadConcern(); + ASSERT(readConcern); ASSERT_BSONOBJ_EQ(BSON("level" << "local" << "afterClusterTime" << Timestamp(0, 1)), - queryObj["readConcern"].Obj()); - ASSERT_FALSE(queryObj.hasField("term")); + *readConcern); + + auto term = findCmdRequest.getTerm(); + ASSERT(!term); } TEST_F( diff --git a/src/mongo/db/repl/primary_only_service.cpp b/src/mongo/db/repl/primary_only_service.cpp index cb79c007ced..dbe696ecce7 100644 --- a/src/mongo/db/repl/primary_only_service.cpp +++ b/src/mongo/db/repl/primary_only_service.cpp @@ -362,6 +362,9 @@ void PrimaryOnlyService::onStepUp(const OpTime& stepUpOpTime) { instance.second.waitForCompletion(); } + savedInstances.clear(); + newThenOldScopedExecutor.reset(); + PrimaryOnlyServiceHangBeforeLaunchingStepUpLogic.pauseWhileSet(); // Now wait for the first write of the new term to be majority committed, so that we know diff --git a/src/mongo/db/repl/repl_set_commands.cpp b/src/mongo/db/repl/repl_set_commands.cpp index 5823d880c14..7f35d2cfb31 100644 --- a/src/mongo/db/repl/repl_set_commands.cpp +++ b/src/mongo/db/repl/repl_set_commands.cpp @@ -528,6 +528,11 @@ public: "primary.)\n" "http://dochub.mongodb.org/core/replicasetcommands"; } + + bool shouldCheckoutSession() const final { + return false; + } + CmdReplSetStepDown() : ReplSetCommand("replSetStepDown"), _stepDownCmdsWithForceExecutedMetric("commands.replSetStepDownWithForce.total", @@ -685,7 +690,7 @@ public: if (metadataResult.isOK()) { // New style update position command has metadata, which may inform the // upstream of a higher term. - auto metadata = metadataResult.getValue(); + const auto& metadata = metadataResult.getValue(); replCoord->processReplSetMetadata(metadata); } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 7cbc79f9aed..5d450af12d7 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -560,7 +560,7 @@ OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationC // TODO: SERVER-65948 move the change collection creation logic from here to the PM-2502 hooks. // The change collection will be created when the change stream is enabled. - if (ChangeStreamChangeCollectionManager::isChangeCollectionEnabled()) { + if (ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) { auto status = ChangeStreamChangeCollectionManager::get(opCtx).createChangeCollection( opCtx, boost::none); if (!status.isOK()) { diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index c2f2aa1ad08..fe769df7572 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -1340,7 +1340,6 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx, _updateMemberStateFromTopologyCoordinator(lk); LOGV2(21331, "Transition to primary complete; database writes are now permitted"); - _drainFinishedCond.notify_all(); _externalState->startNoopWriter(_getMyLastAppliedOpTime_inlock()); } @@ -1830,8 +1829,9 @@ Status ReplicationCoordinatorImpl::setLastDurableOptime_forTest(long long cfgVer const UpdatePositionArgs::UpdateInfo update( OpTime(), Date_t(), opTime, wallTime, cfgVer, memberId); - const auto status = _setLastOptime(lock, update); - return status; + const auto statusWithOpTime = _setLastOptimeForMember(lock, update); + _updateStateAfterRemoteOpTimeUpdates(lock, statusWithOpTime.getValue()); + return statusWithOpTime.getStatus(); } Status ReplicationCoordinatorImpl::setLastAppliedOptime_forTest(long long cfgVer, @@ -1847,25 +1847,29 @@ Status ReplicationCoordinatorImpl::setLastAppliedOptime_forTest(long long cfgVer const UpdatePositionArgs::UpdateInfo update( opTime, wallTime, OpTime(), Date_t(), cfgVer, memberId); - const auto status = _setLastOptime(lock, update); - return status; + const auto statusWithOpTime = _setLastOptimeForMember(lock, update); + _updateStateAfterRemoteOpTimeUpdates(lock, statusWithOpTime.getValue()); + return statusWithOpTime.getStatus(); } -Status ReplicationCoordinatorImpl::_setLastOptime(WithLock lk, - const UpdatePositionArgs::UpdateInfo& args) { - auto result = _topCoord->setLastOptime(args, _replExecutor->now()); +StatusWith<OpTime> ReplicationCoordinatorImpl::_setLastOptimeForMember( + WithLock lk, const UpdatePositionArgs::UpdateInfo& args) { + auto result = _topCoord->setLastOptimeForMember(args, _replExecutor->now()); if (!result.isOK()) return result.getStatus(); const bool advancedOpTime = result.getValue(); + _rescheduleLivenessUpdate_inlock(args.memberId); + return advancedOpTime ? std::max(args.appliedOpTime, args.durableOpTime) : OpTime(); +} + +void ReplicationCoordinatorImpl::_updateStateAfterRemoteOpTimeUpdates( + WithLock lk, const OpTime& maxRemoteOpTime) { // Only update committed optime if the remote optimes increased. - if (advancedOpTime) { + if (!maxRemoteOpTime.isNull()) { _updateLastCommittedOpTimeAndWallTime(lk); // Wait up replication waiters on optime changes. - _wakeReadyWaiters(lk, std::max(args.appliedOpTime, args.durableOpTime)); + _wakeReadyWaiters(lk, maxRemoteOpTime); } - - _rescheduleLivenessUpdate_inlock(args.memberId); - return Status::OK(); } bool ReplicationCoordinatorImpl::isCommitQuorumSatisfied( @@ -4415,7 +4419,7 @@ void ReplicationCoordinatorImpl::_errorOnPromisesIfHorizonChanged(WithLock lk, HelloMetrics::get(opCtx)->resetNumAwaitingTopologyChanges(); } - if (oldIndex >= 0 && newIndex >= 0) { + if (oldIndex >= 0) { invariant(_sniToValidConfigPromiseMap.empty()); const auto oldHorizonMappings = oldConfig.getMemberAt(oldIndex).getHorizonMappings(); @@ -5079,18 +5083,22 @@ void ReplicationCoordinatorImpl::_wakeReadyWaiters(WithLock lk, boost::optional< Status ReplicationCoordinatorImpl::processReplSetUpdatePosition(const UpdatePositionArgs& updates) { stdx::unique_lock<Latch> lock(_mutex); Status status = Status::OK(); - bool somethingChanged = false; + bool gotValidUpdate = false; + OpTime maxRemoteOpTime; for (UpdatePositionArgs::UpdateIterator update = updates.updatesBegin(); update != updates.updatesEnd(); ++update) { - status = _setLastOptime(lock, *update); - if (!status.isOK()) { + auto statusWithOpTime = _setLastOptimeForMember(lock, *update); + if (!statusWithOpTime.isOK()) { + status = statusWithOpTime.getStatus(); break; } - somethingChanged = true; + maxRemoteOpTime = std::max(maxRemoteOpTime, statusWithOpTime.getValue()); + gotValidUpdate = true; } + _updateStateAfterRemoteOpTimeUpdates(lock, maxRemoteOpTime); - if (somethingChanged && !_getMemberState_inlock().primary()) { + if (gotValidUpdate && !_getMemberState_inlock().primary()) { lock.unlock(); // Must do this outside _mutex _externalState->forwardSecondaryProgress(); @@ -5716,28 +5724,27 @@ void ReplicationCoordinatorImpl::prepareReplMetadata(const BSONObj& metadataRequ invariant(-1 != rbid); } - stdx::lock_guard<Latch> lk(_mutex); + boost::optional<rpc::ReplSetMetadata> replSetMetadata; + boost::optional<rpc::OplogQueryMetadata> oplogQueryMetadata; + { + stdx::lock_guard<Latch> lk(_mutex); - if (hasReplSetMetadata) { - _prepareReplSetMetadata_inlock(lastOpTimeFromClient, builder); - } + if (hasReplSetMetadata) { + OpTime lastVisibleOpTime = + std::max(lastOpTimeFromClient, _getCurrentCommittedSnapshotOpTime_inlock()); + replSetMetadata = _topCoord->prepareReplSetMetadata(lastVisibleOpTime); + } - if (hasOplogQueryMetadata) { - _prepareOplogQueryMetadata_inlock(rbid, builder); + if (hasOplogQueryMetadata) { + oplogQueryMetadata = _topCoord->prepareOplogQueryMetadata(rbid); + } } -} - -void ReplicationCoordinatorImpl::_prepareReplSetMetadata_inlock(const OpTime& lastOpTimeFromClient, - BSONObjBuilder* builder) const { - OpTime lastVisibleOpTime = - std::max(lastOpTimeFromClient, _getCurrentCommittedSnapshotOpTime_inlock()); - auto metadata = _topCoord->prepareReplSetMetadata(lastVisibleOpTime); - metadata.writeToMetadata(builder).transitional_ignore(); -} -void ReplicationCoordinatorImpl::_prepareOplogQueryMetadata_inlock(int rbid, - BSONObjBuilder* builder) const { - _topCoord->prepareOplogQueryMetadata(rbid).writeToMetadata(builder).transitional_ignore(); + // Do BSON serialization outside lock. + if (replSetMetadata) + invariantStatusOK(replSetMetadata->writeToMetadata(builder)); + if (oplogQueryMetadata) + invariantStatusOK(oplogQueryMetadata->writeToMetadata(builder)); } bool ReplicationCoordinatorImpl::getWriteConcernMajorityShouldJournal() { diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index a6dc8fe9066..9ac44fdc62e 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -469,7 +469,7 @@ public: executor::TaskExecutor::CallbackHandle getCatchupTakeoverCbh_forTest() const; /** - * Simple wrappers around _setLastOptime to make it easier to test. + * Simple wrappers around _setLastOptimeForMember to make it easier to test. */ Status setLastAppliedOptime_forTest(long long cfgVer, long long memberId, @@ -1099,8 +1099,19 @@ private: * This is only valid to call on replica sets. * "configVersion" will be populated with our config version if it and the configVersion * of "args" differ. + * + * If either applied or durable optime has changed, returns the later of the two (even if + * that's not the one which changed). Otherwise returns a null optime. + */ + StatusWith<OpTime> _setLastOptimeForMember(WithLock lk, + const UpdatePositionArgs::UpdateInfo& args); + + /** + * Helper for processReplSetUpdatePosition, companion to _setLastOptimeForMember above. Updates + * replication coordinator state and notifies waiters after remote optime updates. Must be + * called within the same critical section as _setLastOptimeForMember. */ - Status _setLastOptime(WithLock lk, const UpdatePositionArgs::UpdateInfo& args); + void _updateStateAfterRemoteOpTimeUpdates(WithLock lk, const OpTime& maxRemoteOpTime); /** * This function will report our position externally (like upstream) if necessary. @@ -1463,17 +1474,6 @@ private: EventHandle _processReplSetMetadata_inlock(const rpc::ReplSetMetadata& replMetadata); /** - * Prepares a metadata object for ReplSetMetadata. - */ - void _prepareReplSetMetadata_inlock(const OpTime& lastOpTimeFromClient, - BSONObjBuilder* builder) const; - - /** - * Prepares a metadata object for OplogQueryMetadata. - */ - void _prepareOplogQueryMetadata_inlock(int rbid, BSONObjBuilder* builder) const; - - /** * Blesses a snapshot to be used for new committed reads. * * Returns true if the value was updated to `newCommittedSnapshot`. @@ -1719,9 +1719,6 @@ private: // Current ReplicaSet state. MemberState _memberState; // (M) - // Used to signal threads waiting for changes to _memberState. - stdx::condition_variable _drainFinishedCond; // (M) - ReplicationCoordinator::ApplierState _applierState = ApplierState::Running; // (M) // Used to signal threads waiting for changes to _rsConfigState. diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index cfb8b355366..1392cceb923 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -661,51 +661,51 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatReconfig(WithLock lk, std::tuple<StatusWith<ReplSetConfig>, bool> ReplicationCoordinatorImpl::_resolveConfigToApply( const ReplSetConfig& config) { + if (!_settings.isServerless() || !config.isSplitConfig()) { + return {config, false}; + } + stdx::unique_lock<Latch> lk(_mutex); - if (config.isSplitConfig()) { - if (!_rsConfig.isInitialized()) { - // Unlock the lock because isSelf performs network I/O. - lk.unlock(); + if (!_rsConfig.isInitialized()) { + // Unlock the lock because isSelf performs network I/O. + lk.unlock(); - // If this node is listed in the members of incoming config, accept the config. - const auto foundSelfInMembers = - std::any_of(config.membersBegin(), - config.membersEnd(), - [externalState = _externalState.get()](const MemberConfig& config) { - return externalState->isSelf(config.getHostAndPort(), - getGlobalServiceContext()); - }); - - if (foundSelfInMembers) { - return {config, false}; - } + // If this node is listed in the members of incoming config, accept the config. + const auto foundSelfInMembers = std::any_of( + config.membersBegin(), + config.membersEnd(), + [externalState = _externalState.get()](const MemberConfig& config) { + return externalState->isSelf(config.getHostAndPort(), getGlobalServiceContext()); + }); - return {Status(ErrorCodes::NotYetInitialized, - "Cannot apply a split config if the current config is uninitialized"), - false}; + if (foundSelfInMembers) { + return {config, false}; } - auto recipientConfig = config.getRecipientConfig(); - const auto& selfMember = _rsConfig.getMemberAt(_selfIndex); - if (recipientConfig->findMemberByHostAndPort(selfMember.getHostAndPort())) { - if (selfMember.getNumVotes() > 0) { - return { - Status(ErrorCodes::BadValue, "Cannot apply recipient config to a voting node"), - false}; - } + return {Status(ErrorCodes::NotYetInitialized, + "Cannot apply a split config if the current config is uninitialized"), + false}; + } - if (_rsConfig.getReplSetName() == recipientConfig->getReplSetName()) { - return {Status(ErrorCodes::InvalidReplicaSetConfig, - "Cannot apply recipient config since current config and recipient " - "config have the same set name."), - false}; - } + auto recipientConfig = config.getRecipientConfig(); + const auto& selfMember = _rsConfig.getMemberAt(_selfIndex); + if (recipientConfig->findMemberByHostAndPort(selfMember.getHostAndPort())) { + if (selfMember.getNumVotes() > 0) { + return {Status(ErrorCodes::BadValue, "Cannot apply recipient config to a voting node"), + false}; + } - auto mutableConfig = recipientConfig->getMutable(); - mutableConfig.setConfigVersion(1); - mutableConfig.setConfigTerm(1); - return {ReplSetConfig(std::move(mutableConfig)), true}; + if (_rsConfig.getReplSetName() == recipientConfig->getReplSetName()) { + return {Status(ErrorCodes::InvalidReplicaSetConfig, + "Cannot apply recipient config since current config and recipient " + "config have the same set name."), + false}; } + + auto mutableConfig = recipientConfig->getMutable(); + mutableConfig.setConfigVersion(1); + mutableConfig.setConfigTerm(1); + return {ReplSetConfig(std::move(mutableConfig)), true}; } return {config, false}; diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp index 5203980b575..e619276b129 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp @@ -58,6 +58,7 @@ namespace { using executor::NetworkInterfaceMock; using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; +using InNetworkGuard = NetworkInterfaceMock::InNetworkGuard; TEST(ReplSetHeartbeatArgs, AcceptsUnknownField) { ReplSetHeartbeatArgsV1 hbArgs; @@ -116,7 +117,8 @@ protected: void processResponseFromPrimary(const ReplSetConfig& config, long long version = -2, - long long term = OpTime::kInitialTerm); + long long term = OpTime::kInitialTerm, + const HostAndPort& target = HostAndPort{"h1", 1}); }; void ReplCoordHBV1Test::assertMemberState(const MemberState expected, std::string msg) { @@ -160,13 +162,14 @@ ReplCoordHBV1Test::performSyncToFinishReconfigHeartbeat() { void ReplCoordHBV1Test::processResponseFromPrimary(const ReplSetConfig& config, long long version, - long long term) { + long long term, + const HostAndPort& target) { NetworkInterfaceMock* net = getNet(); const Date_t startDate = getNet()->now(); NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); const RemoteCommandRequest& request = noi->getRequest(); - ASSERT_EQUALS(HostAndPort("h1", 1), request.target); + ASSERT_EQUALS(target, request.target); ReplSetHeartbeatArgsV1 hbArgs; ASSERT_OK(hbArgs.initialize(request.cmdObj)); ASSERT_EQUALS("mySet", hbArgs.getSetName()); @@ -266,6 +269,85 @@ TEST_F(ReplCoordHBV1Test, ASSERT_TRUE(getExternalState()->threadsStarted()); } +TEST_F(ReplCoordHBV1Test, RejectSplitConfigWhenNotInServerlessMode) { + auto severityGuard = unittest::MinimumLoggedSeverityGuard{logv2::LogComponent::kDefault, + logv2::LogSeverity::Debug(3)}; + + // Start up with three nodes, and assume the role of "node2" as a secondary. Notably, the local + // node is NOT started in serverless mode. "node2" is configured as having no votes, no + // priority, so that we can pass validation for accepting a split config. + assertStartSuccess(BSON("_id" + << "mySet" + << "protocolVersion" << 1 << "version" << 2 << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345" + << "votes" << 0 << "priority" << 0) + << BSON("_id" << 3 << "host" + << "node3:12345"))), + HostAndPort("node2", 12345)); + ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); + getReplCoord()->updateTerm_forTest(1, nullptr); + ASSERT_EQ(getReplCoord()->getTerm(), 1); + // respond to initial heartbeat requests + for (int j = 0; j < 2; ++j) { + replyToReceivedHeartbeatV1(); + } + + // Verify that there are no further heartbeat requests, since the heartbeat requests should be + // scheduled for the future. + { + InNetworkGuard guard(getNet()); + assertMemberState(MemberState::RS_SECONDARY); + ASSERT_FALSE(getNet()->hasReadyRequests()); + } + + ReplSetConfig splitConfig = + assertMakeRSConfig(BSON("_id" + << "mySet" + << "version" << 3 << "term" << 1 << "protocolVersion" << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345") + << BSON("_id" << 3 << "host" + << "node3:12345")) + << "recipientConfig" + << BSON("_id" + << "recipientSet" + << "version" << 1 << "term" << 1 << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345") + << BSON("_id" << 3 << "host" + << "node3:12345"))))); + + // Accept a heartbeat from `node1` which has a split config. The split config lists this node + // ("node2") in the recipient member list, but a node started not in serverless mode should not + // accept and install the recipient config. + receiveHeartbeatFrom(splitConfig, 1, HostAndPort("node1", 12345)); + + { + InNetworkGuard guard(getNet()); + processResponseFromPrimary(splitConfig, 2, 1, HostAndPort{"node1", 12345}); + assertMemberState(MemberState::RS_SECONDARY); + OperationContextNoop opCtx; + auto storedConfig = ReplSetConfig::parse( + unittest::assertGet(getExternalState()->loadLocalConfigDocument(&opCtx))); + ASSERT_OK(storedConfig.validate()); + + // Verify that the recipient config was not accepted. A successfully applied splitConfig + // will install at version and term {1, 1}. + ASSERT_EQUALS(ConfigVersionAndTerm(3, 1), storedConfig.getConfigVersionAndTerm()); + ASSERT_EQUALS("mySet", storedConfig.getReplSetName()); + } + + ASSERT_TRUE(getExternalState()->threadsStarted()); +} + TEST_F(ReplCoordHBV1Test, NodeRejectsSplitConfigWhenNotInitialized) { ReplSetConfig rsConfig = assertMakeRSConfig(BSON("_id" @@ -556,6 +638,10 @@ TEST_F( class ReplCoordHBV1SplitConfigTest : public ReplCoordHBV1Test { public: void startUp(const std::string& hostAndPort) { + ReplSettings settings; + settings.setServerlessMode(); + init(settings); + BSONObj configBson = BSON("_id" << _donorSetName << "version" << _configVersion << "term" << _configTerm << "members" << _members << "protocolVersion" << 1); @@ -740,7 +826,6 @@ TEST_F(ReplCoordHBV1SplitConfigTest, RecipientNodeApplyConfig) { validateNextRequest("", _recipientSetName, 1, 1); } -using InNetworkGuard = NetworkInterfaceMock::InNetworkGuard; TEST_F(ReplCoordHBV1SplitConfigTest, RejectMismatchedSetNameInHeartbeatResponse) { startUp(_recipientSecondaryNode); @@ -813,9 +898,9 @@ TEST_F(ReplCoordHBV1SplitConfigTest, RecipientNodeNonZeroVotes) { getNet()->runReadyNetworkOperations(); // The node rejected the config as it's a voting node and its version has not changed. - ASSERT_EQ(getReplCoord()->getConfigVersion(), _configVersion); - ASSERT_EQ(getReplCoord()->getConfigTerm(), _configTerm); - ASSERT_EQ(getReplCoord()->getSettings().ourSetName(), _donorSetName); + auto config = getReplCoord()->getConfig(); + ASSERT_EQ(config.getConfigVersionAndTerm(), ConfigVersionAndTerm(_configVersion, _configTerm)); + ASSERT_EQ(config.getReplSetName(), _donorSetName); } class ReplCoordHBV1ReconfigTest : public ReplCoordHBV1Test { diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index bbe14690c7a..31a307a96b0 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -236,11 +236,11 @@ void ReplicationCoordinatorMock::setMyHeartbeatMessage(const std::string& msg) { } void ReplicationCoordinatorMock::_setMyLastAppliedOpTimeAndWallTime( - const OpTimeAndWallTime& opTimeAndWallTime) { + WithLock lk, const OpTimeAndWallTime& opTimeAndWallTime) { _myLastAppliedOpTime = opTimeAndWallTime.opTime; _myLastAppliedWallTime = opTimeAndWallTime.wallTime; - setCurrentCommittedSnapshotOpTime(opTimeAndWallTime.opTime); + _setCurrentCommittedSnapshotOpTime(lk, opTimeAndWallTime.opTime); if (auto storageEngine = _service->getStorageEngine()) { if (auto snapshotManager = storageEngine->getSnapshotManager()) { @@ -253,7 +253,7 @@ void ReplicationCoordinatorMock::setMyLastAppliedOpTimeAndWallTime( const OpTimeAndWallTime& opTimeAndWallTime) { stdx::lock_guard<Mutex> lk(_mutex); - _setMyLastAppliedOpTimeAndWallTime(opTimeAndWallTime); + _setMyLastAppliedOpTimeAndWallTime(lk, opTimeAndWallTime); } void ReplicationCoordinatorMock::setMyLastDurableOpTimeAndWallTime( @@ -269,7 +269,7 @@ void ReplicationCoordinatorMock::setMyLastAppliedOpTimeAndWallTimeForward( stdx::lock_guard<Mutex> lk(_mutex); if (opTimeAndWallTime.opTime > _myLastAppliedOpTime) { - _setMyLastAppliedOpTimeAndWallTime(opTimeAndWallTime); + _setMyLastAppliedOpTimeAndWallTime(lk, opTimeAndWallTime); } } @@ -657,11 +657,17 @@ Status ReplicationCoordinatorMock::updateTerm(OperationContext* opCtx, long long void ReplicationCoordinatorMock::clearCommittedSnapshot() {} -void ReplicationCoordinatorMock::setCurrentCommittedSnapshotOpTime(OpTime time) { +void ReplicationCoordinatorMock::_setCurrentCommittedSnapshotOpTime(WithLock lk, OpTime time) { _currentCommittedSnapshotOpTime = time; } +void ReplicationCoordinatorMock::setCurrentCommittedSnapshotOpTime(OpTime time) { + stdx::lock_guard<Mutex> lk(_mutex); + _setCurrentCommittedSnapshotOpTime(lk, time); +} + OpTime ReplicationCoordinatorMock::getCurrentCommittedSnapshotOpTime() const { + stdx::lock_guard<Mutex> lk(_mutex); return _currentCommittedSnapshotOpTime; } diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 3ac7686ea34..dbe7b28ef83 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -422,7 +422,9 @@ public: virtual WriteConcernTagChanges* getWriteConcernTagChanges() override; private: - void _setMyLastAppliedOpTimeAndWallTime(const OpTimeAndWallTime& opTimeAndWallTime); + void _setMyLastAppliedOpTimeAndWallTime(WithLock lk, + const OpTimeAndWallTime& opTimeAndWallTime); + void _setCurrentCommittedSnapshotOpTime(WithLock lk, OpTime time); ServiceContext* const _service; ReplSettings _settings; diff --git a/src/mongo/db/repl/roll_back_local_operations_test.cpp b/src/mongo/db/repl/roll_back_local_operations_test.cpp index b71765e33d3..70421f959e1 100644 --- a/src/mongo/db/repl/roll_back_local_operations_test.cpp +++ b/src/mongo/db/repl/roll_back_local_operations_test.cpp @@ -321,7 +321,8 @@ public: DBClientConnectionForTest(int numInitFailures) : _initFailuresLeft(numInitFailures) {} std::unique_ptr<DBClientCursor> find(FindCommandRequest findRequest, - const ReadPreferenceSetting& readPref) override { + const ReadPreferenceSetting& readPref, + ExhaustMode exhaustMode) override { if (_initFailuresLeft > 0) { _initFailuresLeft--; LOGV2(21657, diff --git a/src/mongo/db/repl/rollback_source_impl.cpp b/src/mongo/db/repl/rollback_source_impl.cpp index 9c56b0ff21e..8b427be197c 100644 --- a/src/mongo/db/repl/rollback_source_impl.cpp +++ b/src/mongo/db/repl/rollback_source_impl.cpp @@ -94,7 +94,8 @@ std::pair<BSONObj, NamespaceString> RollbackSourceImpl::findOneByUUID(const std: auto cursor = std::make_unique<DBClientCursor>(_getConnection(), std::move(findRequest), - ReadPreferenceSetting{ReadPreference::SecondaryPreferred}); + ReadPreferenceSetting{ReadPreference::SecondaryPreferred}, + false /*isExhaust*/); uassert(6138500, "find one by UUID failed", cursor->init()); BSONObj result = cursor->more() ? cursor->nextSafe() : BSONObj{}; NamespaceString nss = cursor->getNamespaceString(); diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index e527aa204eb..8777903803c 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -949,7 +949,7 @@ void rollbackCreateIndexes(OperationContext* opCtx, UUID uuid, std::set<std::str "indexName"_attr = indexName); WriteUnitOfWork wuow(opCtx); - dropIndex(opCtx, collection.getWritableCollection(), indexName, *nss); + dropIndex(opCtx, collection.getWritableCollection(opCtx), indexName, *nss); wuow.commit(); LOGV2_DEBUG(21673, @@ -1634,12 +1634,12 @@ void rollback_internal::syncFixUp(OperationContext* opCtx, WriteUnitOfWork wuow(opCtx); // Set collection to whatever temp status is on the sync source. - collection.getWritableCollection()->setIsTemp(opCtx, options.temp); + collection.getWritableCollection(opCtx)->setIsTemp(opCtx, options.temp); // Set any document validation options. We update the validator fields without // parsing/validation, since we fetched the options object directly from the sync // source, and we should set our validation options to match it exactly. - auto validatorStatus = collection.getWritableCollection()->updateValidator( + auto validatorStatus = collection.getWritableCollection(opCtx)->updateValidator( opCtx, options.validator, options.validationLevel, options.validationAction); if (!validatorStatus.isOK()) { throw RSFatalException(str::stream() @@ -1811,16 +1811,16 @@ void rollback_internal::syncFixUp(OperationContext* opCtx, // RecordId loc = Helpers::findById(nsd, pattern); if (!loc.isNull()) { try { - writeConflictRetry(opCtx, - "cappedTruncateAfter", - collection->ns().ns(), - [&] { - WriteUnitOfWork wunit(opCtx); - collection.getWritableCollection() - ->cappedTruncateAfter( - opCtx, loc, true); - wunit.commit(); - }); + writeConflictRetry( + opCtx, + "cappedTruncateAfter", + collection->ns().ns(), + [&] { + WriteUnitOfWork wunit(opCtx); + collection.getWritableCollection(opCtx) + ->cappedTruncateAfter(opCtx, loc, true); + wunit.commit(); + }); } catch (const DBException& e) { if (e.code() == 13415) { // hack: need to just make cappedTruncate do this... @@ -1828,7 +1828,7 @@ void rollback_internal::syncFixUp(OperationContext* opCtx, opCtx, "truncate", collection->ns().ns(), [&] { WriteUnitOfWork wunit(opCtx); uassertStatusOK( - collection.getWritableCollection() + collection.getWritableCollection(opCtx) ->truncate(opCtx)); wunit.commit(); }); @@ -2012,14 +2012,6 @@ void rollback_internal::syncFixUp(OperationContext* opCtx, validator->resetKeyManagerCache(); } - // Force the config server to update its shard registry on next access. Otherwise it may have - // the stale data that has been just rolled back. - if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { - if (auto shardRegistry = Grid::get(opCtx)->shardRegistry()) { - shardRegistry->clearEntries(); - } - } - // Force the default read/write concern cache to reload on next access in case the defaults // document was rolled back. ReadWriteConcernDefaults::get(opCtx).invalidate(); diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index 1a90e3a57c8..22d7c7648e4 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -50,6 +50,7 @@ #include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog/document_validation.h" #include "mongo/db/catalog/index_catalog.h" +#include "mongo/db/change_stream_change_collection_manager.h" #include "mongo/db/client.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/exception_util.h" @@ -323,12 +324,6 @@ template <typename AutoGetCollectionType> StatusWith<const CollectionPtr*> getCollection(const AutoGetCollectionType& autoGetCollection, const NamespaceStringOrUUID& nsOrUUID, const std::string& message) { - if (!autoGetCollection.getDb()) { - StringData dbName = nsOrUUID.nss() ? nsOrUUID.nss()->db() : nsOrUUID.dbname(); - return {ErrorCodes::NamespaceNotFound, - str::stream() << "Database [" << dbName << "] not found. " << message}; - } - const auto& collection = autoGetCollection.getCollection(); if (!collection) { return {ErrorCodes::NamespaceNotFound, @@ -347,6 +342,8 @@ Status insertDocumentsSingleBatch(OperationContext* opCtx, boost::optional<AutoGetOplog> autoOplog; const CollectionPtr* collection; + bool shouldWriteToChangeCollections = false; + auto nss = nsOrUUID.nss(); if (nss && nss->isOplog()) { // Simplify locking rules for oplog collection. @@ -355,6 +352,9 @@ Status insertDocumentsSingleBatch(OperationContext* opCtx, if (!*collection) { return {ErrorCodes::NamespaceNotFound, "Oplog collection does not exist"}; } + + shouldWriteToChangeCollections = + ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive(); } else { autoColl.emplace(opCtx, nsOrUUID, MODE_IX); auto collectionResult = getCollection( @@ -371,6 +371,18 @@ Status insertDocumentsSingleBatch(OperationContext* opCtx, if (!status.isOK()) { return status; } + + // Insert oplog entries to change collections if we are running in the serverless and the 'nss' + // is 'local.oplog.rs'. + if (shouldWriteToChangeCollections) { + auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx); + status = changeCollectionManager.insertDocumentsToChangeCollection( + opCtx, begin, end, nullOpDebug); + if (!status.isOK()) { + return status; + } + } + wunit.commit(); return Status::OK(); diff --git a/src/mongo/db/repl/storage_interface_impl_test.cpp b/src/mongo/db/repl/storage_interface_impl_test.cpp index 3c942ed7361..14362a56821 100644 --- a/src/mongo/db/repl/storage_interface_impl_test.cpp +++ b/src/mongo/db/repl/storage_interface_impl_test.cpp @@ -2684,7 +2684,6 @@ TEST_F(StorageInterfaceImplTest, auto doc = BSON("_id" << 0 << "x" << 1); auto status = storage.upsertById(opCtx, nss, doc["_id"], doc); ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, status); - ASSERT_EQUALS("Database [nosuchdb] not found. Unable to update document.", status.reason()); } TEST_F(StorageInterfaceImplTest, @@ -2879,10 +2878,6 @@ TEST_F(StorageInterfaceImplTest, DeleteByFilterReturnsNamespaceNotFoundWhenDatab auto filter = BSON("x" << 1); auto status = storage.deleteByFilter(opCtx, nss, filter); ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, status); - ASSERT_EQUALS(std::string(str::stream() - << "Database [nosuchdb] not found. Unable to delete documents in " - << nss.ns() << " using filter " << filter), - status.reason()); } TEST_F(StorageInterfaceImplTest, DeleteByFilterReturnsBadValueWhenFilterContainsUnknownOperator) { diff --git a/src/mongo/db/repl/storage_timestamp_test.cpp b/src/mongo/db/repl/storage_timestamp_test.cpp index cc0f88d0779..fb9325c1978 100644 --- a/src/mongo/db/repl/storage_timestamp_test.cpp +++ b/src/mongo/db/repl/storage_timestamp_test.cpp @@ -162,7 +162,7 @@ Status createIndexFromSpec(OperationContext* opCtx, } WriteUnitOfWork wunit(opCtx); ASSERT_OK(indexer.commit(opCtx, - collection.getWritableCollection(), + collection.getWritableCollection(opCtx), MultiIndexBlock::kNoopOnCreateEachFn, MultiIndexBlock::kNoopOnCommitFn)); LogicalTime indexTs = clock->tickClusterTime(1); @@ -394,7 +394,7 @@ public: // Timestamping index completion. Primaries write an oplog entry. ASSERT_OK( indexer.commit(_opCtx, - coll.getWritableCollection(), + coll.getWritableCollection(_opCtx), [&](const BSONObj& indexSpec) { _opCtx->getServiceContext()->getOpObserver()->onCreateIndex( _opCtx, coll->ns(), coll->uuid(), indexSpec, false); @@ -2787,7 +2787,7 @@ TEST_F(StorageTimestampTest, IndexBuildsResolveErrorsDuringStateChangeToPrimary) WriteUnitOfWork wuow(_opCtx); ASSERT_OK( indexer.commit(_opCtx, - collection.getWritableCollection(), + collection.getWritableCollection(_opCtx), [&](const BSONObj& indexSpec) { _opCtx->getServiceContext()->getOpObserver()->onCreateIndex( _opCtx, collection->ns(), collection->uuid(), indexSpec, false); diff --git a/src/mongo/db/repl/tenant_collection_cloner.cpp b/src/mongo/db/repl/tenant_collection_cloner.cpp index 9e6d5f7e02a..165538954bd 100644 --- a/src/mongo/db/repl/tenant_collection_cloner.cpp +++ b/src/mongo/db/repl/tenant_collection_cloner.cpp @@ -474,36 +474,42 @@ BaseCloner::AfterStageBehavior TenantCollectionCloner::queryStage() { } void TenantCollectionCloner::runQuery() { - const BSONObj& filter = _lastDocId.isEmpty() - ? BSONObj{} // Use $expr and the aggregation version of $gt to avoid type bracketing. - : BSON("$expr" << BSON("$gt" << BSON_ARRAY("$_id" << _lastDocId["_id"]))); - - auto query = _collectionOptions.clusteredIndex - // RecordIds are _id values and has no separate _id index - ? Query().hint(BSON("$natural" << 1)) - : Query().hint(BSON("_id" << 1)); - - - // Any errors that are thrown here (including NamespaceNotFound) will be handled on the stage - // level. - getClient()->query_DEPRECATED( - [this](DBClientCursorBatchIterator& iter) { handleNextBatch(iter); }, - _sourceDbAndUuid, - filter, - query, - nullptr /* fieldsToReturn */, - QueryOption_NoCursorTimeout | QueryOption_SecondaryOk | - (collectionClonerUsesExhaust ? QueryOption_Exhaust : 0), - _collectionClonerBatchSize, - ReadConcernArgs(ReadConcernLevel::kMajorityReadConcern).toBSONInner()); + FindCommandRequest findCmd{_sourceDbAndUuid}; + + findCmd.setFilter( + _lastDocId.isEmpty() + ? BSONObj{} // Use $expr and the aggregation version of $gt to avoid type bracketing. + : BSON("$expr" << BSON("$gt" << BSON_ARRAY("$_id" << _lastDocId["_id"])))); + + if (_collectionOptions.clusteredIndex) { + findCmd.setHint(BSON("$natural" << 1)); + } else { + findCmd.setHint(BSON("_id" << 1)); + } + + findCmd.setNoCursorTimeout(true); + findCmd.setReadConcern(ReadConcernArgs(ReadConcernLevel::kMajorityReadConcern).toBSONInner()); + if (_collectionClonerBatchSize) { + findCmd.setBatchSize(_collectionClonerBatchSize); + } + + ExhaustMode exhaustMode = collectionClonerUsesExhaust ? ExhaustMode::kOn : ExhaustMode::kOff; + + auto cursor = getClient()->find( + std::move(findCmd), ReadPreferenceSetting{ReadPreference::SecondaryPreferred}, exhaustMode); + + // Process the results of the cursor one batch at a time. + while (cursor->more()) { + handleNextBatch(*cursor); + } } -void TenantCollectionCloner::handleNextBatch(DBClientCursorBatchIterator& iter) { +void TenantCollectionCloner::handleNextBatch(DBClientCursor& cursor) { { stdx::lock_guard<Latch> lk(_mutex); _stats.receivedBatches++; - while (iter.moreInCurrentBatch()) { - _documentsToInsert.emplace_back(iter.nextSafe()); + while (cursor.moreInCurrentBatch()) { + _documentsToInsert.emplace_back(cursor.nextSafe()); } } diff --git a/src/mongo/db/repl/tenant_collection_cloner.h b/src/mongo/db/repl/tenant_collection_cloner.h index b9c22928917..12bd9bbb832 100644 --- a/src/mongo/db/repl/tenant_collection_cloner.h +++ b/src/mongo/db/repl/tenant_collection_cloner.h @@ -209,10 +209,10 @@ private: AfterStageBehavior queryStage(); /** - * Put all results from a query batch into a buffer to be inserted, and schedule - * it to be inserted. + * Put all results from a query batch into a buffer to be inserted, and schedule it to be + * inserted. */ - void handleNextBatch(DBClientCursorBatchIterator& iter); + void handleNextBatch(DBClientCursor& cursor); /** * Called whenever there is a new batch of documents ready from the DBClientConnection. diff --git a/src/mongo/db/repl/tenant_file_cloner.cpp b/src/mongo/db/repl/tenant_file_cloner.cpp index 83ae3c65fc8..b909039eed1 100644 --- a/src/mongo/db/repl/tenant_file_cloner.cpp +++ b/src/mongo/db/repl/tenant_file_cloner.cpp @@ -188,8 +188,7 @@ void TenantFileCloner::runQuery() { getClient(), std::move(aggRequest), true /* secondaryOk */, useExhaust)); try { while (cursor->more()) { - DBClientCursorBatchIterator iter(*cursor); - handleNextBatch(iter); + handleNextBatch(*cursor); } } catch (const DBException& e) { // We cannot continue after an error when processing exhaust cursors. Instead we must @@ -207,7 +206,7 @@ void TenantFileCloner::runQuery() { } } -void TenantFileCloner::handleNextBatch(DBClientCursorBatchIterator& iter) { +void TenantFileCloner::handleNextBatch(DBClientCursor& cursor) { LOGV2_DEBUG(6113307, 3, "TenantFileCloner handleNextBatch", @@ -215,7 +214,7 @@ void TenantFileCloner::handleNextBatch(DBClientCursorBatchIterator& iter) { "backupId"_attr = _backupId, "remoteFile"_attr = _remoteFileName, "fileOffset"_attr = getFileOffset(), - "moreInCurrentBatch"_attr = iter.moreInCurrentBatch()); + "moreInCurrentBatch"_attr = cursor.moreInCurrentBatch()); { stdx::lock_guard<TenantMigrationSharedData> lk(*getSharedData()); if (!getSharedData()->getStatus(lk).isOK()) { @@ -225,11 +224,11 @@ void TenantFileCloner::handleNextBatch(DBClientCursorBatchIterator& iter) { str::stream() << message << ": " << getSharedData()->getStatus(lk)); } } - while (iter.moreInCurrentBatch()) { + while (cursor.moreInCurrentBatch()) { stdx::lock_guard<Latch> lk(_mutex); _stats.receivedBatches++; - while (iter.moreInCurrentBatch()) { - _dataToWrite.emplace_back(iter.nextSafe()); + while (cursor.moreInCurrentBatch()) { + _dataToWrite.emplace_back(cursor.nextSafe()); } } diff --git a/src/mongo/db/repl/tenant_file_cloner.h b/src/mongo/db/repl/tenant_file_cloner.h index 90e37946224..27ff89fbc3a 100644 --- a/src/mongo/db/repl/tenant_file_cloner.h +++ b/src/mongo/db/repl/tenant_file_cloner.h @@ -160,7 +160,7 @@ private: /** * Put all results from a query batch into a buffer, and schedule it to be written to disk. */ - void handleNextBatch(DBClientCursorBatchIterator& iter); + void handleNextBatch(DBClientCursor& cursor); /** * Called whenever there is a new batch of documents ready from the DBClientConnection. diff --git a/src/mongo/db/repl/tenant_file_importer_service.cpp b/src/mongo/db/repl/tenant_file_importer_service.cpp index 85d95d7e22d..af565c3c713 100644 --- a/src/mongo/db/repl/tenant_file_importer_service.cpp +++ b/src/mongo/db/repl/tenant_file_importer_service.cpp @@ -118,14 +118,21 @@ TenantFileImporterService* TenantFileImporterService::get(ServiceContext* servic void TenantFileImporterService::startMigration(const UUID& migrationId, const StringData& donorConnectionString) { stdx::lock_guard lk(_mutex); + if (migrationId == _migrationId && _state >= State::kStarted && _state < State::kInterrupted) { + return; + } + _reset(lk); _migrationId = migrationId; _donorConnectionString = donorConnectionString.toString(); - _eventQueue = std::make_unique<Queue>(); - _state.setState(ImporterState::State::kStarted); + _eventQueue = std::make_shared<Queue>(); + _state = State::kStarted; - _thread = std::make_unique<stdx::thread>([this] { + _thread = std::make_unique<stdx::thread>([this, migrationId] { Client::initThread("TenantFileImporterService"); + LOGV2_INFO(6378904, + "TenantFileImporterService starting worker thread", + "migrationId"_attr = migrationId.toString()); auto opCtx = cc().makeOperationContext(); _handleEvents(opCtx.get()); }); @@ -134,48 +141,55 @@ void TenantFileImporterService::startMigration(const UUID& migrationId, void TenantFileImporterService::learnedFilename(const UUID& migrationId, const BSONObj& metadataDoc) { stdx::lock_guard lk(_mutex); + if (migrationId == _migrationId && _state >= State::kLearnedAllFilenames) { + return; + } + tassert(8423347, "Called learnedFilename with migrationId {}, but {} is active"_format( migrationId.toString(), _migrationId ? _migrationId->toString() : "no migration"), migrationId == _migrationId); - _state.setState(ImporterState::State::kLearnedFilename); + _state = State::kLearnedFilename; ImporterEvent event{ImporterEvent::Type::kLearnedFileName, migrationId}; event.metadataDoc = metadataDoc.getOwned(); + invariant(_eventQueue); auto success = _eventQueue->tryPush(std::move(event)); - uassert(6378904, + uassert(6378903, "TenantFileImporterService failed to push '{}' event without blocking"_format( - _state.toString()), + stateToString(_state)), success); } void TenantFileImporterService::learnedAllFilenames(const UUID& migrationId) { stdx::lock_guard lk(_mutex); + if (migrationId == _migrationId && _state >= State::kLearnedAllFilenames) { + return; + } + tassert(8423345, "Called learnedAllFilenames with migrationId {}, but {} is active"_format( migrationId.toString(), _migrationId ? _migrationId->toString() : "no migration"), migrationId == _migrationId); - _state.setState(ImporterState::State::kLearnedAllFilenames); + _state = State::kLearnedAllFilenames; + invariant(_eventQueue); auto success = _eventQueue->tryPush({ImporterEvent::Type::kLearnedAllFilenames, migrationId}); - uassert(6378905, + uassert(6378902, "TenantFileImporterService failed to push '{}' event without blocking"_format( - _state.toString()), + stateToString(_state)), success); } void TenantFileImporterService::interrupt(const UUID& migrationId) { stdx::lock_guard lk(_mutex); - if (!_migrationId) { - return; - } if (migrationId != _migrationId) { LOGV2_WARNING( - 6378907, + 6378901, "Called interrupt with migrationId {migrationId}, but {activeMigrationId} is active", "migrationId"_attr = migrationId.toString(), - "activeMigrationId"_attr = _migrationId->toString()); + "activeMigrationId"_attr = _migrationId ? _migrationId->toString() : "no migration"); return; } _interrupt(lk); @@ -195,8 +209,11 @@ void TenantFileImporterService::_handleEvents(OperationContext* opCtx) { std::string donorConnectionString; boost::optional<UUID> migrationId; + std::shared_ptr<Queue> eventQueueRef; { stdx::lock_guard lk(_mutex); + invariant(_eventQueue); + eventQueueRef = _eventQueue; donorConnectionString = _donorConnectionString; migrationId = _migrationId; } @@ -206,9 +223,9 @@ void TenantFileImporterService::_handleEvents(OperationContext* opCtx) { opCtx->checkForInterrupt(); try { - event = _eventQueue->pop(opCtx); + event = eventQueueRef->pop(opCtx); } catch (const ExceptionFor<ErrorCodes::ProducerConsumerQueueEndClosed>& err) { - LOGV2_WARNING(6378908, "Event queue was interrupted", "error"_attr = err); + LOGV2_WARNING(6378900, "Event queue was interrupted", "error"_attr = err); break; } @@ -259,7 +276,7 @@ void TenantFileImporterService::_voteImportedFiles(OperationContext* opCtx) { } void TenantFileImporterService::_interrupt(WithLock) { - if (_state.is(ImporterState::State::kInterrupted)) { + if (_state == State::kInterrupted) { return; } @@ -276,11 +293,16 @@ void TenantFileImporterService::_interrupt(WithLock) { // _opCtx->markKilled(ErrorCodes::Interrupted); } - _state.setState(ImporterState::State::kInterrupted); + _state = State::kInterrupted; } void TenantFileImporterService::_reset(WithLock) { - _migrationId.reset(); + if (_migrationId) { + LOGV2_INFO(6378905, + "TenantFileImporterService resetting migration", + "migrationId"_attr = _migrationId->toString()); + _migrationId.reset(); + } if (_thread && _thread->joinable()) { _thread->join(); @@ -292,6 +314,6 @@ void TenantFileImporterService::_reset(WithLock) { } // TODO SERVER-66907: how should we be resetting _opCtx? - _state.setState(ImporterState::State::kUninitialized); + _state = State::kUninitialized; } } // namespace mongo::repl diff --git a/src/mongo/db/repl/tenant_file_importer_service.h b/src/mongo/db/repl/tenant_file_importer_service.h index 9a27af816da..d7188f9a0e6 100644 --- a/src/mongo/db/repl/tenant_file_importer_service.h +++ b/src/mongo/db/repl/tenant_file_importer_service.h @@ -82,75 +82,35 @@ private: boost::optional<UUID> _migrationId; std::string _donorConnectionString; Mutex _mutex = MONGO_MAKE_LATCH("TenantFileImporterService::_mutex"); - class ImporterState { - public: - enum class State { - kUninitialized, - kStarted, - kLearnedFilename, - kLearnedAllFilenames, - kInterrupted - }; - void setState(State nextState) { - tassert(6114403, - str::stream() << "current state: " << toString(_state) - << ", new state: " << toString(nextState), - isValidTransition(nextState)); - _state = nextState; - } - - bool is(State state) const { - return _state == state; - } - - StringData toString() const { - return toString(_state); - } - private: - static StringData toString(State value) { - switch (value) { - case State::kUninitialized: - return "uninitialized"; - case State::kStarted: - return "started"; - case State::kLearnedFilename: - return "learned filename"; - case State::kLearnedAllFilenames: - return "learned all filenames"; - case State::kInterrupted: - return "interrupted"; - } - MONGO_UNREACHABLE; - return StringData(); - } + // Explicit State enum ordering defined here because we rely on comparison + // operators for state checking in various TenantFileImporterService methods. + enum class State { + kUninitialized = 0, + kStarted = 1, + kLearnedFilename = 2, + kLearnedAllFilenames = 3, + kInterrupted = 4 + }; - bool isValidTransition(State newState) { - if (_state == newState) { - return true; - } - - switch (_state) { - case State::kUninitialized: - return newState == State::kStarted || newState == State::kInterrupted; - case State::kStarted: - return newState == State::kInterrupted || newState == State::kLearnedFilename || - newState == State::kLearnedAllFilenames; - case State::kLearnedFilename: - return newState == State::kInterrupted || newState == State::kLearnedFilename || - newState == State::kLearnedAllFilenames; - case State::kLearnedAllFilenames: - return newState == State::kInterrupted; - case State::kInterrupted: - return newState == State::kUninitialized || newState == State::kStarted; - } - MONGO_UNREACHABLE; + static StringData stateToString(State state) { + switch (state) { + case State::kUninitialized: + return "uninitialized"; + case State::kStarted: + return "started"; + case State::kLearnedFilename: + return "learned filename"; + case State::kLearnedAllFilenames: + return "learned all filenames"; + case State::kInterrupted: + return "interrupted"; } + MONGO_UNREACHABLE; + return StringData(); + } - State _state = State::kUninitialized; - }; - - ImporterState _state; + State _state; struct ImporterEvent { enum class Type { kNone, kLearnedFileName, kLearnedAllFilenames }; @@ -166,6 +126,6 @@ private: MultiProducerSingleConsumerQueue<ImporterEvent, producer_consumer_queue_detail::DefaultCostFunction>; - std::unique_ptr<Queue> _eventQueue; + std::shared_ptr<Queue> _eventQueue; }; } // namespace mongo::repl diff --git a/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp b/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp index 53e7b24f135..fc693f64c20 100644 --- a/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp +++ b/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp @@ -437,7 +437,7 @@ void recoverTenantMigrationAccessBlockers(OperationContext* opCtx) { // Recover TenantMigrationDonorAccessBlockers for ShardSplit. PersistentTaskStore<ShardSplitDonorDocument> shardSplitDonorStore( - NamespaceString::kTenantSplitDonorsNamespace); + NamespaceString::kShardSplitDonorsNamespace); shardSplitDonorStore.forEach(opCtx, {}, [&](const ShardSplitDonorDocument& doc) { // Skip creating a TenantMigrationDonorAccessBlocker for terminal shard split that have been @@ -462,6 +462,8 @@ void recoverTenantMigrationAccessBlockers(OperationContext* opCtx) { .add(tenantId.toString(), mtab); switch (doc.getState()) { + case ShardSplitDonorStateEnum::kAbortingIndexBuilds: + break; case ShardSplitDonorStateEnum::kBlocking: invariant(doc.getBlockTimestamp()); mtab->startBlockingWrites(); diff --git a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp index 34700086793..4cfdb60b43c 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp @@ -282,11 +282,11 @@ void TenantMigrationRecipientOpObserver::onDelete(OperationContext* opCtx, if (nss == NamespaceString::kTenantMigrationRecipientsNamespace && !tenant_migration_access_blocker::inRecoveryMode(opCtx)) { if (tenantIdToDeleteDecoration(opCtx)) { + auto tenantId = tenantIdToDeleteDecoration(opCtx).get(); LOGV2_INFO(8423337, "Removing expired 'multitenant migration' migration"); - opCtx->recoveryUnit()->onCommit([opCtx](boost::optional<Timestamp>) { + opCtx->recoveryUnit()->onCommit([opCtx, tenantId](boost::optional<Timestamp>) { TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) - .remove(tenantIdToDeleteDecoration(opCtx).get(), - TenantMigrationAccessBlocker::BlockerType::kRecipient); + .remove(tenantId, TenantMigrationAccessBlocker::BlockerType::kRecipient); }); } @@ -297,8 +297,7 @@ void TenantMigrationRecipientOpObserver::onDelete(OperationContext* opCtx, "migrationId"_attr = migrationId); opCtx->recoveryUnit()->onCommit([opCtx, migrationId](boost::optional<Timestamp>) { TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) - .removeRecipientAccessBlockersForMigration( - migrationIdToDeleteDecoration(opCtx).get()); + .removeRecipientAccessBlockersForMigration(migrationId); repl::TenantFileImporterService::get(opCtx->getServiceContext()) ->interrupt(migrationId); }); diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index facaf190ab8..f355b7a3ac6 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -43,6 +43,7 @@ #include "mongo/db/commands/tenant_migration_donor_cmds_gen.h" #include "mongo/db/commands/test_commands_enabled.h" #include "mongo/db/concurrency/exception_util.h" +#include "mongo/db/concurrency/replication_state_transition_lock_guard.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/namespace_string.h" @@ -213,7 +214,7 @@ public: // Tenant migration does not require the metadata from the oplog query. void processMetadata(const rpc::ReplSetMetadata& replMetadata, - rpc::OplogQueryMetadata oqMetadata) final {} + const rpc::OplogQueryMetadata& oqMetadata) final {} // Tenant migration does not change sync source depending on metadata. ChangeSyncSourceAction shouldStopFetching(const HostAndPort& source, @@ -2516,7 +2517,8 @@ void TenantMigrationRecipientService::Instance::_startOplogApplier() { } void TenantMigrationRecipientService::Instance::_setup() { - auto opCtx = cc().makeOperationContext(); + auto uniqueOpCtx = cc().makeOperationContext(); + auto opCtx = uniqueOpCtx.get(); { stdx::lock_guard lk(_mutex); // Do not set the internal states if the migration is already interrupted. @@ -2543,12 +2545,23 @@ void TenantMigrationRecipientService::Instance::_setup() { _sharedData = std::make_unique<TenantMigrationSharedData>( getGlobalServiceContext()->getFastClockSource(), getMigrationUUID(), resumePhase); - _createOplogBuffer(lk, opCtx.get()); + _createOplogBuffer(lk, opCtx); } // Start the oplog buffer outside the mutex to avoid deadlock on a concurrent stepdown. try { - _donorOplogBuffer->startup(opCtx.get()); + // It is illegal to start the replicated donor buffer when the node is not primary. + // So ensure we are primary before trying to startup the oplog buffer. + repl::ReplicationStateTransitionLockGuard rstl(opCtx, MODE_IX); + + auto oplogBufferNS = getOplogBufferNs(getMigrationUUID()); + if (!repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase( + opCtx, oplogBufferNS.db())) { + uassertStatusOK( + Status(ErrorCodes::NotWritablePrimary, "Recipient node is no longer a primary.")); + } + + _donorOplogBuffer->startup(opCtx); } catch (DBException& ex) { ex.addContext("Failed to create oplog buffer collection."); throw; diff --git a/src/mongo/db/repl/tenant_oplog_applier_test.cpp b/src/mongo/db/repl/tenant_oplog_applier_test.cpp index 4215b04043a..864960d84d7 100644 --- a/src/mongo/db/repl/tenant_oplog_applier_test.cpp +++ b/src/mongo/db/repl/tenant_oplog_applier_test.cpp @@ -201,10 +201,14 @@ private: logv2::LogComponent::kTenantMigration, logv2::LogSeverity::Debug(1)}; }; +// TODO SERVER-67155 Remove all calls to DatabaseName::toStringWithTenantId() once the OplogEntry +// deserializer passes "tid" to the NamespaceString constructor TEST_F(TenantOplogApplierTest, NoOpsForSingleBatch) { std::vector<OplogEntry> srcOps; - srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(_dbName, "foo"), UUID::gen())); - srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(_dbName, "bar"), UUID::gen())); + srcOps.push_back(makeInsertOplogEntry( + 1, NamespaceString(_dbName.toStringWithTenantId(), "foo"), UUID::gen())); + srcOps.push_back(makeInsertOplogEntry( + 2, NamespaceString(_dbName.toStringWithTenantId(), "bar"), UUID::gen())); pushOps(srcOps); auto writerPool = makeTenantMigrationWriterPool(); @@ -235,7 +239,8 @@ TEST_F(TenantOplogApplierTest, NoOpsForLargeBatch) { std::vector<OplogEntry> srcOps; // This should be big enough to use several threads to do the writing for (int i = 0; i < 64; i++) { - srcOps.push_back(makeInsertOplogEntry(i + 1, NamespaceString(_dbName, "foo"), UUID::gen())); + srcOps.push_back(makeInsertOplogEntry( + i + 1, NamespaceString(_dbName.toStringWithTenantId(), "foo"), UUID::gen())); } pushOps(srcOps); @@ -266,10 +271,14 @@ TEST_F(TenantOplogApplierTest, NoOpsForLargeBatch) { TEST_F(TenantOplogApplierTest, NoOpsForMultipleBatches) { std::vector<OplogEntry> srcOps; - srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(_dbName, "foo"), UUID::gen())); - srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(_dbName, "bar"), UUID::gen())); - srcOps.push_back(makeInsertOplogEntry(3, NamespaceString(_dbName, "baz"), UUID::gen())); - srcOps.push_back(makeInsertOplogEntry(4, NamespaceString(_dbName, "bif"), UUID::gen())); + srcOps.push_back(makeInsertOplogEntry( + 1, NamespaceString(_dbName.toStringWithTenantId(), "foo"), UUID::gen())); + srcOps.push_back(makeInsertOplogEntry( + 2, NamespaceString(_dbName.toStringWithTenantId(), "bar"), UUID::gen())); + srcOps.push_back(makeInsertOplogEntry( + 3, NamespaceString(_dbName.toStringWithTenantId(), "baz"), UUID::gen())); + srcOps.push_back(makeInsertOplogEntry( + 4, NamespaceString(_dbName.toStringWithTenantId(), "bif"), UUID::gen())); auto writerPool = makeTenantMigrationWriterPool(); @@ -305,14 +314,20 @@ TEST_F(TenantOplogApplierTest, NoOpsForMultipleBatches) { TEST_F(TenantOplogApplierTest, NoOpsForLargeTransaction) { std::vector<OplogEntry> innerOps1; - innerOps1.push_back(makeInsertOplogEntry(11, NamespaceString(_dbName, "bar"), UUID::gen())); - innerOps1.push_back(makeInsertOplogEntry(12, NamespaceString(_dbName, "bar"), UUID::gen())); + innerOps1.push_back(makeInsertOplogEntry( + 11, NamespaceString(_dbName.toStringWithTenantId(), "bar"), UUID::gen())); + innerOps1.push_back(makeInsertOplogEntry( + 12, NamespaceString(_dbName.toStringWithTenantId(), "bar"), UUID::gen())); std::vector<OplogEntry> innerOps2; - innerOps2.push_back(makeInsertOplogEntry(21, NamespaceString(_dbName, "bar"), UUID::gen())); - innerOps2.push_back(makeInsertOplogEntry(22, NamespaceString(_dbName, "bar"), UUID::gen())); + innerOps2.push_back(makeInsertOplogEntry( + 21, NamespaceString(_dbName.toStringWithTenantId(), "bar"), UUID::gen())); + innerOps2.push_back(makeInsertOplogEntry( + 22, NamespaceString(_dbName.toStringWithTenantId(), "bar"), UUID::gen())); std::vector<OplogEntry> innerOps3; - innerOps3.push_back(makeInsertOplogEntry(31, NamespaceString(_dbName, "bar"), UUID::gen())); - innerOps3.push_back(makeInsertOplogEntry(32, NamespaceString(_dbName, "bar"), UUID::gen())); + innerOps3.push_back(makeInsertOplogEntry( + 31, NamespaceString(_dbName.toStringWithTenantId(), "bar"), UUID::gen())); + innerOps3.push_back(makeInsertOplogEntry( + 32, NamespaceString(_dbName.toStringWithTenantId(), "bar"), UUID::gen())); // Makes entries with ts from range [2, 5). std::vector<OplogEntry> srcOps = makeMultiEntryTransactionOplogEntries( @@ -353,7 +368,7 @@ TEST_F(TenantOplogApplierTest, CommitUnpreparedTransaction_DataPartiallyApplied) client.createIndexes(NamespaceString::kSessionTransactionsTableNamespace.ns(), {MongoDSessionCatalog::getConfigTxnPartialIndexSpec()}); } - NamespaceString nss(_dbName, "bar"); + NamespaceString nss(_dbName.toStringWithTenantId(), "bar"); auto uuid = createCollectionWithUuid(_opCtx.get(), nss); auto lsid = makeLogicalSessionId(_opCtx.get()); TxnNumber txnNum(0); @@ -411,7 +426,8 @@ TEST_F(TenantOplogApplierTest, CommitUnpreparedTransaction_DataPartiallyApplied) } TEST_F(TenantOplogApplierTest, ApplyInsert_DatabaseMissing) { - auto entry = makeInsertOplogEntry(1, NamespaceString(_dbName, "bar"), UUID::gen()); + auto entry = makeInsertOplogEntry( + 1, NamespaceString(_dbName.toStringWithTenantId(), "bar"), UUID::gen()); bool onInsertsCalled = false; _opObserver->onInsertsFn = [&](OperationContext* opCtx, const NamespaceString&, @@ -439,7 +455,8 @@ TEST_F(TenantOplogApplierTest, ApplyInsert_DatabaseMissing) { TEST_F(TenantOplogApplierTest, ApplyInsert_CollectionMissing) { createDatabase(_opCtx.get(), _dbName.toString()); - auto entry = makeInsertOplogEntry(1, NamespaceString(_dbName, "bar"), UUID::gen()); + auto entry = makeInsertOplogEntry( + 1, NamespaceString(_dbName.toStringWithTenantId(), "bar"), UUID::gen()); bool onInsertsCalled = false; _opObserver->onInsertsFn = [&](OperationContext* opCtx, const NamespaceString&, @@ -466,7 +483,7 @@ TEST_F(TenantOplogApplierTest, ApplyInsert_CollectionMissing) { } TEST_F(TenantOplogApplierTest, ApplyInsert_InsertExisting) { - NamespaceString nss(_dbName, "bar"); + NamespaceString nss(_dbName.toStringWithTenantId(), "bar"); auto uuid = createCollectionWithUuid(_opCtx.get(), nss); ASSERT_OK(getStorageInterface()->insertDocument(_opCtx.get(), nss, @@ -504,7 +521,7 @@ TEST_F(TenantOplogApplierTest, ApplyInsert_InsertExisting) { } TEST_F(TenantOplogApplierTest, ApplyInsert_UniqueKey_InsertExisting) { - NamespaceString nss(_dbName, "bar"); + NamespaceString nss(_dbName.toStringWithTenantId(), "bar"); auto uuid = createCollectionWithUuid(_opCtx.get(), nss); // Create unique key index on the collection. @@ -545,7 +562,7 @@ TEST_F(TenantOplogApplierTest, ApplyInsert_UniqueKey_InsertExisting) { } TEST_F(TenantOplogApplierTest, ApplyInsert_Success) { - NamespaceString nss(_dbName, "bar"); + NamespaceString nss(_dbName.toStringWithTenantId(), "bar"); auto uuid = createCollectionWithUuid(_opCtx.get(), nss); auto entry = makeInsertOplogEntry(1, nss, uuid); bool onInsertsCalled = false; @@ -553,7 +570,9 @@ TEST_F(TenantOplogApplierTest, ApplyInsert_Success) { [&](OperationContext* opCtx, const NamespaceString& nss, const std::vector<BSONObj>& docs) { ASSERT_FALSE(onInsertsCalled); onInsertsCalled = true; - ASSERT_EQUALS(nss.db(), _dbName.toString()); + // TODO Check that (nss.dbName() == _dbName) once the OplogEntry deserializer passes + // "tid" to the NamespaceString constructor + ASSERT_EQUALS(nss.dbName().db(), _dbName.toStringWithTenantId()); ASSERT_EQUALS(nss.coll(), "bar"); ASSERT_EQUALS(1, docs.size()); ASSERT_BSONOBJ_EQ(docs[0], entry.getObject()); @@ -581,9 +600,9 @@ TEST_F(TenantOplogApplierTest, ApplyInsert_Success) { TEST_F(TenantOplogApplierTest, ApplyInserts_Grouped) { // TODO(SERVER-50256): remove nss_workaround, which is used to work around a bug where // the first operation assigned to a worker cannot be grouped. - NamespaceString nss_workaround(_dbName, "a"); - NamespaceString nss1(_dbName, "bar"); - NamespaceString nss2(_dbName, "baz"); + NamespaceString nss_workaround(_dbName.toStringWithTenantId(), "a"); + NamespaceString nss1(_dbName.toStringWithTenantId(), "bar"); + NamespaceString nss2(_dbName.toStringWithTenantId(), "baz"); auto uuid1 = createCollectionWithUuid(_opCtx.get(), nss1); auto uuid2 = createCollectionWithUuid(_opCtx.get(), nss2); std::vector<OplogEntry> entries; @@ -641,7 +660,7 @@ TEST_F(TenantOplogApplierTest, ApplyInserts_Grouped) { } TEST_F(TenantOplogApplierTest, ApplyUpdate_MissingDocument) { - NamespaceString nss(_dbName, "bar"); + NamespaceString nss(_dbName.toStringWithTenantId(), "bar"); auto uuid = createCollectionWithUuid(_opCtx.get(), nss); auto entry = makeOplogEntry( repl::OpTypeEnum::kUpdate, nss, uuid, BSON("$set" << BSON("a" << 1)), BSON("_id" << 0)); @@ -676,7 +695,7 @@ TEST_F(TenantOplogApplierTest, ApplyUpdate_MissingDocument) { } TEST_F(TenantOplogApplierTest, ApplyUpdate_Success) { - NamespaceString nss(_dbName, "bar"); + NamespaceString nss(_dbName.toStringWithTenantId(), "bar"); auto uuid = createCollectionWithUuid(_opCtx.get(), nss); ASSERT_OK(getStorageInterface()->insertDocument(_opCtx.get(), nss, {BSON("_id" << 0)}, 0)); auto entry = makeOplogEntry( @@ -708,7 +727,8 @@ TEST_F(TenantOplogApplierTest, ApplyUpdate_Success) { } TEST_F(TenantOplogApplierTest, ApplyDelete_DatabaseMissing) { - auto entry = makeOplogEntry(OpTypeEnum::kDelete, NamespaceString(_dbName, "bar"), UUID::gen()); + auto entry = makeOplogEntry( + OpTypeEnum::kDelete, NamespaceString(_dbName.toStringWithTenantId(), "bar"), UUID::gen()); bool onDeleteCalled = false; _opObserver->onDeleteFn = [&](OperationContext* opCtx, const NamespaceString&, @@ -738,7 +758,8 @@ TEST_F(TenantOplogApplierTest, ApplyDelete_DatabaseMissing) { TEST_F(TenantOplogApplierTest, ApplyDelete_CollectionMissing) { createDatabase(_opCtx.get(), _dbName.toString()); - auto entry = makeOplogEntry(OpTypeEnum::kDelete, NamespaceString(_dbName, "bar"), UUID::gen()); + auto entry = makeOplogEntry( + OpTypeEnum::kDelete, NamespaceString(_dbName.toStringWithTenantId(), "bar"), UUID::gen()); bool onDeleteCalled = false; _opObserver->onDeleteFn = [&](OperationContext* opCtx, const NamespaceString&, @@ -767,7 +788,7 @@ TEST_F(TenantOplogApplierTest, ApplyDelete_CollectionMissing) { } TEST_F(TenantOplogApplierTest, ApplyDelete_DocumentMissing) { - NamespaceString nss(_dbName, "bar"); + NamespaceString nss(_dbName.toStringWithTenantId(), "bar"); auto uuid = createCollectionWithUuid(_opCtx.get(), nss); auto entry = makeOplogEntry(OpTypeEnum::kDelete, nss, uuid, BSON("_id" << 0)); bool onDeleteCalled = false; @@ -798,7 +819,7 @@ TEST_F(TenantOplogApplierTest, ApplyDelete_DocumentMissing) { } TEST_F(TenantOplogApplierTest, ApplyDelete_Success) { - NamespaceString nss(_dbName, "bar"); + NamespaceString nss(_dbName.toStringWithTenantId(), "bar"); auto uuid = createCollectionWithUuid(_opCtx.get(), nss); ASSERT_OK(getStorageInterface()->insertDocument(_opCtx.get(), nss, {BSON("_id" << 0)}, 0)); auto entry = makeOplogEntry(OpTypeEnum::kDelete, nss, uuid, BSON("_id" << 0)); @@ -814,7 +835,9 @@ TEST_F(TenantOplogApplierTest, ApplyDelete_Success) { ASSERT_TRUE(opCtx->lockState()->isCollectionLockedForMode(nss, MODE_IX)); ASSERT_TRUE(opCtx->writesAreReplicated()); ASSERT_FALSE(args.fromMigrate); - ASSERT_EQUALS(nss.db(), _dbName.toString()); + // TODO SERVER-66708 Check that (nss.dbName() == _dbName) once the OplogEntry deserializer + // passes "tid" to the NamespaceString constructor + ASSERT_EQUALS(nss.dbName().db(), _dbName.toStringWithTenantId()); ASSERT_EQUALS(nss.coll(), "bar"); ASSERT_EQUALS(uuid, observer_uuid); }; @@ -839,7 +862,7 @@ TEST_F(TenantOplogApplierTest, ApplyDelete_Success) { } TEST_F(TenantOplogApplierTest, ApplyCreateCollCommand_CollExisting) { - NamespaceString nss(_dbName, "bar"); + NamespaceString nss(_dbName.toStringWithTenantId(), "bar"); auto uuid = createCollectionWithUuid(_opCtx.get(), nss); auto op = BSON("op" << "c" @@ -874,8 +897,8 @@ TEST_F(TenantOplogApplierTest, ApplyCreateCollCommand_CollExisting) { } TEST_F(TenantOplogApplierTest, ApplyRenameCollCommand_CollExisting) { - NamespaceString nss1(_dbName, "foo"); - NamespaceString nss2(_dbName, "bar"); + NamespaceString nss1(_dbName.toStringWithTenantId(), "foo"); + NamespaceString nss2(_dbName.toStringWithTenantId(), "bar"); auto uuid = createCollectionWithUuid(_opCtx.get(), nss2); auto op = BSON("op" @@ -914,7 +937,7 @@ TEST_F(TenantOplogApplierTest, ApplyRenameCollCommand_CollExisting) { } TEST_F(TenantOplogApplierTest, ApplyCreateCollCommand_Success) { - NamespaceString nss(_dbName, "t"); + NamespaceString nss(_dbName.toStringWithTenantId(), "t"); auto op = BSON("op" << "c" @@ -954,7 +977,7 @@ TEST_F(TenantOplogApplierTest, ApplyCreateCollCommand_Success) { } TEST_F(TenantOplogApplierTest, ApplyCreateIndexesCommand_Success) { - NamespaceString nss(_dbName, "t"); + NamespaceString nss(_dbName.toStringWithTenantId(), "t"); auto uuid = createCollectionWithUuid(_opCtx.get(), nss); auto op = BSON("op" @@ -1001,7 +1024,7 @@ TEST_F(TenantOplogApplierTest, ApplyCreateIndexesCommand_Success) { } TEST_F(TenantOplogApplierTest, ApplyStartIndexBuildCommand_Failure) { - NamespaceString nss(_dbName, "t"); + NamespaceString nss(_dbName.toStringWithTenantId(), "t"); auto uuid = createCollectionWithUuid(_opCtx.get(), nss); auto op = BSON("op" << "c" @@ -1066,7 +1089,7 @@ TEST_F(TenantOplogApplierTest, ApplyCreateCollCommand_WrongNSS) { } TEST_F(TenantOplogApplierTest, ApplyDropIndexesCommand_IndexNotFound) { - NamespaceString nss(_dbName, "bar"); + NamespaceString nss(_dbName.toStringWithTenantId(), "bar"); auto uuid = createCollectionWithUuid(_opCtx.get(), nss); auto op = BSON("op" << "c" @@ -1104,7 +1127,7 @@ TEST_F(TenantOplogApplierTest, ApplyDropIndexesCommand_IndexNotFound) { } TEST_F(TenantOplogApplierTest, ApplyCollModCommand_IndexNotFound) { - NamespaceString nss(_dbName, "bar"); + NamespaceString nss(_dbName.toStringWithTenantId(), "bar"); auto uuid = createCollectionWithUuid(_opCtx.get(), nss); auto op = BSON("op" << "c" @@ -1148,7 +1171,7 @@ TEST_F(TenantOplogApplierTest, ApplyCollModCommand_IndexNotFound) { TEST_F(TenantOplogApplierTest, ApplyCollModCommand_CollectionMissing) { createDatabase(_opCtx.get(), _dbName.toString()); - NamespaceString nss(_dbName, "bar"); + NamespaceString nss(_dbName.toStringWithTenantId(), "bar"); UUID uuid(UUID::gen()); auto op = BSON("op" << "c" @@ -1312,7 +1335,8 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenNoop_Success) { TEST_F(TenantOplogApplierTest, ApplyInsertThenResumeTokenNoopInDifferentBatch_Success) { std::vector<OplogEntry> srcOps; - srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(_dbName, "foo"), UUID::gen())); + srcOps.push_back(makeInsertOplogEntry( + 1, NamespaceString(_dbName.toStringWithTenantId(), "foo"), UUID::gen())); srcOps.push_back(makeNoopOplogEntry(2, TenantMigrationRecipientService::kNoopMsg)); pushOps(srcOps); auto writerPool = makeTenantMigrationWriterPool(); @@ -1349,7 +1373,8 @@ TEST_F(TenantOplogApplierTest, ApplyInsertThenResumeTokenNoopInDifferentBatch_Su TEST_F(TenantOplogApplierTest, ApplyResumeTokenNoopThenInsertInSameBatch_Success) { std::vector<OplogEntry> srcOps; srcOps.push_back(makeNoopOplogEntry(1, TenantMigrationRecipientService::kNoopMsg)); - srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(_dbName, "foo"), UUID::gen())); + srcOps.push_back(makeInsertOplogEntry( + 2, NamespaceString(_dbName.toStringWithTenantId(), "foo"), UUID::gen())); pushOps(srcOps); auto writerPool = makeTenantMigrationWriterPool(); @@ -1380,7 +1405,8 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenNoopThenInsertInSameBatch_Success TEST_F(TenantOplogApplierTest, ApplyResumeTokenInsertThenNoopSameTimestamp_Success) { std::vector<OplogEntry> srcOps; - srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(_dbName, "foo"), UUID::gen())); + srcOps.push_back(makeInsertOplogEntry( + 1, NamespaceString(_dbName.toStringWithTenantId(), "foo"), UUID::gen())); srcOps.push_back(makeNoopOplogEntry(1, TenantMigrationRecipientService::kNoopMsg)); pushOps(srcOps); ASSERT_EQ(srcOps[0].getOpTime(), srcOps[1].getOpTime()); @@ -1413,7 +1439,8 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenInsertThenNoopSameTimestamp_Succe TEST_F(TenantOplogApplierTest, ApplyResumeTokenInsertThenNoop_Success) { std::vector<OplogEntry> srcOps; - srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(_dbName, "foo"), UUID::gen())); + srcOps.push_back(makeInsertOplogEntry( + 1, NamespaceString(_dbName.toStringWithTenantId(), "foo"), UUID::gen())); srcOps.push_back(makeNoopOplogEntry(2, TenantMigrationRecipientService::kNoopMsg)); pushOps(srcOps); auto writerPool = makeTenantMigrationWriterPool(); @@ -1445,8 +1472,8 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenInsertThenNoop_Success) { TEST_F(TenantOplogApplierTest, ApplyInsert_MultiKeyIndex) { createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); - NamespaceString indexedNss(_dbName, "indexedColl"); - NamespaceString nonIndexedNss(_dbName, "nonIndexedColl"); + NamespaceString indexedNss(_dbName.toStringWithTenantId(), "indexedColl"); + NamespaceString nonIndexedNss(_dbName.toStringWithTenantId(), "nonIndexedColl"); auto indexedCollUUID = createCollectionWithUuid(_opCtx.get(), indexedNss); createCollection(_opCtx.get(), nonIndexedNss, CollectionOptions()); diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp index 7f30b7b113d..c72bb2ddfb3 100644 --- a/src/mongo/db/repl/topology_coordinator.cpp +++ b/src/mongo/db/repl/topology_coordinator.cpp @@ -1364,14 +1364,14 @@ void TopologyCoordinator::setMyLastDurableOpTimeAndWallTime(OpTimeAndWallTime op myMemberData.setLastDurableOpTimeAndWallTime(opTimeAndWallTime, now); } -StatusWith<bool> TopologyCoordinator::setLastOptime(const UpdatePositionArgs::UpdateInfo& args, - Date_t now) { +StatusWith<bool> TopologyCoordinator::setLastOptimeForMember( + const UpdatePositionArgs::UpdateInfo& args, Date_t now) { if (_selfIndex == -1) { // Ignore updates when we're in state REMOVED. return Status(ErrorCodes::NotPrimaryOrSecondary, "Received replSetUpdatePosition command but we are in state REMOVED"); } - invariant(_rsConfig.isInitialized()); // Can only use setLastOptime in replSet mode. + invariant(_rsConfig.isInitialized()); // Can only use setLastOptimeForMember in replSet mode. MemberId memberId; try { diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index fb9f7a196f7..3285a5b4825 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -585,7 +585,7 @@ public: * Returns a Status if the position could not be set, false if the last optimes for the node * did not change, or true if either the last applied or last durable optime did change. */ - StatusWith<bool> setLastOptime(const UpdatePositionArgs::UpdateInfo& args, Date_t now); + StatusWith<bool> setLastOptimeForMember(const UpdatePositionArgs::UpdateInfo& args, Date_t now); /** * Sets the latest optime committed in the previous config to the current lastCommitted optime. |