summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJordi Serra Torrens <jordi.serra-torrens@mongodb.com>2022-03-29 10:59:19 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-29 11:57:57 +0000
commitce908a66890bcdd87e709b584682c6b3a3a851be (patch)
tree1440e57ae5afdd2d16f03c47b9679d6ba166c3ac /src
parent7a6435b219660136ddb73e63cba97b5003e14d8f (diff)
downloadmongo-ce908a66890bcdd87e709b584682c6b3a3a851be.tar.gz
SERVER-64943 Ensure replay safety for _shardsvrSetUserWriteBlockMode command
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/config/configsvr_coordinator.cpp20
-rw-r--r--src/mongo/db/s/config/configsvr_coordinator.h37
-rw-r--r--src/mongo/db/s/config/configsvr_coordinator.idl16
-rw-r--r--src/mongo/db/s/config/set_cluster_parameter_coordinator.cpp4
-rw-r--r--src/mongo/db/s/config/set_cluster_parameter_coordinator.h2
-rw-r--r--src/mongo/db/s/config/set_user_write_block_mode_coordinator.cpp40
-rw-r--r--src/mongo/db/s/config/set_user_write_block_mode_coordinator.h2
-rw-r--r--src/mongo/db/s/shardsvr_set_user_write_block_mode_command.cpp28
-rw-r--r--src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp25
-rw-r--r--src/mongo/db/transaction_validation.cpp3
10 files changed, 148 insertions, 29 deletions
diff --git a/src/mongo/db/s/config/configsvr_coordinator.cpp b/src/mongo/db/s/config/configsvr_coordinator.cpp
index af9e21db355..0160c830e8b 100644
--- a/src/mongo/db/s/config/configsvr_coordinator.cpp
+++ b/src/mongo/db/s/config/configsvr_coordinator.cpp
@@ -72,6 +72,17 @@ void ConfigsvrCoordinator::_removeStateDocument(OperationContext* opCtx) {
WriteConcerns::kMajorityWriteConcernNoTimeout);
}
+OperationSessionInfo ConfigsvrCoordinator::_getCurrentSession() const {
+ const auto& coordinatorMetadata = metadata();
+ invariant(coordinatorMetadata.getSession());
+ ConfigsvrCoordinatorSession coordinatorSession = *coordinatorMetadata.getSession();
+
+ OperationSessionInfo osi;
+ osi.setSessionId(coordinatorSession.getLsid());
+ osi.setTxnNumber(coordinatorSession.getTxnNumber());
+ return osi;
+}
+
void ConfigsvrCoordinator::interrupt(Status status) noexcept {
LOGV2_DEBUG(6347303,
1,
@@ -113,6 +124,15 @@ SemiFuture<void> ConfigsvrCoordinator::run(std::shared_ptr<executor::ScopedTaskE
auto opCtxHolder = cc().makeOperationContext();
auto* opCtx = opCtxHolder.get();
_removeStateDocument(opCtx);
+
+ const auto session = metadata().getSession();
+ if (session && status.isOK()) {
+ // Return lsid to the InternalSessionPool. If status is not OK, let the session
+ // be discarded.
+ InternalSessionPool::get(opCtx)->release(
+ {session->getLsid(), session->getTxnNumber()});
+ }
+
} catch (DBException& ex) {
LOGV2_WARNING(6347302,
"Failed to remove ConfigsvrCoordinator state document",
diff --git a/src/mongo/db/s/config/configsvr_coordinator.h b/src/mongo/db/s/config/configsvr_coordinator.h
index 426223e8dae..6c7facecb9a 100644
--- a/src/mongo/db/s/config/configsvr_coordinator.h
+++ b/src/mongo/db/s/config/configsvr_coordinator.h
@@ -65,10 +65,47 @@ protected:
virtual ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) noexcept = 0;
+ virtual const ConfigsvrCoordinatorMetadata& metadata() const = 0;
+
void interrupt(Status status) noexcept override final;
void _removeStateDocument(OperationContext* opCtx);
+ template <typename StateDoc>
+ void _updateStateDocument(OperationContext* opCtx, const StateDoc& newDoc) {
+ PersistentTaskStore<StateDoc> store(NamespaceString::kConfigsvrCoordinatorsNamespace);
+ store.update(opCtx,
+ BSON(StateDoc::kIdFieldName << newDoc.getId().toBSON()),
+ newDoc.toBSON(),
+ WriteConcerns::kMajorityWriteConcernShardingTimeout);
+ }
+
+ template <typename StateDoc>
+ StateDoc _updateSession(OperationContext* opCtx, const StateDoc& doc) {
+ const auto newCoordinatorMetadata = [&] {
+ ConfigsvrCoordinatorMetadata newMetadata = doc.getConfigsvrCoordinatorMetadata();
+
+ const auto optPrevSession = doc.getSession();
+ if (optPrevSession) {
+ newMetadata.setSession(ConfigsvrCoordinatorSession(
+ optPrevSession->getLsid(), optPrevSession->getTxnNumber() + 1));
+ } else {
+ const auto newSession = InternalSessionPool::get(opCtx)->acquireSystemSession();
+ newMetadata.setSession(ConfigsvrCoordinatorSession(newSession.getSessionId(),
+ newSession.getTxnNumber()));
+ }
+
+ return newMetadata;
+ }();
+
+ StateDoc newDoc(doc);
+ newDoc.setConfigsvrCoordinatorMetadata(std::move(newCoordinatorMetadata));
+ _updateStateDocument(opCtx, newDoc);
+ return newDoc;
+ }
+
+ OperationSessionInfo _getCurrentSession() const;
+
Mutex _mutex = MONGO_MAKE_LATCH("ConfigsvrCoordinator::_mutex");
SharedPromise<void> _completionPromise;
};
diff --git a/src/mongo/db/s/config/configsvr_coordinator.idl b/src/mongo/db/s/config/configsvr_coordinator.idl
index dccf776b84d..b97e29fe6a4 100644
--- a/src/mongo/db/s/config/configsvr_coordinator.idl
+++ b/src/mongo/db/s/config/configsvr_coordinator.idl
@@ -52,6 +52,16 @@ structs:
description: "Type of the ConfigsvrCoordinator"
type: ConfigsvrCoordinatorType
+ ConfigsvrCoordinatorSession:
+ description: "Container for configsvr coordinator session used for guaranteeing remote
+ command idempotency."
+ strict: false
+ fields:
+ lsid:
+ type: LogicalSessionId
+ txnNumber:
+ type: TxnNumber
+
ConfigsvrCoordinatorMetadata:
description: "Common metadata for all ConfigsvrCoordinators"
generate_comparison_operators: false
@@ -60,6 +70,6 @@ structs:
_id:
cpp_name: id
type: ConfigsvrCoordinatorId
- recoveredFromDisk:
- type: bool
- default: false
+ session:
+ type: ConfigsvrCoordinatorSession
+ optional: true
diff --git a/src/mongo/db/s/config/set_cluster_parameter_coordinator.cpp b/src/mongo/db/s/config/set_cluster_parameter_coordinator.cpp
index 1a1a95b253e..209ebf8a2f3 100644
--- a/src/mongo/db/s/config/set_cluster_parameter_coordinator.cpp
+++ b/src/mongo/db/s/config/set_cluster_parameter_coordinator.cpp
@@ -87,6 +87,10 @@ void SetClusterParameterCoordinator::_enterPhase(Phase newPhase) {
_doc = std::move(newDoc);
}
+const ConfigsvrCoordinatorMetadata& SetClusterParameterCoordinator::metadata() const {
+ return _doc.getConfigsvrCoordinatorMetadata();
+}
+
ExecutorFuture<void> SetClusterParameterCoordinator::_runImpl(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) noexcept {
diff --git a/src/mongo/db/s/config/set_cluster_parameter_coordinator.h b/src/mongo/db/s/config/set_cluster_parameter_coordinator.h
index 95aa2823608..ec0f0fead04 100644
--- a/src/mongo/db/s/config/set_cluster_parameter_coordinator.h
+++ b/src/mongo/db/s/config/set_cluster_parameter_coordinator.h
@@ -57,6 +57,8 @@ private:
ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) noexcept override;
+ const ConfigsvrCoordinatorMetadata& metadata() const override;
+
template <typename Func>
auto _executePhase(const Phase& newPhase, Func&& func) {
return [=] {
diff --git a/src/mongo/db/s/config/set_user_write_block_mode_coordinator.cpp b/src/mongo/db/s/config/set_user_write_block_mode_coordinator.cpp
index 28ff0648424..e719bdecc40 100644
--- a/src/mongo/db/s/config/set_user_write_block_mode_coordinator.cpp
+++ b/src/mongo/db/s/config/set_user_write_block_mode_coordinator.cpp
@@ -62,18 +62,19 @@ ShardsvrSetUserWriteBlockMode makeShardsvrSetUserWriteBlockModeCommand(
void sendSetUserWriteBlockModeCmdToAllShards(OperationContext* opCtx,
std::shared_ptr<executor::TaskExecutor> executor,
bool block,
- ShardsvrSetUserWriteBlockModePhaseEnum phase) {
+ ShardsvrSetUserWriteBlockModePhaseEnum phase,
+ const OperationSessionInfo& osi) {
const auto allShards = Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx);
const auto shardsvrSetUserWriteBlockModeCmd =
makeShardsvrSetUserWriteBlockModeCommand(block, phase);
- sharding_util::sendCommandToShards(
- opCtx,
- shardsvrSetUserWriteBlockModeCmd.getDbName(),
- CommandHelpers::appendMajorityWriteConcern(shardsvrSetUserWriteBlockModeCmd.toBSON({})),
- allShards,
- executor);
+ sharding_util::sendCommandToShards(opCtx,
+ shardsvrSetUserWriteBlockModeCmd.getDbName(),
+ CommandHelpers::appendMajorityWriteConcern(
+ shardsvrSetUserWriteBlockModeCmd.toBSON(osi.toBSON())),
+ allShards,
+ executor);
}
} // namespace
@@ -115,15 +116,16 @@ void SetUserWriteBlockModeCoordinator::_enterPhase(Phase newPhase) {
if (_doc.getPhase() == Phase::kUnset) {
store.add(opCtx.get(), newDoc, WriteConcerns::kMajorityWriteConcernShardingTimeout);
} else {
- store.update(opCtx.get(),
- BSON(StateDoc::kIdFieldName << _coordId.toBSON()),
- newDoc.toBSON(),
- WriteConcerns::kMajorityWriteConcernNoTimeout);
+ _updateStateDocument(opCtx.get(), newDoc);
}
_doc = std::move(newDoc);
}
+const ConfigsvrCoordinatorMetadata& SetUserWriteBlockModeCoordinator::metadata() const {
+ return _doc.getConfigsvrCoordinatorMetadata();
+}
+
ExecutorFuture<void> SetUserWriteBlockModeCoordinator::_runImpl(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) noexcept {
@@ -135,6 +137,11 @@ ExecutorFuture<void> SetUserWriteBlockModeCoordinator::_runImpl(
auto* opCtx = opCtxHolder.get();
auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
+ // Get an incremented {lsid, txNnumber} pair that will be attached to the command
+ // sent to the shards to guarantee message replay protection.
+ _doc = _updateSession(opCtx, _doc);
+ const auto session = _getCurrentSession();
+
// Ensure the topology is stable so we don't miss propagating the write blocking
// state to any concurrently added shard. Keep it stable until we have persisted the
// user write blocking state on the configsvr so that new shards that get added will
@@ -147,7 +154,8 @@ ExecutorFuture<void> SetUserWriteBlockModeCoordinator::_runImpl(
opCtx,
executor,
_doc.getBlock(),
- ShardsvrSetUserWriteBlockModePhaseEnum::kPrepare);
+ ShardsvrSetUserWriteBlockModePhaseEnum::kPrepare,
+ session);
// Durably store the state on the configsvr.
if (_doc.getBlock()) {
@@ -177,6 +185,11 @@ ExecutorFuture<void> SetUserWriteBlockModeCoordinator::_runImpl(
auto* opCtx = opCtxHolder.get();
auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
+ // Get an incremented {lsid, txNnumber} pair that will be attached to the command sent
+ // to the shards to guarantee message replay protection.
+ _doc = _updateSession(opCtx, _doc);
+ const auto session = _getCurrentSession();
+
// Ensure the topology is stable so we don't miss propagating the write blocking state
// to any concurrently added shard. Keep it stable until we have persisted the user
// write blocking state on the configsvr so that new shards that get added will e see
@@ -189,7 +202,8 @@ ExecutorFuture<void> SetUserWriteBlockModeCoordinator::_runImpl(
opCtx,
executor,
_doc.getBlock(),
- ShardsvrSetUserWriteBlockModePhaseEnum::kComplete);
+ ShardsvrSetUserWriteBlockModePhaseEnum::kComplete,
+ session);
// Durably store the state on the configsvr.
if (_doc.getBlock()) {
diff --git a/src/mongo/db/s/config/set_user_write_block_mode_coordinator.h b/src/mongo/db/s/config/set_user_write_block_mode_coordinator.h
index f704ac4087b..6ab9d93304b 100644
--- a/src/mongo/db/s/config/set_user_write_block_mode_coordinator.h
+++ b/src/mongo/db/s/config/set_user_write_block_mode_coordinator.h
@@ -57,6 +57,8 @@ private:
ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) noexcept override;
+ const ConfigsvrCoordinatorMetadata& metadata() const override;
+
template <typename Func>
auto _executePhase(const Phase& newPhase, Func&& func) {
return [=] {
diff --git a/src/mongo/db/s/shardsvr_set_user_write_block_mode_command.cpp b/src/mongo/db/s/shardsvr_set_user_write_block_mode_command.cpp
index ef4f067f483..5ecfdc73bd6 100644
--- a/src/mongo/db/s/shardsvr_set_user_write_block_mode_command.cpp
+++ b/src/mongo/db/s/shardsvr_set_user_write_block_mode_command.cpp
@@ -33,6 +33,8 @@
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/commands.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/s/sharding_ddl_coordinator.h"
#include "mongo/db/s/sharding_ddl_coordinator_service.h"
#include "mongo/db/s/user_writes_recoverable_critical_section_service.h"
@@ -63,10 +65,25 @@ public:
hangInShardsvrSetUserWriteBlockMode.pauseWhileSet();
- const auto startBlocking = request().getGlobal();
+ _runImpl(opCtx, request());
+
+ // Since it is possible that no actual write happened with this txnNumber, we need to
+ // make a dummy write so that secondaries can be aware of this txn.
+ DBDirectClient client(opCtx);
+ client.update(NamespaceString::kServerConfigurationNamespace.ns(),
+ BSON("_id"
+ << "SetUseWriteBlockModeStats"),
+ BSON("$inc" << BSON("count" << 1)),
+ true /* upsert */,
+ false /* multi */);
+ }
+
+ private:
+ void _runImpl(OperationContext* opCtx, const Request& request) {
+ const auto startBlocking = request.getGlobal();
if (startBlocking) {
- switch (request().getPhase()) {
+ switch (request.getPhase()) {
case ShardsvrSetUserWriteBlockModePhaseEnum::kPrepare:
UserWritesRecoverableCriticalSectionService::get(opCtx)
->acquireRecoverableCriticalSectionBlockNewShardedDDL(
@@ -77,8 +94,8 @@ public:
// Wait for ongoing ShardingDDLCoordinators to finish. This ensures that all
// coordinators that started before enabling blocking have finish, and that
// any new coordinator that is started after this point will see the
- // blocking is enabled. Wait only for coordinators that don't have
- // the user-write-blocking bypass enabled -- the ones allowed to bypass user
+ // blocking is enabled. Wait only for coordinators that don't have the
+ // user-write-blocking bypass enabled -- the ones allowed to bypass user
// write blocking don't care about the write blocking state.
{
const auto shouldWaitPred =
@@ -113,7 +130,7 @@ public:
MONGO_UNREACHABLE;
}
} else {
- switch (request().getPhase()) {
+ switch (request.getPhase()) {
case ShardsvrSetUserWriteBlockModePhaseEnum::kPrepare:
UserWritesRecoverableCriticalSectionService::get(opCtx)
->demoteRecoverableCriticalSectionToNoLongerBlockUserWrites(
@@ -134,7 +151,6 @@ public:
}
}
- private:
NamespaceString ns() const override {
return NamespaceString();
}
diff --git a/src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp b/src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp
index e54abafe898..5aec4a6521f 100644
--- a/src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp
+++ b/src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp
@@ -334,12 +334,25 @@ void UserWritesRecoverableCriticalSectionService::releaseRecoverableCriticalSect
// Release the critical section by deleting the critical section document. The OpObserver
// will release the in-memory CS when reacting to the delete event.
- PersistentTaskStore<UserWriteBlockingCriticalSectionDocument> store(
- NamespaceString::kUserWritesCriticalSectionsNamespace);
- store.remove(
- opCtx,
- BSON(UserWriteBlockingCriticalSectionDocument::kNssFieldName << nss.toString()),
- ShardingCatalogClient::kLocalWriteConcern);
+ DBDirectClient dbClient(opCtx);
+ const auto cmdResponse = dbClient.runCommand([&] {
+ write_ops::DeleteCommandRequest deleteOp(
+ NamespaceString::kUserWritesCriticalSectionsNamespace);
+
+ deleteOp.setDeletes({[&] {
+ write_ops::DeleteOpEntry entry;
+ entry.setQ(BSON(UserWriteBlockingCriticalSectionDocument::kNssFieldName
+ << nss.toString()));
+ // At most one doc can possibly match the above query.
+ entry.setMulti(false);
+ return entry;
+ }()});
+
+ return deleteOp.serialize({});
+ }());
+
+ const auto commandReply = cmdResponse->getCommandReply();
+ uassertStatusOK(getStatusFromWriteCommandReply(commandReply));
}
LOGV2_DEBUG(
diff --git a/src/mongo/db/transaction_validation.cpp b/src/mongo/db/transaction_validation.cpp
index 39811a02295..5ae750ec2f5 100644
--- a/src/mongo/db/transaction_validation.cpp
+++ b/src/mongo/db/transaction_validation.cpp
@@ -60,7 +60,8 @@ const StringMap<int> retryableWriteCommands = {{"delete", 1},
{"_configsvrRenameCollectionMetadata", 1},
{"_shardsvrParticipantBlock", 1},
{"_configsvrCollMod", 1},
- {"_shardsvrCollModParticipant", 1}};
+ {"_shardsvrCollModParticipant", 1},
+ {"_shardsvrSetUserWriteBlockMode", 1}};
// Commands that can be sent with session info but should not check out a session.
const StringMap<int> skipSessionCheckoutList = {