diff options
author | Daniel Gottlieb <daniel.gottlieb@mongodb.com> | 2021-03-31 17:02:12 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-03-31 21:45:49 +0000 |
commit | 9f857d7249b9dc5a8aa8032a2c55e533fc1e980a (patch) | |
tree | 1e25e84508d71ed494e4ea84c171e75bd9e17791 /src/mongo/db/repl | |
parent | bed9b4820e4421d63ff5457cd854671cf51bf4f6 (diff) | |
download | mongo-9f857d7249b9dc5a8aa8032a2c55e533fc1e980a.tar.gz |
SERVER-54486: Clear resharding filtering metadata on primary stepUp.
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 190 |
1 files changed, 101 insertions, 89 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index ff76b5c5cf2..cab39ffea0a 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -82,6 +82,7 @@ #include "mongo/db/s/migration_util.h" #include "mongo/db/s/periodic_balancer_config_refresher.h" #include "mongo/db/s/periodic_sharded_index_consistency_checker.h" +#include "mongo/db/s/resharding/resharding_donor_recipient_common.h" #include "mongo/db/s/sharding_ddl_util.h" #include "mongo/db/s/sharding_initialization_mongod.h" #include "mongo/db/s/sharding_state_recovery.h" @@ -171,8 +172,8 @@ auto makeTaskExecutor(ServiceContext* service, } /** - * Schedules a task using the executor. This task is always run unless the task executor is shutting - * down. + * Schedules a task using the executor. This task is always run unless the task executor is + * shutting down. */ void scheduleWork(executor::TaskExecutor* executor, executor::TaskExecutor::CallbackFn work) { auto cbh = executor->scheduleWork( @@ -226,9 +227,9 @@ void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication( invariant(!_oplogApplier); - // Using noop observer now that BackgroundSync no longer implements the OplogApplier::Observer - // interface. During steady state replication, there is no need to log details on every batch - // we apply. + // Using noop observer now that BackgroundSync no longer implements the + // OplogApplier::Observer interface. During steady state replication, there is no need to + // log details on every batch we apply. _oplogApplier = std::make_unique<OplogApplierImpl>( _oplogApplierTaskExecutor.get(), _oplogBuffer.get(), @@ -291,8 +292,8 @@ void ReplicationCoordinatorExternalStateImpl::_stopDataReplication_inlock( } // Clear the buffer. This unblocks the OplogFetcher if it is blocked with a full queue, but - // ensures that it won't add anything. It will also unblock the OplogApplier pipeline if it is - // waiting for an operation to be past the slaveDelay point. + // ensures that it won't add anything. It will also unblock the OplogApplier pipeline if it + // is waiting for an operation to be past the slaveDelay point. if (oldOplogBuffer) { oldOplogBuffer->clear(opCtx); } @@ -352,8 +353,8 @@ void ReplicationCoordinatorExternalStateImpl::clearAppliedThroughIfCleanShutdown } // Ensure that all writes are visible before reading. If we failed mid-batch, it would be - // possible to read from a kNoOverlap ReadSource where not all writes to the minValid document - // are visible, generating a writeConflict that would not resolve. + // possible to read from a kNoOverlap ReadSource where not all writes to the minValid + // document are visible, generating a writeConflict that would not resolve. invariant(RecoveryUnit::ReadSource::kNoTimestamp == opCtx->recoveryUnit()->getTimestampReadSource()); @@ -362,13 +363,14 @@ void ReplicationCoordinatorExternalStateImpl::clearAppliedThroughIfCleanShutdown loadLastOpTimeAndWallTimeResult.isOK() && loadLastOpTimeAndWallTimeResult.getValue().opTime == _replicationProcess->getConsistencyMarkers()->getAppliedThrough(opCtx)) { - // Clear the appliedThrough marker to indicate we are consistent with the top of the oplog. + // Clear the appliedThrough marker to indicate we are consistent with the top of the + // oplog. // - // TODO SERVER-53642: We used to record this update at the 'lastAppliedOpTime'. If there are - // any outstanding checkpoints being taken, they should only reflect this write if they see - // all writes up to our 'lastAppliedOpTime'. But with Lock Free Reads we can have readers on - // that timestamp, making it not safe to write to, even as we're holding the RSTL in - // exclusive mode. + // TODO SERVER-53642: We used to record this update at the 'lastAppliedOpTime'. If there + // are any outstanding checkpoints being taken, they should only reflect this write if + // they see all writes up to our 'lastAppliedOpTime'. But with Lock Free Reads we can + // have readers on that timestamp, making it not safe to write to, even as we're holding + // the RSTL in exclusive mode. invariant(opCtx->lockState()->isRSTLExclusive()); _replicationProcess->getConsistencyMarkers()->clearAppliedThrough(opCtx, Timestamp()); @@ -393,24 +395,24 @@ void ReplicationCoordinatorExternalStateImpl::shutdown(OperationContext* opCtx) // Perform additional shutdown steps below that must be done outside _threadMutex. - // Stop the NoOpWriter before grabbing the mutex to avoid creating a deadlock as the NoOpWriter - // itself can block on the ReplicationCoordinator mutex. It is safe to access _noopWriter - // outside of _threadMutex because _noopWriter is protected by its own mutex. + // Stop the NoOpWriter before grabbing the mutex to avoid creating a deadlock as the + // NoOpWriter itself can block on the ReplicationCoordinator mutex. It is safe to access + // _noopWriter outside of _threadMutex because _noopWriter is protected by its own mutex. invariant(_noopWriter); LOGV2_DEBUG(21308, 1, "Stopping noop writer"); _noopWriter->stopWritingPeriodicNoops(); - // We must wait for _taskExecutor outside of _threadMutex, since _taskExecutor is used to run - // the dropPendingCollectionReaper, which takes database locks. It is safe to access + // We must wait for _taskExecutor outside of _threadMutex, since _taskExecutor is used to + // run the dropPendingCollectionReaper, which takes database locks. It is safe to access // _taskExecutor outside of _threadMutex because once _startedThreads is set to true, the // _taskExecutor pointer never changes. _taskExecutor->join(); // The oplog truncate after point must be cleared, if we are still primary for shutdown, so - // nothing gets truncated unnecessarily on startup. There are no oplog holes on clean primary - // shutdown. Stepdown is similarly safe from holes and halts updates to and clears the truncate - // point. The other replication states do need truncation if the truncate point is set: e.g. - // interruption mid batch application can leave oplog holes. + // nothing gets truncated unnecessarily on startup. There are no oplog holes on clean + // primary shutdown. Stepdown is similarly safe from holes and halts updates to and clears + // the truncate point. The other replication states do need truncation if the truncate point + // is set: e.g. interruption mid batch application can leave oplog holes. if (!storageGlobalParams.readOnly && _replicationProcess->getConsistencyMarkers() ->isOplogTruncateAfterPointBeingUsedForPrimary()) { @@ -449,10 +451,11 @@ Status ReplicationCoordinatorExternalStateImpl::initializeReplSetStorage(Operati _service->getOpObserver()->onOpMessage(opCtx, msgObj); wuow.commit(); // ReplSetTest assumes that immediately after the replSetInitiate - // command returns, it can allow other nodes to initial sync with no - // retries and they will succeed. Unfortunately, initial sync will - // fail if it finds its sync source has an empty oplog. Thus, we - // need to wait here until the seed document is visible in our oplog. + // command returns, it can allow other nodes to initial sync with + // no retries and they will succeed. Unfortunately, initial sync + // will fail if it finds its sync source has an empty oplog. + // Thus, we need to wait here until the seed document is visible + // in our oplog. _storageInterface->waitForAllEarlierOplogWritesToBeVisible(opCtx); }); @@ -484,29 +487,31 @@ OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationC invariant( _replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx).isNull()); - // A primary periodically updates the oplogTruncateAfterPoint to allow replication to proceed - // without danger of unidentifiable oplog holes on unclean shutdown due to parallel writes. + // A primary periodically updates the oplogTruncateAfterPoint to allow replication to + // proceed without danger of unidentifiable oplog holes on unclean shutdown due to parallel + // writes. // // Initialize the oplogTruncateAfterPoint so that user writes are safe on unclean shutdown // between completion of transition to primary and the first async oplogTruncateAfterPoint // update. _replicationProcess->getConsistencyMarkers()->setOplogTruncateAfterPointToTopOfOplog(opCtx); - // Tell the system to start updating the oplogTruncateAfterPoint asynchronously and to use the - // truncate point, rather than last applied, to update the repl durable timestamp. + // Tell the system to start updating the oplogTruncateAfterPoint asynchronously and to use + // the truncate point, rather than last applied, to update the repl durable timestamp. // - // The truncate point must be used while primary for repl's durable timestamp because otherwise - // we could truncate last applied writes on startup recovery after an unclean shutdown that were - // previously majority confirmed to the user. + // The truncate point must be used while primary for repl's durable timestamp because + // otherwise we could truncate last applied writes on startup recovery after an unclean + // shutdown that were previously majority confirmed to the user. _replicationProcess->getConsistencyMarkers()->startUsingOplogTruncateAfterPointForPrimary(); - // Clear the appliedThrough marker so on startup we'll use the top of the oplog. This must be - // done before we add anything to our oplog. + // Clear the appliedThrough marker so on startup we'll use the top of the oplog. This must + // be done before we add anything to our oplog. // - // TODO SERVER-53642: We used to record this update at the 'lastAppliedOpTime'. If there are any - // outstanding checkpoints being taken, they should only reflect this write if they see all - // writes up to our 'lastAppliedOpTime'. But with Lock Free Reads we can have readers on that - // timestamp, making it not safe to write to, even as we're holding the RSTL in exclusive mode. + // TODO SERVER-53642: We used to record this update at the 'lastAppliedOpTime'. If there are + // any outstanding checkpoints being taken, they should only reflect this write if they see + // all writes up to our 'lastAppliedOpTime'. But with Lock Free Reads we can have readers on + // that timestamp, making it not safe to write to, even as we're holding the RSTL in + // exclusive mode. _replicationProcess->getConsistencyMarkers()->clearAppliedThrough(opCtx, Timestamp()); writeConflictRetry(opCtx, "logging transition to primary to oplog", "local.oplog.rs", [&] { @@ -523,12 +528,12 @@ OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationC auto opTimeToReturn = loadLastOpTimeAndWallTimeResult.getValue().opTime; auto newTermStartDate = loadLastOpTimeAndWallTimeResult.getValue().wallTime; - // This constant was based on data described in SERVER-44634. It is in relation to how long the - // first majority committed write takes after a new term has started. + // This constant was based on data described in SERVER-44634. It is in relation to how long + // the first majority committed write takes after a new term has started. const auto flowControlGracePeriod = Seconds(4); - // SERVER-44634: Disable flow control for a grace period after stepup. Because writes may stop - // while a node wins election and steps up, it's likely to determine there's majority point - // lag. Moreover, because there are no writes in the system, flow control will believe + // SERVER-44634: Disable flow control for a grace period after stepup. Because writes may + // stop while a node wins election and steps up, it's likely to determine there's majority + // point lag. Moreover, because there are no writes in the system, flow control will believe // secondaries are unable to process oplog entries. This can result in an undesirable "slow // start" phenomena. FlowControl::get(opCtx)->disableUntil(newTermStartDate + flowControlGracePeriod); @@ -673,9 +678,10 @@ Status ReplicationCoordinatorExternalStateImpl::storeLocalLastVoteDocument( WriteUnitOfWork wunit(opCtx); // We only want to replace the last vote document if the new last vote document - // would have a higher term. We check the term of the current last vote document and - // insert the new document in a WriteUnitOfWork to synchronize the two operations. - // We have already ensured at startup time that there is an old document. + // would have a higher term. We check the term of the current last vote document + // and insert the new document in a WriteUnitOfWork to synchronize the two + // operations. We have already ensured at startup time that there is an old + // document. BSONObj result; bool exists = Helpers::getSingleton(opCtx, lastVoteCollectionName, result); fassert(51241, exists); @@ -790,24 +796,25 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnStepDownHook() { void ReplicationCoordinatorExternalStateImpl::_stopAsyncUpdatesOfAndClearOplogTruncateAfterPoint() { auto opCtx = cc().getOperationContext(); // Temporarily turn off flow control ticketing. Getting a ticket can stall on a ticket being - // available, which may have to wait for the ticket refresher to run, which in turn blocks on - // the repl _mutex to check whether we are primary or not: this is a deadlock because stepdown - // already holds the repl _mutex! + // available, which may have to wait for the ticket refresher to run, which in turn blocks + // on the repl _mutex to check whether we are primary or not: this is a deadlock because + // stepdown already holds the repl _mutex! FlowControl::Bypass flowControlBypass(opCtx); - // Tell the system to stop updating the oplogTruncateAfterPoint asynchronously and to go back to - // using last applied to update repl's durable timestamp instead of the truncate point. + // Tell the system to stop updating the oplogTruncateAfterPoint asynchronously and to go + // back to using last applied to update repl's durable timestamp instead of the truncate + // point. _replicationProcess->getConsistencyMarkers()->stopUsingOplogTruncateAfterPointForPrimary(); // Interrupt the current JournalFlusher thread round, so it recognizes that it is no longer // primary. Otherwise the asynchronously running thread could race with setting the truncate - // point to null below. This would leave the truncate point potentially stale in a non-PRIMARY - // state, where last applied would be used to update repl's durable timestamp and confirm - // majority writes. Startup recovery could truncate majority confirmed writes back to the stale - // truncate after point. + // point to null below. This would leave the truncate point potentially stale in a + // non-PRIMARY state, where last applied would be used to update repl's durable timestamp + // and confirm majority writes. Startup recovery could truncate majority confirmed writes + // back to the stale truncate after point. // - // This makes sure the JournalFlusher is not stuck waiting for a lock that stepdown might hold - // before doing an update write to the truncate point. + // This makes sure the JournalFlusher is not stuck waiting for a lock that stepdown might + // hold before doing an update write to the truncate point. JournalFlusher::get(_service)->interruptJournalFlusherForReplStateChange(); // Wait for another round of journal flushing. This will ensure that we wait for the current @@ -816,8 +823,8 @@ void ReplicationCoordinatorExternalStateImpl::_stopAsyncUpdatesOfAndClearOplogTr // above, if writing is imminent, so we must make sure that the code completes fully. JournalFlusher::get(_service)->waitForJournalFlush(); - // Writes to non-replicated collections do not need concurrency control with the OplogApplier - // that never accesses them. Skip taking the PBWM. + // Writes to non-replicated collections do not need concurrency control with the + // OplogApplier that never accesses them. Skip taking the PBWM. ShouldNotConflictWithSecondaryBatchApplicationBlock shouldNotConflictBlock(opCtx->lockState()); // We can clear the oplogTruncateAfterPoint because we know there are no user writes during @@ -830,9 +837,10 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { Status status = ShardingCatalogManager::get(opCtx)->initializeConfigDatabaseIfNeeded(opCtx); if (!status.isOK() && status != ErrorCodes::AlreadyInitialized) { - // If the node is shutting down or it lost quorum just as it was becoming primary, don't - // run the sharding onStepUp machinery. The onStepDown counterpart to these methods is - // already idempotent, so the machinery will remain in the stepped down state. + // If the node is shutting down or it lost quorum just as it was becoming primary, + // don't run the sharding onStepUp machinery. The onStepDown counterpart to these + // methods is already idempotent, so the machinery will remain in the stepped down + // state. if (ErrorCodes::isShutdownError(status.code()) || ErrorCodes::isNotPrimaryError(status.code())) { return; @@ -844,14 +852,14 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook } if (status.isOK()) { - // Load the clusterId into memory. Use local readConcern, since we can't use majority - // readConcern in drain mode because the global lock prevents replication. This is - // safe, since if the clusterId write is rolled back, any writes that depend on it will - // also be rolled back. + // Load the clusterId into memory. Use local readConcern, since we can't use + // majority readConcern in drain mode because the global lock prevents replication. + // This is safe, since if the clusterId write is rolled back, any writes that depend + // on it will also be rolled back. // - // Since we *just* wrote the cluster ID to the config.version document (via the call to - // ShardingCatalogManager::initializeConfigDatabaseIfNeeded above), this read can only - // meaningfully fail if the node is shutting down. + // Since we *just* wrote the cluster ID to the config.version document (via the call + // to ShardingCatalogManager::initializeConfigDatabaseIfNeeded above), this read can + // only meaningfully fail if the node is shutting down. status = ClusterIdentityLoader::get(opCtx)->loadClusterId( opCtx, repl::ReadConcernLevel::kLocalReadConcern); @@ -873,9 +881,9 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook } else if (ShardingState::get(opCtx)->enabled()) { Status status = ShardingStateRecovery::recover(opCtx); - // If the node is shutting down or it lost quorum just as it was becoming primary, don't run - // the sharding onStepUp machinery. The onStepDown counterpart to these methods is already - // idempotent, so the machinery will remain in the stepped down state. + // If the node is shutting down or it lost quorum just as it was becoming primary, don't + // run the sharding onStepUp machinery. The onStepDown counterpart to these methods is + // already idempotent, so the machinery will remain in the stepped down state. if (ErrorCodes::isShutdownError(status.code()) || ErrorCodes::isNotPrimaryError(status.code())) { return; @@ -900,6 +908,8 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook sharding_ddl_util::retakeInMemoryRecoverableCriticalSections(opCtx); + const bool scheduleAsyncRefresh = true; + resharding::clearFilteringMetadata(opCtx, scheduleAsyncRefresh); } else { // unsharded if (auto validator = LogicalTimeValidator::get(_service)) { validator->enableKeyGenerator(opCtx, true); @@ -997,8 +1007,8 @@ void ReplicationCoordinatorExternalStateImpl::notifyOplogMetadataWaiters( const OpTime& committedOpTime) { signalOplogWaiters(); - // Notify the DropPendingCollectionReaper if there are any drop-pending collections with drop - // optimes before or at the committed optime. + // Notify the DropPendingCollectionReaper if there are any drop-pending collections with + // drop optimes before or at the committed optime. if (auto earliestDropOpTime = _dropPendingCollectionReaper->getEarliestDropOpTime()) { if (committedOpTime >= *earliestDropOpTime) { auto reaper = _dropPendingCollectionReaper; @@ -1060,12 +1070,13 @@ std::size_t ReplicationCoordinatorExternalStateImpl::getOplogFetcherInitialSyncM } JournalListener::Token ReplicationCoordinatorExternalStateImpl::getToken(OperationContext* opCtx) { - // If in state PRIMARY, the oplogTruncateAfterPoint must be used for the Durable timestamp in - // order to avoid majority confirming any writes that could later be truncated. + // If in state PRIMARY, the oplogTruncateAfterPoint must be used for the Durable timestamp + // in order to avoid majority confirming any writes that could later be truncated. // - // TODO (SERVER-45847): temporary hack for the ephemeral storage engine that passes in a nullptr - // for the opCtx. The ephemeral engine does not do parallel writes to cause oplog holes, - // therefore it is safe to skip updating the oplogTruncateAfterPoint that tracks oplog holes. + // TODO (SERVER-45847): temporary hack for the ephemeral storage engine that passes in a + // nullptr for the opCtx. The ephemeral engine does not do parallel writes to cause oplog + // holes, therefore it is safe to skip updating the oplogTruncateAfterPoint that tracks + // oplog holes. if (MONGO_likely(opCtx)) { auto truncatePoint = repl::ReplicationProcess::get(opCtx) ->getConsistencyMarkers() @@ -1078,14 +1089,15 @@ JournalListener::Token ReplicationCoordinatorExternalStateImpl::getToken(Operati // All other repl states use the 'lastApplied'. // // Setting 'rollbackSafe' will ensure that a safe lastApplied value is returned if we're in - // ROLLBACK state. 'lastApplied' may be momentarily set to an opTime from a divergent branch of - // history during rollback, so a benign default value will be returned instead to prevent a - // divergent 'lastApplied' from being used to forward the 'lastDurable' after rollback. + // ROLLBACK state. 'lastApplied' may be momentarily set to an opTime from a divergent branch + // of history during rollback, so a benign default value will be returned instead to prevent + // a divergent 'lastApplied' from being used to forward the 'lastDurable' after rollback. // - // No concurrency control is necessary and it is still safe if the node goes into ROLLBACK after - // getting the token because the JournalFlusher is shut down during rollback, before a divergent - // 'lastApplied' value is present. The JournalFlusher will start up again in ROLLBACK and never - // transition from non-ROLLBACK to ROLLBACK with a divergent 'lastApplied' value. + // No concurrency control is necessary and it is still safe if the node goes into ROLLBACK + // after getting the token because the JournalFlusher is shut down during rollback, before a + // divergent 'lastApplied' value is present. The JournalFlusher will start up again in + // ROLLBACK and never transition from non-ROLLBACK to ROLLBACK with a divergent + // 'lastApplied' value. return repl::ReplicationCoordinator::get(_service)->getMyLastAppliedOpTimeAndWallTime( /*rollbackSafe=*/true); } |