summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/reshard_collection_coordinator.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/reshard_collection_coordinator.cpp')
-rw-r--r--src/mongo/db/s/reshard_collection_coordinator.cpp146
1 files changed, 105 insertions, 41 deletions
diff --git a/src/mongo/db/s/reshard_collection_coordinator.cpp b/src/mongo/db/s/reshard_collection_coordinator.cpp
index 76d75706614..9a9b8af12ba 100644
--- a/src/mongo/db/s/reshard_collection_coordinator.cpp
+++ b/src/mongo/db/s/reshard_collection_coordinator.cpp
@@ -39,51 +39,115 @@
namespace mongo {
-ReshardCollectionCoordinator::ReshardCollectionCoordinator(
- OperationContext* opCtx, const ShardsvrReshardCollection& reshardCollectionParams)
- : ShardingDDLCoordinator_NORESILIENT(opCtx, reshardCollectionParams.getCommandParameter()),
- _serviceContext(opCtx->getServiceContext()),
- _requestObj(reshardCollectionParams.serialize({})),
- _request(ShardsvrReshardCollection::parse(IDLParserErrorContext("_shardsvrReshardCollection"),
- _requestObj)),
- _nss(_request.getCommandParameter()) {}
-
-SemiFuture<void> ReshardCollectionCoordinator::runImpl(
- std::shared_ptr<executor::TaskExecutor> executor) {
- return ExecutorFuture<void>(executor, Status::OK())
- .then([this, anchor = shared_from_this()]() {
- ThreadClient tc("ReshardCollectionCoordinator", _serviceContext);
- auto opCtxHolder = tc->makeOperationContext();
- auto* opCtx = opCtxHolder.get();
- _forwardableOpMetadata.setOn(opCtx);
-
- ConfigsvrReshardCollection configsvrReshardCollection(_nss, _request.getKey());
- configsvrReshardCollection.setDbName(_request.getDbName());
- configsvrReshardCollection.setUnique(_request.getUnique());
- configsvrReshardCollection.setCollation(_request.getCollation());
- configsvrReshardCollection.set_presetReshardedChunks(
- _request.get_presetReshardedChunks());
- configsvrReshardCollection.setZones(_request.getZones());
- configsvrReshardCollection.setNumInitialChunks(_request.getNumInitialChunks());
-
- auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
- const auto cmdResponse = uassertStatusOK(configShard->runCommandWithFixedRetryAttempts(
- opCtx,
- ReadPreferenceSetting(ReadPreference::PrimaryOnly),
- "admin",
- CommandHelpers::appendMajorityWriteConcern(configsvrReshardCollection.toBSON({}),
- opCtx->getWriteConcern()),
- Shard::RetryPolicy::kIdempotent));
- uassertStatusOK(Shard::CommandResponse::getEffectiveStatus(std::move(cmdResponse)));
- })
+ReshardCollectionCoordinator::ReshardCollectionCoordinator(ShardingDDLCoordinatorService* service,
+ const BSONObj& initialState)
+ : ReshardCollectionCoordinator(service, initialState, true /* persistCoordinatorDocument */) {}
+
+ReshardCollectionCoordinator::ReshardCollectionCoordinator(ShardingDDLCoordinatorService* service,
+ const BSONObj& initialState,
+ bool persistCoordinatorDocument)
+ : ShardingDDLCoordinator(service, initialState),
+ _initialState(initialState.getOwned()),
+ _doc(ReshardCollectionCoordinatorDocument::parse(
+ IDLParserErrorContext("ReshardCollectionCoordinatorDocument"), _initialState)),
+ _persistCoordinatorDocument(persistCoordinatorDocument) {}
+
+void ReshardCollectionCoordinator::checkIfOptionsConflict(const BSONObj& doc) const {
+ const auto otherDoc = ReshardCollectionCoordinatorDocument::parse(
+ IDLParserErrorContext("ReshardCollectionCoordinatorDocument"), doc);
+
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ "Another reshard collection with different arguments is already running for the same "
+ "namespace",
+ SimpleBSONObjComparator::kInstance.evaluate(
+ _doc.getReshardCollectionRequest().toBSON() ==
+ otherDoc.getReshardCollectionRequest().toBSON()));
+}
+
+boost::optional<BSONObj> ReshardCollectionCoordinator::reportForCurrentOp(
+ MongoProcessInterface::CurrentOpConnectionsMode connMode,
+ MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept {
+ BSONObjBuilder cmdBob;
+ if (const auto& optComment = getForwardableOpMetadata().getComment()) {
+ cmdBob.append(optComment.get().firstElement());
+ }
+ cmdBob.appendElements(_doc.getReshardCollectionRequest().toBSON());
+
+ BSONObjBuilder bob;
+ bob.append("type", "op");
+ bob.append("desc", "ReshardCollectionCoordinator");
+ bob.append("op", "command");
+ bob.append("ns", nss().toString());
+ bob.append("command", cmdBob.obj());
+ bob.append("active", true);
+ return bob.obj();
+}
+
+void ReshardCollectionCoordinator::_enterPhase(Phase newPhase) {
+ if (!_persistCoordinatorDocument) {
+ return;
+ }
+
+ StateDoc newDoc(_doc);
+ newDoc.setPhase(newPhase);
+
+ LOGV2_DEBUG(6206400,
+ 2,
+ "Reshard collection coordinator phase transition",
+ "namespace"_attr = nss(),
+ "newPhase"_attr = ReshardCollectionCoordinatorPhase_serializer(newDoc.getPhase()),
+ "oldPhase"_attr = ReshardCollectionCoordinatorPhase_serializer(_doc.getPhase()));
+
+ if (_doc.getPhase() == Phase::kUnset) {
+ _doc = _insertStateDocument(std::move(newDoc));
+ return;
+ }
+ _doc = _updateStateDocument(cc().makeOperationContext().get(), std::move(newDoc));
+}
+
+ExecutorFuture<void> ReshardCollectionCoordinator::_runImpl(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancellationToken& token) noexcept {
+ return ExecutorFuture<void>(**executor)
+ .then(_executePhase(
+ Phase::kReshard,
+ [this, anchor = shared_from_this()] {
+ auto opCtxHolder = cc().makeOperationContext();
+ auto* opCtx = opCtxHolder.get();
+ getForwardableOpMetadata().setOn(opCtx);
+
+ ConfigsvrReshardCollection configsvrReshardCollection(nss(), _doc.getKey());
+ configsvrReshardCollection.setDbName(nss().db());
+ configsvrReshardCollection.setUnique(_doc.getUnique());
+ configsvrReshardCollection.setCollation(_doc.getCollation());
+ configsvrReshardCollection.set_presetReshardedChunks(
+ _doc.get_presetReshardedChunks());
+ configsvrReshardCollection.setZones(_doc.getZones());
+ configsvrReshardCollection.setNumInitialChunks(_doc.getNumInitialChunks());
+
+ const auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
+
+ const auto cmdResponse =
+ uassertStatusOK(configShard->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ NamespaceString::kAdminDb.toString(),
+ CommandHelpers::appendMajorityWriteConcern(
+ configsvrReshardCollection.toBSON({}), opCtx->getWriteConcern()),
+ Shard::RetryPolicy::kIdempotent));
+ uassertStatusOK(Shard::CommandResponse::getEffectiveStatus(std::move(cmdResponse)));
+ }))
.onError([this, anchor = shared_from_this()](const Status& status) {
- LOGV2_ERROR(5276001,
+ LOGV2_ERROR(6206401,
"Error running reshard collection",
- "namespace"_attr = _nss,
+ "namespace"_attr = nss(),
"error"_attr = redact(status));
return status;
- })
- .semi();
+ });
}
+ReshardCollectionCoordinator_NORESILIENT::ReshardCollectionCoordinator_NORESILIENT(
+ ShardingDDLCoordinatorService* service, const BSONObj& initialState)
+ : ReshardCollectionCoordinator(service, initialState, false /* persistCoordinatorDocument */) {}
+
} // namespace mongo