diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/s/global_user_write_block_state.cpp | 18 | ||||
-rw-r--r-- | src/mongo/db/s/global_user_write_block_state.h | 15 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_coordinator.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_coordinator_service.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_coordinator_service.h | 6 | ||||
-rw-r--r-- | src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/user_write_block_mode_op_observer.cpp | 13 |
8 files changed, 94 insertions, 1 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 8a897f2f13f..8f37f180c1b 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -181,6 +181,7 @@ env.Library( '$BUILD_DIR/mongo/db/repl/replica_set_aware_service', '$BUILD_DIR/mongo/db/rw_concern_d', 'sharding_api_d', + 'sharding_commands_d', ] ) diff --git a/src/mongo/db/s/global_user_write_block_state.cpp b/src/mongo/db/s/global_user_write_block_state.cpp index c7648075483..47422440e8a 100644 --- a/src/mongo/db/s/global_user_write_block_state.cpp +++ b/src/mongo/db/s/global_user_write_block_state.cpp @@ -65,4 +65,22 @@ void GlobalUserWriteBlockState::checkUserWritesAllowed(OperationContext* opCtx, nss.isOnInternalDb()); } +void GlobalUserWriteBlockState::enableUserShardedDDLBlocking(OperationContext* opCtx) { + invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer); + _userShardedDDLBlocked.store(true); +} + +void GlobalUserWriteBlockState::disableUserShardedDDLBlocking(OperationContext* opCtx) { + _userShardedDDLBlocked.store(false); +} + +void GlobalUserWriteBlockState::checkShardedDDLAllowedToStart(OperationContext* opCtx, + const NamespaceString& nss) const { + invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer); + uassert(ErrorCodes::OperationFailed, + "User writes blocked", + !_userShardedDDLBlocked.load() || + WriteBlockBypass::get(opCtx).isWriteBlockBypassEnabled() || nss.isOnInternalDb()); +} + } // namespace mongo diff --git a/src/mongo/db/s/global_user_write_block_state.h b/src/mongo/db/s/global_user_write_block_state.h index 8325b717b15..f354f14a186 100644 --- a/src/mongo/db/s/global_user_write_block_state.h +++ b/src/mongo/db/s/global_user_write_block_state.h @@ -49,12 +49,25 @@ public: /** * Checks that user writes are allowed on the specified namespace. Callers must hold the - * GlobalLock in any mode. + * GlobalLock in any mode. Throws OperationFailed if user writes are disallowed. */ void checkUserWritesAllowed(OperationContext* opCtx, const NamespaceString& nss) const; + /** + * Methods to enable/disable blocking new sharded DDL operations. + */ + void enableUserShardedDDLBlocking(OperationContext* opCtx); + void disableUserShardedDDLBlocking(OperationContext* opCtx); + + /** + * Checks that new sharded DDL operations are allowed to start. Throws OperationFailed if + * starting new sharded DDL operations is disallowed. + */ + void checkShardedDDLAllowedToStart(OperationContext* opCtx, const NamespaceString& nss) const; + private: bool _globalUserWritesBlocked{false}; + AtomicWord<bool> _userShardedDDLBlocked{false}; }; } // namespace mongo diff --git a/src/mongo/db/s/sharding_ddl_coordinator.cpp b/src/mongo/db/s/sharding_ddl_coordinator.cpp index 3c05fbc25c0..505ae360b9a 100644 --- a/src/mongo/db/s/sharding_ddl_coordinator.cpp +++ b/src/mongo/db/s/sharding_ddl_coordinator.cpp @@ -37,6 +37,7 @@ #include "mongo/db/logical_session_id_helpers.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/s/database_sharding_state.h" +#include "mongo/db/s/global_user_write_block_state.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/sharding_ddl_coordinator_gen.h" #include "mongo/db/s/sharding_ddl_util.h" @@ -188,6 +189,19 @@ SemiFuture<void> ShardingDDLCoordinator::run(std::shared_ptr<executor::ScopedTas const CancellationToken& token) noexcept { return ExecutorFuture<void>(**executor) .then([this, executor, token, anchor = shared_from_this()] { + auto opCtxHolder = cc().makeOperationContext(); + auto* opCtx = opCtxHolder.get(); + getForwardableOpMetadata().setOn(opCtx); + + // Check if this coordinator is allowed to start according to the user-writes blocking + // critical section. If it is not the first execution, it means it had started already + // and we are recovering this coordinator. In this case, let it be completed even though + // new DDL operations may be prohibited now. + if (_firstExecution) { + GlobalUserWriteBlockState::get(opCtx)->checkShardedDDLAllowedToStart(opCtx, nss()); + } + }) + .then([this, executor, token, anchor = shared_from_this()] { return _acquireLockAsync(executor, token, nss().db()); }) .then([this, executor, token, anchor = shared_from_this()] { diff --git a/src/mongo/db/s/sharding_ddl_coordinator_service.cpp b/src/mongo/db/s/sharding_ddl_coordinator_service.cpp index 1c01f4882dc..27d44ea57a3 100644 --- a/src/mongo/db/s/sharding_ddl_coordinator_service.cpp +++ b/src/mongo/db/s/sharding_ddl_coordinator_service.cpp @@ -165,6 +165,22 @@ void ShardingDDLCoordinatorService::waitForCoordinatorsOfGivenTypeToComplete( }); } +void ShardingDDLCoordinatorService::waitForOngoingCoordinatorsToFinish(OperationContext* opCtx) { + std::vector<SharedSemiFuture<void>> futuresToWait; + + const auto instances = getAllInstances(opCtx); + for (const auto& instance : instances) { + auto typedInstance = checked_pointer_cast<ShardingDDLCoordinator>(instance); + // TODO: SERVER-63724 Wait only for coordinators that don't have the user-write-blocking + // bypass enabled. + futuresToWait.emplace_back(typedInstance->getCompletionFuture()); + } + + for (auto&& future : futuresToWait) { + future.wait(opCtx); + } +} + void ShardingDDLCoordinatorService::_afterStepDown() { stdx::lock_guard lg(_mutex); _state = State::kPaused; diff --git a/src/mongo/db/s/sharding_ddl_coordinator_service.h b/src/mongo/db/s/sharding_ddl_coordinator_service.h index e45ce768b41..a2c4aae17cc 100644 --- a/src/mongo/db/s/sharding_ddl_coordinator_service.h +++ b/src/mongo/db/s/sharding_ddl_coordinator_service.h @@ -75,6 +75,12 @@ public: void waitForCoordinatorsOfGivenTypeToComplete(OperationContext* opCtx, DDLCoordinatorTypeEnum type) const; + /** + * Waits for all currently running coordinators to finish. While waiting here, new coordinators + * may start, but they will not be waited for. + */ + void waitForOngoingCoordinatorsToFinish(OperationContext* opCtx); + private: ExecutorFuture<void> _rebuildService(std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) override; diff --git a/src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp b/src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp index 6215032f7b0..c1457176934 100644 --- a/src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp +++ b/src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp @@ -37,6 +37,7 @@ #include "mongo/db/dbdirectclient.h" #include "mongo/db/persistent_task_store.h" #include "mongo/db/s/global_user_write_block_state.h" +#include "mongo/db/s/sharding_ddl_coordinator_service.h" #include "mongo/db/s/user_writes_critical_section_document_gen.h" #include "mongo/logv2/log.h" #include "mongo/s/catalog/sharding_catalog_client.h" @@ -177,6 +178,11 @@ void UserWritesRecoverableCriticalSectionService:: // Take the user writes critical section blocking only ShardingDDLCoordinators. acquireRecoverableCriticalSection( opCtx, nss, true /* blockShardedDDL */, false /* blockUserWrites */); + + // Wait for ongoing ShardingDDLCoordinators to finish. This ensures that all coordinators that + // started before enabling blocking have finish, and that any new coordinator that is started + // after this point will see the blocking is enabled. + ShardingDDLCoordinatorService::getService(opCtx)->waitForOngoingCoordinatorsToFinish(opCtx); } void UserWritesRecoverableCriticalSectionService:: @@ -333,6 +339,7 @@ void UserWritesRecoverableCriticalSectionService::recoverRecoverableCriticalSect OperationContext* opCtx) { LOGV2_DEBUG(6351912, 2, "Recovering all user writes recoverable critical sections"); + GlobalUserWriteBlockState::get(opCtx)->disableUserShardedDDLBlocking(opCtx); GlobalUserWriteBlockState::get(opCtx)->disableUserWriteBlocking(opCtx); // Read the persisted critical section documents and restore the state into memory. @@ -340,6 +347,11 @@ void UserWritesRecoverableCriticalSectionService::recoverRecoverableCriticalSect NamespaceString::kUserWritesCriticalSectionsNamespace); store.forEach(opCtx, BSONObj{}, [&opCtx](const UserWriteBlockingCriticalSectionDocument& doc) { invariant(doc.getNss().isEmpty()); + + if (doc.getBlockNewUserShardedDDL()) { + GlobalUserWriteBlockState::get(opCtx)->enableUserShardedDDLBlocking(opCtx); + } + if (doc.getBlockUserWrites()) { GlobalUserWriteBlockState::get(opCtx)->enableUserWriteBlocking(opCtx); } diff --git a/src/mongo/db/user_write_block_mode_op_observer.cpp b/src/mongo/db/user_write_block_mode_op_observer.cpp index 2d7c7de00f2..c024231909d 100644 --- a/src/mongo/db/user_write_block_mode_op_observer.cpp +++ b/src/mongo/db/user_write_block_mode_op_observer.cpp @@ -75,6 +75,12 @@ void UserWriteBlockModeOpObserver::onInserts(OperationContext* opCtx, if (!isStandaloneOrPrimary(opCtx)) { globalLockIfNotPrimary.emplace(opCtx, MODE_IX); } + + if (blockShardedDDL) { + GlobalUserWriteBlockState::get(opCtx)->enableUserShardedDDLBlocking( + opCtx); + } + if (blockWrites) { GlobalUserWriteBlockState::get(opCtx)->enableUserWriteBlocking(opCtx); } @@ -108,6 +114,12 @@ void UserWriteBlockModeOpObserver::onUpdate(OperationContext* opCtx, globalLockIfNotPrimary.emplace(opCtx, MODE_IX); } + if (blockShardedDDL) { + GlobalUserWriteBlockState::get(opCtx)->enableUserShardedDDLBlocking(opCtx); + } else { + GlobalUserWriteBlockState::get(opCtx)->disableUserShardedDDLBlocking(opCtx); + } + if (blockWrites) { GlobalUserWriteBlockState::get(opCtx)->enableUserWriteBlocking(opCtx); } else { @@ -155,6 +167,7 @@ void UserWriteBlockModeOpObserver::onDelete(OperationContext* opCtx, globalLockIfNotPrimary.emplace(opCtx, MODE_IX); } + GlobalUserWriteBlockState::get(opCtx)->disableUserShardedDDLBlocking(opCtx); GlobalUserWriteBlockState::get(opCtx)->disableUserWriteBlocking(opCtx); }); } |