summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
authorDaniel Gottlieb <daniel.gottlieb@mongodb.com>2021-03-31 17:02:12 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-03-31 21:45:49 +0000
commit9f857d7249b9dc5a8aa8032a2c55e533fc1e980a (patch)
tree1e25e84508d71ed494e4ea84c171e75bd9e17791 /src/mongo/db/repl
parentbed9b4820e4421d63ff5457cd854671cf51bf4f6 (diff)
downloadmongo-9f857d7249b9dc5a8aa8032a2c55e533fc1e980a.tar.gz
SERVER-54486: Clear resharding filtering metadata on primary stepUp.
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp190
1 files changed, 101 insertions, 89 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index ff76b5c5cf2..cab39ffea0a 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -82,6 +82,7 @@
#include "mongo/db/s/migration_util.h"
#include "mongo/db/s/periodic_balancer_config_refresher.h"
#include "mongo/db/s/periodic_sharded_index_consistency_checker.h"
+#include "mongo/db/s/resharding/resharding_donor_recipient_common.h"
#include "mongo/db/s/sharding_ddl_util.h"
#include "mongo/db/s/sharding_initialization_mongod.h"
#include "mongo/db/s/sharding_state_recovery.h"
@@ -171,8 +172,8 @@ auto makeTaskExecutor(ServiceContext* service,
}
/**
- * Schedules a task using the executor. This task is always run unless the task executor is shutting
- * down.
+ * Schedules a task using the executor. This task is always run unless the task executor is
+ * shutting down.
*/
void scheduleWork(executor::TaskExecutor* executor, executor::TaskExecutor::CallbackFn work) {
auto cbh = executor->scheduleWork(
@@ -226,9 +227,9 @@ void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication(
invariant(!_oplogApplier);
- // Using noop observer now that BackgroundSync no longer implements the OplogApplier::Observer
- // interface. During steady state replication, there is no need to log details on every batch
- // we apply.
+ // Using noop observer now that BackgroundSync no longer implements the
+ // OplogApplier::Observer interface. During steady state replication, there is no need to
+ // log details on every batch we apply.
_oplogApplier = std::make_unique<OplogApplierImpl>(
_oplogApplierTaskExecutor.get(),
_oplogBuffer.get(),
@@ -291,8 +292,8 @@ void ReplicationCoordinatorExternalStateImpl::_stopDataReplication_inlock(
}
// Clear the buffer. This unblocks the OplogFetcher if it is blocked with a full queue, but
- // ensures that it won't add anything. It will also unblock the OplogApplier pipeline if it is
- // waiting for an operation to be past the slaveDelay point.
+ // ensures that it won't add anything. It will also unblock the OplogApplier pipeline if it
+ // is waiting for an operation to be past the slaveDelay point.
if (oldOplogBuffer) {
oldOplogBuffer->clear(opCtx);
}
@@ -352,8 +353,8 @@ void ReplicationCoordinatorExternalStateImpl::clearAppliedThroughIfCleanShutdown
}
// Ensure that all writes are visible before reading. If we failed mid-batch, it would be
- // possible to read from a kNoOverlap ReadSource where not all writes to the minValid document
- // are visible, generating a writeConflict that would not resolve.
+ // possible to read from a kNoOverlap ReadSource where not all writes to the minValid
+ // document are visible, generating a writeConflict that would not resolve.
invariant(RecoveryUnit::ReadSource::kNoTimestamp ==
opCtx->recoveryUnit()->getTimestampReadSource());
@@ -362,13 +363,14 @@ void ReplicationCoordinatorExternalStateImpl::clearAppliedThroughIfCleanShutdown
loadLastOpTimeAndWallTimeResult.isOK() &&
loadLastOpTimeAndWallTimeResult.getValue().opTime ==
_replicationProcess->getConsistencyMarkers()->getAppliedThrough(opCtx)) {
- // Clear the appliedThrough marker to indicate we are consistent with the top of the oplog.
+ // Clear the appliedThrough marker to indicate we are consistent with the top of the
+ // oplog.
//
- // TODO SERVER-53642: We used to record this update at the 'lastAppliedOpTime'. If there are
- // any outstanding checkpoints being taken, they should only reflect this write if they see
- // all writes up to our 'lastAppliedOpTime'. But with Lock Free Reads we can have readers on
- // that timestamp, making it not safe to write to, even as we're holding the RSTL in
- // exclusive mode.
+ // TODO SERVER-53642: We used to record this update at the 'lastAppliedOpTime'. If there
+ // are any outstanding checkpoints being taken, they should only reflect this write if
+ // they see all writes up to our 'lastAppliedOpTime'. But with Lock Free Reads we can
+ // have readers on that timestamp, making it not safe to write to, even as we're holding
+ // the RSTL in exclusive mode.
invariant(opCtx->lockState()->isRSTLExclusive());
_replicationProcess->getConsistencyMarkers()->clearAppliedThrough(opCtx, Timestamp());
@@ -393,24 +395,24 @@ void ReplicationCoordinatorExternalStateImpl::shutdown(OperationContext* opCtx)
// Perform additional shutdown steps below that must be done outside _threadMutex.
- // Stop the NoOpWriter before grabbing the mutex to avoid creating a deadlock as the NoOpWriter
- // itself can block on the ReplicationCoordinator mutex. It is safe to access _noopWriter
- // outside of _threadMutex because _noopWriter is protected by its own mutex.
+ // Stop the NoOpWriter before grabbing the mutex to avoid creating a deadlock as the
+ // NoOpWriter itself can block on the ReplicationCoordinator mutex. It is safe to access
+ // _noopWriter outside of _threadMutex because _noopWriter is protected by its own mutex.
invariant(_noopWriter);
LOGV2_DEBUG(21308, 1, "Stopping noop writer");
_noopWriter->stopWritingPeriodicNoops();
- // We must wait for _taskExecutor outside of _threadMutex, since _taskExecutor is used to run
- // the dropPendingCollectionReaper, which takes database locks. It is safe to access
+ // We must wait for _taskExecutor outside of _threadMutex, since _taskExecutor is used to
+ // run the dropPendingCollectionReaper, which takes database locks. It is safe to access
// _taskExecutor outside of _threadMutex because once _startedThreads is set to true, the
// _taskExecutor pointer never changes.
_taskExecutor->join();
// The oplog truncate after point must be cleared, if we are still primary for shutdown, so
- // nothing gets truncated unnecessarily on startup. There are no oplog holes on clean primary
- // shutdown. Stepdown is similarly safe from holes and halts updates to and clears the truncate
- // point. The other replication states do need truncation if the truncate point is set: e.g.
- // interruption mid batch application can leave oplog holes.
+ // nothing gets truncated unnecessarily on startup. There are no oplog holes on clean
+ // primary shutdown. Stepdown is similarly safe from holes and halts updates to and clears
+ // the truncate point. The other replication states do need truncation if the truncate point
+ // is set: e.g. interruption mid batch application can leave oplog holes.
if (!storageGlobalParams.readOnly &&
_replicationProcess->getConsistencyMarkers()
->isOplogTruncateAfterPointBeingUsedForPrimary()) {
@@ -449,10 +451,11 @@ Status ReplicationCoordinatorExternalStateImpl::initializeReplSetStorage(Operati
_service->getOpObserver()->onOpMessage(opCtx, msgObj);
wuow.commit();
// ReplSetTest assumes that immediately after the replSetInitiate
- // command returns, it can allow other nodes to initial sync with no
- // retries and they will succeed. Unfortunately, initial sync will
- // fail if it finds its sync source has an empty oplog. Thus, we
- // need to wait here until the seed document is visible in our oplog.
+ // command returns, it can allow other nodes to initial sync with
+ // no retries and they will succeed. Unfortunately, initial sync
+ // will fail if it finds its sync source has an empty oplog.
+ // Thus, we need to wait here until the seed document is visible
+ // in our oplog.
_storageInterface->waitForAllEarlierOplogWritesToBeVisible(opCtx);
});
@@ -484,29 +487,31 @@ OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationC
invariant(
_replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx).isNull());
- // A primary periodically updates the oplogTruncateAfterPoint to allow replication to proceed
- // without danger of unidentifiable oplog holes on unclean shutdown due to parallel writes.
+ // A primary periodically updates the oplogTruncateAfterPoint to allow replication to
+ // proceed without danger of unidentifiable oplog holes on unclean shutdown due to parallel
+ // writes.
//
// Initialize the oplogTruncateAfterPoint so that user writes are safe on unclean shutdown
// between completion of transition to primary and the first async oplogTruncateAfterPoint
// update.
_replicationProcess->getConsistencyMarkers()->setOplogTruncateAfterPointToTopOfOplog(opCtx);
- // Tell the system to start updating the oplogTruncateAfterPoint asynchronously and to use the
- // truncate point, rather than last applied, to update the repl durable timestamp.
+ // Tell the system to start updating the oplogTruncateAfterPoint asynchronously and to use
+ // the truncate point, rather than last applied, to update the repl durable timestamp.
//
- // The truncate point must be used while primary for repl's durable timestamp because otherwise
- // we could truncate last applied writes on startup recovery after an unclean shutdown that were
- // previously majority confirmed to the user.
+ // The truncate point must be used while primary for repl's durable timestamp because
+ // otherwise we could truncate last applied writes on startup recovery after an unclean
+ // shutdown that were previously majority confirmed to the user.
_replicationProcess->getConsistencyMarkers()->startUsingOplogTruncateAfterPointForPrimary();
- // Clear the appliedThrough marker so on startup we'll use the top of the oplog. This must be
- // done before we add anything to our oplog.
+ // Clear the appliedThrough marker so on startup we'll use the top of the oplog. This must
+ // be done before we add anything to our oplog.
//
- // TODO SERVER-53642: We used to record this update at the 'lastAppliedOpTime'. If there are any
- // outstanding checkpoints being taken, they should only reflect this write if they see all
- // writes up to our 'lastAppliedOpTime'. But with Lock Free Reads we can have readers on that
- // timestamp, making it not safe to write to, even as we're holding the RSTL in exclusive mode.
+ // TODO SERVER-53642: We used to record this update at the 'lastAppliedOpTime'. If there are
+ // any outstanding checkpoints being taken, they should only reflect this write if they see
+ // all writes up to our 'lastAppliedOpTime'. But with Lock Free Reads we can have readers on
+ // that timestamp, making it not safe to write to, even as we're holding the RSTL in
+ // exclusive mode.
_replicationProcess->getConsistencyMarkers()->clearAppliedThrough(opCtx, Timestamp());
writeConflictRetry(opCtx, "logging transition to primary to oplog", "local.oplog.rs", [&] {
@@ -523,12 +528,12 @@ OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationC
auto opTimeToReturn = loadLastOpTimeAndWallTimeResult.getValue().opTime;
auto newTermStartDate = loadLastOpTimeAndWallTimeResult.getValue().wallTime;
- // This constant was based on data described in SERVER-44634. It is in relation to how long the
- // first majority committed write takes after a new term has started.
+ // This constant was based on data described in SERVER-44634. It is in relation to how long
+ // the first majority committed write takes after a new term has started.
const auto flowControlGracePeriod = Seconds(4);
- // SERVER-44634: Disable flow control for a grace period after stepup. Because writes may stop
- // while a node wins election and steps up, it's likely to determine there's majority point
- // lag. Moreover, because there are no writes in the system, flow control will believe
+ // SERVER-44634: Disable flow control for a grace period after stepup. Because writes may
+ // stop while a node wins election and steps up, it's likely to determine there's majority
+ // point lag. Moreover, because there are no writes in the system, flow control will believe
// secondaries are unable to process oplog entries. This can result in an undesirable "slow
// start" phenomena.
FlowControl::get(opCtx)->disableUntil(newTermStartDate + flowControlGracePeriod);
@@ -673,9 +678,10 @@ Status ReplicationCoordinatorExternalStateImpl::storeLocalLastVoteDocument(
WriteUnitOfWork wunit(opCtx);
// We only want to replace the last vote document if the new last vote document
- // would have a higher term. We check the term of the current last vote document and
- // insert the new document in a WriteUnitOfWork to synchronize the two operations.
- // We have already ensured at startup time that there is an old document.
+ // would have a higher term. We check the term of the current last vote document
+ // and insert the new document in a WriteUnitOfWork to synchronize the two
+ // operations. We have already ensured at startup time that there is an old
+ // document.
BSONObj result;
bool exists = Helpers::getSingleton(opCtx, lastVoteCollectionName, result);
fassert(51241, exists);
@@ -790,24 +796,25 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnStepDownHook() {
void ReplicationCoordinatorExternalStateImpl::_stopAsyncUpdatesOfAndClearOplogTruncateAfterPoint() {
auto opCtx = cc().getOperationContext();
// Temporarily turn off flow control ticketing. Getting a ticket can stall on a ticket being
- // available, which may have to wait for the ticket refresher to run, which in turn blocks on
- // the repl _mutex to check whether we are primary or not: this is a deadlock because stepdown
- // already holds the repl _mutex!
+ // available, which may have to wait for the ticket refresher to run, which in turn blocks
+ // on the repl _mutex to check whether we are primary or not: this is a deadlock because
+ // stepdown already holds the repl _mutex!
FlowControl::Bypass flowControlBypass(opCtx);
- // Tell the system to stop updating the oplogTruncateAfterPoint asynchronously and to go back to
- // using last applied to update repl's durable timestamp instead of the truncate point.
+ // Tell the system to stop updating the oplogTruncateAfterPoint asynchronously and to go
+ // back to using last applied to update repl's durable timestamp instead of the truncate
+ // point.
_replicationProcess->getConsistencyMarkers()->stopUsingOplogTruncateAfterPointForPrimary();
// Interrupt the current JournalFlusher thread round, so it recognizes that it is no longer
// primary. Otherwise the asynchronously running thread could race with setting the truncate
- // point to null below. This would leave the truncate point potentially stale in a non-PRIMARY
- // state, where last applied would be used to update repl's durable timestamp and confirm
- // majority writes. Startup recovery could truncate majority confirmed writes back to the stale
- // truncate after point.
+ // point to null below. This would leave the truncate point potentially stale in a
+ // non-PRIMARY state, where last applied would be used to update repl's durable timestamp
+ // and confirm majority writes. Startup recovery could truncate majority confirmed writes
+ // back to the stale truncate after point.
//
- // This makes sure the JournalFlusher is not stuck waiting for a lock that stepdown might hold
- // before doing an update write to the truncate point.
+ // This makes sure the JournalFlusher is not stuck waiting for a lock that stepdown might
+ // hold before doing an update write to the truncate point.
JournalFlusher::get(_service)->interruptJournalFlusherForReplStateChange();
// Wait for another round of journal flushing. This will ensure that we wait for the current
@@ -816,8 +823,8 @@ void ReplicationCoordinatorExternalStateImpl::_stopAsyncUpdatesOfAndClearOplogTr
// above, if writing is imminent, so we must make sure that the code completes fully.
JournalFlusher::get(_service)->waitForJournalFlush();
- // Writes to non-replicated collections do not need concurrency control with the OplogApplier
- // that never accesses them. Skip taking the PBWM.
+ // Writes to non-replicated collections do not need concurrency control with the
+ // OplogApplier that never accesses them. Skip taking the PBWM.
ShouldNotConflictWithSecondaryBatchApplicationBlock shouldNotConflictBlock(opCtx->lockState());
// We can clear the oplogTruncateAfterPoint because we know there are no user writes during
@@ -830,9 +837,10 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook
if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
Status status = ShardingCatalogManager::get(opCtx)->initializeConfigDatabaseIfNeeded(opCtx);
if (!status.isOK() && status != ErrorCodes::AlreadyInitialized) {
- // If the node is shutting down or it lost quorum just as it was becoming primary, don't
- // run the sharding onStepUp machinery. The onStepDown counterpart to these methods is
- // already idempotent, so the machinery will remain in the stepped down state.
+ // If the node is shutting down or it lost quorum just as it was becoming primary,
+ // don't run the sharding onStepUp machinery. The onStepDown counterpart to these
+ // methods is already idempotent, so the machinery will remain in the stepped down
+ // state.
if (ErrorCodes::isShutdownError(status.code()) ||
ErrorCodes::isNotPrimaryError(status.code())) {
return;
@@ -844,14 +852,14 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook
}
if (status.isOK()) {
- // Load the clusterId into memory. Use local readConcern, since we can't use majority
- // readConcern in drain mode because the global lock prevents replication. This is
- // safe, since if the clusterId write is rolled back, any writes that depend on it will
- // also be rolled back.
+ // Load the clusterId into memory. Use local readConcern, since we can't use
+ // majority readConcern in drain mode because the global lock prevents replication.
+ // This is safe, since if the clusterId write is rolled back, any writes that depend
+ // on it will also be rolled back.
//
- // Since we *just* wrote the cluster ID to the config.version document (via the call to
- // ShardingCatalogManager::initializeConfigDatabaseIfNeeded above), this read can only
- // meaningfully fail if the node is shutting down.
+ // Since we *just* wrote the cluster ID to the config.version document (via the call
+ // to ShardingCatalogManager::initializeConfigDatabaseIfNeeded above), this read can
+ // only meaningfully fail if the node is shutting down.
status = ClusterIdentityLoader::get(opCtx)->loadClusterId(
opCtx, repl::ReadConcernLevel::kLocalReadConcern);
@@ -873,9 +881,9 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook
} else if (ShardingState::get(opCtx)->enabled()) {
Status status = ShardingStateRecovery::recover(opCtx);
- // If the node is shutting down or it lost quorum just as it was becoming primary, don't run
- // the sharding onStepUp machinery. The onStepDown counterpart to these methods is already
- // idempotent, so the machinery will remain in the stepped down state.
+ // If the node is shutting down or it lost quorum just as it was becoming primary, don't
+ // run the sharding onStepUp machinery. The onStepDown counterpart to these methods is
+ // already idempotent, so the machinery will remain in the stepped down state.
if (ErrorCodes::isShutdownError(status.code()) ||
ErrorCodes::isNotPrimaryError(status.code())) {
return;
@@ -900,6 +908,8 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook
sharding_ddl_util::retakeInMemoryRecoverableCriticalSections(opCtx);
+ const bool scheduleAsyncRefresh = true;
+ resharding::clearFilteringMetadata(opCtx, scheduleAsyncRefresh);
} else { // unsharded
if (auto validator = LogicalTimeValidator::get(_service)) {
validator->enableKeyGenerator(opCtx, true);
@@ -997,8 +1007,8 @@ void ReplicationCoordinatorExternalStateImpl::notifyOplogMetadataWaiters(
const OpTime& committedOpTime) {
signalOplogWaiters();
- // Notify the DropPendingCollectionReaper if there are any drop-pending collections with drop
- // optimes before or at the committed optime.
+ // Notify the DropPendingCollectionReaper if there are any drop-pending collections with
+ // drop optimes before or at the committed optime.
if (auto earliestDropOpTime = _dropPendingCollectionReaper->getEarliestDropOpTime()) {
if (committedOpTime >= *earliestDropOpTime) {
auto reaper = _dropPendingCollectionReaper;
@@ -1060,12 +1070,13 @@ std::size_t ReplicationCoordinatorExternalStateImpl::getOplogFetcherInitialSyncM
}
JournalListener::Token ReplicationCoordinatorExternalStateImpl::getToken(OperationContext* opCtx) {
- // If in state PRIMARY, the oplogTruncateAfterPoint must be used for the Durable timestamp in
- // order to avoid majority confirming any writes that could later be truncated.
+ // If in state PRIMARY, the oplogTruncateAfterPoint must be used for the Durable timestamp
+ // in order to avoid majority confirming any writes that could later be truncated.
//
- // TODO (SERVER-45847): temporary hack for the ephemeral storage engine that passes in a nullptr
- // for the opCtx. The ephemeral engine does not do parallel writes to cause oplog holes,
- // therefore it is safe to skip updating the oplogTruncateAfterPoint that tracks oplog holes.
+ // TODO (SERVER-45847): temporary hack for the ephemeral storage engine that passes in a
+ // nullptr for the opCtx. The ephemeral engine does not do parallel writes to cause oplog
+ // holes, therefore it is safe to skip updating the oplogTruncateAfterPoint that tracks
+ // oplog holes.
if (MONGO_likely(opCtx)) {
auto truncatePoint = repl::ReplicationProcess::get(opCtx)
->getConsistencyMarkers()
@@ -1078,14 +1089,15 @@ JournalListener::Token ReplicationCoordinatorExternalStateImpl::getToken(Operati
// All other repl states use the 'lastApplied'.
//
// Setting 'rollbackSafe' will ensure that a safe lastApplied value is returned if we're in
- // ROLLBACK state. 'lastApplied' may be momentarily set to an opTime from a divergent branch of
- // history during rollback, so a benign default value will be returned instead to prevent a
- // divergent 'lastApplied' from being used to forward the 'lastDurable' after rollback.
+ // ROLLBACK state. 'lastApplied' may be momentarily set to an opTime from a divergent branch
+ // of history during rollback, so a benign default value will be returned instead to prevent
+ // a divergent 'lastApplied' from being used to forward the 'lastDurable' after rollback.
//
- // No concurrency control is necessary and it is still safe if the node goes into ROLLBACK after
- // getting the token because the JournalFlusher is shut down during rollback, before a divergent
- // 'lastApplied' value is present. The JournalFlusher will start up again in ROLLBACK and never
- // transition from non-ROLLBACK to ROLLBACK with a divergent 'lastApplied' value.
+ // No concurrency control is necessary and it is still safe if the node goes into ROLLBACK
+ // after getting the token because the JournalFlusher is shut down during rollback, before a
+ // divergent 'lastApplied' value is present. The JournalFlusher will start up again in
+ // ROLLBACK and never transition from non-ROLLBACK to ROLLBACK with a divergent
+ // 'lastApplied' value.
return repl::ReplicationCoordinator::get(_service)->getMyLastAppliedOpTimeAndWallTime(
/*rollbackSafe=*/true);
}