diff options
9 files changed, 271 insertions, 92 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index ff76b5c5cf2..cab39ffea0a 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -82,6 +82,7 @@ #include "mongo/db/s/migration_util.h" #include "mongo/db/s/periodic_balancer_config_refresher.h" #include "mongo/db/s/periodic_sharded_index_consistency_checker.h" +#include "mongo/db/s/resharding/resharding_donor_recipient_common.h" #include "mongo/db/s/sharding_ddl_util.h" #include "mongo/db/s/sharding_initialization_mongod.h" #include "mongo/db/s/sharding_state_recovery.h" @@ -171,8 +172,8 @@ auto makeTaskExecutor(ServiceContext* service, } /** - * Schedules a task using the executor. This task is always run unless the task executor is shutting - * down. + * Schedules a task using the executor. This task is always run unless the task executor is + * shutting down. */ void scheduleWork(executor::TaskExecutor* executor, executor::TaskExecutor::CallbackFn work) { auto cbh = executor->scheduleWork( @@ -226,9 +227,9 @@ void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication( invariant(!_oplogApplier); - // Using noop observer now that BackgroundSync no longer implements the OplogApplier::Observer - // interface. During steady state replication, there is no need to log details on every batch - // we apply. + // Using noop observer now that BackgroundSync no longer implements the + // OplogApplier::Observer interface. During steady state replication, there is no need to + // log details on every batch we apply. _oplogApplier = std::make_unique<OplogApplierImpl>( _oplogApplierTaskExecutor.get(), _oplogBuffer.get(), @@ -291,8 +292,8 @@ void ReplicationCoordinatorExternalStateImpl::_stopDataReplication_inlock( } // Clear the buffer. This unblocks the OplogFetcher if it is blocked with a full queue, but - // ensures that it won't add anything. It will also unblock the OplogApplier pipeline if it is - // waiting for an operation to be past the slaveDelay point. + // ensures that it won't add anything. It will also unblock the OplogApplier pipeline if it + // is waiting for an operation to be past the slaveDelay point. if (oldOplogBuffer) { oldOplogBuffer->clear(opCtx); } @@ -352,8 +353,8 @@ void ReplicationCoordinatorExternalStateImpl::clearAppliedThroughIfCleanShutdown } // Ensure that all writes are visible before reading. If we failed mid-batch, it would be - // possible to read from a kNoOverlap ReadSource where not all writes to the minValid document - // are visible, generating a writeConflict that would not resolve. + // possible to read from a kNoOverlap ReadSource where not all writes to the minValid + // document are visible, generating a writeConflict that would not resolve. invariant(RecoveryUnit::ReadSource::kNoTimestamp == opCtx->recoveryUnit()->getTimestampReadSource()); @@ -362,13 +363,14 @@ void ReplicationCoordinatorExternalStateImpl::clearAppliedThroughIfCleanShutdown loadLastOpTimeAndWallTimeResult.isOK() && loadLastOpTimeAndWallTimeResult.getValue().opTime == _replicationProcess->getConsistencyMarkers()->getAppliedThrough(opCtx)) { - // Clear the appliedThrough marker to indicate we are consistent with the top of the oplog. + // Clear the appliedThrough marker to indicate we are consistent with the top of the + // oplog. // - // TODO SERVER-53642: We used to record this update at the 'lastAppliedOpTime'. If there are - // any outstanding checkpoints being taken, they should only reflect this write if they see - // all writes up to our 'lastAppliedOpTime'. But with Lock Free Reads we can have readers on - // that timestamp, making it not safe to write to, even as we're holding the RSTL in - // exclusive mode. + // TODO SERVER-53642: We used to record this update at the 'lastAppliedOpTime'. If there + // are any outstanding checkpoints being taken, they should only reflect this write if + // they see all writes up to our 'lastAppliedOpTime'. But with Lock Free Reads we can + // have readers on that timestamp, making it not safe to write to, even as we're holding + // the RSTL in exclusive mode. invariant(opCtx->lockState()->isRSTLExclusive()); _replicationProcess->getConsistencyMarkers()->clearAppliedThrough(opCtx, Timestamp()); @@ -393,24 +395,24 @@ void ReplicationCoordinatorExternalStateImpl::shutdown(OperationContext* opCtx) // Perform additional shutdown steps below that must be done outside _threadMutex. - // Stop the NoOpWriter before grabbing the mutex to avoid creating a deadlock as the NoOpWriter - // itself can block on the ReplicationCoordinator mutex. It is safe to access _noopWriter - // outside of _threadMutex because _noopWriter is protected by its own mutex. + // Stop the NoOpWriter before grabbing the mutex to avoid creating a deadlock as the + // NoOpWriter itself can block on the ReplicationCoordinator mutex. It is safe to access + // _noopWriter outside of _threadMutex because _noopWriter is protected by its own mutex. invariant(_noopWriter); LOGV2_DEBUG(21308, 1, "Stopping noop writer"); _noopWriter->stopWritingPeriodicNoops(); - // We must wait for _taskExecutor outside of _threadMutex, since _taskExecutor is used to run - // the dropPendingCollectionReaper, which takes database locks. It is safe to access + // We must wait for _taskExecutor outside of _threadMutex, since _taskExecutor is used to + // run the dropPendingCollectionReaper, which takes database locks. It is safe to access // _taskExecutor outside of _threadMutex because once _startedThreads is set to true, the // _taskExecutor pointer never changes. _taskExecutor->join(); // The oplog truncate after point must be cleared, if we are still primary for shutdown, so - // nothing gets truncated unnecessarily on startup. There are no oplog holes on clean primary - // shutdown. Stepdown is similarly safe from holes and halts updates to and clears the truncate - // point. The other replication states do need truncation if the truncate point is set: e.g. - // interruption mid batch application can leave oplog holes. + // nothing gets truncated unnecessarily on startup. There are no oplog holes on clean + // primary shutdown. Stepdown is similarly safe from holes and halts updates to and clears + // the truncate point. The other replication states do need truncation if the truncate point + // is set: e.g. interruption mid batch application can leave oplog holes. if (!storageGlobalParams.readOnly && _replicationProcess->getConsistencyMarkers() ->isOplogTruncateAfterPointBeingUsedForPrimary()) { @@ -449,10 +451,11 @@ Status ReplicationCoordinatorExternalStateImpl::initializeReplSetStorage(Operati _service->getOpObserver()->onOpMessage(opCtx, msgObj); wuow.commit(); // ReplSetTest assumes that immediately after the replSetInitiate - // command returns, it can allow other nodes to initial sync with no - // retries and they will succeed. Unfortunately, initial sync will - // fail if it finds its sync source has an empty oplog. Thus, we - // need to wait here until the seed document is visible in our oplog. + // command returns, it can allow other nodes to initial sync with + // no retries and they will succeed. Unfortunately, initial sync + // will fail if it finds its sync source has an empty oplog. + // Thus, we need to wait here until the seed document is visible + // in our oplog. _storageInterface->waitForAllEarlierOplogWritesToBeVisible(opCtx); }); @@ -484,29 +487,31 @@ OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationC invariant( _replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx).isNull()); - // A primary periodically updates the oplogTruncateAfterPoint to allow replication to proceed - // without danger of unidentifiable oplog holes on unclean shutdown due to parallel writes. + // A primary periodically updates the oplogTruncateAfterPoint to allow replication to + // proceed without danger of unidentifiable oplog holes on unclean shutdown due to parallel + // writes. // // Initialize the oplogTruncateAfterPoint so that user writes are safe on unclean shutdown // between completion of transition to primary and the first async oplogTruncateAfterPoint // update. _replicationProcess->getConsistencyMarkers()->setOplogTruncateAfterPointToTopOfOplog(opCtx); - // Tell the system to start updating the oplogTruncateAfterPoint asynchronously and to use the - // truncate point, rather than last applied, to update the repl durable timestamp. + // Tell the system to start updating the oplogTruncateAfterPoint asynchronously and to use + // the truncate point, rather than last applied, to update the repl durable timestamp. // - // The truncate point must be used while primary for repl's durable timestamp because otherwise - // we could truncate last applied writes on startup recovery after an unclean shutdown that were - // previously majority confirmed to the user. + // The truncate point must be used while primary for repl's durable timestamp because + // otherwise we could truncate last applied writes on startup recovery after an unclean + // shutdown that were previously majority confirmed to the user. _replicationProcess->getConsistencyMarkers()->startUsingOplogTruncateAfterPointForPrimary(); - // Clear the appliedThrough marker so on startup we'll use the top of the oplog. This must be - // done before we add anything to our oplog. + // Clear the appliedThrough marker so on startup we'll use the top of the oplog. This must + // be done before we add anything to our oplog. // - // TODO SERVER-53642: We used to record this update at the 'lastAppliedOpTime'. If there are any - // outstanding checkpoints being taken, they should only reflect this write if they see all - // writes up to our 'lastAppliedOpTime'. But with Lock Free Reads we can have readers on that - // timestamp, making it not safe to write to, even as we're holding the RSTL in exclusive mode. + // TODO SERVER-53642: We used to record this update at the 'lastAppliedOpTime'. If there are + // any outstanding checkpoints being taken, they should only reflect this write if they see + // all writes up to our 'lastAppliedOpTime'. But with Lock Free Reads we can have readers on + // that timestamp, making it not safe to write to, even as we're holding the RSTL in + // exclusive mode. _replicationProcess->getConsistencyMarkers()->clearAppliedThrough(opCtx, Timestamp()); writeConflictRetry(opCtx, "logging transition to primary to oplog", "local.oplog.rs", [&] { @@ -523,12 +528,12 @@ OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationC auto opTimeToReturn = loadLastOpTimeAndWallTimeResult.getValue().opTime; auto newTermStartDate = loadLastOpTimeAndWallTimeResult.getValue().wallTime; - // This constant was based on data described in SERVER-44634. It is in relation to how long the - // first majority committed write takes after a new term has started. + // This constant was based on data described in SERVER-44634. It is in relation to how long + // the first majority committed write takes after a new term has started. const auto flowControlGracePeriod = Seconds(4); - // SERVER-44634: Disable flow control for a grace period after stepup. Because writes may stop - // while a node wins election and steps up, it's likely to determine there's majority point - // lag. Moreover, because there are no writes in the system, flow control will believe + // SERVER-44634: Disable flow control for a grace period after stepup. Because writes may + // stop while a node wins election and steps up, it's likely to determine there's majority + // point lag. Moreover, because there are no writes in the system, flow control will believe // secondaries are unable to process oplog entries. This can result in an undesirable "slow // start" phenomena. FlowControl::get(opCtx)->disableUntil(newTermStartDate + flowControlGracePeriod); @@ -673,9 +678,10 @@ Status ReplicationCoordinatorExternalStateImpl::storeLocalLastVoteDocument( WriteUnitOfWork wunit(opCtx); // We only want to replace the last vote document if the new last vote document - // would have a higher term. We check the term of the current last vote document and - // insert the new document in a WriteUnitOfWork to synchronize the two operations. - // We have already ensured at startup time that there is an old document. + // would have a higher term. We check the term of the current last vote document + // and insert the new document in a WriteUnitOfWork to synchronize the two + // operations. We have already ensured at startup time that there is an old + // document. BSONObj result; bool exists = Helpers::getSingleton(opCtx, lastVoteCollectionName, result); fassert(51241, exists); @@ -790,24 +796,25 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnStepDownHook() { void ReplicationCoordinatorExternalStateImpl::_stopAsyncUpdatesOfAndClearOplogTruncateAfterPoint() { auto opCtx = cc().getOperationContext(); // Temporarily turn off flow control ticketing. Getting a ticket can stall on a ticket being - // available, which may have to wait for the ticket refresher to run, which in turn blocks on - // the repl _mutex to check whether we are primary or not: this is a deadlock because stepdown - // already holds the repl _mutex! + // available, which may have to wait for the ticket refresher to run, which in turn blocks + // on the repl _mutex to check whether we are primary or not: this is a deadlock because + // stepdown already holds the repl _mutex! FlowControl::Bypass flowControlBypass(opCtx); - // Tell the system to stop updating the oplogTruncateAfterPoint asynchronously and to go back to - // using last applied to update repl's durable timestamp instead of the truncate point. + // Tell the system to stop updating the oplogTruncateAfterPoint asynchronously and to go + // back to using last applied to update repl's durable timestamp instead of the truncate + // point. _replicationProcess->getConsistencyMarkers()->stopUsingOplogTruncateAfterPointForPrimary(); // Interrupt the current JournalFlusher thread round, so it recognizes that it is no longer // primary. Otherwise the asynchronously running thread could race with setting the truncate - // point to null below. This would leave the truncate point potentially stale in a non-PRIMARY - // state, where last applied would be used to update repl's durable timestamp and confirm - // majority writes. Startup recovery could truncate majority confirmed writes back to the stale - // truncate after point. + // point to null below. This would leave the truncate point potentially stale in a + // non-PRIMARY state, where last applied would be used to update repl's durable timestamp + // and confirm majority writes. Startup recovery could truncate majority confirmed writes + // back to the stale truncate after point. // - // This makes sure the JournalFlusher is not stuck waiting for a lock that stepdown might hold - // before doing an update write to the truncate point. + // This makes sure the JournalFlusher is not stuck waiting for a lock that stepdown might + // hold before doing an update write to the truncate point. JournalFlusher::get(_service)->interruptJournalFlusherForReplStateChange(); // Wait for another round of journal flushing. This will ensure that we wait for the current @@ -816,8 +823,8 @@ void ReplicationCoordinatorExternalStateImpl::_stopAsyncUpdatesOfAndClearOplogTr // above, if writing is imminent, so we must make sure that the code completes fully. JournalFlusher::get(_service)->waitForJournalFlush(); - // Writes to non-replicated collections do not need concurrency control with the OplogApplier - // that never accesses them. Skip taking the PBWM. + // Writes to non-replicated collections do not need concurrency control with the + // OplogApplier that never accesses them. Skip taking the PBWM. ShouldNotConflictWithSecondaryBatchApplicationBlock shouldNotConflictBlock(opCtx->lockState()); // We can clear the oplogTruncateAfterPoint because we know there are no user writes during @@ -830,9 +837,10 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { Status status = ShardingCatalogManager::get(opCtx)->initializeConfigDatabaseIfNeeded(opCtx); if (!status.isOK() && status != ErrorCodes::AlreadyInitialized) { - // If the node is shutting down or it lost quorum just as it was becoming primary, don't - // run the sharding onStepUp machinery. The onStepDown counterpart to these methods is - // already idempotent, so the machinery will remain in the stepped down state. + // If the node is shutting down or it lost quorum just as it was becoming primary, + // don't run the sharding onStepUp machinery. The onStepDown counterpart to these + // methods is already idempotent, so the machinery will remain in the stepped down + // state. if (ErrorCodes::isShutdownError(status.code()) || ErrorCodes::isNotPrimaryError(status.code())) { return; @@ -844,14 +852,14 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook } if (status.isOK()) { - // Load the clusterId into memory. Use local readConcern, since we can't use majority - // readConcern in drain mode because the global lock prevents replication. This is - // safe, since if the clusterId write is rolled back, any writes that depend on it will - // also be rolled back. + // Load the clusterId into memory. Use local readConcern, since we can't use + // majority readConcern in drain mode because the global lock prevents replication. + // This is safe, since if the clusterId write is rolled back, any writes that depend + // on it will also be rolled back. // - // Since we *just* wrote the cluster ID to the config.version document (via the call to - // ShardingCatalogManager::initializeConfigDatabaseIfNeeded above), this read can only - // meaningfully fail if the node is shutting down. + // Since we *just* wrote the cluster ID to the config.version document (via the call + // to ShardingCatalogManager::initializeConfigDatabaseIfNeeded above), this read can + // only meaningfully fail if the node is shutting down. status = ClusterIdentityLoader::get(opCtx)->loadClusterId( opCtx, repl::ReadConcernLevel::kLocalReadConcern); @@ -873,9 +881,9 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook } else if (ShardingState::get(opCtx)->enabled()) { Status status = ShardingStateRecovery::recover(opCtx); - // If the node is shutting down or it lost quorum just as it was becoming primary, don't run - // the sharding onStepUp machinery. The onStepDown counterpart to these methods is already - // idempotent, so the machinery will remain in the stepped down state. + // If the node is shutting down or it lost quorum just as it was becoming primary, don't + // run the sharding onStepUp machinery. The onStepDown counterpart to these methods is + // already idempotent, so the machinery will remain in the stepped down state. if (ErrorCodes::isShutdownError(status.code()) || ErrorCodes::isNotPrimaryError(status.code())) { return; @@ -900,6 +908,8 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook sharding_ddl_util::retakeInMemoryRecoverableCriticalSections(opCtx); + const bool scheduleAsyncRefresh = true; + resharding::clearFilteringMetadata(opCtx, scheduleAsyncRefresh); } else { // unsharded if (auto validator = LogicalTimeValidator::get(_service)) { validator->enableKeyGenerator(opCtx, true); @@ -997,8 +1007,8 @@ void ReplicationCoordinatorExternalStateImpl::notifyOplogMetadataWaiters( const OpTime& committedOpTime) { signalOplogWaiters(); - // Notify the DropPendingCollectionReaper if there are any drop-pending collections with drop - // optimes before or at the committed optime. + // Notify the DropPendingCollectionReaper if there are any drop-pending collections with + // drop optimes before or at the committed optime. if (auto earliestDropOpTime = _dropPendingCollectionReaper->getEarliestDropOpTime()) { if (committedOpTime >= *earliestDropOpTime) { auto reaper = _dropPendingCollectionReaper; @@ -1060,12 +1070,13 @@ std::size_t ReplicationCoordinatorExternalStateImpl::getOplogFetcherInitialSyncM } JournalListener::Token ReplicationCoordinatorExternalStateImpl::getToken(OperationContext* opCtx) { - // If in state PRIMARY, the oplogTruncateAfterPoint must be used for the Durable timestamp in - // order to avoid majority confirming any writes that could later be truncated. + // If in state PRIMARY, the oplogTruncateAfterPoint must be used for the Durable timestamp + // in order to avoid majority confirming any writes that could later be truncated. // - // TODO (SERVER-45847): temporary hack for the ephemeral storage engine that passes in a nullptr - // for the opCtx. The ephemeral engine does not do parallel writes to cause oplog holes, - // therefore it is safe to skip updating the oplogTruncateAfterPoint that tracks oplog holes. + // TODO (SERVER-45847): temporary hack for the ephemeral storage engine that passes in a + // nullptr for the opCtx. The ephemeral engine does not do parallel writes to cause oplog + // holes, therefore it is safe to skip updating the oplogTruncateAfterPoint that tracks + // oplog holes. if (MONGO_likely(opCtx)) { auto truncatePoint = repl::ReplicationProcess::get(opCtx) ->getConsistencyMarkers() @@ -1078,14 +1089,15 @@ JournalListener::Token ReplicationCoordinatorExternalStateImpl::getToken(Operati // All other repl states use the 'lastApplied'. // // Setting 'rollbackSafe' will ensure that a safe lastApplied value is returned if we're in - // ROLLBACK state. 'lastApplied' may be momentarily set to an opTime from a divergent branch of - // history during rollback, so a benign default value will be returned instead to prevent a - // divergent 'lastApplied' from being used to forward the 'lastDurable' after rollback. + // ROLLBACK state. 'lastApplied' may be momentarily set to an opTime from a divergent branch + // of history during rollback, so a benign default value will be returned instead to prevent + // a divergent 'lastApplied' from being used to forward the 'lastDurable' after rollback. // - // No concurrency control is necessary and it is still safe if the node goes into ROLLBACK after - // getting the token because the JournalFlusher is shut down during rollback, before a divergent - // 'lastApplied' value is present. The JournalFlusher will start up again in ROLLBACK and never - // transition from non-ROLLBACK to ROLLBACK with a divergent 'lastApplied' value. + // No concurrency control is necessary and it is still safe if the node goes into ROLLBACK + // after getting the token because the JournalFlusher is shut down during rollback, before a + // divergent 'lastApplied' value is present. The JournalFlusher will start up again in + // ROLLBACK and never transition from non-ROLLBACK to ROLLBACK with a divergent + // 'lastApplied' value. return repl::ReplicationCoordinator::get(_service)->getMyLastAppliedOpTimeAndWallTime( /*rollbackSafe=*/true); } diff --git a/src/mongo/db/s/collection_metadata.cpp b/src/mongo/db/s/collection_metadata.cpp index 86744c1594f..518f5405085 100644 --- a/src/mongo/db/s/collection_metadata.cpp +++ b/src/mongo/db/s/collection_metadata.cpp @@ -184,6 +184,12 @@ void CollectionMetadata::toBSONBasic(BSONObjBuilder& bb) const { } } +BSONObj CollectionMetadata::toBSON() const { + BSONObjBuilder builder; + toBSONBasic(builder); + return builder.obj(); +} + std::string CollectionMetadata::toStringBasic() const { if (isSharded()) { return str::stream() << "collection version: " << _cm->getVersion().toString() diff --git a/src/mongo/db/s/collection_metadata.h b/src/mongo/db/s/collection_metadata.h index 13fe79d78e0..a742b77726f 100644 --- a/src/mongo/db/s/collection_metadata.h +++ b/src/mongo/db/s/collection_metadata.h @@ -170,11 +170,17 @@ public: */ void toBSONBasic(BSONObjBuilder& bb) const; + BSONObj toBSON() const; + /** * String output of the collection and shard versions. */ std::string toStringBasic() const; + std::string toString() const { + return toStringBasic(); + } + // // Methods used for orphan filtering and general introspection of the chunks owned by the shard // diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp index df03a16bddb..8c95024d108 100644 --- a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp @@ -32,14 +32,17 @@ #include "mongo/platform/basic.h" #include "mongo/db/s/resharding/resharding_donor_recipient_common.h" -#include "mongo/db/storage/duplicate_key_error_info.h" #include <fmt/format.h> +#include "mongo/db/persistent_task_store.h" +#include "mongo/db/s/shard_filtering_metadata_refresh.h" #include "mongo/db/s/sharding_state.h" +#include "mongo/db/storage/duplicate_key_error_info.h" #include "mongo/logv2/log.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/grid.h" +#include "mongo/stdx/unordered_set.h" namespace mongo { namespace resharding { @@ -376,6 +379,49 @@ void processReshardingFieldsForCollection(OperationContext* opCtx, } } +void clearFilteringMetadata(OperationContext* opCtx, bool scheduleAsyncRefresh) { + stdx::unordered_set<NamespaceString> namespacesToRefresh; + for (const NamespaceString homeToReshardingDocs : + {NamespaceString::kDonorReshardingOperationsNamespace, + NamespaceString::kRecipientReshardingOperationsNamespace}) { + PersistentTaskStore<CommonReshardingMetadata> store(homeToReshardingDocs); + + store.forEach(opCtx, Query(), [&](CommonReshardingMetadata reshardingDoc) -> bool { + namespacesToRefresh.insert(reshardingDoc.getSourceNss()); + namespacesToRefresh.insert(reshardingDoc.getTempReshardingNss()); + + return true; + }); + } + + for (const auto& nss : namespacesToRefresh) { + AutoGetCollection autoColl(opCtx, nss, MODE_IX); + CollectionShardingRuntime::get(opCtx, nss)->clearFilteringMetadata(opCtx); + + if (!scheduleAsyncRefresh) { + continue; + } + + ExecutorFuture<void>(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor()) + .then([svcCtx = opCtx->getServiceContext(), nss] { + ThreadClient tc("TriggerReshardingRecovery", svcCtx); + { + stdx::lock_guard<Client> lk(*tc.get()); + tc->setSystemOperationKillableByStepdown(lk); + } + + auto opCtx = tc->makeOperationContext(); + onShardVersionMismatch(opCtx.get(), nss, boost::none /* shardVersionReceived */); + }) + .onError([](const Status& status) { + LOGV2_WARNING(5498101, + "Error on deferred shardVersion recovery execution", + "error"_attr = redact(status)); + }) + .getAsync([](auto) {}); + } +} + } // namespace resharding } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common.h b/src/mongo/db/s/resharding/resharding_donor_recipient_common.h index bf016473582..0a103ef3619 100644 --- a/src/mongo/db/s/resharding/resharding_donor_recipient_common.h +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common.h @@ -76,6 +76,8 @@ void processReshardingFieldsForCollection(OperationContext* opCtx, const CollectionMetadata& metadata, const ReshardingFields& reshardingFields); +void clearFilteringMetadata(OperationContext* opCtx, bool scheduleAsyncRefresh); + } // namespace resharding } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp index 6bfdd45bbe8..9768b725524 100644 --- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp @@ -31,12 +31,15 @@ #include "mongo/db/s/resharding/resharding_donor_recipient_common_test.h" +#include "mongo/db/catalog/drop_database.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" +#include "mongo/db/persistent_task_store.h" #include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/operation_sharding_state.h" +#include "mongo/db/s/resharding/donor_document_gen.h" #include "mongo/db/s/resharding/resharding_donor_recipient_common.h" #include "mongo/db/s/shard_server_test_fixture.h" #include "mongo/unittest/death_test.h" @@ -213,6 +216,81 @@ TEST_F(ReshardingDonorRecipientCommonTest, ProcessReshardingFieldsWithoutDonorOr 5274201); } +TEST_F(ReshardingDonorRecipientCommonTest, ClearReshardingFilteringMetaData) { + OperationContext* opCtx = operationContext(); + + const bool scheduleAsyncRefresh = false; + auto doSetupFunc = [&] { + // Clear out the resharding donor/recipient metadata collections. + for (auto const& nss : {NamespaceString::kDonorReshardingOperationsNamespace, + NamespaceString::kRecipientReshardingOperationsNamespace}) { + dropDatabase(opCtx, nss.db().toString()).ignore(); + } + + // Assert the prestate has no filtering metadata. + for (auto const& nss : {kOriginalNss, kTemporaryReshardingNss}) { + AutoGetCollection autoColl(opCtx, nss, LockMode::MODE_IS); + auto csr = CollectionShardingRuntime::get(opCtx, nss); + ASSERT(csr->getCurrentMetadataIfKnown() == boost::none); + } + + // Add filtering metadata for the collection being resharded. + { + AutoGetCollection autoColl(opCtx, kOriginalNss, LockMode::MODE_IS); + auto csr = CollectionShardingRuntime::get(opCtx, kOriginalNss); + csr->setFilteringMetadata(opCtx, + makeShardedMetadataForOriginalCollection(opCtx, kThisShard)); + ASSERT(csr->getCurrentMetadataIfKnown()); + } + + // Add filtering metadata for the temporary resharding namespace. + { + AutoGetCollection autoColl(opCtx, kTemporaryReshardingNss, LockMode::MODE_IS); + auto csr = CollectionShardingRuntime::get(opCtx, kTemporaryReshardingNss); + csr->setFilteringMetadata( + opCtx, makeShardedMetadataForTemporaryReshardingCollection(opCtx, kThisShard)); + ASSERT(csr->getCurrentMetadataIfKnown()); + } + + // Prior to adding a resharding document, assert that attempting to clear filtering does + // nothing. + resharding::clearFilteringMetadata(opCtx, scheduleAsyncRefresh); + + for (auto const& nss : {kOriginalNss, kTemporaryReshardingNss}) { + AutoGetCollection autoColl(opCtx, nss, LockMode::MODE_IS); + auto csr = CollectionShardingRuntime::get(opCtx, nss); + ASSERT(csr->getCurrentMetadataIfKnown()); + } + }; + + doSetupFunc(); + // Add a resharding donor document that targets the namespaces involved in resharding. + ReshardingDonorDocument donorDoc = makeDonorStateDoc(); + ReshardingDonorService::DonorStateMachine::insertStateDocument(opCtx, donorDoc); + + // Clear the filtering metadata (without scheduling a refresh) and assert the metadata is gone. + resharding::clearFilteringMetadata(opCtx, scheduleAsyncRefresh); + + for (auto const& nss : {kOriginalNss, kTemporaryReshardingNss}) { + AutoGetCollection autoColl(opCtx, nss, LockMode::MODE_IS); + auto csr = CollectionShardingRuntime::get(opCtx, nss); + ASSERT(csr->getCurrentMetadataIfKnown() == boost::none); + } + + doSetupFunc(); + // Add a resharding recipient document that targets the namespaces involved in resharding. + ReshardingRecipientDocument recipDoc = makeRecipientStateDoc(); + ReshardingRecipientService::RecipientStateMachine::insertStateDocument(opCtx, recipDoc); + + // Clear the filtering metadata (without scheduling a refresh) and assert the metadata is gone. + resharding::clearFilteringMetadata(opCtx, scheduleAsyncRefresh); + + for (auto const& nss : {kOriginalNss, kTemporaryReshardingNss}) { + AutoGetCollection autoColl(opCtx, nss, LockMode::MODE_IS); + auto csr = CollectionShardingRuntime::get(opCtx, nss); + ASSERT(csr->getCurrentMetadataIfKnown() == boost::none); + } +} } // namespace } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h index b707a975444..f2f4ee7001f 100644 --- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h @@ -126,6 +126,36 @@ protected: return CollectionMetadata(std::move(cm), kThisShard); } + ReshardingDonorDocument makeDonorStateDoc() { + DonorShardContext donorCtx; + donorCtx.setState(DonorStateEnum::kPreparingToDonate); + + ReshardingDonorDocument doc(std::move(donorCtx), {kThisShard, kOtherShard}); + + NamespaceString sourceNss = kOriginalNss; + auto sourceUUID = UUID::gen(); + auto commonMetadata = CommonReshardingMetadata( + UUID::gen(), sourceNss, sourceUUID, kTemporaryReshardingNss, kReshardingKeyPattern); + + doc.setCommonReshardingMetadata(std::move(commonMetadata)); + return doc; + } + + ReshardingRecipientDocument makeRecipientStateDoc() { + RecipientShardContext recipCtx; + recipCtx.setState(RecipientStateEnum::kCloning); + + ReshardingRecipientDocument doc(std::move(recipCtx), {kThisShard, kOtherShard}, 1000); + + NamespaceString sourceNss = kOriginalNss; + auto sourceUUID = UUID::gen(); + auto commonMetadata = CommonReshardingMetadata( + UUID::gen(), sourceNss, sourceUUID, kTemporaryReshardingNss, kReshardingKeyPattern); + + doc.setCommonReshardingMetadata(std::move(commonMetadata)); + return doc; + } + ReshardingFields createCommonReshardingFields(const UUID& reshardingUUID, CoordinatorStateEnum state) { auto fields = ReshardingFields(reshardingUUID); diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp index 10bb3fcc583..b96711fd30b 100644 --- a/src/mongo/db/s/resharding_util.cpp +++ b/src/mongo/db/s/resharding_util.cpp @@ -177,7 +177,6 @@ void validateReshardedChunks(const std::vector<mongo::BSONObj>& chunks, Grid::get(opCtx)->shardRegistry()->getShard(opCtx, chunk.getRecipientShardId())); validChunks.push_back(chunk); } - checkForHolesAndOverlapsInChunks(validChunks, keyPattern); } @@ -527,5 +526,4 @@ NamespaceString getLocalConflictStashNamespace(UUID existingUUID, ShardId donorS "localReshardingConflictStash.{}.{}"_format(existingUUID.toString(), donorShardId.toString())}; } - } // namespace mongo diff --git a/src/mongo/db/storage/wiredtiger/Conscript b/src/mongo/db/storage/wiredtiger/Conscript new file mode 120000 index 00000000000..2672a145b97 --- /dev/null +++ b/src/mongo/db/storage/wiredtiger/Conscript @@ -0,0 +1 @@ +/home/dgottlieb/xgen/mongo/src/mongo/db/storage/wiredtiger/oplog_stone_parameters.idl
\ No newline at end of file |