summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorSimon Graetzer <simon.gratzer@mongodb.com>2021-07-21 12:22:35 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-07-21 12:47:44 +0000
commit26862ed87cd1001577565f20f1d14b5bdb742ed2 (patch)
treef586d30bbadc708856354d56cd1a6158eb199022 /src/mongo
parent4a399e6d1f7da8022e7d8d489bb9500ffded6d58 (diff)
downloadmongo-26862ed87cd1001577565f20f1d14b5bdb742ed2.tar.gz
SERVER-57559 Implement LSID cache for DDL coordinators
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/s/create_collection_coordinator.cpp28
-rw-r--r--src/mongo/db/s/create_collection_coordinator.h6
-rw-r--r--src/mongo/db/s/drop_collection_coordinator.cpp46
-rw-r--r--src/mongo/db/s/drop_collection_coordinator.h6
-rw-r--r--src/mongo/db/s/drop_database_coordinator.cpp27
-rw-r--r--src/mongo/db/s/drop_database_coordinator.h6
-rw-r--r--src/mongo/db/s/rename_collection_coordinator.cpp26
-rw-r--r--src/mongo/db/s/rename_collection_coordinator.h6
-rw-r--r--src/mongo/db/s/sharding_ddl_coordinator.cpp89
-rw-r--r--src/mongo/db/s/sharding_ddl_coordinator.h78
-rw-r--r--src/mongo/db/s/sharding_ddl_coordinator.idl14
-rw-r--r--src/mongo/s/commands/cluster_balancer_configure_auto_split.cpp120
12 files changed, 308 insertions, 144 deletions
diff --git a/src/mongo/db/s/create_collection_coordinator.cpp b/src/mongo/db/s/create_collection_coordinator.cpp
index 86c0f9c592d..11a787cae82 100644
--- a/src/mongo/db/s/create_collection_coordinator.cpp
+++ b/src/mongo/db/s/create_collection_coordinator.cpp
@@ -799,29 +799,7 @@ void CreateCollectionCoordinator::_logEndCreateCollection(OperationContext* opCt
opCtx, "shardCollection.end", nss().ns(), collectionDetail.obj());
}
-// Phase change and document handling API.
-void CreateCollectionCoordinator::_insertCoordinatorDocument(CoordDoc&& doc) {
- auto docBSON = _doc.toBSON();
- auto coorMetadata = doc.getShardingDDLCoordinatorMetadata();
- coorMetadata.setRecoveredFromDisk(true);
- doc.setShardingDDLCoordinatorMetadata(coorMetadata);
-
- auto opCtx = cc().makeOperationContext();
- PersistentTaskStore<CoordDoc> store(NamespaceString::kShardingDDLCoordinatorsNamespace);
- store.add(opCtx.get(), doc, WriteConcerns::kMajorityWriteConcern);
- _doc = std::move(doc);
-}
-
-void CreateCollectionCoordinator::_updateCoordinatorDocument(CoordDoc&& newDoc) {
- auto opCtx = cc().makeOperationContext();
- PersistentTaskStore<CoordDoc> store(NamespaceString::kShardingDDLCoordinatorsNamespace);
- store.update(opCtx.get(),
- BSON(CoordDoc::kIdFieldName << _doc.getId().toBSON()),
- newDoc.toBSON(),
- WriteConcerns::kMajorityWriteConcern);
-
- _doc = std::move(newDoc);
-}
+// Phase change API.
void CreateCollectionCoordinator::_enterPhase(Phase newPhase) {
CoordDoc newDoc(_doc);
@@ -835,10 +813,10 @@ void CreateCollectionCoordinator::_enterPhase(Phase newPhase) {
"oldPhase"_attr = CreateCollectionCoordinatorPhase_serializer(_doc.getPhase()));
if (_doc.getPhase() == Phase::kUnset) {
- _insertCoordinatorDocument(std::move(newDoc));
+ _doc = _insertStateDocument(std::move(newDoc));
return;
}
- _updateCoordinatorDocument(std::move(newDoc));
+ _doc = _updateStateDocument(cc().makeOperationContext().get(), std::move(newDoc));
}
} // namespace mongo
diff --git a/src/mongo/db/s/create_collection_coordinator.h b/src/mongo/db/s/create_collection_coordinator.h
index c7cc1b2f418..bb1f303e3c3 100644
--- a/src/mongo/db/s/create_collection_coordinator.h
+++ b/src/mongo/db/s/create_collection_coordinator.h
@@ -66,6 +66,10 @@ public:
}
private:
+ ShardingDDLCoordinatorMetadata const& metadata() const override {
+ return _doc.getShardingDDLCoordinatorMetadata();
+ }
+
ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) noexcept override;
@@ -86,8 +90,6 @@ private:
};
};
- void _insertCoordinatorDocument(CoordDoc&& doc);
- void _updateCoordinatorDocument(CoordDoc&& newStateDoc);
void _enterPhase(Phase newState);
/**
diff --git a/src/mongo/db/s/drop_collection_coordinator.cpp b/src/mongo/db/s/drop_collection_coordinator.cpp
index 3156329d35d..2136e1f2f96 100644
--- a/src/mongo/db/s/drop_collection_coordinator.cpp
+++ b/src/mongo/db/s/drop_collection_coordinator.cpp
@@ -31,7 +31,6 @@
#include "mongo/db/s/drop_collection_coordinator.h"
-#include "mongo/db/persistent_task_store.h"
#include "mongo/db/s/sharding_ddl_util.h"
#include "mongo/db/s/sharding_logging.h"
#include "mongo/db/s/sharding_state.h"
@@ -68,27 +67,6 @@ boost::optional<BSONObj> DropCollectionCoordinator::reportForCurrentOp(
bob.append("active", true);
return bob.obj();
}
-void DropCollectionCoordinator::_insertStateDocument(StateDoc&& doc) {
- auto coorMetadata = doc.getShardingDDLCoordinatorMetadata();
- coorMetadata.setRecoveredFromDisk(true);
- doc.setShardingDDLCoordinatorMetadata(coorMetadata);
-
- auto opCtx = cc().makeOperationContext();
- PersistentTaskStore<StateDoc> store(NamespaceString::kShardingDDLCoordinatorsNamespace);
- store.add(opCtx.get(), doc, WriteConcerns::kMajorityWriteConcern);
- _doc = std::move(doc);
-}
-
-void DropCollectionCoordinator::_updateStateDocument(StateDoc&& newDoc) {
- auto opCtx = cc().makeOperationContext();
- PersistentTaskStore<StateDoc> store(NamespaceString::kShardingDDLCoordinatorsNamespace);
- store.update(opCtx.get(),
- BSON(StateDoc::kIdFieldName << _doc.getId().toBSON()),
- newDoc.toBSON(),
- WriteConcerns::kMajorityWriteConcern);
-
- _doc = std::move(newDoc);
-}
DropReply DropCollectionCoordinator::dropCollectionLocally(OperationContext* opCtx,
const NamespaceString& nss) {
@@ -125,10 +103,10 @@ void DropCollectionCoordinator::_enterPhase(Phase newPhase) {
"oldPhase"_attr = DropCollectionCoordinatorPhase_serializer(_doc.getPhase()));
if (_doc.getPhase() == Phase::kUnset) {
- _insertStateDocument(std::move(newDoc));
+ _doc = _insertStateDocument(std::move(newDoc));
return;
}
- _updateStateDocument(std::move(newDoc));
+ _doc = _updateStateDocument(cc().makeOperationContext().get(), std::move(newDoc));
}
ExecutorFuture<void> DropCollectionCoordinator::_runImpl(
@@ -177,15 +155,16 @@ ExecutorFuture<void> DropCollectionCoordinator::_runImpl(
sharding_ddl_util::removeTagsMetadataFromConfig(opCtx, nss());
}
+ // get a Lsid and an incremented txnNumber. Ensures we are the primary
+ _doc = _updateSession(opCtx, _doc);
+
const auto primaryShardId = ShardingState::get(opCtx)->shardId();
const ShardsvrDropCollectionParticipant dropCollectionParticipant(nss());
+ const auto cmdObj = CommandHelpers::appendMajorityWriteConcern(
+ dropCollectionParticipant.toBSON({}));
+
sharding_ddl_util::sendAuthenticatedCommandToShards(
- opCtx,
- nss().db(),
- CommandHelpers::appendMajorityWriteConcern(
- dropCollectionParticipant.toBSON({})),
- {primaryShardId},
- **executor);
+ opCtx, nss().db(), cmdObj, {primaryShardId}, **executor);
// We need to send the drop to all the shards because both movePrimary and
// moveChunk leave garbage behind for sharded collections.
@@ -195,12 +174,7 @@ ExecutorFuture<void> DropCollectionCoordinator::_runImpl(
std::remove(participants.begin(), participants.end(), primaryShardId),
participants.end());
sharding_ddl_util::sendAuthenticatedCommandToShards(
- opCtx,
- nss().db(),
- CommandHelpers::appendMajorityWriteConcern(
- dropCollectionParticipant.toBSON({})),
- participants,
- **executor);
+ opCtx, nss().db(), cmdObj, participants, **executor);
ShardingLogging::get(opCtx)->logChange(opCtx, "dropCollection", nss().ns());
LOGV2(5390503, "Collection dropped", "namespace"_attr = nss());
diff --git a/src/mongo/db/s/drop_collection_coordinator.h b/src/mongo/db/s/drop_collection_coordinator.h
index 9baf2f47d6f..1f43b204de4 100644
--- a/src/mongo/db/s/drop_collection_coordinator.h
+++ b/src/mongo/db/s/drop_collection_coordinator.h
@@ -57,6 +57,10 @@ public:
static DropReply dropCollectionLocally(OperationContext* opCtx, const NamespaceString& nss);
private:
+ ShardingDDLCoordinatorMetadata const& metadata() const override {
+ return _doc.getShardingDDLCoordinatorMetadata();
+ }
+
ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) noexcept override;
@@ -77,8 +81,6 @@ private:
};
}
- void _insertStateDocument(StateDoc&& doc);
- void _updateStateDocument(StateDoc&& newStateDoc);
void _enterPhase(Phase newPhase);
DropCollectionCoordinatorDocument _doc;
diff --git a/src/mongo/db/s/drop_database_coordinator.cpp b/src/mongo/db/s/drop_database_coordinator.cpp
index 17d2b612f24..e20a72dbab1 100644
--- a/src/mongo/db/s/drop_database_coordinator.cpp
+++ b/src/mongo/db/s/drop_database_coordinator.cpp
@@ -117,26 +117,6 @@ boost::optional<BSONObj> DropDatabaseCoordinator::reportForCurrentOp(
return bob.obj();
}
-void DropDatabaseCoordinator::_insertStateDocument(OperationContext* opCtx, StateDoc&& doc) {
- auto coorMetadata = doc.getShardingDDLCoordinatorMetadata();
- coorMetadata.setRecoveredFromDisk(true);
- doc.setShardingDDLCoordinatorMetadata(coorMetadata);
-
- PersistentTaskStore<StateDoc> store(NamespaceString::kShardingDDLCoordinatorsNamespace);
- store.add(opCtx, doc, WriteConcerns::kMajorityWriteConcern);
- _doc = std::move(doc);
-}
-
-void DropDatabaseCoordinator::_updateStateDocument(OperationContext* opCtx, StateDoc&& newDoc) {
- PersistentTaskStore<StateDoc> store(NamespaceString::kShardingDDLCoordinatorsNamespace);
- store.update(opCtx,
- BSON(StateDoc::kIdFieldName << _doc.getId().toBSON()),
- newDoc.toBSON(),
- WriteConcerns::kMajorityWriteConcern);
-
- _doc = std::move(newDoc);
-}
-
void DropDatabaseCoordinator::_enterPhase(Phase newPhase) {
StateDoc newDoc(_doc);
newDoc.setPhase(newPhase);
@@ -148,12 +128,11 @@ void DropDatabaseCoordinator::_enterPhase(Phase newPhase) {
"newPhase"_attr = DropDatabaseCoordinatorPhase_serializer(newDoc.getPhase()),
"oldPhase"_attr = DropDatabaseCoordinatorPhase_serializer(_doc.getPhase()));
- auto opCtx = cc().makeOperationContext();
if (_doc.getPhase() == Phase::kUnset) {
- _insertStateDocument(opCtx.get(), std::move(newDoc));
+ _doc = _insertStateDocument(std::move(newDoc));
return;
}
- _updateStateDocument(opCtx.get(), std::move(newDoc));
+ _doc = _updateStateDocument(cc().makeOperationContext().get(), std::move(newDoc));
}
ExecutorFuture<void> DropDatabaseCoordinator::_runImpl(
@@ -191,7 +170,7 @@ ExecutorFuture<void> DropDatabaseCoordinator::_runImpl(
auto newStateDoc = _doc;
newStateDoc.setCollInfo(coll);
- _updateStateDocument(opCtx, std::move(newStateDoc));
+ _doc = _updateStateDocument(opCtx, std::move(newStateDoc));
dropShardedCollection(opCtx, coll, executor);
}
diff --git a/src/mongo/db/s/drop_database_coordinator.h b/src/mongo/db/s/drop_database_coordinator.h
index 085e0b6d8cd..2b9d4774088 100644
--- a/src/mongo/db/s/drop_database_coordinator.h
+++ b/src/mongo/db/s/drop_database_coordinator.h
@@ -49,6 +49,10 @@ public:
MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept override;
private:
+ ShardingDDLCoordinatorMetadata const& metadata() const override {
+ return _doc.getShardingDDLCoordinatorMetadata();
+ }
+
ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) noexcept override;
@@ -69,8 +73,6 @@ private:
};
}
- void _insertStateDocument(OperationContext* opCtx, StateDoc&& doc);
- void _updateStateDocument(OperationContext* opCtx, StateDoc&& newStateDoc);
void _enterPhase(Phase newPhase);
DropDatabaseCoordinatorDocument _doc;
diff --git a/src/mongo/db/s/rename_collection_coordinator.cpp b/src/mongo/db/s/rename_collection_coordinator.cpp
index 508eb01f51f..cf943e4a559 100644
--- a/src/mongo/db/s/rename_collection_coordinator.cpp
+++ b/src/mongo/db/s/rename_collection_coordinator.cpp
@@ -111,28 +111,6 @@ boost::optional<BSONObj> RenameCollectionCoordinator::reportForCurrentOp(
return bob.obj();
}
-void RenameCollectionCoordinator::_insertStateDocument(StateDoc&& doc) {
- auto coorMetadata = doc.getShardingDDLCoordinatorMetadata();
- coorMetadata.setRecoveredFromDisk(true);
- doc.setShardingDDLCoordinatorMetadata(coorMetadata);
-
- auto opCtx = cc().makeOperationContext();
- PersistentTaskStore<StateDoc> store(NamespaceString::kShardingDDLCoordinatorsNamespace);
- store.add(opCtx.get(), doc, WriteConcerns::kMajorityWriteConcern);
- _doc = std::move(doc);
-}
-
-void RenameCollectionCoordinator::_updateStateDocument(StateDoc&& newDoc) {
- auto opCtx = cc().makeOperationContext();
- PersistentTaskStore<StateDoc> store(NamespaceString::kShardingDDLCoordinatorsNamespace);
- store.update(opCtx.get(),
- BSON(StateDoc::kIdFieldName << _doc.getId().toBSON()),
- newDoc.toBSON(),
- WriteConcerns::kMajorityWriteConcern);
-
- _doc = std::move(newDoc);
-}
-
void RenameCollectionCoordinator::_enterPhase(Phase newPhase) {
StateDoc newDoc(_doc);
newDoc.setPhase(newPhase);
@@ -146,10 +124,10 @@ void RenameCollectionCoordinator::_enterPhase(Phase newPhase) {
"oldPhase"_attr = RenameCollectionCoordinatorPhase_serializer(_doc.getPhase()));
if (_doc.getPhase() == Phase::kUnset) {
- _insertStateDocument(std::move(newDoc));
+ _doc = _insertStateDocument(std::move(newDoc));
return;
}
- _updateStateDocument(std::move(newDoc));
+ _doc = _updateStateDocument(cc().makeOperationContext().get(), std::move(newDoc));
}
ExecutorFuture<void> RenameCollectionCoordinator::_runImpl(
diff --git a/src/mongo/db/s/rename_collection_coordinator.h b/src/mongo/db/s/rename_collection_coordinator.h
index c3779b6c289..badc1a59ee9 100644
--- a/src/mongo/db/s/rename_collection_coordinator.h
+++ b/src/mongo/db/s/rename_collection_coordinator.h
@@ -59,6 +59,10 @@ public:
}
private:
+ ShardingDDLCoordinatorMetadata const& metadata() const override {
+ return _doc.getShardingDDLCoordinatorMetadata();
+ }
+
ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) noexcept override;
@@ -81,8 +85,6 @@ private:
};
}
- void _insertStateDocument(StateDoc&& doc);
- void _updateStateDocument(StateDoc&& newStateDoc);
void _enterPhase(Phase newPhase);
RenameCollectionCoordinatorDocument _doc;
diff --git a/src/mongo/db/s/sharding_ddl_coordinator.cpp b/src/mongo/db/s/sharding_ddl_coordinator.cpp
index e519eec4787..a638319db54 100644
--- a/src/mongo/db/s/sharding_ddl_coordinator.cpp
+++ b/src/mongo/db/s/sharding_ddl_coordinator.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/s/sharding_ddl_coordinator.h"
#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/logical_session_id_helpers.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/s/database_sharding_state.h"
#include "mongo/db/s/operation_sharding_state.h"
@@ -62,9 +63,8 @@ ShardingDDLCoordinatorMetadata extractShardingDDLCoordinatorMetadata(const BSONO
ShardingDDLCoordinator::ShardingDDLCoordinator(ShardingDDLCoordinatorService* service,
const BSONObj& coorDoc)
: _service(service),
- _coorMetadata(extractShardingDDLCoordinatorMetadata(coorDoc)),
- _coorName(DDLCoordinatorType_serializer(_coorMetadata.getId().getOperationType())),
- _recoveredFromDisk(_coorMetadata.getRecoveredFromDisk()) {}
+ _coordId(extractShardingDDLCoordinatorMetadata(coorDoc).getId()),
+ _recoveredFromDisk(extractShardingDDLCoordinatorMetadata(coorDoc).getRecoveredFromDisk()) {}
ShardingDDLCoordinator::~ShardingDDLCoordinator() {
invariant(_constructionCompletionPromise.getFuture().isReady());
@@ -79,8 +79,7 @@ bool ShardingDDLCoordinator::_removeDocument(OperationContext* opCtx) {
deleteOp.setDeletes({[&] {
write_ops::DeleteOpEntry entry;
- entry.setQ(BSON(ShardingDDLCoordinatorMetadata::kIdFieldName
- << _coorMetadata.getId().toBSON()));
+ entry.setQ(BSON(ShardingDDLCoordinatorMetadata::kIdFieldName << _coordId.toBSON()));
entry.setMulti(true);
return entry;
}()});
@@ -116,8 +115,9 @@ ExecutorFuture<void> ShardingDDLCoordinator::_acquireLockAsync(
auto* opCtx = opCtxHolder.get();
auto distLockManager = DistLockManager::get(opCtx);
+ const auto coorName = DDLCoordinatorType_serializer(_coordId.getOperationType());
auto dbDistLock = uassertStatusOK(distLockManager->lock(
- opCtx, resource, _coorName, DistLockManager::kDefaultLockTimeout));
+ opCtx, resource, coorName, DistLockManager::kDefaultLockTimeout));
_scopedLocks.emplace(dbDistLock.moveToAnotherThread());
})
.until([this](Status status) { return (!_recoveredFromDisk) || status.isOK(); })
@@ -129,7 +129,7 @@ void ShardingDDLCoordinator::interrupt(Status status) {
LOGV2_DEBUG(5390535,
1,
"Sharding DDL Coordinator received an interrupt",
- "coordinatorId"_attr = _coorMetadata.getId(),
+ "coordinatorId"_attr = _coordId,
"reason"_attr = redact(status));
// Resolve any unresolved promises to avoid hanging.
@@ -153,10 +153,10 @@ SemiFuture<void> ShardingDDLCoordinator::run(std::shared_ptr<executor::ScopedTas
if (!nss().isConfigDB() && !_recoveredFromDisk) {
auto opCtxHolder = cc().makeOperationContext();
auto* opCtx = opCtxHolder.get();
- invariant(_coorMetadata.getDatabaseVersion());
+ invariant(metadata().getDatabaseVersion());
OperationShardingState::get(opCtx).initializeClientRoutingVersions(
- nss(), boost::none /* ChunkVersion */, _coorMetadata.getDatabaseVersion());
+ nss(), boost::none /* ChunkVersion */, metadata().getDatabaseVersion());
// Check under the dbLock if this is still the primary shard for the database
DatabaseShardingState::checkIsPrimaryShardForDb(opCtx, nss().db());
};
@@ -188,10 +188,8 @@ SemiFuture<void> ShardingDDLCoordinator::run(std::shared_ptr<executor::ScopedTas
.onError([this, anchor = shared_from_this()](const Status& status) {
static constexpr auto& errorMsg =
"Failed to complete construction of sharding DDL coordinator";
- LOGV2_ERROR(5390530,
- errorMsg,
- "coordinatorId"_attr = _coorMetadata.getId(),
- "reason"_attr = redact(status));
+ LOGV2_ERROR(
+ 5390530, errorMsg, "coordinatorId"_attr = _coordId, "reason"_attr = redact(status));
stdx::lock_guard<Latch> lg(_mutex);
if (!_constructionCompletionPromise.getFuture().isReady()) {
@@ -222,7 +220,7 @@ SemiFuture<void> ShardingDDLCoordinator::run(std::shared_ptr<executor::ScopedTas
LOGV2_DEBUG(5656000,
1,
"Re-executing sharding DDL coordinator",
- "coordinatorId"_attr = _coorMetadata.getId(),
+ "coordinatorId"_attr = _coordId,
"reason"_attr = redact(status));
return false;
}
@@ -240,25 +238,31 @@ SemiFuture<void> ShardingDDLCoordinator::run(std::shared_ptr<executor::ScopedTas
// Release the coordinator only if we are not stepping down
if (!status.isA<ErrorCategory::NotPrimaryError>() &&
!status.isA<ErrorCategory::ShutdownError>()) {
-
try {
LOGV2(5565601,
"Releasing sharding DDL coordinator",
- "coordinatorId"_attr = _coorMetadata.getId());
+ "coordinatorId"_attr = _coordId);
+ auto session = metadata().getSession();
const auto docWasRemoved = _removeDocument(opCtx);
if (!docWasRemoved) {
// Release the instance without interrupting it
- _service->releaseInstance(BSON(ShardingDDLCoordinatorMetadata::kIdFieldName
- << _coorMetadata.getId().toBSON()),
- Status::OK());
+ _service->releaseInstance(
+ BSON(ShardingDDLCoordinatorMetadata::kIdFieldName << _coordId.toBSON()),
+ Status::OK());
+ }
+
+ if (status.isOK() && session) {
+ // Return lsid to the SessionCache. If status is not OK, let the lsid be
+ // discarded.
+ SessionCache::get(opCtx)->release(*session);
}
} catch (DBException& ex) {
static constexpr auto errMsg = "Failed to release sharding DDL coordinator";
LOGV2_WARNING(5565605,
errMsg,
- "coordinatorId"_attr = _coorMetadata.getId(),
+ "coordinatorId"_attr = _coordId,
"error"_attr = redact(ex));
completionStatus = ex.toStatus(errMsg);
}
@@ -297,4 +301,49 @@ SemiFuture<void> ShardingDDLCoordinator_NORESILIENT::run(OperationContext* opCtx
return runImpl(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor());
}
+const auto serviceDecorator =
+ ServiceContext::declareDecoration<ShardingDDLCoordinator::SessionCache>();
+
+auto ShardingDDLCoordinator::SessionCache::get(ServiceContext* serviceContext) -> SessionCache* {
+ return &serviceDecorator(serviceContext);
+}
+
+auto ShardingDDLCoordinator::SessionCache::get(OperationContext* opCtx) -> SessionCache* {
+ return get(opCtx->getServiceContext());
+}
+
+ShardingDDLSession ShardingDDLCoordinator::SessionCache::acquire() {
+ const ShardingDDLSession session = [&] {
+ stdx::unique_lock<Latch> lock(_cacheMutex);
+
+ if (!_cache.empty()) {
+ auto session = std::move(_cache.top());
+ _cache.pop();
+ return session;
+ } else {
+ return ShardingDDLSession(makeSystemLogicalSessionId(), TxnNumber(0));
+ }
+ }();
+
+ LOGV2_DEBUG(5565606,
+ 2,
+ "Acquired new DDL logical session",
+ "lsid"_attr = session.getLsid(),
+ "txnNumber"_attr = session.getTxnNumber());
+
+ return session;
+}
+
+void ShardingDDLCoordinator::SessionCache::release(ShardingDDLSession session) {
+ LOGV2_DEBUG(5565607,
+ 2,
+ "Released DDL logical session",
+ "lsid"_attr = session.getLsid(),
+ "highestUsedTxnNumber"_attr = session.getTxnNumber());
+
+ session.setTxnNumber(session.getTxnNumber() + 1);
+ stdx::unique_lock<Latch> lock(_cacheMutex);
+ _cache.push(std::move(session));
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/sharding_ddl_coordinator.h b/src/mongo/db/s/sharding_ddl_coordinator.h
index 07397e7c4dc..49372a81080 100644
--- a/src/mongo/db/s/sharding_ddl_coordinator.h
+++ b/src/mongo/db/s/sharding_ddl_coordinator.h
@@ -31,6 +31,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/persistent_task_store.h"
#include "mongo/db/s/dist_lock_manager.h"
#include "mongo/db/s/forwardable_operation_metadata.h"
#include "mongo/db/s/sharding_ddl_coordinator_gen.h"
@@ -81,24 +82,89 @@ public:
}
const NamespaceString& nss() const {
- return _coorMetadata.getId().getNss();
+ return _coordId.getNss();
}
const ForwardableOperationMetadata& getForwardableOpMetadata() const {
- invariant(_coorMetadata.getForwardableOpMetadata());
- return _coorMetadata.getForwardableOpMetadata().get();
+ invariant(metadata().getForwardableOpMetadata());
+ return metadata().getForwardableOpMetadata().get();
}
+ // Cached LSIDs shared between DDL coordinator instances
+ class SessionCache {
+
+ public:
+ SessionCache() = default;
+
+ static SessionCache* get(ServiceContext* serviceContext);
+ static SessionCache* get(OperationContext* opCtx);
+
+ ShardingDDLSession acquire();
+
+ void release(ShardingDDLSession);
+
+ private:
+ std::stack<ShardingDDLSession> _cache;
+
+ // Protects _cache.
+ mutable Mutex _cacheMutex = MONGO_MAKE_LATCH("SessionCache::_cacheMutex");
+ };
+
protected:
virtual std::vector<StringData> _acquireAdditionalLocks(OperationContext* opCtx) {
return {};
};
+ virtual ShardingDDLCoordinatorMetadata const& metadata() const = 0;
+
+ template <typename StateDoc>
+ StateDoc _insertStateDocument(StateDoc&& newDoc) {
+ auto copyMetadata = newDoc.getShardingDDLCoordinatorMetadata();
+ copyMetadata.setRecoveredFromDisk(true);
+ newDoc.setShardingDDLCoordinatorMetadata(copyMetadata);
+
+ auto opCtx = cc().makeOperationContext();
+ PersistentTaskStore<StateDoc> store(NamespaceString::kShardingDDLCoordinatorsNamespace);
+ store.add(opCtx.get(), newDoc, WriteConcerns::kMajorityWriteConcern);
+
+ return std::move(newDoc);
+ }
+
+ template <typename StateDoc>
+ StateDoc _updateStateDocument(OperationContext* opCtx, StateDoc&& newDoc) {
+ PersistentTaskStore<StateDoc> store(NamespaceString::kShardingDDLCoordinatorsNamespace);
+ invariant(newDoc.getShardingDDLCoordinatorMetadata().getRecoveredFromDisk());
+ store.update(opCtx,
+ BSON(StateDoc::kIdFieldName << newDoc.getId().toBSON()),
+ newDoc.toBSON(),
+ WriteConcerns::kMajorityWriteConcern);
+ return std::move(newDoc);
+ }
+
+ // lazily acqiure Logical Session ID and a txn number
+ template <typename StateDoc>
+ StateDoc _updateSession(OperationContext* opCtx, StateDoc const& doc) {
+ auto newShardingDDLCoordinatorMetadata = doc.getShardingDDLCoordinatorMetadata();
+
+ auto optSession = newShardingDDLCoordinatorMetadata.getSession();
+ if (optSession) {
+ auto txnNumber = optSession->getTxnNumber();
+ optSession->setTxnNumber(++txnNumber);
+ newShardingDDLCoordinatorMetadata.setSession(optSession);
+ } else {
+ newShardingDDLCoordinatorMetadata.setSession(SessionCache::get(opCtx)->acquire());
+ }
+
+ StateDoc newDoc(doc);
+ newDoc.setShardingDDLCoordinatorMetadata(std::move(newShardingDDLCoordinatorMetadata));
+ return _updateStateDocument(opCtx, std::move(newDoc));
+ }
+
+protected:
ShardingDDLCoordinatorService* _service;
- ShardingDDLCoordinatorMetadata _coorMetadata;
- StringData _coorName;
+ const ShardingDDLCoordinatorId _coordId;
- bool _recoveredFromDisk;
+ const bool _recoveredFromDisk;
bool _completeOnError{false};
private:
diff --git a/src/mongo/db/s/sharding_ddl_coordinator.idl b/src/mongo/db/s/sharding_ddl_coordinator.idl
index 3a28d9f2e67..6aeb578be09 100644
--- a/src/mongo/db/s/sharding_ddl_coordinator.idl
+++ b/src/mongo/db/s/sharding_ddl_coordinator.idl
@@ -36,6 +36,7 @@ global:
imports:
- "mongo/idl/basic_types.idl"
+ - "mongo/db/logical_session_id.idl"
enums:
DDLCoordinatorType:
@@ -74,6 +75,15 @@ structs:
operationType:
description: "Type of the sharding DDL coordinator."
type: DDLCoordinatorType
+
+ ShardingDDLSession:
+ description: "Container for DDL coordinator session."
+ strict: true
+ fields:
+ lsid:
+ type: LogicalSessionId
+ txnNumber:
+ type: TxnNumber
ShardingDDLCoordinatorMetadata:
description: "Commong metadata for all sharding DDL coordinator."
@@ -92,4 +102,6 @@ structs:
databaseVersion:
type: DatabaseVersion
optional: true
-
+ session:
+ type: ShardingDDLSession
+ optional: true
diff --git a/src/mongo/s/commands/cluster_balancer_configure_auto_split.cpp b/src/mongo/s/commands/cluster_balancer_configure_auto_split.cpp
new file mode 100644
index 00000000000..caeb86a724c
--- /dev/null
+++ b/src/mongo/s/commands/cluster_balancer_configure_auto_split.cpp
@@ -0,0 +1,120 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/auth/action_set.h"
+#include "mongo/db/auth/action_type.h"
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/auth/privilege.h"
+#include "mongo/db/catalog_raii.h"
+#include "mongo/db/client.h"
+#include "mongo/db/commands.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/idl/idl_parser.h"
+#include "mongo/s/catalog_cache_loader.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/request_types/configure_collection_auto_split_gen.h"
+
+
+namespace mongo {
+namespace {
+
+class ConfigCollAutoSplitCmd final : public TypedCommand<ConfigCollAutoSplitCmd> {
+public:
+ using Request = ConfigureCollAutoSplit;
+ using Response = void;
+
+ class Invocation final : public InvocationBase {
+ public:
+ using InvocationBase::InvocationBase;
+
+ StringData kStatusField = "status"_sd;
+
+ Response typedRun(OperationContext* opCtx) {
+ const NamespaceString& nss = ns();
+
+ ConfigsvrConfigureCollAutoSplit configsvrRequest(nss);
+ configsvrRequest.setConfigureCollAutoSplit(request().getConfigureCollAutoSplit());
+ configsvrRequest.setDbName(request().getDbName());
+
+ auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
+ auto cmdResponse = uassertStatusOK(configShard->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ NamespaceString::kAdminDb.toString(),
+ configsvrRequest.toBSON({}),
+ Shard::RetryPolicy::kIdempotent));
+
+ uassertStatusOK(cmdResponse.commandStatus);
+ }
+
+ private:
+ NamespaceString ns() const override {
+ return request().getCommandParameter();
+ }
+
+ bool supportsWriteConcern() const override {
+ return false;
+ }
+
+ void doCheckAuthorization(OperationContext* opCtx) const override {
+ ActionSet actions({ActionType::splitChunk});
+ if (request().getBalancerShouldMergeChunks().get_value_or(false)) {
+ actions.addAction(ActionType::moveChunk);
+ }
+ uassert(ErrorCodes::Unauthorized,
+ "Unauthorized",
+ AuthorizationSession::get(opCtx->getClient())
+ ->isAuthorizedForActionsOnResource(ResourcePattern::forExactNamespace(ns()),
+ actions));
+ }
+ };
+
+ std::string help() const override {
+ return "command to check whether the chunks of a given collection are in a quiesced state "
+ "or there are any which need to be moved because of (1) draining shards, (2) zone "
+ "violation or (3) imbalance between shards";
+ }
+
+ bool adminOnly() const override {
+ return true;
+ }
+
+ AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
+ return AllowedOnSecondary::kNever;
+ }
+
+} balancerConfigureAutoSplitCmd;
+
+} // namespace
+} // namespace mongo