diff options
author | Simon Graetzer <simon.gratzer@mongodb.com> | 2021-07-21 12:22:35 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-07-21 12:47:44 +0000 |
commit | 26862ed87cd1001577565f20f1d14b5bdb742ed2 (patch) | |
tree | f586d30bbadc708856354d56cd1a6158eb199022 /src/mongo | |
parent | 4a399e6d1f7da8022e7d8d489bb9500ffded6d58 (diff) | |
download | mongo-26862ed87cd1001577565f20f1d14b5bdb742ed2.tar.gz |
SERVER-57559 Implement LSID cache for DDL coordinators
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/s/create_collection_coordinator.cpp | 28 | ||||
-rw-r--r-- | src/mongo/db/s/create_collection_coordinator.h | 6 | ||||
-rw-r--r-- | src/mongo/db/s/drop_collection_coordinator.cpp | 46 | ||||
-rw-r--r-- | src/mongo/db/s/drop_collection_coordinator.h | 6 | ||||
-rw-r--r-- | src/mongo/db/s/drop_database_coordinator.cpp | 27 | ||||
-rw-r--r-- | src/mongo/db/s/drop_database_coordinator.h | 6 | ||||
-rw-r--r-- | src/mongo/db/s/rename_collection_coordinator.cpp | 26 | ||||
-rw-r--r-- | src/mongo/db/s/rename_collection_coordinator.h | 6 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_coordinator.cpp | 89 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_coordinator.h | 78 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_coordinator.idl | 14 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_balancer_configure_auto_split.cpp | 120 |
12 files changed, 308 insertions, 144 deletions
diff --git a/src/mongo/db/s/create_collection_coordinator.cpp b/src/mongo/db/s/create_collection_coordinator.cpp index 86c0f9c592d..11a787cae82 100644 --- a/src/mongo/db/s/create_collection_coordinator.cpp +++ b/src/mongo/db/s/create_collection_coordinator.cpp @@ -799,29 +799,7 @@ void CreateCollectionCoordinator::_logEndCreateCollection(OperationContext* opCt opCtx, "shardCollection.end", nss().ns(), collectionDetail.obj()); } -// Phase change and document handling API. -void CreateCollectionCoordinator::_insertCoordinatorDocument(CoordDoc&& doc) { - auto docBSON = _doc.toBSON(); - auto coorMetadata = doc.getShardingDDLCoordinatorMetadata(); - coorMetadata.setRecoveredFromDisk(true); - doc.setShardingDDLCoordinatorMetadata(coorMetadata); - - auto opCtx = cc().makeOperationContext(); - PersistentTaskStore<CoordDoc> store(NamespaceString::kShardingDDLCoordinatorsNamespace); - store.add(opCtx.get(), doc, WriteConcerns::kMajorityWriteConcern); - _doc = std::move(doc); -} - -void CreateCollectionCoordinator::_updateCoordinatorDocument(CoordDoc&& newDoc) { - auto opCtx = cc().makeOperationContext(); - PersistentTaskStore<CoordDoc> store(NamespaceString::kShardingDDLCoordinatorsNamespace); - store.update(opCtx.get(), - BSON(CoordDoc::kIdFieldName << _doc.getId().toBSON()), - newDoc.toBSON(), - WriteConcerns::kMajorityWriteConcern); - - _doc = std::move(newDoc); -} +// Phase change API. void CreateCollectionCoordinator::_enterPhase(Phase newPhase) { CoordDoc newDoc(_doc); @@ -835,10 +813,10 @@ void CreateCollectionCoordinator::_enterPhase(Phase newPhase) { "oldPhase"_attr = CreateCollectionCoordinatorPhase_serializer(_doc.getPhase())); if (_doc.getPhase() == Phase::kUnset) { - _insertCoordinatorDocument(std::move(newDoc)); + _doc = _insertStateDocument(std::move(newDoc)); return; } - _updateCoordinatorDocument(std::move(newDoc)); + _doc = _updateStateDocument(cc().makeOperationContext().get(), std::move(newDoc)); } } // namespace mongo diff --git a/src/mongo/db/s/create_collection_coordinator.h b/src/mongo/db/s/create_collection_coordinator.h index c7cc1b2f418..bb1f303e3c3 100644 --- a/src/mongo/db/s/create_collection_coordinator.h +++ b/src/mongo/db/s/create_collection_coordinator.h @@ -66,6 +66,10 @@ public: } private: + ShardingDDLCoordinatorMetadata const& metadata() const override { + return _doc.getShardingDDLCoordinatorMetadata(); + } + ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) noexcept override; @@ -86,8 +90,6 @@ private: }; }; - void _insertCoordinatorDocument(CoordDoc&& doc); - void _updateCoordinatorDocument(CoordDoc&& newStateDoc); void _enterPhase(Phase newState); /** diff --git a/src/mongo/db/s/drop_collection_coordinator.cpp b/src/mongo/db/s/drop_collection_coordinator.cpp index 3156329d35d..2136e1f2f96 100644 --- a/src/mongo/db/s/drop_collection_coordinator.cpp +++ b/src/mongo/db/s/drop_collection_coordinator.cpp @@ -31,7 +31,6 @@ #include "mongo/db/s/drop_collection_coordinator.h" -#include "mongo/db/persistent_task_store.h" #include "mongo/db/s/sharding_ddl_util.h" #include "mongo/db/s/sharding_logging.h" #include "mongo/db/s/sharding_state.h" @@ -68,27 +67,6 @@ boost::optional<BSONObj> DropCollectionCoordinator::reportForCurrentOp( bob.append("active", true); return bob.obj(); } -void DropCollectionCoordinator::_insertStateDocument(StateDoc&& doc) { - auto coorMetadata = doc.getShardingDDLCoordinatorMetadata(); - coorMetadata.setRecoveredFromDisk(true); - doc.setShardingDDLCoordinatorMetadata(coorMetadata); - - auto opCtx = cc().makeOperationContext(); - PersistentTaskStore<StateDoc> store(NamespaceString::kShardingDDLCoordinatorsNamespace); - store.add(opCtx.get(), doc, WriteConcerns::kMajorityWriteConcern); - _doc = std::move(doc); -} - -void DropCollectionCoordinator::_updateStateDocument(StateDoc&& newDoc) { - auto opCtx = cc().makeOperationContext(); - PersistentTaskStore<StateDoc> store(NamespaceString::kShardingDDLCoordinatorsNamespace); - store.update(opCtx.get(), - BSON(StateDoc::kIdFieldName << _doc.getId().toBSON()), - newDoc.toBSON(), - WriteConcerns::kMajorityWriteConcern); - - _doc = std::move(newDoc); -} DropReply DropCollectionCoordinator::dropCollectionLocally(OperationContext* opCtx, const NamespaceString& nss) { @@ -125,10 +103,10 @@ void DropCollectionCoordinator::_enterPhase(Phase newPhase) { "oldPhase"_attr = DropCollectionCoordinatorPhase_serializer(_doc.getPhase())); if (_doc.getPhase() == Phase::kUnset) { - _insertStateDocument(std::move(newDoc)); + _doc = _insertStateDocument(std::move(newDoc)); return; } - _updateStateDocument(std::move(newDoc)); + _doc = _updateStateDocument(cc().makeOperationContext().get(), std::move(newDoc)); } ExecutorFuture<void> DropCollectionCoordinator::_runImpl( @@ -177,15 +155,16 @@ ExecutorFuture<void> DropCollectionCoordinator::_runImpl( sharding_ddl_util::removeTagsMetadataFromConfig(opCtx, nss()); } + // get a Lsid and an incremented txnNumber. Ensures we are the primary + _doc = _updateSession(opCtx, _doc); + const auto primaryShardId = ShardingState::get(opCtx)->shardId(); const ShardsvrDropCollectionParticipant dropCollectionParticipant(nss()); + const auto cmdObj = CommandHelpers::appendMajorityWriteConcern( + dropCollectionParticipant.toBSON({})); + sharding_ddl_util::sendAuthenticatedCommandToShards( - opCtx, - nss().db(), - CommandHelpers::appendMajorityWriteConcern( - dropCollectionParticipant.toBSON({})), - {primaryShardId}, - **executor); + opCtx, nss().db(), cmdObj, {primaryShardId}, **executor); // We need to send the drop to all the shards because both movePrimary and // moveChunk leave garbage behind for sharded collections. @@ -195,12 +174,7 @@ ExecutorFuture<void> DropCollectionCoordinator::_runImpl( std::remove(participants.begin(), participants.end(), primaryShardId), participants.end()); sharding_ddl_util::sendAuthenticatedCommandToShards( - opCtx, - nss().db(), - CommandHelpers::appendMajorityWriteConcern( - dropCollectionParticipant.toBSON({})), - participants, - **executor); + opCtx, nss().db(), cmdObj, participants, **executor); ShardingLogging::get(opCtx)->logChange(opCtx, "dropCollection", nss().ns()); LOGV2(5390503, "Collection dropped", "namespace"_attr = nss()); diff --git a/src/mongo/db/s/drop_collection_coordinator.h b/src/mongo/db/s/drop_collection_coordinator.h index 9baf2f47d6f..1f43b204de4 100644 --- a/src/mongo/db/s/drop_collection_coordinator.h +++ b/src/mongo/db/s/drop_collection_coordinator.h @@ -57,6 +57,10 @@ public: static DropReply dropCollectionLocally(OperationContext* opCtx, const NamespaceString& nss); private: + ShardingDDLCoordinatorMetadata const& metadata() const override { + return _doc.getShardingDDLCoordinatorMetadata(); + } + ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) noexcept override; @@ -77,8 +81,6 @@ private: }; } - void _insertStateDocument(StateDoc&& doc); - void _updateStateDocument(StateDoc&& newStateDoc); void _enterPhase(Phase newPhase); DropCollectionCoordinatorDocument _doc; diff --git a/src/mongo/db/s/drop_database_coordinator.cpp b/src/mongo/db/s/drop_database_coordinator.cpp index 17d2b612f24..e20a72dbab1 100644 --- a/src/mongo/db/s/drop_database_coordinator.cpp +++ b/src/mongo/db/s/drop_database_coordinator.cpp @@ -117,26 +117,6 @@ boost::optional<BSONObj> DropDatabaseCoordinator::reportForCurrentOp( return bob.obj(); } -void DropDatabaseCoordinator::_insertStateDocument(OperationContext* opCtx, StateDoc&& doc) { - auto coorMetadata = doc.getShardingDDLCoordinatorMetadata(); - coorMetadata.setRecoveredFromDisk(true); - doc.setShardingDDLCoordinatorMetadata(coorMetadata); - - PersistentTaskStore<StateDoc> store(NamespaceString::kShardingDDLCoordinatorsNamespace); - store.add(opCtx, doc, WriteConcerns::kMajorityWriteConcern); - _doc = std::move(doc); -} - -void DropDatabaseCoordinator::_updateStateDocument(OperationContext* opCtx, StateDoc&& newDoc) { - PersistentTaskStore<StateDoc> store(NamespaceString::kShardingDDLCoordinatorsNamespace); - store.update(opCtx, - BSON(StateDoc::kIdFieldName << _doc.getId().toBSON()), - newDoc.toBSON(), - WriteConcerns::kMajorityWriteConcern); - - _doc = std::move(newDoc); -} - void DropDatabaseCoordinator::_enterPhase(Phase newPhase) { StateDoc newDoc(_doc); newDoc.setPhase(newPhase); @@ -148,12 +128,11 @@ void DropDatabaseCoordinator::_enterPhase(Phase newPhase) { "newPhase"_attr = DropDatabaseCoordinatorPhase_serializer(newDoc.getPhase()), "oldPhase"_attr = DropDatabaseCoordinatorPhase_serializer(_doc.getPhase())); - auto opCtx = cc().makeOperationContext(); if (_doc.getPhase() == Phase::kUnset) { - _insertStateDocument(opCtx.get(), std::move(newDoc)); + _doc = _insertStateDocument(std::move(newDoc)); return; } - _updateStateDocument(opCtx.get(), std::move(newDoc)); + _doc = _updateStateDocument(cc().makeOperationContext().get(), std::move(newDoc)); } ExecutorFuture<void> DropDatabaseCoordinator::_runImpl( @@ -191,7 +170,7 @@ ExecutorFuture<void> DropDatabaseCoordinator::_runImpl( auto newStateDoc = _doc; newStateDoc.setCollInfo(coll); - _updateStateDocument(opCtx, std::move(newStateDoc)); + _doc = _updateStateDocument(opCtx, std::move(newStateDoc)); dropShardedCollection(opCtx, coll, executor); } diff --git a/src/mongo/db/s/drop_database_coordinator.h b/src/mongo/db/s/drop_database_coordinator.h index 085e0b6d8cd..2b9d4774088 100644 --- a/src/mongo/db/s/drop_database_coordinator.h +++ b/src/mongo/db/s/drop_database_coordinator.h @@ -49,6 +49,10 @@ public: MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept override; private: + ShardingDDLCoordinatorMetadata const& metadata() const override { + return _doc.getShardingDDLCoordinatorMetadata(); + } + ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) noexcept override; @@ -69,8 +73,6 @@ private: }; } - void _insertStateDocument(OperationContext* opCtx, StateDoc&& doc); - void _updateStateDocument(OperationContext* opCtx, StateDoc&& newStateDoc); void _enterPhase(Phase newPhase); DropDatabaseCoordinatorDocument _doc; diff --git a/src/mongo/db/s/rename_collection_coordinator.cpp b/src/mongo/db/s/rename_collection_coordinator.cpp index 508eb01f51f..cf943e4a559 100644 --- a/src/mongo/db/s/rename_collection_coordinator.cpp +++ b/src/mongo/db/s/rename_collection_coordinator.cpp @@ -111,28 +111,6 @@ boost::optional<BSONObj> RenameCollectionCoordinator::reportForCurrentOp( return bob.obj(); } -void RenameCollectionCoordinator::_insertStateDocument(StateDoc&& doc) { - auto coorMetadata = doc.getShardingDDLCoordinatorMetadata(); - coorMetadata.setRecoveredFromDisk(true); - doc.setShardingDDLCoordinatorMetadata(coorMetadata); - - auto opCtx = cc().makeOperationContext(); - PersistentTaskStore<StateDoc> store(NamespaceString::kShardingDDLCoordinatorsNamespace); - store.add(opCtx.get(), doc, WriteConcerns::kMajorityWriteConcern); - _doc = std::move(doc); -} - -void RenameCollectionCoordinator::_updateStateDocument(StateDoc&& newDoc) { - auto opCtx = cc().makeOperationContext(); - PersistentTaskStore<StateDoc> store(NamespaceString::kShardingDDLCoordinatorsNamespace); - store.update(opCtx.get(), - BSON(StateDoc::kIdFieldName << _doc.getId().toBSON()), - newDoc.toBSON(), - WriteConcerns::kMajorityWriteConcern); - - _doc = std::move(newDoc); -} - void RenameCollectionCoordinator::_enterPhase(Phase newPhase) { StateDoc newDoc(_doc); newDoc.setPhase(newPhase); @@ -146,10 +124,10 @@ void RenameCollectionCoordinator::_enterPhase(Phase newPhase) { "oldPhase"_attr = RenameCollectionCoordinatorPhase_serializer(_doc.getPhase())); if (_doc.getPhase() == Phase::kUnset) { - _insertStateDocument(std::move(newDoc)); + _doc = _insertStateDocument(std::move(newDoc)); return; } - _updateStateDocument(std::move(newDoc)); + _doc = _updateStateDocument(cc().makeOperationContext().get(), std::move(newDoc)); } ExecutorFuture<void> RenameCollectionCoordinator::_runImpl( diff --git a/src/mongo/db/s/rename_collection_coordinator.h b/src/mongo/db/s/rename_collection_coordinator.h index c3779b6c289..badc1a59ee9 100644 --- a/src/mongo/db/s/rename_collection_coordinator.h +++ b/src/mongo/db/s/rename_collection_coordinator.h @@ -59,6 +59,10 @@ public: } private: + ShardingDDLCoordinatorMetadata const& metadata() const override { + return _doc.getShardingDDLCoordinatorMetadata(); + } + ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) noexcept override; @@ -81,8 +85,6 @@ private: }; } - void _insertStateDocument(StateDoc&& doc); - void _updateStateDocument(StateDoc&& newStateDoc); void _enterPhase(Phase newPhase); RenameCollectionCoordinatorDocument _doc; diff --git a/src/mongo/db/s/sharding_ddl_coordinator.cpp b/src/mongo/db/s/sharding_ddl_coordinator.cpp index e519eec4787..a638319db54 100644 --- a/src/mongo/db/s/sharding_ddl_coordinator.cpp +++ b/src/mongo/db/s/sharding_ddl_coordinator.cpp @@ -34,6 +34,7 @@ #include "mongo/db/s/sharding_ddl_coordinator.h" #include "mongo/db/dbdirectclient.h" +#include "mongo/db/logical_session_id_helpers.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/s/database_sharding_state.h" #include "mongo/db/s/operation_sharding_state.h" @@ -62,9 +63,8 @@ ShardingDDLCoordinatorMetadata extractShardingDDLCoordinatorMetadata(const BSONO ShardingDDLCoordinator::ShardingDDLCoordinator(ShardingDDLCoordinatorService* service, const BSONObj& coorDoc) : _service(service), - _coorMetadata(extractShardingDDLCoordinatorMetadata(coorDoc)), - _coorName(DDLCoordinatorType_serializer(_coorMetadata.getId().getOperationType())), - _recoveredFromDisk(_coorMetadata.getRecoveredFromDisk()) {} + _coordId(extractShardingDDLCoordinatorMetadata(coorDoc).getId()), + _recoveredFromDisk(extractShardingDDLCoordinatorMetadata(coorDoc).getRecoveredFromDisk()) {} ShardingDDLCoordinator::~ShardingDDLCoordinator() { invariant(_constructionCompletionPromise.getFuture().isReady()); @@ -79,8 +79,7 @@ bool ShardingDDLCoordinator::_removeDocument(OperationContext* opCtx) { deleteOp.setDeletes({[&] { write_ops::DeleteOpEntry entry; - entry.setQ(BSON(ShardingDDLCoordinatorMetadata::kIdFieldName - << _coorMetadata.getId().toBSON())); + entry.setQ(BSON(ShardingDDLCoordinatorMetadata::kIdFieldName << _coordId.toBSON())); entry.setMulti(true); return entry; }()}); @@ -116,8 +115,9 @@ ExecutorFuture<void> ShardingDDLCoordinator::_acquireLockAsync( auto* opCtx = opCtxHolder.get(); auto distLockManager = DistLockManager::get(opCtx); + const auto coorName = DDLCoordinatorType_serializer(_coordId.getOperationType()); auto dbDistLock = uassertStatusOK(distLockManager->lock( - opCtx, resource, _coorName, DistLockManager::kDefaultLockTimeout)); + opCtx, resource, coorName, DistLockManager::kDefaultLockTimeout)); _scopedLocks.emplace(dbDistLock.moveToAnotherThread()); }) .until([this](Status status) { return (!_recoveredFromDisk) || status.isOK(); }) @@ -129,7 +129,7 @@ void ShardingDDLCoordinator::interrupt(Status status) { LOGV2_DEBUG(5390535, 1, "Sharding DDL Coordinator received an interrupt", - "coordinatorId"_attr = _coorMetadata.getId(), + "coordinatorId"_attr = _coordId, "reason"_attr = redact(status)); // Resolve any unresolved promises to avoid hanging. @@ -153,10 +153,10 @@ SemiFuture<void> ShardingDDLCoordinator::run(std::shared_ptr<executor::ScopedTas if (!nss().isConfigDB() && !_recoveredFromDisk) { auto opCtxHolder = cc().makeOperationContext(); auto* opCtx = opCtxHolder.get(); - invariant(_coorMetadata.getDatabaseVersion()); + invariant(metadata().getDatabaseVersion()); OperationShardingState::get(opCtx).initializeClientRoutingVersions( - nss(), boost::none /* ChunkVersion */, _coorMetadata.getDatabaseVersion()); + nss(), boost::none /* ChunkVersion */, metadata().getDatabaseVersion()); // Check under the dbLock if this is still the primary shard for the database DatabaseShardingState::checkIsPrimaryShardForDb(opCtx, nss().db()); }; @@ -188,10 +188,8 @@ SemiFuture<void> ShardingDDLCoordinator::run(std::shared_ptr<executor::ScopedTas .onError([this, anchor = shared_from_this()](const Status& status) { static constexpr auto& errorMsg = "Failed to complete construction of sharding DDL coordinator"; - LOGV2_ERROR(5390530, - errorMsg, - "coordinatorId"_attr = _coorMetadata.getId(), - "reason"_attr = redact(status)); + LOGV2_ERROR( + 5390530, errorMsg, "coordinatorId"_attr = _coordId, "reason"_attr = redact(status)); stdx::lock_guard<Latch> lg(_mutex); if (!_constructionCompletionPromise.getFuture().isReady()) { @@ -222,7 +220,7 @@ SemiFuture<void> ShardingDDLCoordinator::run(std::shared_ptr<executor::ScopedTas LOGV2_DEBUG(5656000, 1, "Re-executing sharding DDL coordinator", - "coordinatorId"_attr = _coorMetadata.getId(), + "coordinatorId"_attr = _coordId, "reason"_attr = redact(status)); return false; } @@ -240,25 +238,31 @@ SemiFuture<void> ShardingDDLCoordinator::run(std::shared_ptr<executor::ScopedTas // Release the coordinator only if we are not stepping down if (!status.isA<ErrorCategory::NotPrimaryError>() && !status.isA<ErrorCategory::ShutdownError>()) { - try { LOGV2(5565601, "Releasing sharding DDL coordinator", - "coordinatorId"_attr = _coorMetadata.getId()); + "coordinatorId"_attr = _coordId); + auto session = metadata().getSession(); const auto docWasRemoved = _removeDocument(opCtx); if (!docWasRemoved) { // Release the instance without interrupting it - _service->releaseInstance(BSON(ShardingDDLCoordinatorMetadata::kIdFieldName - << _coorMetadata.getId().toBSON()), - Status::OK()); + _service->releaseInstance( + BSON(ShardingDDLCoordinatorMetadata::kIdFieldName << _coordId.toBSON()), + Status::OK()); + } + + if (status.isOK() && session) { + // Return lsid to the SessionCache. If status is not OK, let the lsid be + // discarded. + SessionCache::get(opCtx)->release(*session); } } catch (DBException& ex) { static constexpr auto errMsg = "Failed to release sharding DDL coordinator"; LOGV2_WARNING(5565605, errMsg, - "coordinatorId"_attr = _coorMetadata.getId(), + "coordinatorId"_attr = _coordId, "error"_attr = redact(ex)); completionStatus = ex.toStatus(errMsg); } @@ -297,4 +301,49 @@ SemiFuture<void> ShardingDDLCoordinator_NORESILIENT::run(OperationContext* opCtx return runImpl(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor()); } +const auto serviceDecorator = + ServiceContext::declareDecoration<ShardingDDLCoordinator::SessionCache>(); + +auto ShardingDDLCoordinator::SessionCache::get(ServiceContext* serviceContext) -> SessionCache* { + return &serviceDecorator(serviceContext); +} + +auto ShardingDDLCoordinator::SessionCache::get(OperationContext* opCtx) -> SessionCache* { + return get(opCtx->getServiceContext()); +} + +ShardingDDLSession ShardingDDLCoordinator::SessionCache::acquire() { + const ShardingDDLSession session = [&] { + stdx::unique_lock<Latch> lock(_cacheMutex); + + if (!_cache.empty()) { + auto session = std::move(_cache.top()); + _cache.pop(); + return session; + } else { + return ShardingDDLSession(makeSystemLogicalSessionId(), TxnNumber(0)); + } + }(); + + LOGV2_DEBUG(5565606, + 2, + "Acquired new DDL logical session", + "lsid"_attr = session.getLsid(), + "txnNumber"_attr = session.getTxnNumber()); + + return session; +} + +void ShardingDDLCoordinator::SessionCache::release(ShardingDDLSession session) { + LOGV2_DEBUG(5565607, + 2, + "Released DDL logical session", + "lsid"_attr = session.getLsid(), + "highestUsedTxnNumber"_attr = session.getTxnNumber()); + + session.setTxnNumber(session.getTxnNumber() + 1); + stdx::unique_lock<Latch> lock(_cacheMutex); + _cache.push(std::move(session)); +} + } // namespace mongo diff --git a/src/mongo/db/s/sharding_ddl_coordinator.h b/src/mongo/db/s/sharding_ddl_coordinator.h index 07397e7c4dc..49372a81080 100644 --- a/src/mongo/db/s/sharding_ddl_coordinator.h +++ b/src/mongo/db/s/sharding_ddl_coordinator.h @@ -31,6 +31,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" +#include "mongo/db/persistent_task_store.h" #include "mongo/db/s/dist_lock_manager.h" #include "mongo/db/s/forwardable_operation_metadata.h" #include "mongo/db/s/sharding_ddl_coordinator_gen.h" @@ -81,24 +82,89 @@ public: } const NamespaceString& nss() const { - return _coorMetadata.getId().getNss(); + return _coordId.getNss(); } const ForwardableOperationMetadata& getForwardableOpMetadata() const { - invariant(_coorMetadata.getForwardableOpMetadata()); - return _coorMetadata.getForwardableOpMetadata().get(); + invariant(metadata().getForwardableOpMetadata()); + return metadata().getForwardableOpMetadata().get(); } + // Cached LSIDs shared between DDL coordinator instances + class SessionCache { + + public: + SessionCache() = default; + + static SessionCache* get(ServiceContext* serviceContext); + static SessionCache* get(OperationContext* opCtx); + + ShardingDDLSession acquire(); + + void release(ShardingDDLSession); + + private: + std::stack<ShardingDDLSession> _cache; + + // Protects _cache. + mutable Mutex _cacheMutex = MONGO_MAKE_LATCH("SessionCache::_cacheMutex"); + }; + protected: virtual std::vector<StringData> _acquireAdditionalLocks(OperationContext* opCtx) { return {}; }; + virtual ShardingDDLCoordinatorMetadata const& metadata() const = 0; + + template <typename StateDoc> + StateDoc _insertStateDocument(StateDoc&& newDoc) { + auto copyMetadata = newDoc.getShardingDDLCoordinatorMetadata(); + copyMetadata.setRecoveredFromDisk(true); + newDoc.setShardingDDLCoordinatorMetadata(copyMetadata); + + auto opCtx = cc().makeOperationContext(); + PersistentTaskStore<StateDoc> store(NamespaceString::kShardingDDLCoordinatorsNamespace); + store.add(opCtx.get(), newDoc, WriteConcerns::kMajorityWriteConcern); + + return std::move(newDoc); + } + + template <typename StateDoc> + StateDoc _updateStateDocument(OperationContext* opCtx, StateDoc&& newDoc) { + PersistentTaskStore<StateDoc> store(NamespaceString::kShardingDDLCoordinatorsNamespace); + invariant(newDoc.getShardingDDLCoordinatorMetadata().getRecoveredFromDisk()); + store.update(opCtx, + BSON(StateDoc::kIdFieldName << newDoc.getId().toBSON()), + newDoc.toBSON(), + WriteConcerns::kMajorityWriteConcern); + return std::move(newDoc); + } + + // lazily acqiure Logical Session ID and a txn number + template <typename StateDoc> + StateDoc _updateSession(OperationContext* opCtx, StateDoc const& doc) { + auto newShardingDDLCoordinatorMetadata = doc.getShardingDDLCoordinatorMetadata(); + + auto optSession = newShardingDDLCoordinatorMetadata.getSession(); + if (optSession) { + auto txnNumber = optSession->getTxnNumber(); + optSession->setTxnNumber(++txnNumber); + newShardingDDLCoordinatorMetadata.setSession(optSession); + } else { + newShardingDDLCoordinatorMetadata.setSession(SessionCache::get(opCtx)->acquire()); + } + + StateDoc newDoc(doc); + newDoc.setShardingDDLCoordinatorMetadata(std::move(newShardingDDLCoordinatorMetadata)); + return _updateStateDocument(opCtx, std::move(newDoc)); + } + +protected: ShardingDDLCoordinatorService* _service; - ShardingDDLCoordinatorMetadata _coorMetadata; - StringData _coorName; + const ShardingDDLCoordinatorId _coordId; - bool _recoveredFromDisk; + const bool _recoveredFromDisk; bool _completeOnError{false}; private: diff --git a/src/mongo/db/s/sharding_ddl_coordinator.idl b/src/mongo/db/s/sharding_ddl_coordinator.idl index 3a28d9f2e67..6aeb578be09 100644 --- a/src/mongo/db/s/sharding_ddl_coordinator.idl +++ b/src/mongo/db/s/sharding_ddl_coordinator.idl @@ -36,6 +36,7 @@ global: imports: - "mongo/idl/basic_types.idl" + - "mongo/db/logical_session_id.idl" enums: DDLCoordinatorType: @@ -74,6 +75,15 @@ structs: operationType: description: "Type of the sharding DDL coordinator." type: DDLCoordinatorType + + ShardingDDLSession: + description: "Container for DDL coordinator session." + strict: true + fields: + lsid: + type: LogicalSessionId + txnNumber: + type: TxnNumber ShardingDDLCoordinatorMetadata: description: "Commong metadata for all sharding DDL coordinator." @@ -92,4 +102,6 @@ structs: databaseVersion: type: DatabaseVersion optional: true - + session: + type: ShardingDDLSession + optional: true diff --git a/src/mongo/s/commands/cluster_balancer_configure_auto_split.cpp b/src/mongo/s/commands/cluster_balancer_configure_auto_split.cpp new file mode 100644 index 00000000000..caeb86a724c --- /dev/null +++ b/src/mongo/s/commands/cluster_balancer_configure_auto_split.cpp @@ -0,0 +1,120 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/db/auth/action_set.h" +#include "mongo/db/auth/action_type.h" +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/auth/privilege.h" +#include "mongo/db/catalog_raii.h" +#include "mongo/db/client.h" +#include "mongo/db/commands.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/repl/repl_client_info.h" +#include "mongo/idl/idl_parser.h" +#include "mongo/s/catalog_cache_loader.h" +#include "mongo/s/grid.h" +#include "mongo/s/request_types/configure_collection_auto_split_gen.h" + + +namespace mongo { +namespace { + +class ConfigCollAutoSplitCmd final : public TypedCommand<ConfigCollAutoSplitCmd> { +public: + using Request = ConfigureCollAutoSplit; + using Response = void; + + class Invocation final : public InvocationBase { + public: + using InvocationBase::InvocationBase; + + StringData kStatusField = "status"_sd; + + Response typedRun(OperationContext* opCtx) { + const NamespaceString& nss = ns(); + + ConfigsvrConfigureCollAutoSplit configsvrRequest(nss); + configsvrRequest.setConfigureCollAutoSplit(request().getConfigureCollAutoSplit()); + configsvrRequest.setDbName(request().getDbName()); + + auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); + auto cmdResponse = uassertStatusOK(configShard->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + NamespaceString::kAdminDb.toString(), + configsvrRequest.toBSON({}), + Shard::RetryPolicy::kIdempotent)); + + uassertStatusOK(cmdResponse.commandStatus); + } + + private: + NamespaceString ns() const override { + return request().getCommandParameter(); + } + + bool supportsWriteConcern() const override { + return false; + } + + void doCheckAuthorization(OperationContext* opCtx) const override { + ActionSet actions({ActionType::splitChunk}); + if (request().getBalancerShouldMergeChunks().get_value_or(false)) { + actions.addAction(ActionType::moveChunk); + } + uassert(ErrorCodes::Unauthorized, + "Unauthorized", + AuthorizationSession::get(opCtx->getClient()) + ->isAuthorizedForActionsOnResource(ResourcePattern::forExactNamespace(ns()), + actions)); + } + }; + + std::string help() const override { + return "command to check whether the chunks of a given collection are in a quiesced state " + "or there are any which need to be moved because of (1) draining shards, (2) zone " + "violation or (3) imbalance between shards"; + } + + bool adminOnly() const override { + return true; + } + + AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { + return AllowedOnSecondary::kNever; + } + +} balancerConfigureAutoSplitCmd; + +} // namespace +} // namespace mongo |