summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp190
-rw-r--r--src/mongo/db/s/collection_metadata.cpp6
-rw-r--r--src/mongo/db/s/collection_metadata.h6
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp48
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_recipient_common.h2
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp78
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h30
-rw-r--r--src/mongo/db/s/resharding_util.cpp2
l---------src/mongo/db/storage/wiredtiger/Conscript1
9 files changed, 271 insertions, 92 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index ff76b5c5cf2..cab39ffea0a 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -82,6 +82,7 @@
#include "mongo/db/s/migration_util.h"
#include "mongo/db/s/periodic_balancer_config_refresher.h"
#include "mongo/db/s/periodic_sharded_index_consistency_checker.h"
+#include "mongo/db/s/resharding/resharding_donor_recipient_common.h"
#include "mongo/db/s/sharding_ddl_util.h"
#include "mongo/db/s/sharding_initialization_mongod.h"
#include "mongo/db/s/sharding_state_recovery.h"
@@ -171,8 +172,8 @@ auto makeTaskExecutor(ServiceContext* service,
}
/**
- * Schedules a task using the executor. This task is always run unless the task executor is shutting
- * down.
+ * Schedules a task using the executor. This task is always run unless the task executor is
+ * shutting down.
*/
void scheduleWork(executor::TaskExecutor* executor, executor::TaskExecutor::CallbackFn work) {
auto cbh = executor->scheduleWork(
@@ -226,9 +227,9 @@ void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication(
invariant(!_oplogApplier);
- // Using noop observer now that BackgroundSync no longer implements the OplogApplier::Observer
- // interface. During steady state replication, there is no need to log details on every batch
- // we apply.
+ // Using noop observer now that BackgroundSync no longer implements the
+ // OplogApplier::Observer interface. During steady state replication, there is no need to
+ // log details on every batch we apply.
_oplogApplier = std::make_unique<OplogApplierImpl>(
_oplogApplierTaskExecutor.get(),
_oplogBuffer.get(),
@@ -291,8 +292,8 @@ void ReplicationCoordinatorExternalStateImpl::_stopDataReplication_inlock(
}
// Clear the buffer. This unblocks the OplogFetcher if it is blocked with a full queue, but
- // ensures that it won't add anything. It will also unblock the OplogApplier pipeline if it is
- // waiting for an operation to be past the slaveDelay point.
+ // ensures that it won't add anything. It will also unblock the OplogApplier pipeline if it
+ // is waiting for an operation to be past the slaveDelay point.
if (oldOplogBuffer) {
oldOplogBuffer->clear(opCtx);
}
@@ -352,8 +353,8 @@ void ReplicationCoordinatorExternalStateImpl::clearAppliedThroughIfCleanShutdown
}
// Ensure that all writes are visible before reading. If we failed mid-batch, it would be
- // possible to read from a kNoOverlap ReadSource where not all writes to the minValid document
- // are visible, generating a writeConflict that would not resolve.
+ // possible to read from a kNoOverlap ReadSource where not all writes to the minValid
+ // document are visible, generating a writeConflict that would not resolve.
invariant(RecoveryUnit::ReadSource::kNoTimestamp ==
opCtx->recoveryUnit()->getTimestampReadSource());
@@ -362,13 +363,14 @@ void ReplicationCoordinatorExternalStateImpl::clearAppliedThroughIfCleanShutdown
loadLastOpTimeAndWallTimeResult.isOK() &&
loadLastOpTimeAndWallTimeResult.getValue().opTime ==
_replicationProcess->getConsistencyMarkers()->getAppliedThrough(opCtx)) {
- // Clear the appliedThrough marker to indicate we are consistent with the top of the oplog.
+ // Clear the appliedThrough marker to indicate we are consistent with the top of the
+ // oplog.
//
- // TODO SERVER-53642: We used to record this update at the 'lastAppliedOpTime'. If there are
- // any outstanding checkpoints being taken, they should only reflect this write if they see
- // all writes up to our 'lastAppliedOpTime'. But with Lock Free Reads we can have readers on
- // that timestamp, making it not safe to write to, even as we're holding the RSTL in
- // exclusive mode.
+ // TODO SERVER-53642: We used to record this update at the 'lastAppliedOpTime'. If there
+ // are any outstanding checkpoints being taken, they should only reflect this write if
+ // they see all writes up to our 'lastAppliedOpTime'. But with Lock Free Reads we can
+ // have readers on that timestamp, making it not safe to write to, even as we're holding
+ // the RSTL in exclusive mode.
invariant(opCtx->lockState()->isRSTLExclusive());
_replicationProcess->getConsistencyMarkers()->clearAppliedThrough(opCtx, Timestamp());
@@ -393,24 +395,24 @@ void ReplicationCoordinatorExternalStateImpl::shutdown(OperationContext* opCtx)
// Perform additional shutdown steps below that must be done outside _threadMutex.
- // Stop the NoOpWriter before grabbing the mutex to avoid creating a deadlock as the NoOpWriter
- // itself can block on the ReplicationCoordinator mutex. It is safe to access _noopWriter
- // outside of _threadMutex because _noopWriter is protected by its own mutex.
+ // Stop the NoOpWriter before grabbing the mutex to avoid creating a deadlock as the
+ // NoOpWriter itself can block on the ReplicationCoordinator mutex. It is safe to access
+ // _noopWriter outside of _threadMutex because _noopWriter is protected by its own mutex.
invariant(_noopWriter);
LOGV2_DEBUG(21308, 1, "Stopping noop writer");
_noopWriter->stopWritingPeriodicNoops();
- // We must wait for _taskExecutor outside of _threadMutex, since _taskExecutor is used to run
- // the dropPendingCollectionReaper, which takes database locks. It is safe to access
+ // We must wait for _taskExecutor outside of _threadMutex, since _taskExecutor is used to
+ // run the dropPendingCollectionReaper, which takes database locks. It is safe to access
// _taskExecutor outside of _threadMutex because once _startedThreads is set to true, the
// _taskExecutor pointer never changes.
_taskExecutor->join();
// The oplog truncate after point must be cleared, if we are still primary for shutdown, so
- // nothing gets truncated unnecessarily on startup. There are no oplog holes on clean primary
- // shutdown. Stepdown is similarly safe from holes and halts updates to and clears the truncate
- // point. The other replication states do need truncation if the truncate point is set: e.g.
- // interruption mid batch application can leave oplog holes.
+ // nothing gets truncated unnecessarily on startup. There are no oplog holes on clean
+ // primary shutdown. Stepdown is similarly safe from holes and halts updates to and clears
+ // the truncate point. The other replication states do need truncation if the truncate point
+ // is set: e.g. interruption mid batch application can leave oplog holes.
if (!storageGlobalParams.readOnly &&
_replicationProcess->getConsistencyMarkers()
->isOplogTruncateAfterPointBeingUsedForPrimary()) {
@@ -449,10 +451,11 @@ Status ReplicationCoordinatorExternalStateImpl::initializeReplSetStorage(Operati
_service->getOpObserver()->onOpMessage(opCtx, msgObj);
wuow.commit();
// ReplSetTest assumes that immediately after the replSetInitiate
- // command returns, it can allow other nodes to initial sync with no
- // retries and they will succeed. Unfortunately, initial sync will
- // fail if it finds its sync source has an empty oplog. Thus, we
- // need to wait here until the seed document is visible in our oplog.
+ // command returns, it can allow other nodes to initial sync with
+ // no retries and they will succeed. Unfortunately, initial sync
+ // will fail if it finds its sync source has an empty oplog.
+ // Thus, we need to wait here until the seed document is visible
+ // in our oplog.
_storageInterface->waitForAllEarlierOplogWritesToBeVisible(opCtx);
});
@@ -484,29 +487,31 @@ OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationC
invariant(
_replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx).isNull());
- // A primary periodically updates the oplogTruncateAfterPoint to allow replication to proceed
- // without danger of unidentifiable oplog holes on unclean shutdown due to parallel writes.
+ // A primary periodically updates the oplogTruncateAfterPoint to allow replication to
+ // proceed without danger of unidentifiable oplog holes on unclean shutdown due to parallel
+ // writes.
//
// Initialize the oplogTruncateAfterPoint so that user writes are safe on unclean shutdown
// between completion of transition to primary and the first async oplogTruncateAfterPoint
// update.
_replicationProcess->getConsistencyMarkers()->setOplogTruncateAfterPointToTopOfOplog(opCtx);
- // Tell the system to start updating the oplogTruncateAfterPoint asynchronously and to use the
- // truncate point, rather than last applied, to update the repl durable timestamp.
+ // Tell the system to start updating the oplogTruncateAfterPoint asynchronously and to use
+ // the truncate point, rather than last applied, to update the repl durable timestamp.
//
- // The truncate point must be used while primary for repl's durable timestamp because otherwise
- // we could truncate last applied writes on startup recovery after an unclean shutdown that were
- // previously majority confirmed to the user.
+ // The truncate point must be used while primary for repl's durable timestamp because
+ // otherwise we could truncate last applied writes on startup recovery after an unclean
+ // shutdown that were previously majority confirmed to the user.
_replicationProcess->getConsistencyMarkers()->startUsingOplogTruncateAfterPointForPrimary();
- // Clear the appliedThrough marker so on startup we'll use the top of the oplog. This must be
- // done before we add anything to our oplog.
+ // Clear the appliedThrough marker so on startup we'll use the top of the oplog. This must
+ // be done before we add anything to our oplog.
//
- // TODO SERVER-53642: We used to record this update at the 'lastAppliedOpTime'. If there are any
- // outstanding checkpoints being taken, they should only reflect this write if they see all
- // writes up to our 'lastAppliedOpTime'. But with Lock Free Reads we can have readers on that
- // timestamp, making it not safe to write to, even as we're holding the RSTL in exclusive mode.
+ // TODO SERVER-53642: We used to record this update at the 'lastAppliedOpTime'. If there are
+ // any outstanding checkpoints being taken, they should only reflect this write if they see
+ // all writes up to our 'lastAppliedOpTime'. But with Lock Free Reads we can have readers on
+ // that timestamp, making it not safe to write to, even as we're holding the RSTL in
+ // exclusive mode.
_replicationProcess->getConsistencyMarkers()->clearAppliedThrough(opCtx, Timestamp());
writeConflictRetry(opCtx, "logging transition to primary to oplog", "local.oplog.rs", [&] {
@@ -523,12 +528,12 @@ OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationC
auto opTimeToReturn = loadLastOpTimeAndWallTimeResult.getValue().opTime;
auto newTermStartDate = loadLastOpTimeAndWallTimeResult.getValue().wallTime;
- // This constant was based on data described in SERVER-44634. It is in relation to how long the
- // first majority committed write takes after a new term has started.
+ // This constant was based on data described in SERVER-44634. It is in relation to how long
+ // the first majority committed write takes after a new term has started.
const auto flowControlGracePeriod = Seconds(4);
- // SERVER-44634: Disable flow control for a grace period after stepup. Because writes may stop
- // while a node wins election and steps up, it's likely to determine there's majority point
- // lag. Moreover, because there are no writes in the system, flow control will believe
+ // SERVER-44634: Disable flow control for a grace period after stepup. Because writes may
+ // stop while a node wins election and steps up, it's likely to determine there's majority
+ // point lag. Moreover, because there are no writes in the system, flow control will believe
// secondaries are unable to process oplog entries. This can result in an undesirable "slow
// start" phenomena.
FlowControl::get(opCtx)->disableUntil(newTermStartDate + flowControlGracePeriod);
@@ -673,9 +678,10 @@ Status ReplicationCoordinatorExternalStateImpl::storeLocalLastVoteDocument(
WriteUnitOfWork wunit(opCtx);
// We only want to replace the last vote document if the new last vote document
- // would have a higher term. We check the term of the current last vote document and
- // insert the new document in a WriteUnitOfWork to synchronize the two operations.
- // We have already ensured at startup time that there is an old document.
+ // would have a higher term. We check the term of the current last vote document
+ // and insert the new document in a WriteUnitOfWork to synchronize the two
+ // operations. We have already ensured at startup time that there is an old
+ // document.
BSONObj result;
bool exists = Helpers::getSingleton(opCtx, lastVoteCollectionName, result);
fassert(51241, exists);
@@ -790,24 +796,25 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnStepDownHook() {
void ReplicationCoordinatorExternalStateImpl::_stopAsyncUpdatesOfAndClearOplogTruncateAfterPoint() {
auto opCtx = cc().getOperationContext();
// Temporarily turn off flow control ticketing. Getting a ticket can stall on a ticket being
- // available, which may have to wait for the ticket refresher to run, which in turn blocks on
- // the repl _mutex to check whether we are primary or not: this is a deadlock because stepdown
- // already holds the repl _mutex!
+ // available, which may have to wait for the ticket refresher to run, which in turn blocks
+ // on the repl _mutex to check whether we are primary or not: this is a deadlock because
+ // stepdown already holds the repl _mutex!
FlowControl::Bypass flowControlBypass(opCtx);
- // Tell the system to stop updating the oplogTruncateAfterPoint asynchronously and to go back to
- // using last applied to update repl's durable timestamp instead of the truncate point.
+ // Tell the system to stop updating the oplogTruncateAfterPoint asynchronously and to go
+ // back to using last applied to update repl's durable timestamp instead of the truncate
+ // point.
_replicationProcess->getConsistencyMarkers()->stopUsingOplogTruncateAfterPointForPrimary();
// Interrupt the current JournalFlusher thread round, so it recognizes that it is no longer
// primary. Otherwise the asynchronously running thread could race with setting the truncate
- // point to null below. This would leave the truncate point potentially stale in a non-PRIMARY
- // state, where last applied would be used to update repl's durable timestamp and confirm
- // majority writes. Startup recovery could truncate majority confirmed writes back to the stale
- // truncate after point.
+ // point to null below. This would leave the truncate point potentially stale in a
+ // non-PRIMARY state, where last applied would be used to update repl's durable timestamp
+ // and confirm majority writes. Startup recovery could truncate majority confirmed writes
+ // back to the stale truncate after point.
//
- // This makes sure the JournalFlusher is not stuck waiting for a lock that stepdown might hold
- // before doing an update write to the truncate point.
+ // This makes sure the JournalFlusher is not stuck waiting for a lock that stepdown might
+ // hold before doing an update write to the truncate point.
JournalFlusher::get(_service)->interruptJournalFlusherForReplStateChange();
// Wait for another round of journal flushing. This will ensure that we wait for the current
@@ -816,8 +823,8 @@ void ReplicationCoordinatorExternalStateImpl::_stopAsyncUpdatesOfAndClearOplogTr
// above, if writing is imminent, so we must make sure that the code completes fully.
JournalFlusher::get(_service)->waitForJournalFlush();
- // Writes to non-replicated collections do not need concurrency control with the OplogApplier
- // that never accesses them. Skip taking the PBWM.
+ // Writes to non-replicated collections do not need concurrency control with the
+ // OplogApplier that never accesses them. Skip taking the PBWM.
ShouldNotConflictWithSecondaryBatchApplicationBlock shouldNotConflictBlock(opCtx->lockState());
// We can clear the oplogTruncateAfterPoint because we know there are no user writes during
@@ -830,9 +837,10 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook
if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
Status status = ShardingCatalogManager::get(opCtx)->initializeConfigDatabaseIfNeeded(opCtx);
if (!status.isOK() && status != ErrorCodes::AlreadyInitialized) {
- // If the node is shutting down or it lost quorum just as it was becoming primary, don't
- // run the sharding onStepUp machinery. The onStepDown counterpart to these methods is
- // already idempotent, so the machinery will remain in the stepped down state.
+ // If the node is shutting down or it lost quorum just as it was becoming primary,
+ // don't run the sharding onStepUp machinery. The onStepDown counterpart to these
+ // methods is already idempotent, so the machinery will remain in the stepped down
+ // state.
if (ErrorCodes::isShutdownError(status.code()) ||
ErrorCodes::isNotPrimaryError(status.code())) {
return;
@@ -844,14 +852,14 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook
}
if (status.isOK()) {
- // Load the clusterId into memory. Use local readConcern, since we can't use majority
- // readConcern in drain mode because the global lock prevents replication. This is
- // safe, since if the clusterId write is rolled back, any writes that depend on it will
- // also be rolled back.
+ // Load the clusterId into memory. Use local readConcern, since we can't use
+ // majority readConcern in drain mode because the global lock prevents replication.
+ // This is safe, since if the clusterId write is rolled back, any writes that depend
+ // on it will also be rolled back.
//
- // Since we *just* wrote the cluster ID to the config.version document (via the call to
- // ShardingCatalogManager::initializeConfigDatabaseIfNeeded above), this read can only
- // meaningfully fail if the node is shutting down.
+ // Since we *just* wrote the cluster ID to the config.version document (via the call
+ // to ShardingCatalogManager::initializeConfigDatabaseIfNeeded above), this read can
+ // only meaningfully fail if the node is shutting down.
status = ClusterIdentityLoader::get(opCtx)->loadClusterId(
opCtx, repl::ReadConcernLevel::kLocalReadConcern);
@@ -873,9 +881,9 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook
} else if (ShardingState::get(opCtx)->enabled()) {
Status status = ShardingStateRecovery::recover(opCtx);
- // If the node is shutting down or it lost quorum just as it was becoming primary, don't run
- // the sharding onStepUp machinery. The onStepDown counterpart to these methods is already
- // idempotent, so the machinery will remain in the stepped down state.
+ // If the node is shutting down or it lost quorum just as it was becoming primary, don't
+ // run the sharding onStepUp machinery. The onStepDown counterpart to these methods is
+ // already idempotent, so the machinery will remain in the stepped down state.
if (ErrorCodes::isShutdownError(status.code()) ||
ErrorCodes::isNotPrimaryError(status.code())) {
return;
@@ -900,6 +908,8 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook
sharding_ddl_util::retakeInMemoryRecoverableCriticalSections(opCtx);
+ const bool scheduleAsyncRefresh = true;
+ resharding::clearFilteringMetadata(opCtx, scheduleAsyncRefresh);
} else { // unsharded
if (auto validator = LogicalTimeValidator::get(_service)) {
validator->enableKeyGenerator(opCtx, true);
@@ -997,8 +1007,8 @@ void ReplicationCoordinatorExternalStateImpl::notifyOplogMetadataWaiters(
const OpTime& committedOpTime) {
signalOplogWaiters();
- // Notify the DropPendingCollectionReaper if there are any drop-pending collections with drop
- // optimes before or at the committed optime.
+ // Notify the DropPendingCollectionReaper if there are any drop-pending collections with
+ // drop optimes before or at the committed optime.
if (auto earliestDropOpTime = _dropPendingCollectionReaper->getEarliestDropOpTime()) {
if (committedOpTime >= *earliestDropOpTime) {
auto reaper = _dropPendingCollectionReaper;
@@ -1060,12 +1070,13 @@ std::size_t ReplicationCoordinatorExternalStateImpl::getOplogFetcherInitialSyncM
}
JournalListener::Token ReplicationCoordinatorExternalStateImpl::getToken(OperationContext* opCtx) {
- // If in state PRIMARY, the oplogTruncateAfterPoint must be used for the Durable timestamp in
- // order to avoid majority confirming any writes that could later be truncated.
+ // If in state PRIMARY, the oplogTruncateAfterPoint must be used for the Durable timestamp
+ // in order to avoid majority confirming any writes that could later be truncated.
//
- // TODO (SERVER-45847): temporary hack for the ephemeral storage engine that passes in a nullptr
- // for the opCtx. The ephemeral engine does not do parallel writes to cause oplog holes,
- // therefore it is safe to skip updating the oplogTruncateAfterPoint that tracks oplog holes.
+ // TODO (SERVER-45847): temporary hack for the ephemeral storage engine that passes in a
+ // nullptr for the opCtx. The ephemeral engine does not do parallel writes to cause oplog
+ // holes, therefore it is safe to skip updating the oplogTruncateAfterPoint that tracks
+ // oplog holes.
if (MONGO_likely(opCtx)) {
auto truncatePoint = repl::ReplicationProcess::get(opCtx)
->getConsistencyMarkers()
@@ -1078,14 +1089,15 @@ JournalListener::Token ReplicationCoordinatorExternalStateImpl::getToken(Operati
// All other repl states use the 'lastApplied'.
//
// Setting 'rollbackSafe' will ensure that a safe lastApplied value is returned if we're in
- // ROLLBACK state. 'lastApplied' may be momentarily set to an opTime from a divergent branch of
- // history during rollback, so a benign default value will be returned instead to prevent a
- // divergent 'lastApplied' from being used to forward the 'lastDurable' after rollback.
+ // ROLLBACK state. 'lastApplied' may be momentarily set to an opTime from a divergent branch
+ // of history during rollback, so a benign default value will be returned instead to prevent
+ // a divergent 'lastApplied' from being used to forward the 'lastDurable' after rollback.
//
- // No concurrency control is necessary and it is still safe if the node goes into ROLLBACK after
- // getting the token because the JournalFlusher is shut down during rollback, before a divergent
- // 'lastApplied' value is present. The JournalFlusher will start up again in ROLLBACK and never
- // transition from non-ROLLBACK to ROLLBACK with a divergent 'lastApplied' value.
+ // No concurrency control is necessary and it is still safe if the node goes into ROLLBACK
+ // after getting the token because the JournalFlusher is shut down during rollback, before a
+ // divergent 'lastApplied' value is present. The JournalFlusher will start up again in
+ // ROLLBACK and never transition from non-ROLLBACK to ROLLBACK with a divergent
+ // 'lastApplied' value.
return repl::ReplicationCoordinator::get(_service)->getMyLastAppliedOpTimeAndWallTime(
/*rollbackSafe=*/true);
}
diff --git a/src/mongo/db/s/collection_metadata.cpp b/src/mongo/db/s/collection_metadata.cpp
index 86744c1594f..518f5405085 100644
--- a/src/mongo/db/s/collection_metadata.cpp
+++ b/src/mongo/db/s/collection_metadata.cpp
@@ -184,6 +184,12 @@ void CollectionMetadata::toBSONBasic(BSONObjBuilder& bb) const {
}
}
+BSONObj CollectionMetadata::toBSON() const {
+ BSONObjBuilder builder;
+ toBSONBasic(builder);
+ return builder.obj();
+}
+
std::string CollectionMetadata::toStringBasic() const {
if (isSharded()) {
return str::stream() << "collection version: " << _cm->getVersion().toString()
diff --git a/src/mongo/db/s/collection_metadata.h b/src/mongo/db/s/collection_metadata.h
index 13fe79d78e0..a742b77726f 100644
--- a/src/mongo/db/s/collection_metadata.h
+++ b/src/mongo/db/s/collection_metadata.h
@@ -170,11 +170,17 @@ public:
*/
void toBSONBasic(BSONObjBuilder& bb) const;
+ BSONObj toBSON() const;
+
/**
* String output of the collection and shard versions.
*/
std::string toStringBasic() const;
+ std::string toString() const {
+ return toStringBasic();
+ }
+
//
// Methods used for orphan filtering and general introspection of the chunks owned by the shard
//
diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp
index df03a16bddb..8c95024d108 100644
--- a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp
@@ -32,14 +32,17 @@
#include "mongo/platform/basic.h"
#include "mongo/db/s/resharding/resharding_donor_recipient_common.h"
-#include "mongo/db/storage/duplicate_key_error_info.h"
#include <fmt/format.h>
+#include "mongo/db/persistent_task_store.h"
+#include "mongo/db/s/shard_filtering_metadata_refresh.h"
#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/storage/duplicate_key_error_info.h"
#include "mongo/logv2/log.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/grid.h"
+#include "mongo/stdx/unordered_set.h"
namespace mongo {
namespace resharding {
@@ -376,6 +379,49 @@ void processReshardingFieldsForCollection(OperationContext* opCtx,
}
}
+void clearFilteringMetadata(OperationContext* opCtx, bool scheduleAsyncRefresh) {
+ stdx::unordered_set<NamespaceString> namespacesToRefresh;
+ for (const NamespaceString homeToReshardingDocs :
+ {NamespaceString::kDonorReshardingOperationsNamespace,
+ NamespaceString::kRecipientReshardingOperationsNamespace}) {
+ PersistentTaskStore<CommonReshardingMetadata> store(homeToReshardingDocs);
+
+ store.forEach(opCtx, Query(), [&](CommonReshardingMetadata reshardingDoc) -> bool {
+ namespacesToRefresh.insert(reshardingDoc.getSourceNss());
+ namespacesToRefresh.insert(reshardingDoc.getTempReshardingNss());
+
+ return true;
+ });
+ }
+
+ for (const auto& nss : namespacesToRefresh) {
+ AutoGetCollection autoColl(opCtx, nss, MODE_IX);
+ CollectionShardingRuntime::get(opCtx, nss)->clearFilteringMetadata(opCtx);
+
+ if (!scheduleAsyncRefresh) {
+ continue;
+ }
+
+ ExecutorFuture<void>(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor())
+ .then([svcCtx = opCtx->getServiceContext(), nss] {
+ ThreadClient tc("TriggerReshardingRecovery", svcCtx);
+ {
+ stdx::lock_guard<Client> lk(*tc.get());
+ tc->setSystemOperationKillableByStepdown(lk);
+ }
+
+ auto opCtx = tc->makeOperationContext();
+ onShardVersionMismatch(opCtx.get(), nss, boost::none /* shardVersionReceived */);
+ })
+ .onError([](const Status& status) {
+ LOGV2_WARNING(5498101,
+ "Error on deferred shardVersion recovery execution",
+ "error"_attr = redact(status));
+ })
+ .getAsync([](auto) {});
+ }
+}
+
} // namespace resharding
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common.h b/src/mongo/db/s/resharding/resharding_donor_recipient_common.h
index bf016473582..0a103ef3619 100644
--- a/src/mongo/db/s/resharding/resharding_donor_recipient_common.h
+++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common.h
@@ -76,6 +76,8 @@ void processReshardingFieldsForCollection(OperationContext* opCtx,
const CollectionMetadata& metadata,
const ReshardingFields& reshardingFields);
+void clearFilteringMetadata(OperationContext* opCtx, bool scheduleAsyncRefresh);
+
} // namespace resharding
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp
index 6bfdd45bbe8..9768b725524 100644
--- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp
@@ -31,12 +31,15 @@
#include "mongo/db/s/resharding/resharding_donor_recipient_common_test.h"
+#include "mongo/db/catalog/drop_database.h"
#include "mongo/db/catalog_raii.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/persistent_task_store.h"
#include "mongo/db/repl/wait_for_majority_service.h"
#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/operation_sharding_state.h"
+#include "mongo/db/s/resharding/donor_document_gen.h"
#include "mongo/db/s/resharding/resharding_donor_recipient_common.h"
#include "mongo/db/s/shard_server_test_fixture.h"
#include "mongo/unittest/death_test.h"
@@ -213,6 +216,81 @@ TEST_F(ReshardingDonorRecipientCommonTest, ProcessReshardingFieldsWithoutDonorOr
5274201);
}
+TEST_F(ReshardingDonorRecipientCommonTest, ClearReshardingFilteringMetaData) {
+ OperationContext* opCtx = operationContext();
+
+ const bool scheduleAsyncRefresh = false;
+ auto doSetupFunc = [&] {
+ // Clear out the resharding donor/recipient metadata collections.
+ for (auto const& nss : {NamespaceString::kDonorReshardingOperationsNamespace,
+ NamespaceString::kRecipientReshardingOperationsNamespace}) {
+ dropDatabase(opCtx, nss.db().toString()).ignore();
+ }
+
+ // Assert the prestate has no filtering metadata.
+ for (auto const& nss : {kOriginalNss, kTemporaryReshardingNss}) {
+ AutoGetCollection autoColl(opCtx, nss, LockMode::MODE_IS);
+ auto csr = CollectionShardingRuntime::get(opCtx, nss);
+ ASSERT(csr->getCurrentMetadataIfKnown() == boost::none);
+ }
+
+ // Add filtering metadata for the collection being resharded.
+ {
+ AutoGetCollection autoColl(opCtx, kOriginalNss, LockMode::MODE_IS);
+ auto csr = CollectionShardingRuntime::get(opCtx, kOriginalNss);
+ csr->setFilteringMetadata(opCtx,
+ makeShardedMetadataForOriginalCollection(opCtx, kThisShard));
+ ASSERT(csr->getCurrentMetadataIfKnown());
+ }
+
+ // Add filtering metadata for the temporary resharding namespace.
+ {
+ AutoGetCollection autoColl(opCtx, kTemporaryReshardingNss, LockMode::MODE_IS);
+ auto csr = CollectionShardingRuntime::get(opCtx, kTemporaryReshardingNss);
+ csr->setFilteringMetadata(
+ opCtx, makeShardedMetadataForTemporaryReshardingCollection(opCtx, kThisShard));
+ ASSERT(csr->getCurrentMetadataIfKnown());
+ }
+
+ // Prior to adding a resharding document, assert that attempting to clear filtering does
+ // nothing.
+ resharding::clearFilteringMetadata(opCtx, scheduleAsyncRefresh);
+
+ for (auto const& nss : {kOriginalNss, kTemporaryReshardingNss}) {
+ AutoGetCollection autoColl(opCtx, nss, LockMode::MODE_IS);
+ auto csr = CollectionShardingRuntime::get(opCtx, nss);
+ ASSERT(csr->getCurrentMetadataIfKnown());
+ }
+ };
+
+ doSetupFunc();
+ // Add a resharding donor document that targets the namespaces involved in resharding.
+ ReshardingDonorDocument donorDoc = makeDonorStateDoc();
+ ReshardingDonorService::DonorStateMachine::insertStateDocument(opCtx, donorDoc);
+
+ // Clear the filtering metadata (without scheduling a refresh) and assert the metadata is gone.
+ resharding::clearFilteringMetadata(opCtx, scheduleAsyncRefresh);
+
+ for (auto const& nss : {kOriginalNss, kTemporaryReshardingNss}) {
+ AutoGetCollection autoColl(opCtx, nss, LockMode::MODE_IS);
+ auto csr = CollectionShardingRuntime::get(opCtx, nss);
+ ASSERT(csr->getCurrentMetadataIfKnown() == boost::none);
+ }
+
+ doSetupFunc();
+ // Add a resharding recipient document that targets the namespaces involved in resharding.
+ ReshardingRecipientDocument recipDoc = makeRecipientStateDoc();
+ ReshardingRecipientService::RecipientStateMachine::insertStateDocument(opCtx, recipDoc);
+
+ // Clear the filtering metadata (without scheduling a refresh) and assert the metadata is gone.
+ resharding::clearFilteringMetadata(opCtx, scheduleAsyncRefresh);
+
+ for (auto const& nss : {kOriginalNss, kTemporaryReshardingNss}) {
+ AutoGetCollection autoColl(opCtx, nss, LockMode::MODE_IS);
+ auto csr = CollectionShardingRuntime::get(opCtx, nss);
+ ASSERT(csr->getCurrentMetadataIfKnown() == boost::none);
+ }
+}
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h
index b707a975444..f2f4ee7001f 100644
--- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h
+++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h
@@ -126,6 +126,36 @@ protected:
return CollectionMetadata(std::move(cm), kThisShard);
}
+ ReshardingDonorDocument makeDonorStateDoc() {
+ DonorShardContext donorCtx;
+ donorCtx.setState(DonorStateEnum::kPreparingToDonate);
+
+ ReshardingDonorDocument doc(std::move(donorCtx), {kThisShard, kOtherShard});
+
+ NamespaceString sourceNss = kOriginalNss;
+ auto sourceUUID = UUID::gen();
+ auto commonMetadata = CommonReshardingMetadata(
+ UUID::gen(), sourceNss, sourceUUID, kTemporaryReshardingNss, kReshardingKeyPattern);
+
+ doc.setCommonReshardingMetadata(std::move(commonMetadata));
+ return doc;
+ }
+
+ ReshardingRecipientDocument makeRecipientStateDoc() {
+ RecipientShardContext recipCtx;
+ recipCtx.setState(RecipientStateEnum::kCloning);
+
+ ReshardingRecipientDocument doc(std::move(recipCtx), {kThisShard, kOtherShard}, 1000);
+
+ NamespaceString sourceNss = kOriginalNss;
+ auto sourceUUID = UUID::gen();
+ auto commonMetadata = CommonReshardingMetadata(
+ UUID::gen(), sourceNss, sourceUUID, kTemporaryReshardingNss, kReshardingKeyPattern);
+
+ doc.setCommonReshardingMetadata(std::move(commonMetadata));
+ return doc;
+ }
+
ReshardingFields createCommonReshardingFields(const UUID& reshardingUUID,
CoordinatorStateEnum state) {
auto fields = ReshardingFields(reshardingUUID);
diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp
index 10bb3fcc583..b96711fd30b 100644
--- a/src/mongo/db/s/resharding_util.cpp
+++ b/src/mongo/db/s/resharding_util.cpp
@@ -177,7 +177,6 @@ void validateReshardedChunks(const std::vector<mongo::BSONObj>& chunks,
Grid::get(opCtx)->shardRegistry()->getShard(opCtx, chunk.getRecipientShardId()));
validChunks.push_back(chunk);
}
-
checkForHolesAndOverlapsInChunks(validChunks, keyPattern);
}
@@ -527,5 +526,4 @@ NamespaceString getLocalConflictStashNamespace(UUID existingUUID, ShardId donorS
"localReshardingConflictStash.{}.{}"_format(existingUUID.toString(),
donorShardId.toString())};
}
-
} // namespace mongo
diff --git a/src/mongo/db/storage/wiredtiger/Conscript b/src/mongo/db/storage/wiredtiger/Conscript
new file mode 120000
index 00000000000..2672a145b97
--- /dev/null
+++ b/src/mongo/db/storage/wiredtiger/Conscript
@@ -0,0 +1 @@
+/home/dgottlieb/xgen/mongo/src/mongo/db/storage/wiredtiger/oplog_stone_parameters.idl \ No newline at end of file