summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/global_user_write_block_state.cpp18
-rw-r--r--src/mongo/db/s/global_user_write_block_state.h15
-rw-r--r--src/mongo/db/s/sharding_ddl_coordinator.cpp14
-rw-r--r--src/mongo/db/s/sharding_ddl_coordinator_service.cpp16
-rw-r--r--src/mongo/db/s/sharding_ddl_coordinator_service.h6
-rw-r--r--src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp12
-rw-r--r--src/mongo/db/user_write_block_mode_op_observer.cpp13
8 files changed, 94 insertions, 1 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 8a897f2f13f..8f37f180c1b 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -181,6 +181,7 @@ env.Library(
'$BUILD_DIR/mongo/db/repl/replica_set_aware_service',
'$BUILD_DIR/mongo/db/rw_concern_d',
'sharding_api_d',
+ 'sharding_commands_d',
]
)
diff --git a/src/mongo/db/s/global_user_write_block_state.cpp b/src/mongo/db/s/global_user_write_block_state.cpp
index c7648075483..47422440e8a 100644
--- a/src/mongo/db/s/global_user_write_block_state.cpp
+++ b/src/mongo/db/s/global_user_write_block_state.cpp
@@ -65,4 +65,22 @@ void GlobalUserWriteBlockState::checkUserWritesAllowed(OperationContext* opCtx,
nss.isOnInternalDb());
}
+void GlobalUserWriteBlockState::enableUserShardedDDLBlocking(OperationContext* opCtx) {
+ invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer);
+ _userShardedDDLBlocked.store(true);
+}
+
+void GlobalUserWriteBlockState::disableUserShardedDDLBlocking(OperationContext* opCtx) {
+ _userShardedDDLBlocked.store(false);
+}
+
+void GlobalUserWriteBlockState::checkShardedDDLAllowedToStart(OperationContext* opCtx,
+ const NamespaceString& nss) const {
+ invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer);
+ uassert(ErrorCodes::OperationFailed,
+ "User writes blocked",
+ !_userShardedDDLBlocked.load() ||
+ WriteBlockBypass::get(opCtx).isWriteBlockBypassEnabled() || nss.isOnInternalDb());
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/global_user_write_block_state.h b/src/mongo/db/s/global_user_write_block_state.h
index 8325b717b15..f354f14a186 100644
--- a/src/mongo/db/s/global_user_write_block_state.h
+++ b/src/mongo/db/s/global_user_write_block_state.h
@@ -49,12 +49,25 @@ public:
/**
* Checks that user writes are allowed on the specified namespace. Callers must hold the
- * GlobalLock in any mode.
+ * GlobalLock in any mode. Throws OperationFailed if user writes are disallowed.
*/
void checkUserWritesAllowed(OperationContext* opCtx, const NamespaceString& nss) const;
+ /**
+ * Methods to enable/disable blocking new sharded DDL operations.
+ */
+ void enableUserShardedDDLBlocking(OperationContext* opCtx);
+ void disableUserShardedDDLBlocking(OperationContext* opCtx);
+
+ /**
+ * Checks that new sharded DDL operations are allowed to start. Throws OperationFailed if
+ * starting new sharded DDL operations is disallowed.
+ */
+ void checkShardedDDLAllowedToStart(OperationContext* opCtx, const NamespaceString& nss) const;
+
private:
bool _globalUserWritesBlocked{false};
+ AtomicWord<bool> _userShardedDDLBlocked{false};
};
} // namespace mongo
diff --git a/src/mongo/db/s/sharding_ddl_coordinator.cpp b/src/mongo/db/s/sharding_ddl_coordinator.cpp
index 3c05fbc25c0..505ae360b9a 100644
--- a/src/mongo/db/s/sharding_ddl_coordinator.cpp
+++ b/src/mongo/db/s/sharding_ddl_coordinator.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/logical_session_id_helpers.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/s/database_sharding_state.h"
+#include "mongo/db/s/global_user_write_block_state.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/sharding_ddl_coordinator_gen.h"
#include "mongo/db/s/sharding_ddl_util.h"
@@ -188,6 +189,19 @@ SemiFuture<void> ShardingDDLCoordinator::run(std::shared_ptr<executor::ScopedTas
const CancellationToken& token) noexcept {
return ExecutorFuture<void>(**executor)
.then([this, executor, token, anchor = shared_from_this()] {
+ auto opCtxHolder = cc().makeOperationContext();
+ auto* opCtx = opCtxHolder.get();
+ getForwardableOpMetadata().setOn(opCtx);
+
+ // Check if this coordinator is allowed to start according to the user-writes blocking
+ // critical section. If it is not the first execution, it means it had started already
+ // and we are recovering this coordinator. In this case, let it be completed even though
+ // new DDL operations may be prohibited now.
+ if (_firstExecution) {
+ GlobalUserWriteBlockState::get(opCtx)->checkShardedDDLAllowedToStart(opCtx, nss());
+ }
+ })
+ .then([this, executor, token, anchor = shared_from_this()] {
return _acquireLockAsync(executor, token, nss().db());
})
.then([this, executor, token, anchor = shared_from_this()] {
diff --git a/src/mongo/db/s/sharding_ddl_coordinator_service.cpp b/src/mongo/db/s/sharding_ddl_coordinator_service.cpp
index 1c01f4882dc..27d44ea57a3 100644
--- a/src/mongo/db/s/sharding_ddl_coordinator_service.cpp
+++ b/src/mongo/db/s/sharding_ddl_coordinator_service.cpp
@@ -165,6 +165,22 @@ void ShardingDDLCoordinatorService::waitForCoordinatorsOfGivenTypeToComplete(
});
}
+void ShardingDDLCoordinatorService::waitForOngoingCoordinatorsToFinish(OperationContext* opCtx) {
+ std::vector<SharedSemiFuture<void>> futuresToWait;
+
+ const auto instances = getAllInstances(opCtx);
+ for (const auto& instance : instances) {
+ auto typedInstance = checked_pointer_cast<ShardingDDLCoordinator>(instance);
+ // TODO: SERVER-63724 Wait only for coordinators that don't have the user-write-blocking
+ // bypass enabled.
+ futuresToWait.emplace_back(typedInstance->getCompletionFuture());
+ }
+
+ for (auto&& future : futuresToWait) {
+ future.wait(opCtx);
+ }
+}
+
void ShardingDDLCoordinatorService::_afterStepDown() {
stdx::lock_guard lg(_mutex);
_state = State::kPaused;
diff --git a/src/mongo/db/s/sharding_ddl_coordinator_service.h b/src/mongo/db/s/sharding_ddl_coordinator_service.h
index e45ce768b41..a2c4aae17cc 100644
--- a/src/mongo/db/s/sharding_ddl_coordinator_service.h
+++ b/src/mongo/db/s/sharding_ddl_coordinator_service.h
@@ -75,6 +75,12 @@ public:
void waitForCoordinatorsOfGivenTypeToComplete(OperationContext* opCtx,
DDLCoordinatorTypeEnum type) const;
+ /**
+ * Waits for all currently running coordinators to finish. While waiting here, new coordinators
+ * may start, but they will not be waited for.
+ */
+ void waitForOngoingCoordinatorsToFinish(OperationContext* opCtx);
+
private:
ExecutorFuture<void> _rebuildService(std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) override;
diff --git a/src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp b/src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp
index 6215032f7b0..c1457176934 100644
--- a/src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp
+++ b/src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/persistent_task_store.h"
#include "mongo/db/s/global_user_write_block_state.h"
+#include "mongo/db/s/sharding_ddl_coordinator_service.h"
#include "mongo/db/s/user_writes_critical_section_document_gen.h"
#include "mongo/logv2/log.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
@@ -177,6 +178,11 @@ void UserWritesRecoverableCriticalSectionService::
// Take the user writes critical section blocking only ShardingDDLCoordinators.
acquireRecoverableCriticalSection(
opCtx, nss, true /* blockShardedDDL */, false /* blockUserWrites */);
+
+ // Wait for ongoing ShardingDDLCoordinators to finish. This ensures that all coordinators that
+ // started before enabling blocking have finish, and that any new coordinator that is started
+ // after this point will see the blocking is enabled.
+ ShardingDDLCoordinatorService::getService(opCtx)->waitForOngoingCoordinatorsToFinish(opCtx);
}
void UserWritesRecoverableCriticalSectionService::
@@ -333,6 +339,7 @@ void UserWritesRecoverableCriticalSectionService::recoverRecoverableCriticalSect
OperationContext* opCtx) {
LOGV2_DEBUG(6351912, 2, "Recovering all user writes recoverable critical sections");
+ GlobalUserWriteBlockState::get(opCtx)->disableUserShardedDDLBlocking(opCtx);
GlobalUserWriteBlockState::get(opCtx)->disableUserWriteBlocking(opCtx);
// Read the persisted critical section documents and restore the state into memory.
@@ -340,6 +347,11 @@ void UserWritesRecoverableCriticalSectionService::recoverRecoverableCriticalSect
NamespaceString::kUserWritesCriticalSectionsNamespace);
store.forEach(opCtx, BSONObj{}, [&opCtx](const UserWriteBlockingCriticalSectionDocument& doc) {
invariant(doc.getNss().isEmpty());
+
+ if (doc.getBlockNewUserShardedDDL()) {
+ GlobalUserWriteBlockState::get(opCtx)->enableUserShardedDDLBlocking(opCtx);
+ }
+
if (doc.getBlockUserWrites()) {
GlobalUserWriteBlockState::get(opCtx)->enableUserWriteBlocking(opCtx);
}
diff --git a/src/mongo/db/user_write_block_mode_op_observer.cpp b/src/mongo/db/user_write_block_mode_op_observer.cpp
index 2d7c7de00f2..c024231909d 100644
--- a/src/mongo/db/user_write_block_mode_op_observer.cpp
+++ b/src/mongo/db/user_write_block_mode_op_observer.cpp
@@ -75,6 +75,12 @@ void UserWriteBlockModeOpObserver::onInserts(OperationContext* opCtx,
if (!isStandaloneOrPrimary(opCtx)) {
globalLockIfNotPrimary.emplace(opCtx, MODE_IX);
}
+
+ if (blockShardedDDL) {
+ GlobalUserWriteBlockState::get(opCtx)->enableUserShardedDDLBlocking(
+ opCtx);
+ }
+
if (blockWrites) {
GlobalUserWriteBlockState::get(opCtx)->enableUserWriteBlocking(opCtx);
}
@@ -108,6 +114,12 @@ void UserWriteBlockModeOpObserver::onUpdate(OperationContext* opCtx,
globalLockIfNotPrimary.emplace(opCtx, MODE_IX);
}
+ if (blockShardedDDL) {
+ GlobalUserWriteBlockState::get(opCtx)->enableUserShardedDDLBlocking(opCtx);
+ } else {
+ GlobalUserWriteBlockState::get(opCtx)->disableUserShardedDDLBlocking(opCtx);
+ }
+
if (blockWrites) {
GlobalUserWriteBlockState::get(opCtx)->enableUserWriteBlocking(opCtx);
} else {
@@ -155,6 +167,7 @@ void UserWriteBlockModeOpObserver::onDelete(OperationContext* opCtx,
globalLockIfNotPrimary.emplace(opCtx, MODE_IX);
}
+ GlobalUserWriteBlockState::get(opCtx)->disableUserShardedDDLBlocking(opCtx);
GlobalUserWriteBlockState::get(opCtx)->disableUserWriteBlocking(opCtx);
});
}